• No results found

Data Science in the Wild

N/A
N/A
Protected

Academic year: 2021

Share "Data Science in the Wild"

Copied!
17
0
0

Loading.... (view fulltext now)

Full text

(1)

Cornell Tech

Spring 2015

Lecture 4

Data Science in the Wild

59 Cornell Tech Spring 2015

Apache Spark

Spring 2015 Cornell Tech

(2)

What is Spark?

 Not a modified version of Hadoop  Separate, fast, MapReduce-like engine

In-memory data storage for very fast iterative queries General execution graphs and powerful optimizations Up to 40x faster than Hadoop

 Compatible with Hadoop’s storage APIs

Can read/write to any Hadoop-supported system,

including HDFS, HBase, SequenceFiles, etc

Spring 2015 Cornell Tech

61

What is Shark?

 Port of Apache Hive to run on Spark

Compatible with existing Hive data, metastores, and queries (HiveQL, UDFs, etc)

(3)

Why a New Programming Model?

MapReduce greatly simplified big data analysis But as soon as it got popular, users wanted more:

More complex, multi-stage applications (e.g.

iterative graph algorithms and machine learning)

More interactive ad-hoc queries

Both multi-stage and interactive apps require faster data sharing across parallel jobs

Spring 2015 Cornell Tech

63

Data Sharing in MapReduce

iter. 1 iter. 2 . . . Input HDFS read HDFS write HDFS read HDFS write Input query 1 query 2 query 3 result 1 result 2 result 3 . . . HDFS read Spring 2015 Cornell Tech

(4)

iter. 1 iter. 2 . . .

Input

Data Sharing in Spark

Distributed memory Input query 1 query 2 query 3 . . . one-time processing Spring 2015 Cornell Tech

65 10-100

×

faster than network and disk

Spark Programming Model

 Key idea: resilient distributed datasets (RDDs)

Distributed collections of objects that can be cached in

memory across cluster nodes

Manipulated through various parallel operators Automatically rebuilt on failure

Interface

Clean language-integrated API in Scala Can be used interactively from Scala console

(5)

Example: Log Mining

Load error messages from a log into memory, then interactively search for various patterns

lines = spark.textFile(“hdfs://...”)

errors = lines.filter(_.startsWith(“ERROR”)) messages = errors.map(_.split(‘\t’)(2)) cachedMsgs = messages.cache()

Block 1 Block 2 Block 3 Worker Worker Worker Driver

cachedMsgs.filter(_.contains(“foo”)).count cachedMsgs.filter(_.contains(“bar”)).count . . . tasks results Cache 1 Cache 2 Cache 3 Base RDD Transformed RDD Action

Result: full-text search of Wikipedia in <1 sec (vs 20 sec for on-disk data)

Result: scaled to 1 TB data in 5-7 sec (vs 170 sec for on-disk data)

Spring 2015 Cornell Tech 67

Scaling Down

69 58 41 30 12 0 20 40 60 80 100 Cache disabled 25% 50% 75% Fully cached Ex ecuti on time (s)

% of working set in cache

Spring 2015 Cornell Tech

(6)

Fault Recovery

RDDs track lineage information that can be used to efficiently recompute lost data

msgs = textFile.filter(lambda s: s.startsWith(“ERROR”)) .map(lambda s: s.split(“\t”)[2])

HDFS File Filtered RDD Mapped RDD filter (func = startsWith(…)) map (func = split(...)) Spring 2015 Cornell Tech 69

SparkContext

Main entry point to Spark functionality

Available in shell as variable

sc

(7)

Creating RDDs

# Turn a Python collection into an RDD

> sc.parallelize([1, 2, 3])

# Load text file from local FS, HDFS, or S3

> sc.textFile(“file.txt”)

> sc.textFile(“directory/*.txt”)

> sc.textFile(“hdfs://namenode:9000/path/file”)

# Use existing Hadoop InputFormat (Java/Scala only)

> sc.hadoopFile(keyClass, valClass, inputFmt, conf)

Spring 2015 Cornell Tech

71

Basic Transformations

> nums = sc.parallelize([1, 2, 3])

# Pass each element through a function

> squares = nums.map(lambda x: x*x) // {1, 4, 9}

# Keep elements passing a predicate

> even = squares.filter(lambda x: x % 2 == 0) // {4}

# Map each element to zero or more others

> nums.flatMap(lambda x: => range(x))

> # => {0, 0, 1, 0, 1, 2}

Spring 2015 Cornell Tech

72

Range object (sequence of numbers 0, 1, …, x-1)

(8)

Basic Actions

> nums = sc.parallelize([1, 2, 3])

# Retrieve RDD contents as a local collection

> nums.collect() # => [1, 2, 3]

# Return first K elements

> nums.take(2) # => [1, 2]

# Count number of elements

> nums.count() # => 3

# Merge elements with an associative function

> nums.reduce(lambda x, y: x + y) # => 6

# Write elements to a text file

> nums.saveAsTextFile(“hdfs://file.txt”)

Spring 2015 Cornell Tech

73

Working with Key-Value Pairs

Spark’s “distributed reduce” transformations operate on RDDs of key-value pairs

Python: pair = (a, b)

pair[0] # => a

pair[1] # => b

Scala: val pair = (a, b)

pair._1 // => a

pair._2 // => b

Java: Tuple2 pair = new Tuple2(a, b); pair._1 // => a

(9)

Some Key-Value Operations

> pets = sc.parallelize(

[(“cat”, 1), (“dog”, 1), (“cat”, 2)])

> pets.reduceByKey(lambda x, y: x + y)

# => {(cat, 3), (dog, 1)}

> pets.groupByKey() # => {(cat, [1, 2]), (dog, [1])} > pets.sortByKey() # => {(cat, 1), (cat, 2), (dog, 1)} reduceByKey also automatically implements combiners on the map side

Spring 2015 Cornell Tech

75

> lines = sc.textFile(“hamlet.txt”)

> counts = lines.flatMap(lambda line: line.split(“ ”)) .map(lambda word => (word, 1))

.reduceByKey(lambda x, y: x + y)

Example: Word Count

“to be or” “not to be” “to” “be” “or” “not” “to” “be” (to, 1) (be, 1) (or, 1) (not, 1) (to, 1) (be, 1) (be, 2) (not, 1) (or, 1) (to, 2) Spring 2015 Cornell Tech 76

(10)

Other Key-Value Operations

> visits = sc.parallelize([ (“index.html”, “1.2.3.4”), (“about.html”, “3.4.5.6”), (“index.html”, “1.3.3.1”) ]) > pageNames = sc.parallelize([ (“index.html”, “Home”),

(“about.html”, “About”) ]) > visits.join(pageNames)

# (“index.html”, (“1.2.3.4”, “Home”)) # (“index.html”, (“1.3.3.1”, “Home”)) # (“about.html”, (“3.4.5.6”, “About”)) > visits.cogroup(pageNames)

# (“index.html”, ([“1.2.3.4”, “1.3.3.1”], [“Home”])) # (“about.html”, ([“3.4.5.6”], [“About”]))

Spring 2015 Cornell Tech

77

Setting the Level of Parallelism

All the pair RDD operations take an optional second parameter for number of tasks

> words.reduceByKey(lambda x, y: x + y, 5)

> words.groupByKey(5)

(11)

Using Local Variables

Any external variables you use in a closure will automatically be shipped to the cluster:

> query = sys.stdin.readline() > pages.filter(lambda x: query in

x).count()

Some caveats:

Each task gets a new copy (updates are not sent back) Variable must be Serializable

Spring 2015 Cornell Tech

79

Under The Hood: DAG Scheduler

General task graphs Automatically

pipelines functions  Data locality aware Partitioning aware to avoid shuffles = cached partition = RDD join filter groupBy Stage 3 Stage 1 Stage 2 A: B: C: D: E: F: map Spring 2015 Cornell Tech 80

(12)

More RDD Operators

 map  filter  groupBy  sort  union  join  leftOuterJoin  rightOuterJoin  reduce  count  fold  reduceByKey  groupByKey  cogroup  cross  zip sample take first partitionBy mapWith pipe save ... Spring 2015 Cornell Tech 81

Language Support

Standalone Programs Python, Scala, & Java

Interactive ShellsPython & Scala

Performance

Java & Scala are faster due to static typing

…but Python is often fine

Python

lines = sc.textFile(...)

lines.filter(lambda s: “ERROR” in s).count()

Scala

val lines = sc.textFile(...)

lines.filter(x => x.contains(“ERROR”)).count()

Java

JavaRDD<String> lines = sc.textFile(...); lines.filter(new Function<String, Boolean>() { Boolean call(String s) {

return s.contains(“error”); }

(13)

Interactive Shell

Available in Python and

Scala

Runs as an application on an existing Spark

Cluster…

OR Can run locally

Spring 2015 Cornell Tech

83

import sys

from pyspark import SparkContext if __name__ == "__main__":

sc = SparkContext( “local”, “WordCount”, sys.argv[0], None)

lines = sc.textFile(sys.argv[1])

counts = lines.flatMap(lambda s: s.split(“ ”)) \ .map(lambda word: (word, 1)) \ .reduceByKey(lambda x, y: x + y) counts.saveAsTextFile(sys.argv[2])

… or a Standalone Application

Spring 2015 Cornell Tech

(14)

import org.apache.spark.api.java.JavaSparkContext; JavaSparkContext sc = new JavaSparkContext(

“masterUrl”, “name”, “sparkHome”, new String[] {“app.jar”})); import org.apache.spark.SparkContext

import org.apache.spark.SparkContext._

val sc = new SparkContext(“url”, “name”, “sparkHome”, Seq(“app.jar”))

Cluster URL, or local / local[N]

App name

Spark install path on cluster

List of JARs with app code (to ship)

Create a SparkContext

Scala

Ja

va

from pyspark import SparkContext

sc = SparkContext(“masterUrl”, “name”, “sparkHome”, [“library.py”]))

Python

Spring 2015 Cornell Tech

85

Add Spark to Your Project

Scala / Java: add a Maven dependency on

groupId:

org.spark-project

artifactId:

spark-core_2.10

version:

0.9.0

(15)

Administrative GUIs

http://<Standalone Master>:8080 (by default)

Spring 2015 Cornell Tech

87

Software Components

Spark runs as a library in your

program (1 instance per app)  Runs tasks locally or on cluster

 Mesos, YARN or standalone mode  Accesses storage systems via Hadoop

InputFormat API

 Can use HBase, HDFS, S3, …

Your application SparkContext Local threads Cluster manager Worker Spark executor Worker Spark executor HDFS or other storage Spring 2015 Cornell Tech 88

(16)

Fault Tolerance

RDDs track the series of transformations used to build them (their lineage) to recompute lost data

E.g:

messages = textFile(...).filter(_.contains(“error”)) .map(_.split(‘\t’)(2)) HadoopRDD path = hdfs://… FilteredRDD func = _.contains(...) MappedRDD func = _.split(…) Spring 2015 Cornell Tech 89

Example: Logistic Regression

val data = spark.textFile(...).map(readPoint).cache() var w = Vector.random(D)

for (i <- 1 to ITERATIONS) { val gradient = data.map(p =>

(1 / (1 + exp(-p.y*(w dot p.x))) - 1) * p.y * p.x ).reduce(_ + _)

w -= gradient }

println("Final w: " + w)

Initial parameter vector

Repeated MapReduce steps to do gradient descent

(17)

Logistic Regression Performance

0 500 1000 1500 2000 2500 3000 3500 4000 4500 1 5 10 20 30 R un ning Time (s ) Number of Iterations Hadoop Spark 127 s / iteration first iteration 174 s further iterations 6 s Spring 2015 Cornell Tech 91

References

Related documents

Verify the Avaya IR VoIP channels (IP stations) assigned to the Processor Ethernet of Site 4 LSP S8500 Media Server are “in-service”. Place a call using the Hunt Group Number

Furthermore, it is widely reported that medical tourists from particular source countries seek care abroad based on singular motivations found in their home context, such as the

After classification is done, next is to apply rule based system method to know safety stock and ROP from each type of drug based on fast moving, moderate and slow moving

You can remove your Risk Manager managed host from SIEM console to change network settings or if there is a problem with the Risks tab.. 1 Open your

The financial statements for the company issued by the Executive Board in accordance with the German Commercial Code (HGB) and the consolidated financial

The average concentration of cadmium content of the marine sediments in the studied transects ranges between 0.06 ppm at Tourist Harbour and 0.15 ppm at Abu-Shaar and El-Samaka

Erhan, Common fixed point theorems of integral type contraction on metric spaces and its applications to system of functional equations, Fixed Point Theory Appl., 2015 (2015),

3, The scheduler allocates resources according to the priorities Spark Driver Scheduling Pools A1 B1 C1 A2 B2 C2 … … … Task scheduler Executor Executor Executor Executor (1)