[R] Rmpi - send/receive multiple objects to slaves

Nathan S. Watson-Haigh nathan.watson-haigh at csiro.au
Wed Mar 25 01:35:00 CET 2009


-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA1

I've written a function that uses Rmpi to perform a calculation in parallel. It
works fine, but I'm trying to improve efficiency in terms of memory usage and
the amount of data being passed back and forth between mater and slaves.

Calculations are performed on a symmetrical matrix in order to zero-out some of
the cells.

In the parallel version of my function (relevant code below), I get the slaves
to only return the index's of the cells that were zero'd out and get the master
to zero those cells in a copy of the original matrix, the result matrix. This
way the slaves are doing all the computationally intensive tasks and the master
is simply putting the pieces together. At the moment, I broadcast the whole
matrix to each slave. I split the whole job such that each slave only has 1 task
of approximately equal sizes to perform. Since each task to be performed by a
slave is on a subset of the original matrix, and due to its large size
(nrow/ncol = 10-15k), it makes sense to only send the required subset of the
matrix to each slave. How do I do this?

For example, rather than broadcasting the whole of matrix m to all slaves:
mpi.bcast.Robj2slave(m)

Can I send a subset of the matrix (depending on what the tasks is) at the same
time I send the task to the slaves e.g.:
mpi.send.Robj(tasks[[1]], slave_id, 1)
idx <- c(tasks[[1]]$start:tasks[[1]]$end)
mpi.send.Robj(m[idx,idx], slave_id, 1)

If so, how do I receive the two objects in the slave? i.e. How do I modify the
following to receive the task and the subset of matrix m:
task <- mpi.recv.Robj(mpi.any.source(),mpi.any.tag())

Once I have this figured out, I could then only send the upper/lower triangle,
to halve the data being passed around, but one step at a time!

Cheers,
Nathan


<code>
  n_slaves <- mpi.comm.size()-1
  slave <- function() {
    # load my library in each slave
    library(my_lib)

    # Note the use of the tag for sent messages:
    #     1=ready_for_task, 2=done_task, 3=exiting
    # Note the use of the tag for received messages:
    #     1=task, 2=done_tasks
    junk <- 0
    done <- 0
    while (done != 1) {
      # Signal being ready to receive a new task
      mpi.send.Robj(junk,0,1)

      # Receive a task
      task <- mpi.recv.Robj(mpi.any.source(),mpi.any.tag())
      task_info <- mpi.get.sourcetag()
      tag <- task_info[2]

      if (tag == 1) {
        # do a task, taking values from the tasts list
        m_p <- .myFun(m, x=task$x)

        # Send a results message back to the master
        # only send back to the master the indices which have been zero'd out by
this slave
        idx <- which(m_p == 0 &! m == 0)
        results <- list(result=idx)
        mpi.send.Robj(results,0,2)

      } else if (tag == 2) {
        done <- 1
      }
    # We'll just ignore any unknown messages
    }

    mpi.send.Robj(junk,0,3)
  }

  mpi.bcast.Robj2slave(m)

  # Send functions to the slaves
  mpi.bcast.Robj2slave(slave)

  # Call the function in all the slaves to get them ready to
  # undertake tasks
  mpi.bcast.cmd(slave())


  # Create a list of tasks that will be executed in order by the slaves
  tasks <- defineTasks(m)

  # Create data structure to store the results
  m_p <- m

  junk <- 0
  closed_slaves <- 0
  while (closed_slaves < n_slaves) {
    # Receive a message from a slave
    message <- mpi.recv.Robj(mpi.any.source(),mpi.any.tag())
    message_info <- mpi.get.sourcetag()
    slave_id <- message_info[1]
    tag <- message_info[2]

    if (tag == 1) {
      # slave is ready for a task. Give it the next task, or tell it tasks
      # are done if there are none.
      if (length(tasks) > 0) {
        # Send a task, and then remove it from the task list
        mpi.send.Robj(tasks[[1]], slave_id, 1)
        tasks[[1]] <- NULL
      } else {
        mpi.send.Robj(junk, slave_id, 2)
      }

    } else if (tag == 2) {
      # The message contains results. Do something with the results.
      # Store them in the data structure
      # myFun() will change some cells in the matrix to zero, need to copy all
zero values from this node's result to the master result object
      m_pcit[message$result] <- 0

    } else if (tag == 3) {
      # A slave has closed down.
      closed_slaves <- closed_slaves + 1
    }
  }

  mpi.close.Rslaves()
</code>


- --
- --------------------------------------------------------
Dr. Nathan S. Watson-Haigh
OCE Post Doctoral Fellow
CSIRO Livestock Industries
Queensland Bioscience Precinct
St Lucia, QLD 4067
Australia

Tel: +61 (0)7 3214 2922
Fax: +61 (0)7 3214 2900
Web: http://www.csiro.au/people/Nathan.Watson-Haigh.html
- --------------------------------------------------------

-----BEGIN PGP SIGNATURE-----
Version: GnuPG v1.4.9 (MingW32)
Comment: Using GnuPG with Mozilla - http://enigmail.mozdev.org

iEYEARECAAYFAknJfDQACgkQ9gTv6QYzVL7tyACg0pgd0Ffcz0P/h3mDYF1VKcnw
AxYAn14nKautl6JQgWg6Rjl15xt7tr1O
=zd/R
-----END PGP SIGNATURE-----




More information about the R-help mailing list