Chapter 5 R Vectorization in Distributed R Computing Framework
5.2 SparkR Background
SparkR is an open-source project that integrate GNU R with Apache Spark[12]. It provides a light-weight front-end to use Apache Spark’s distributed computation power from R.
5.2.1
Basic Structure
The architecture of SparkR follows the structure described in Figure 5.2. The runtime system of SparkR uses Apache Spark, and the data storage could use HDFS[72], in memory, or other dis- tributed storage systems. Spark uses RDD (Resilient Distributed Dataset) to express the large data object stored in the underlying storage layer, and exposes APIs to manipulate the RDD. SparkR exposes RDD to the R user by defining RDD as an R S4 type object. It also provides R APIs to manipulate the R RDD object.
Console
Worker 1 Worker 2R
Spark Context Java Spark Context JNISpark Executor (Java)
pipe
R
Spark Executor (Java)
YX handle YX RDD YX part1 element1 element 2 element 3 YX part 2 element 4 element 5 element 6 pipe YX part1 element 1 element 2 element 3 R YX part 2 element 4 element 5 element 6 R list storage in memory byte-array (serialized R object) Spark Runtime
Figure 5.3: SparkR Architecture and Object Storage
Because lapply is the basic building block of SparkR, the RDD is internally an R distributed list object. Each partition of an R RDD object in one single R instance’s memory is indeed a normal R list object. Figure5.3show SparkR’s architecture. YX is an R RDD object. The RDD object in the console is just a handler (reference) of this distributed object, and the real data is expressed as Java byte-array (serialized R object), and distributed in the Spark Executors across the whole system. In each R computation operation, the Java byte-array is de-serialized back into an R instance, and the corresponding R operations are invoked to process the object.
5.2.2
SparkR APIs
SparkR provides a bunch of APIs to create, transform, and aggregate R RDD object. The full list of the APIs could be found at [15]. Among them, the most important API is lapply, which overrides R base package’s lapply for processing RDD data type. Another important API is reduce, which is similar to R base package’s Reduce, but it is used with RDD data type. With SparkR’s API, user can simply transform the original sequential lapply based application into a distributed application that can process large amount of data set. Listing 5.1 shows the Linear Regression example in SparkR.
1 grad.func <- function(yx) { 2 y <- yx[1]
3 x <- c(1, yx[2]) #Add 1 to est interception 4 error <- sum(x * theta) - y
5 delta <- error * x 6 return(delta) 7 }
8
9 sc <- ... #A Spark context
10 YX <- ... #A RDD list in the sc context. Each element is a [y x] vector 11 alpha <- ... # Control the changes in each iteration
12 for(iter in 1:niter) {
13 delta <- lapply(YX, grad.func)
14 theta <- theta - alpha * reduce(delta, ’+’) 15 }
Listing 5.1: Linear Regression with lapply in SparkR
Compared to the standalone R’s Linear Regression example, Listing 4.1, there are only a few changes, including YX is now an RDD object, and Reduce is changed to reduce with some parameter order changes. After the application is launched, SparkR will create the distributed YX object, and it will also distribute the function grad.func and the console’s data theta to all the workers. The runtime also handles the reduction from the workers to the console. This model greatly simplifies the programmer’s effort for distributed computing. The programmer now only needs to think about the algorithm, and no need to worry about how to schedule data and the task.
5.2.3
Performance Problems
SparkR is still in the very early development phase, and one big problem is that it runs very slow. In some workloads, SparkR in a cluster is even slower than a single node standalone sequential R. Although SparkR can process large data set, but if the processing speed is too slow, the adoption of SparkR is still hampered.
There are at four sources that cause the slow performance. (1) The slow processing speed of GNU R itself; (2) The data exchange between Spark Java executor and R instance in the worker; (3) The R instance launching overhead for each SparkR blocking operation; and (4) The slow lapplyinterpretation mechanism in each worker. The first problem also exists in single node’s GNU R, and can be solved by an efficient R implementation, for example ORBIT introduced in Chapter3. The second problem can be solved by an efficient R-Java interaction mechanism and fast serialization/de-serialization implementations. The third problem can be solved by reusing R instance approach. And the SparkR project is working on solving the second and third problems. The last problem is exactly the one described in Chapter4.
In order to solve the slow interpretation speed problem of lapply, SparkR provides an addi- tional API, called lapplyPartition. Similar to lapply, this API still accepts two parame- ters, the data, which is now a chunk of data (typically the whole portion of an RDD in a worker), and the mapper function, which is a function to process the whole data chunk. The API itself will not run faster. But if the programmer organizes the chunk of data into a vector storage, and pro- vides a vector function to process the vector data, the lapplyPartition will only be invoked once for one worker, and the underlying invocation is a vector function over a vector data, the interpretation overhead will be much smaller than the original lapply based case.
However, in order to use lapplyPartition, the programmer needs to think about how to organize data, and how to write the vector function, which is tedious and complex. Furthermore, the algorithm should also be changed compared with the original lapply based implementation. For example, if the YX in Listing5.1is changed to vector partition storage, and the grad.func
is changed to a vector function, the lapply then could be changed into lapplyPartition. But the reduce operation should be also changed, because the reduce’s input now is not a list of elements, whose reduce semantic is clear, but a few vector values. The programmer should take care of the new reduction semantics in the worker, or across workers, or both.
All of these changes bring large burden to the programmer, and an automatic solution is desired to handle these transformation, and improve SparkR’s performance.