Introduction
Running serial R code on the cluster is similar to running any other serial job. We give a serial example, and then demonstrate how to run parallel jobs using the snow and Rmpi packages. Make sure you’ve read the tutorial for C programs first, to understand the basics of serial and parallel programming on maya.
For more information about the software, see the R website.
To use R, load the R module.
[araim1@maya-usr1 ~]$ module load R/3.1.2 [araim1@maya-usr1 ~]$
You can check the version using the command below.
[araim1@maya-usr1 ~]$ R --version R version 3.1.2 (2014-10-31) -- "Pumpkin Helmet" Copyright (C) 2014 The R Foundation for Statistical Computing Platform: x86_64-unknown-linux-gnu (64-bit) R is free software and comes with ABSOLUTELY NO WARRANTY. You are welcome to redistribute it under the terms of the GNU General Public License versions 2 or 3. For more information about these matters see http://www.gnu.org/licenses/. [araim1@maya-usr1 ~]$
Serial example
Let us demonstrate running a simple serial example on the compute nodes. Consider the following R script.
Download: ../code/serial-R/driver.R
Here is a batch script to submit the job. We use the Rscript command to execute the script. There are other possible methods such as “R CMD BATCH driver.R”, whose behavior may vary (e.g. echoing your commands to the output and automatically saving your workspace before exiting).
#!/bin/bash #SBATCH --job-name=serialR #SBATCH --output=slurm.out #SBATCH --error=slurm.err #SBATCH --partition=develop Rscript driver.R
Download: ../code/serial-R/run.slurm
Submitting the batch script and checking the output, we have the following.
[araim1@maya-usr1 serial-R]$ sbatch run.slurm Submitted batch job 39 [araim1@maya-usr1 serial-R]$ cat slurm.err [araim1@maya-usr1 serial-R]$ cat slurm.out My sample from N(0,1) is: [1] -0.356303202 0.419682287 -0.230608279 0.135259560 1.822515337 [6] -0.449762634 -2.105769487 1.098466496 1.529508269 0.026835656 [11] -0.897587458 -1.288914041 0.220773489 0.376240073 0.864344607 [16] -1.327439568 0.422681814 -1.397241776 -1.654353308 -0.609916217 [21] -0.950127463 0.679627735 -0.392008111 0.682678308 -1.220454702 [26] -0.003588201 0.389992611 0.985849810 0.490708090 1.091854375 [31] 1.801349886 -0.085661133 0.788164285 -1.140966336 -1.598769489 [36] -0.089101948 0.276440157 1.419193659 -0.695946417 -0.101343436 [41] -2.650310815 1.860333545 -0.752810917 -1.203240435 -1.233831773 [46] -0.103542902 0.988442107 0.298530395 0.804604428 0.839184497 [araim1@maya-usr1 serial-R]$
Serial example with plot
On maya, some special steps are needed to produce some types of graphics such as PNG and JPEG. (PDF graphics do not require these steps). We will demonstrate with the following script which creates a scatter plot and adds a regression line.
lm.out <- lm(mpg ~ disp, data = mtcars) png("mtcars.png", width = 5, height = 5, units = "in", res = 100) plot(mtcars$disp, mtcars$mpg, main = "Displacement vs mpg in mtcars dataset", col = "blue", pch = 16, cex = 1.2) abline(coef(lm.out), col = "green", lwd = 2) dev.off()
Download: ../code/plot-R/driver.R
Our batch script will take a special step to ensure that a “virtual display” is available on the compute nodes. This allows graphics operations to be carried out even though no display is available.
#!/bin/bash #SBATCH --job-name=plotR #SBATCH --output=slurm.out #SBATCH --error=slurm.err #SBATCH --partition=develop Xvfb & export DISPLAY=:0 Rscript driver.R
Download: ../code/plot-R/run.slurm
Submitting the batch script gives the following result.
[araim1@maya-usr1 plot-R]$ sbatch run.slurm sbatch: Submitted batch job 2623 [araim1@maya-usr1 plot-R]$ cat slurm.err Initializing built-in extension Generic Event Extension ... DBE_WINDOW: 0 objects of 24 bytes = 0 total bytes 0 private allocs TOTAL: 4 objects, 64 bytes, 0 allocs [araim1@maya-usr1 plot-R]$ cat slurm.out null device 1 [araim1@maya-usr1 plot-R]$ ls -l total 20 -rw-r----- 1 araim1 pi_nagaraj 275 May 14 10:37 driver.R -rw-r----- 1 araim1 pi_nagaraj 3922 May 14 10:38 mtcars.png -rw-r----- 1 araim1 pi_nagaraj 162 May 14 10:37 run.slurm -rw-rw---- 1 araim1 pi_nagaraj 2277 May 14 10:38 slurm.err -rw-rw---- 1 araim1 pi_nagaraj 72 May 14 10:38 slurm.out [araim1@maya-usr1 plot-R]$
The messages in slurm.err are status messages from Xvfb. Notice that we have the graphic mtcars.png in the output. If mtcars.png is zero bytes, it likely means that the virtual display did not work correctly. Viewing the image should produce a result as follows.
Refer to this page for help with viewing images on maya.
Preparing to use parallel libraries
We have made several parallel R packages available for all cluster users. They have all been prepared with OpenMPI 1.4.3 + GCC (module “openmpi/gcc/64”). You may install your own versions of the libraries locally as well, but for the rest of this page we will assume you are using ours. To prepare your environment for the cluster-wide parallel R libraries, ensure that the following modules are loaded.
[araim1@maya-usr1 ~]$ module load openmpi/gcc/64 [araim1@maya-usr1 ~]$ module load R/3.1.2
To make sure you can access the necessary libraries, run the following in R.
[araim1@maya-usr1 ~]$ R R version 3.0.2 (2013-09-25) Copyright (C) 2013 The R Foundation for Statistical Computing Platform: x86_64-unknown-linux-gnu (64-bit) ... > .libPaths() [1] "/usr/cluster/contrib/R/3.1.2/R-3.1.2/library" [2] "/home/araim1/R/x86_64-unknown-linux-gnu-library/3.0" > library(snow) > library(Rmpi) library(pbdMPI) library(pbdSLAP) library(pbdBASE) library(pbdDMAT) library(pbdDEMO)
Your .libPaths() may not be exactly the same, but should have similar entries. The first entry contains cluster-wide libraries, and the second entry contains the user’s locally installed libraries. The local library directory may not exist for you if you haven’t installed any libraries for yourself yet.
A snow example
Now we’ll look at running parallel R jobs with snow. snow (Simple Network Of Workstations) provides a simple but powerful programming model for solving “embarassingly parallel” problems with R. For more information about programming snow, see snow Simplified or Luke Tierney’s web site.
Two versions of simple snow scripts will be presented: one using socket-based communication, and one using MPI communication. The programmer interacts with the same high-level snow interface using either communication method. However, the code to initialize the snow cluster, as well as the performance, differs between methods. In our current setup, socket-based seems to outperform MPI significantly, so that is the recommended method. However, note that socket-based clusters are limited to a maximum of 126 workers.
snow with socket communication
snow with sockets is implemented specially on maya because most users don’t have access to SSH to compute nodes. Instead of SSH, srun is used to launch snow workers. This has been implemented in a simple R package called snowslurm, which is already set up on the cluster. We’ll demonstrate by running the following code.
library(snowslurm) # Initialize SNOW using SOCKET communication, customized for SLURM # Everything else is standard SNOW cluster <- makeSLURMcluster() # Print the hostname for each cluster member sayhello <- function() { info <- Sys.info()[c("nodename", "machine")] paste("Hello from", info[1], "with CPU type", info[2]) } names <- clusterCall(cluster, sayhello) print(unlist(names)) # Compute row sums in parallel using all processes, # then a grand sum at the end on the master process parallelSum <- function(m, n) { A <- matrix(rnorm(m*n), nrow = m, ncol = n) row.sums <- parApply(cluster, A, 1, sum) print(sum(row.sums)) } parallelSum(500, 500) stopCluster(cluster)
Download: ../code/R-snow-sock-test2/snow-test.R
We set up a snow cluster from our assigned nodes, print out the hostnames of the nodes, and perform a few simple parallel calculations before stopping the snow cluster and exiting. Notice that we use the “snowslurm” library rather than “snow” on the first line. This is our customized version of snow with sockets for maya.
When using “makeSLURMcluster” to create a cluster, we expect to be running in batch mode from a SLURM submission script. If we try to run this script from the R prompt like a normal R script, it will fail. Here is a sample batch script.
#!/bin/bash #SBATCH --job-name=snow-sock-test #SBATCH --output=slurm.out #SBATCH --error=slurm.err #SBATCH --partition=develop #SBATCH --nodes=2 #SBATCH --ntasks-per-node=3 R --no-save < snow-test.R
Download: ../code/R-snow-sock-test2/run.slurm
Notice that we’re requesting six processes in the develop queue, and launching R as a batch process.
[araim1@maya-usr1 R-snow-sock-test2]$ sbatch run.slurm sbatch: Submitted batch job 2648 [araim1@maya-usr1 R-snow-sock-test2]$
Once the job completes, we should see something like the following in our slurm.out output (and slurm.err should be empty):
R version 3.0.2 (2013-09-25) -- "Frisbee Sailing" Copyright (C) 2013 The R Foundation for Statistical Computing Platform: x86_64-unknown-linux-gnu (64-bit) R is free software and comes with ABSOLUTELY NO WARRANTY. You are welcome to redistribute it under certain conditions. Type 'license()' or 'licence()' for distribution details. Natural language support but running in an English locale R is a collaborative project with many contributors. Type 'contributors()' for more information and 'citation()' on how to cite R or R packages in publications. Type 'demo()' for some demos, 'help()' for on-line help, or 'help.start()' for an HTML browser interface to help. Type 'q()' to quit R. > library(snowslurm) > > # Initialize SNOW using SOCKET communication, customized for SLURM > # Everything else is standard SNOW > cluster <- makeSLURMcluster() 2011-08-04 05:26:25 - About to spawn 6 remote slaves for SLURM SOCKET cluster 2011-08-04 05:26:26 - Remote cluster is constructed > > # Print the hostname for each cluster member > sayhello <- function() + { + info <- Sys.info()[c("nodename", "machine")] + paste("Hello from", info[1], "with CPU type", info[2]) + } > > names <- clusterCall(cluster, sayhello) > print(unlist(names)) [1] "Hello from n1 with CPU type x86_64" "Hello from n1 with CPU type x86_64" [3] "Hello from n1 with CPU type x86_64" "Hello from n2 with CPU type x86_64" [5] "Hello from n2 with CPU type x86_64" "Hello from n2 with CPU type x86_64" > > # Compute row sums in parallel using all processes, > # then a grand sum at the end on the master process > parallelSum <- function(m, n) + { + A <- matrix(rnorm(m*n), nrow = m, ncol = n) + row.sums <- parApply(cluster, A, 1, sum) + print(sum(row.sums)) + } > > parallelSum(500, 500) [1] -341.2832 > > stopCluster(cluster) >
The package “snowslurm” resides in /usr/cluster/contrib/Rlibs/snowslurm/. The code for makeSLURMcluster can be viewed here if you’re curious.
snow with MPI communication
First save the following R script to your account.
library(Rmpi) library(snow) # Initialize SNOW using MPI communication. The first line will get the # number of MPI processes the scheduler assigned to us. Everything else # is standard SNOW np <- mpi.universe.size() cluster <- makeMPIcluster(np) # Print the hostname for each cluster member sayhello <- function() { info <- Sys.info()[c("nodename", "machine")] paste("Hello from", info[1], "with CPU type", info[2]) } names <- clusterCall(cluster, sayhello) print(unlist(names)) # Compute row sums in parallel using all processes, # then a grand sum at the end on the master process parallelSum <- function(m, n) { A <- matrix(rnorm(m*n), nrow = m, ncol = n) row.sums <- parApply(cluster, A, 1, sum) print(sum(row.sums)) } parallelSum(500, 500) stopCluster(cluster) mpi.exit()
Download: ../code/R-snow-mpi-test/snow-test.R
Notice that we first load the snow library, then we find our number of assigned MPI processes from an MPI call (using the Rmpi interface, introduced briefly in the next section). We set up a snow cluster from our assigned nodes, print out some information from the cluster members, and perform a few simple parallel calculations before stopping the snow cluster and exiting.
In order to run our code on the cluster, we’ll need a batch script as usual
#!/bin/bash #SBATCH --job-name=snow-mpi-test #SBATCH --output=slurm.out #SBATCH --error=slurm.err #SBATCH --partition=develop #SBATCH --nodes=2 #SBATCH --ntasks-per-node=3 mpirun -np 1 R --no-save < snow-test.R
Download: ../code/R-snow-mpi-test/run.slurm
Notice that we’re requesting six processes in the develop queue, and launching R as a batch process. There is one notable difference from usual SLURM scripts here – we have only requested mpirun to launch a single process. That process will be responsible for asking MPI to start the remaining processes that we had requested. Now let’s submit this script to the batch system.
[araim1@maya-usr1 R-snow-mpi-test]$ sbatch run.slurm sbatch: Submitted batch job 2648 [araim1@maya-usr1 R-snow-mpi-test]$
Once the job completes, we should see something like the following in our slurm.out output (and slurm.err should be empty):
R version 3.0.2 (2013-09-25) -- "Frisbee Sailing" Copyright (C) 2013 The R Foundation for Statistical Computing Platform: x86_64-unknown-linux-gnu (64-bit) R is free software and comes with ABSOLUTELY NO WARRANTY. You are welcome to redistribute it under certain conditions. Type 'license()' or 'licence()' for distribution details. Natural language support but running in an English locale R is a collaborative project with many contributors. Type 'contributors()' for more information and 'citation()' on how to cite R or R packages in publications. Type 'demo()' for some demos, 'help()' for on-line help, or 'help.start()' for an HTML browser interface to help. Type 'q()' to quit R. > library(Rmpi) > library(snow) > > # Initialize SNOW using MPI communication. The first line will get the > # number of MPI processes the scheduler assigned to us. Everything else > # is standard SNOW > > np <- mpi.universe.size() > cluster <- makeMPIcluster(np) 6 slaves are spawned successfully. 0 failed. > > # Print the hostname for each cluster member > sayhello <- function() + { + info <- Sys.info()[c("nodename", "machine")] + paste("Hello from", info[1], "with CPU type", info[2]) + } > > names <- clusterCall(cluster, sayhello) > print(unlist(names)) [1] "Hello from n1 with CPU type x86_64" "Hello from n1 with CPU type x86_64" [3] "Hello from n2 with CPU type x86_64" "Hello from n2 with CPU type x86_64" [5] "Hello from n2 with CPU type x86_64" "Hello from n1 with CPU type x86_64" > > # Compute row sums in parallel using all processes, > # then a grand sum at the end on the master process > parallelSum <- function(m, n) + { + A <- matrix(rnorm(m*n), nrow = m, ncol = n) + row.sums <- parApply(cluster, A, 1, sum) + print(sum(row.sums)) + } > > parallelSum(500, 500) [1] 166.6455 > > stopCluster(cluster) [1] 1 > mpi.exit() [1] "Detaching Rmpi. Rmpi cannot be used unless relaunching R." > >
Rmpi examples: master/slave mode
Rmpi is another popular R package for parallel computing. It’s a bit more involved to use than snow, but also offers more control. The usual use of Rmpi follows a slightly different paradigm than traditional MPI. In Rmpi there is a master process that spawns slaves to work in parallel; the master usually maintains control of the overall execution. In contrast, the traditional MPI paradigm is “single program multiple data” (SPMD) where all processes are treated as equal peers, but processes may take on specific roles during the course of a program.
In the current section, we show how to run Rmpi in the master/slave mode. In the next section, we switch to the single program multiple data paradigm. We have noted that the performance of master/slave mode on maya is significantly worse than SPMD mode, but the reason is not yet apparent.
Rmpi features many familiar communications like “bcast” and “send”. For more information about using Rmpi, a good place to check is the reference manual on CRAN.
Hello example
To see Rmpi in action, we will run the following parallel “hello world” R script
library(Rmpi) mpi.spawn.Rslaves(needlog = FALSE) mpi.bcast.cmd( id <- mpi.comm.rank() ) mpi.bcast.cmd( np <- mpi.comm.size() ) mpi.bcast.cmd( host <- mpi.get.processor.name() ) result <- mpi.remote.exec(paste("I am", id, "of", np, "running on", host)) print(unlist(result)) mpi.close.Rslaves(dellog = FALSE) mpi.exit()
Download: ../code/Rmpi-test/hello.R
Note that if the option the option “dellog = FALSE” is not specified, a warning will be returned from OpenMPI when the Rmpi tries to clean up:
An MPI process has executed an operation involving a call to the “fork()” system call to create a child process. Open MPI is currently operating in a condition that could result in memory corruption or other system errors…
The option “needlog = FALSE” tells Rmpi not to create the extra log files associated with this cleanup. If you would like to see the log files along with your results, do not specify this option.
To run this code, we will use the following SLURM script, which launches only the master process initially, as in the snow example from earlier. When the master calls “mpi.spawn.Rslaves”, the remaining MPI processes will be initialized.
#!/bin/bash #SBATCH --job-name=Rmpi_hello #SBATCH --output=slurm.out #SBATCH --error=slurm.err #SBATCH --partition=develop #SBATCH --nodes=2 #SBATCH --ntasks-per-node=4 mpirun -np 1 R --no-save < hello.R
Download: ../code/Rmpi-test/run.slurm
After submitting the job it should run to completion, producing an empty slurm.err file and the following slurm.out
[araim1@maya-usr1 Rmpi-test]$ cat slurm.out [... R disclaimer message ...] > library(Rmpi) > mpi.spawn.Rslaves(needlog = FALSE) 8 slaves are spawned successfully. 0 failed. master (rank 0, comm 1) of size 9 is running on: n1 slave1 (rank 1, comm 1) of size 9 is running on: n1 slave2 (rank 2, comm 1) of size 9 is running on: n1 slave3 (rank 3, comm 1) of size 9 is running on: n1 slave4 (rank 4, comm 1) of size 9 is running on: n2 slave5 (rank 5, comm 1) of size 9 is running on: n2 slave6 (rank 6, comm 1) of size 9 is running on: n2 slave7 (rank 7, comm 1) of size 9 is running on: n2 slave8 (rank 8, comm 1) of size 9 is running on: n1 > > mpi.bcast.cmd( id <- mpi.comm.rank() ) > mpi.bcast.cmd( np <- mpi.comm.size() ) > mpi.bcast.cmd( host <- mpi.get.processor.name() ) > result <- mpi.remote.exec(paste("I am", id, "of", np, "running on", host)) > > print(unlist(result)) slave1 slave2 "I am 1 of 9 running on n1" "I am 2 of 9 running on n1" slave3 slave4 "I am 3 of 9 running on n1" "I am 4 of 9 running on n2" slave5 slave6 "I am 5 of 9 running on n2" "I am 6 of 9 running on n2" slave7 slave8 "I am 7 of 9 running on n2" "I am 8 of 9 running on n1" > > mpi.close.Rslaves(dellog = FALSE) [1] 1 > mpi.exit() [1] "Detaching Rmpi. Rmpi cannot be used unless relaunching R." >
Gather & reduce example
Let’s look at a slightly more interesting example of Rmpi, using the “gather” and “reduce” communications. Notice especially that the master process must be involved in the communication, which is invoked separately from the slaves. (If you forget to include the master, the program will hang until it is killed). Here is the R code
library(Rmpi) mpi.spawn.Rslaves(needlog = FALSE) mpi.bcast.cmd( id <- mpi.comm.rank() ) mpi.bcast.cmd( np <- mpi.comm.size() ) mpi.bcast.cmd( host <- mpi.get.processor.name() ) result <- mpi.remote.exec(paste("I am", id, "of", np, "running on", host)) print(unlist(result)) # Sample one normal observation on the master and each slave x <- rnorm(1) mpi.bcast.cmd(x <- rnorm(1)) # Gather the entire x vector (by default to process 0, the master) mpi.bcast.cmd(mpi.gather.Robj(x)) y <- mpi.gather.Robj(x) print(unlist(y)) # Sum the x vector together, storing the result on process 0 by default mpi.bcast.cmd(mpi.reduce(x, op = "sum")) z <- mpi.reduce(x, op = "sum") print(z) mpi.close.Rslaves(dellog = FALSE) mpi.exit()
Download: ../code/Rmpi-comm-test/driver.R
The submission script is similar to the previous example
#!/bin/bash #SBATCH --job-name=Rmpi_test #SBATCH --output=slurm.out #SBATCH --error=slurm.err #SBATCH --partition=develop #SBATCH --nodes=1 #SBATCH --ntasks-per-node=8 mpirun -np 1 R --no-save < driver.R
Download: ../code/Rmpi-comm-test/run.slurm
And here are the results
[araim1@maya-usr1 Rmpi-comm-test]$ cat slurm.out [... R disclaimer message ...] > library(Rmpi) > > mpi.spawn.Rslaves(needlog = FALSE) 8 slaves are spawned successfully. 0 failed. master (rank 0, comm 1) of size 9 is running on: n1 slave1 (rank 1, comm 1) of size 9 is running on: n1 ... ... ... slave8 (rank 8, comm 1) of size 9 is running on: n1 > > mpi.bcast.cmd( id <- mpi.comm.rank() ) > mpi.bcast.cmd( np <- mpi.comm.size() ) > mpi.bcast.cmd( host <- mpi.get.processor.name() ) > result <- mpi.remote.exec(paste("I am", id, "of", np, "running on", host)) > print(unlist(result)) slave1 slave2 "I am 1 of 9 running on n1" "I am 2 of 9 running on n1" slave3 slave4 "I am 3 of 9 running on n1" "I am 4 of 9 running on n1" slave5 slave6 "I am 5 of 9 running on n1" "I am 6 of 9 running on n1" slave7 slave8 "I am 7 of 9 running on n1" "I am 8 of 9 running on n1" > > # Sample one normal observation on the master and each slave > x <- rnorm(1) > mpi.bcast.cmd(x <- rnorm(1)) > > # Gather the entire x vector (by default to process 0, the master) > mpi.bcast.cmd(mpi.gather.Robj(x)) > y <- mpi.gather.Robj(x) > print(unlist(y)) [1] -2.6498050 0.5241441 -0.6747354 0.5915066 0.7660781 0.3608518 -2.7048508 [8] -0.4686277 0.5241441 > > # Sum the x vector together, storing the result on process 0 by default > mpi.bcast.cmd(mpi.reduce(x, op = "sum")) > z <- mpi.reduce(x, op = "sum") > print(z) [1] -3.731294 > > mpi.close.Rslaves(dellog = FALSE) [1] 1 > mpi.exit() [1] "Detaching Rmpi. Rmpi cannot be used unless relaunching R." >
Rmpi examples: SPMD mode
Now we present examples similar to the previous section, except in an SPMD mode. Notice that commands here are issued from the perspective of the current process, and some of the master/slave commands may not make sense here.
Hello example
library(Rmpi) id <- mpi.comm.rank(comm = 0) np <- mpi.comm.size(comm = 0) hostname <- mpi.get.processor.name() msg <- sprintf("Hello world from process %03d of %03d, on host %s\n", id, np, hostname) cat(msg) mpi.barrier(comm = 0) mpi.finalize()
Download: ../code/Rmpi-hello-spmd/hello.R
#!/bin/bash #SBATCH --job-name=R_MPI #SBATCH --output=slurm.out #SBATCH --error=slurm.err #SBATCH --partition=develop #SBATCH --nodes=2 #SBATCH --ntasks-per-node=4 mpirun Rscript hello.R
Download: ../code/Rmpi-hello-spmd/run.slurm
[araim1@maya-usr1 Rmpi-hello-spmd]$ sbatch run.slurm [araim1@maya-usr1 Rmpi-hello-spmd]$ cat slurm.err [araim1@maya-usr1 Rmpi-hello-spmd]$ cat slurm.out Hello world from process 001 of 016, on host n1 Hello world from process 003 of 016, on host n1 Hello world from process 004 of 016, on host n1 Hello world from process 010 of 016, on host n2 Hello world from process 011 of 016, on host n2 Hello world from process 013 of 016, on host n2 Hello world from process 009 of 016, on host n2 Hello world from process 002 of 016, on host n1 Hello world from process 008 of 016, on host n2 Hello world from process 012 of 016, on host n2 Hello world from process 014 of 016, on host n2 Hello world from process 000 of 016, on host n1 Hello world from process 015 of 016, on host n2 Hello world from process 007 of 016, on host n1 [araim1@maya-usr1 Rmpi-hello-spmd]$
Hello example: send & receive
library(Rmpi) id <- mpi.comm.rank(comm = 0) np <- mpi.comm.size(comm = 0) hostname <- mpi.get.processor.name() if (id == 0) { msg <- sprintf("Hello world from process %03d", id) cat("Process 0: Received msg from process 0 saying:", msg, "\n") for (i in seq(1, np-1)) { # buf is just a buffer to receive a string buf <- paste(rep(" ", 64), collapse="") recv <- mpi.recv(x = buf, type = 3, source = i, tag = 0, comm = 0) cat("Process 0: Received msg from process", i, "saying:", recv, "\n") } } else { msg <- sprintf("Hello world from process %03d", id) mpi.send(msg, 3, dest = 0, tag = 0, comm = 0) } mpi.barrier(comm = 0) mpi.finalize()
Download: ../code/Rmpi-hellosendrecv-spmd/driver.R
#!/bin/bash #SBATCH --job-name=R_MPI #SBATCH --output=slurm.out #SBATCH --error=slurm.err #SBATCH --partition=develop #SBATCH --nodes=2 #SBATCH --ntasks-per-node=8 mpirun Rscript driver.R
[araim1@maya-usr1 Rmpi-hellosendrecv-spmd]$ sbatch run.slurm [araim1@maya-usr1 Rmpi-hellosendrecv-spmd]$ cat slurm.err [araim1@maya-usr1 Rmpi-hellosendrecv-spmd]$ cat slurm.out Process 0: Received msg from process 0 saying: Hello world from process 000 Process 0: Received msg from process 1 saying: Hello world from process 001 Process 0: Received msg from process 2 saying: Hello world from process 002 Process 0: Received msg from process 3 saying: Hello world from process 003 Process 0: Received msg from process 4 saying: Hello world from process 004 Process 0: Received msg from process 5 saying: Hello world from process 005 Process 0: Received msg from process 6 saying: Hello world from process 006 Process 0: Received msg from process 7 saying: Hello world from process 007 Process 0: Received msg from process 8 saying: Hello world from process 008 Process 0: Received msg from process 9 saying: Hello world from process 009 Process 0: Received msg from process 10 saying: Hello world from process 010 Process 0: Received msg from process 11 saying: Hello world from process 011 Process 0: Received msg from process 12 saying: Hello world from process 012 Process 0: Received msg from process 13 saying: Hello world from process 013 Process 0: Received msg from process 14 saying: Hello world from process 014 Process 0: Received msg from process 15 saying: Hello world from process 015 [araim1@maya-usr1 Rmpi-hellosendrecv-spmd]$
Gather example
The Rmpi package provides a function called mpi.gather that provides the functionality of MPI’s Gather operation. In the example below, five standard normal random variables are generated on each of the three processes and are ‘gathered’ to the process with rank 0. Each process outputs the five random numbers it generates and gathered result in the form an array of 15 real numbers to its own log file. Note that only process 0 prints the gathered result.
Download: testgather.R
Here is a batch script to submit the job.
Download: run.slurm
Running the above slurm script using the sbatch command produces three .log files in addition to slurm.err and slurm.out. If the program runs successfully, slurm.err files should be empty and slurm.out might contain some information related to R or MPI. The output we are interested in is shown below. Note that process-000.log shows
the gathered result.
[saiku1@maya-usr1 gather]$ cat process-000.log local x: [1] 1.3162732 0.9533861 1.0263475 1.3543835 0.2781883 gather.result: [1] 1.3162732 0.9533861 1.0263475 1.3543835 0.2781883 0.2750832 [7] 1.8240492 1.1123469 -0.7478470 -0.6101217 0.6130688 1.2249780 [13] -0.2659300 1.0182077 0.5539022 [saiku1@maya-usr1 gather]$ cat process-001.log local x: [1] 0.2750832 1.8240492 1.1123469 -0.7478470 -0.6101217 gather.result: [1] 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 [saiku1@maya-usr1 gather]$ cat process-002.log local x: [1] 0.6130688 1.2249780 -0.2659300 1.0182077 0.5539022 gather.result: [1] 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
Allgather example
While Rmpi’s mpi.gather function gathers the results from all the processes to one process (with rank 0 as in the previous example), the mpi.allgather function makes the gathered result available on all the processes. In this example, we will again generate five standard normal variables on three processes and using the mpi.allgather function, make the gathered result available on all the processes. Note that the only difference between the code below and the code from the previous example is that the function call mpi.gather is replaced with mpi.allgather.
library(Rmpi) id <- mpi.comm.rank(comm = 0) np <- mpi.comm.size(comm = 0) hostname <- mpi.get.processor.name() # Generate k normal observations per process k <- 5 x <- rnorm(k) # Combine the x vectors together on every process gather.result <- mpi.allgather(x, type = 2, rdata = numeric(k * np), comm = 0) # Log out for each process: # 1) The local part x # 2) The combined vector gather.result logfile <- sprintf("process-%03d.log", id) sink(logfile) cat("local x:\n") print(x) cat("\ngather.result:\n") print(gather.result) sink(NULL) mpi.barrier(comm = 0) mpi.finalize()
Download: ../code/Rmpi-allgather-spmd/driver.R
#!/bin/bash #SBATCH --job-name=R_MPI #SBATCH --output=slurm.out #SBATCH --error=slurm.err #SBATCH --partition=develop #SBATCH --nodes=1 #SBATCH --ntasks-per-node=3 mpirun Rscript driver.R
Download: ../code/Rmpi-allgather-spmd/run.slurm
Below is the output from our program. Note that all the three processes output the gathered result, unlike process 0 alone as in the example on mpi.gather.
[araim1@maya-usr1 Rmpi-allgather-spmd]$ sbatch run.slurm [araim1@maya-usr1 Rmpi-allgather-spmd]$ cat slurm.err [araim1@maya-usr1 Rmpi-allgather-spmd]$ cat slurm.out [1] 1 [1] 1 [1] 1 [1][1] "Exiting Rmpi. Rmpi cannot be used unless relaunching R." "Exiting Rmpi. Rmpi cannot be used unless relaunching R." [1] "Exiting Rmpi. Rmpi cannot be used unless relaunching R." [1] 1 [1] 1 [1] 1 [araim1@maya-usr1 all-gather]$ cat process-000.log local x: [1] 0.5997779 0.6245255 -0.8847268 -3.1556043 -0.5805308 gather.result: [1] 0.59977794 0.62452554 -0.88472684 -3.15560425 -0.58053082 -0.82290441 [7] -0.75349182 -0.08821349 -0.97613294 0.88190176 0.59724681 -1.91898345 [13] -0.09221058 -0.11054082 0.85566243 [araim1@maya-usr1 all-gather]$ cat process-001.log local x: [1] -0.82290441 -0.75349182 -0.08821349 -0.97613294 0.88190176 gather.result: [1] 0.59977794 0.62452554 -0.88472684 -3.15560425 -0.58053082 -0.82290441 [7] -0.75349182 -0.08821349 -0.97613294 0.88190176 0.59724681 -1.91898345 [13] -0.09221058 -0.11054082 0.85566243 [araim1@maya-usr1 all-gather]$ cat process-002.log local x: [1] 0.59724681 -1.91898345 -0.09221058 -0.11054082 0.85566243 gather.result: [1] 0.59977794 0.62452554 -0.88472684 -3.15560425 -0.58053082 -0.82290441 [7] -0.75349182 -0.08821349 -0.97613294 0.88190176 0.59724681 -1.91898345 [13] -0.09221058 -0.11054082 0.8556624 [araim1@maya-usr1 Rmpi-allgather-spmd]$
pbdR: Programming with Big Data in R
pbdR is a more recent package for high performance computing in R. pdbR capabilities include:
- pbdMPI – an interface to MPI through R, like the older Rmpi package but with improved usability
- pbdDMAT – operate on dense distributed matrices in parallel
Some initial examples for pbdR are given below. For more information on using pbdR on maya, see Technical Report HPCF-2013-2.
MPI Example
The following example says hello from multiple processes, and quickly demonstrates creating local vectors which are combined into a matrix using MPI communication.
library(pbdMPI, quiet = TRUE) init() .comm.size <- comm.size() .comm.rank <- comm.rank() msg <- sprintf("Hello world from process %d\n", .comm.rank) comm.cat("Say hello:\n", quiet = TRUE) comm.cat(msg, all.rank = TRUE) k <- 10 x <- rep(.comm.rank, k) comm.cat("\nOriginal x vector:\n", quiet = TRUE) comm.print(x, all.rank = TRUE) y <- allgather(x, unlist = TRUE) A <- matrix(y, nrow = k, byrow = FALSE) comm.cat("\nAllgather matrix (only showing process 0):\n", quiet = TRUE) comm.print(A) finalize()
Download: ../code/pbdR-hello/driver.R
#!/bin/bash #SBATCH --job-name=pbdMPI #SBATCH --output=slurm.out #SBATCH --error=slurm.err #SBATCH --partition=develop #SBATCH --nodes=2 #SBATCH --ntasks-per-node=3 mpirun Rscript driver.R
Download: ../code/pbdR-hello/run.slurm
[araim1@maya-usr1 helloMPI]$ sbatch run.slurm [araim1@maya-usr1 helloMPI]$ cat slurm.err [araim1@maya-usr1 helloMPI]$ cat slurm.out Say hello: COMM.RANK = 0 Hello world from process 0 COMM.RANK = 1 Hello world from process 1 COMM.RANK = 2 Hello world from process 2 COMM.RANK = 3 Hello world from process 3 COMM.RANK = 4 Hello world from process 4 COMM.RANK = 5 Hello world from process 5 Original x vector: COMM.RANK = 0 [1] 0 0 0 0 0 0 0 0 0 0 COMM.RANK = 1 [1] 1 1 1 1 1 1 1 1 1 1 COMM.RANK = 2 [1] 2 2 2 2 2 2 2 2 2 2 COMM.RANK = 3 [1] 3 3 3 3 3 3 3 3 3 3 COMM.RANK = 4 [1] 4 4 4 4 4 4 4 4 4 4 COMM.RANK = 5 [1] 5 5 5 5 5 5 5 5 5 5 Allgather matrix (only showing process 0): COMM.RANK = 0 [,1] [,2] [,3] [,4] [,5] [,6] [1,] 0 1 2 3 4 5 [2,] 0 1 2 3 4 5 [3,] 0 1 2 3 4 5 [4,] 0 1 2 3 4 5 [5,] 0 1 2 3 4 5 [6,] 0 1 2 3 4 5 [7,] 0 1 2 3 4 5 [8,] 0 1 2 3 4 5 [9,] 0 1 2 3 4 5 [10,] 0 1 2 3 4 5 [araim1@maya-usr1 helloMPI]$
Demos
Many demos are provided with pbdR. To see what’s available, start R and issue any of the following commands.
[araim1@maya-usr1 ~]$ R [... R disclaimer message ...] > demo(package = "pbdMPI") > demo(package = "pbdDMAT") > demo(package = "pbdSLAP") > demo(package = "pbdDEMO")
For example, the current list of demos for pbdMPI is displayed as
Demos in package 'pbdMPI': allgather pbdMPI An example of allgather. allreduce pbdMPI An example of allreduce. any_all pbdMPI An example of any and all. bcast pbdMPI An example of bcast. divide pbdMPI An example of dividing jobs. gather pbdMPI An example of gather. pbdApply pbdMPI An example of parallel apply. pbdLapply pbdMPI An example of parallel lapply. reduce pbdMPI An example of reduce. scatter pbdMPI An example of scatter. seed pbdMPI An example of random number generation. sort pbdMPI An example of sorting.
To run a demo on maya (say, “scatter”), we just need a SLURM script
#!/bin/bash #SBATCH --job-name=pbdDemo #SBATCH --output=slurm.out #SBATCH --error=slurm.err #SBATCH --partition=develop #SBATCH --nodes=1 #SBATCH --ntasks-per-node=3 mpirun Rscript -e "demo(scatter, 'pbdMPI', ask=F, echo=F)"
Download: ../code/pbdR-demo/run.slurm
[araim1@maya-usr1 pbdR-demo]$ sbatch run.slurm Submitted batch job 1327033 [araim1@maya-usr1 pbdR-demo]$ cat slurm.err [araim1@maya-usr1 pbdR-demo]$ cat slurm.out Original x: COMM.RANK = 0 [1] 1 2 3 4 5 6 Scatter list: COMM.RANK = 0 [1] 1 Scatterv integer: COMM.RANK = 1 [1] 2 3 Scatterv double: COMM.RANK = 1 [1] 2 3 [araim1@maya-usr1 pbdR-demo]$