• No results found

Writing Standalone Spark Programs

N/A
N/A
Protected

Academic year: 2021

Share "Writing Standalone Spark Programs"

Copied!
33
0
0

Loading.... (view fulltext now)

Full text

(1)

Matei  Zaharia  

  UC  Berkeley     www.spark-­‐project.org    

Writing  Standalone  

Spark  Programs  

UC  BERKELEY  

(2)

Outline  

Setting  up  for  Spark  development   Example:  PageRank  

PageRank  in  Java  

Testing  and  debugging    

(3)

Building  Spark  

Requires:  Java  6+,  Scala  2.9.1+  

git clone git://github.com/mesos/spark cd spark

sbt/sbt compile

# Build Spark + dependencies into single JAR # (gives core/target/spark*assembly*.jar)

sbt/sbt assembly

# Publish Spark to local Maven cache

(4)

Adding  it  to  Your  Project  

Either  include  the  Spark  assembly  JAR,  or  add  a   Maven  dependency  on:  

groupId:        org.spark-project artifactId:  spark-core_2.9.1   version:        0.5.1-SNAPSHOT

(5)

Creating  a  SparkContext  

import spark.SparkContext import spark.SparkContext._ val sc = new SparkContext(

“masterUrl”, “name”, “sparkHome”, Seq(“job.jar”))

Mesos  cluster  URL,  

or  local  /  local[N]   name  Job   path  on  cluster  Spark  install   your  code  (to  ship)  List  of  JARs  with   Important  to  get  some  

(6)

Complete  Example  

import spark.SparkContext import spark.SparkContext._ object WordCount {

def main(args: Array[String]) { val sc = new SparkContext(

“local”, “WordCount”, args(0), Seq(args(1))) val file = sc.textFile(args(2))

file.map(_.split(“ ”))

.flatMap(word => (word, 1)) .reduceByKey(_ + _)

.saveAsTextFile(args(3)) }

}

(7)

Outline  

Setting  up  for  Spark  development   Example:  PageRank  

PageRank  in  Java  

Testing  and  debugging    

(8)

Why  PageRank?  

Good  example  of  a  more  complex  algorithm  

» Multiple  stages  of  map  &  reduce  

Benefits  from  Spark’s  in-­‐memory  caching  

(9)

Basic  Idea  

Give  pages  ranks  (scores)  based  on  links  to  them  

» Links  from  many  pages  è  high  rank  

» Link  from  a  high-­‐rank  page  è  high  rank  

(10)

Algorithm  

1.0   1.0  

1.0  

1.0  

1.  Start  each  page  at  a  rank  of  1  

2.  On  each  iteration,  have  page  p  contribute   rankp  /  |neighborsp|  to  its  neighbors  

3.  Set  each  page’s  rank  to  0.15  +  0.85  ×  contribs    

(11)

Algorithm  

1.  Start  each  page  at  a  rank  of  1  

2.  On  each  iteration,  have  page  p  contribute   rankp  /  |neighborsp|  to  its  neighbors  

3.  Set  each  page’s  rank  to  0.15  +  0.85  ×  contribs     1.0   1.0   1.0   1.0   1   0.5   0.5   0.5   1   0.5  

(12)

Algorithm  

1.  Start  each  page  at  a  rank  of  1  

2.  On  each  iteration,  have  page  p  contribute   rankp  /  |neighborsp|  to  its  neighbors  

3.  Set  each  page’s  rank  to  0.15  +  0.85  ×  contribs    

0.58   1.0  

1.85  

(13)

Algorithm  

1.  Start  each  page  at  a  rank  of  1  

2.  On  each  iteration,  have  page  p  contribute   rankp  /  |neighborsp|  to  its  neighbors  

3.  Set  each  page’s  rank  to  0.15  +  0.85  ×  contribs     0.58   0.29   0.29   0.5   1.85   0.58   1.0   1.85   0.58   0.5  

(14)

Algorithm  

1.  Start  each  page  at  a  rank  of  1  

2.  On  each  iteration,  have  page  p  contribute   rankp  /  |neighborsp|  to  its  neighbors  

3.  Set  each  page’s  rank  to  0.15  +  0.85  ×  contribs    

0.39   1.72  

1.31  

0.58  

(15)

Algorithm  

1.  Start  each  page  at  a  rank  of  1  

2.  On  each  iteration,  have  page  p  contribute   rankp  /  |neighborsp|  to  its  neighbors  

3.  Set  each  page’s  rank  to  0.15  +  0.85  ×  contribs    

0.46   1.37  

1.44  

0.73  

(16)

Spark  Program  

val links = // RDD of (url, neighbors) pairs

var ranks = // RDD of (url, rank) pairs

for (i <- 1 to ITERATIONS) {

val contribs = links.join(ranks).flatMap { case (url, (links, rank)) =>

links.map(dest => (dest, rank/links.size)) }

ranks = contribs.reduceByKey(_ + _)

.mapValues(0.15 + 0.85 * _) }

(17)
(18)

PageRank  Performance  

17 1   80   72   28   0   20   40   60  80   100  120   140   160   180   30   60   Ti me  p er  Ite ra ti on  (s )  

Number  of  machines  

Hadoop   Spark  

(19)

Other  Iterative  Algorithms  

Logistic  Regression   K-­‐Means  Clustering  

76   3   0   20   40   60   80   Ti me  p er  Ite ra ti on  (s )   Hadoop   Spark   106   33   0   25   50   75   100   125   Ti me  p er  Ite ra ti on  (s )   Hadoop   Spark  

(20)

Outline  

Setting  up  for  Spark  development   Example:  PageRank  

PageRank  in  Java  

Testing  and  debugging    

(21)

Differences  in  Java  API  

Implement  functions  by  extending  classes  

»  spark.api.java.function.Function,  Function2,  etc  

Use  special  Java  RDDs  in  spark.api.java

» Same  methods  as  Scala  RDDs,  but  take  Java  Functions

Special  PairFunction  and  JavaPairRDD  provide   operations  on  key-­‐value  pairs  

(22)

Examples  

import spark.api.java.*;

import spark.api.java.function.*;

JavaSparkContext sc = new JavaSparkContext(...);

JavaRDD<String> lines = ctx.textFile(“hdfs://...”);

JavaRDD<String> words = lines.flatMap(

new FlatMapFunction<String, String>() {

public Iterable<String> call(String s) {

return Arrays.asList(s.split(" "));

} } );

(23)

Examples  

import spark.api.java.*;

import spark.api.java.function.*;

JavaSparkContext sc = new JavaSparkContext(...); JavaRDD<String> lines = ctx.textFile(args[1], 1);

class Split extends FlatMapFunction<String, String> { public Iterable<String> call(String s) {

return Arrays.asList(s.split(" "));

} );

JavaRDD<String> words = lines.flatMap(new Split());

(24)

Key-­‐Value  Pairs  

import scala.Tuple2;

JavaPairRDD<String, Integer> ones = words.map(

new PairFunction<String, String, Integer>() {

public Tuple2<String, Integer> call(String s) { return new Tuple2(s, 1);

} } );

JavaPairRDD<String, Integer> counts = ones.reduceByKey(

new Function2<Integer, Integer, Integer>() {

public Integer call(Integer i1, Integer i2) { return i1 + i2;

} } );

(25)
(26)

Outline  

Setting  up  for  Spark  development   Example:  PageRank  

PageRank  in  Java  

Testing  and  debugging    

(27)

Developing  in  Local  Mode  

Just  pass  local  or  local[k]  as  master  URL  

Still  serializes  tasks  to  catch  marshaling  errors   Debug  in  any  Java/Scala  debugger  

(28)

Running  on  a  Cluster  

Set  up  Mesos  as  per  Spark  wiki  

» github.com/mesos/spark/wiki/Running-­‐spark-­‐on-­‐mesos    

Basically  requires  building  Mesos  and  creating   config  files  with  locations  of  slaves  

(29)

Running  on  EC2  

Easiest  way  to  launch  a  Spark  cluster  

git clone git://github.com/mesos/spark.git cd spark/ec2

./spark-ec2 -k keypair –i id_rsa.pem –s slaves \ [launch|stop|start|destroy] clusterName

(30)

Viewing  Logs  

Click  through  the  web  UI  at  master:8080

Or,  look  at  stdout  and  stdout  files  in  the  Mesos   “work”  directories  for  your  program,  such  as:  

/tmp/mesos/slaves/<SlaveID>/frameworks/

<FrameworkID>/executors/0/runs/0/stderr

FrameworkID  is  printed  when  Spark  connects,   SlaveID  is  printed  when  a  task  starts  

(31)

Common  Problems  

Exceptions  in  tasks:  will  be  reported  at  master  

17:57:00 INFO TaskSetManager: Lost TID 1 (task 0.0:1) 17:57:00 INFO TaskSetManager: Loss was due to

java.lang.ArithmeticException: / by zero

at BadJob$$anonfun$1.apply$mcII$sp(BadJob.scala:10) at BadJob$$anonfun$1.apply(BadJob.scala:10)

at ...

Fetch  failure:  couldn’t  communicate  with  a  node  

» Most  likely,  it  crashed  earlier  

(32)

Common  Problems  

NotSerializableException:  

» Set  sun.io.serialization.extendedDebugInfo=true  

to  get  a  detailed  trace  (in  SPARK_JAVA_OPTS)  

» Beware  of  closures  using  fields/methods  of  outer   object  (these  will  reference  the  whole  object)  

(33)

For  More  Help  

Join  the  Spark  Users  mailing  list:  

groups.google.com/group/spark-­‐users    

 

Come  to  the  Bay  Area  meetup:   www.meetup.com/spark-­‐users    

References

Related documents

In China, private citizens (generally the victim or the victim’s relatives) prosecute less serious crimes, while the state prosecutes more serious offences (Newman, 1999, p. From

The objective of this article is to make a state of the art on knowledge management and its bond with the innovating designs. After having presented some general information

In our experiment, both sexually reversed and non-reversed fish treated with GH had greater average length and weight gains than did fish not treated with the hormone. Use of the

Although the ma­ jority of OECD countries have engaged in some institutional reforms, the empirical evidence of their impact on efficiency is so far limited due to:

Developmental changes of small and medium enterprises in Poland in the face of globalization and integration with the European Union.. The globalization of the economy influences

The basic provision of a market for immigration would allow individuals who qualified for immigration to the United States under current laws and quotas, and who

In memory of Harold Taub, beloved husband of Paula Taub by: Karen &amp; Charles Rosen.. Honouring Maria Belenkova-Buford on her marriage by: Karen &amp;

The second approach demonstrates the practical applicability of fNIRS to real-time cognitive monitoring, and identifies appropriate sensor placement, signal processing and