• No results found

Introduction to PySpark

N/A
N/A
Protected

Academic year: 2021

Share "Introduction to PySpark"

Copied!
21
0
0

Loading.... (view fulltext now)

Full text

(1)

Intro to PySpark

(2)

What is PySpark?

Python interface to Apache Spark

Map/Reduce style distributed computing

Natively Scala

Interfaces to Python, R are well-maintained

Uses Py4J for Java <-> Scala interface

(3)

PySpark Basics

• Distributed Computing basic premise:

• Data is big

• Program to process data is relatively small

• Send program to where the data lives

(4)

PySpark Basics

• Driver: e.g. my laptop

• Cluster Manager: YARN, Mesos, etc

(5)

PySpark Basics

• RDD: Resilient Distributed Dataset

• Interface to parallelized data in the cluster

• Map, Filter, Reduce functions sent from driver,

executed by workers on chunks of data in parallel

(6)

PySpark: Hello World

• Classic Word Count problem

• How many times does each word appear in a

given text?

• Approach: Each worker computes word counts

(7)

PySpark: Hello World

The brown dog brown dog (The, 1) (brown, 1) (dog, 1) (brown, 1) (dog, 1) (The, 1) (dog, 2) (brown, 2) (The, 1) (dog, 2) (brown, 2) Map Reduce Shuffle Collect

(8)

Demo

# example 1

text = "the brown dog jumped over the other brown dog" text_rdd = sc.parallelize(text.split(' '))

text_rdd.map(lambda word: (word, 1)) \

.reduceByKey(lambda left, right: left + right).collect() # example 2

import string

time_machine = sc.textFile('/user/jasonwhite/time_machine')

time_machine_tuples = time_machine.flatMap(lambda line: line.lower().split(' ')) \ .map(lambda word: ''.join(ch for ch in word if ch in string.letters)) \

.filter(lambda word: word != '') \ .map(lambda word: (word, 1))

(9)

Monoids

• Monoids are combinations of: • set of data; and

• associative, commutative functions • Very efficient in M/R, strongly preferred • Examples:

• addition of integers

(10)

Demo

# example 3 dataset = sc.parallelize([ {'id': 1, 'value': 1}, {'id': 2, 'value': 2}, {'id': 2, 'value': 6} ])

def add_tuples(left, right):

left_sum, left_count = left

right_sum, right_count = right

return (left_sum + right_sum, left_count + right_count) averages = dataset.map(lambda d: (d['value'], 1)) \

.reduce(add_tuples)

averages_by_key = dataset.map(lambda d: (d['id'], (d['value'], 1))) \ .reduceByKey(add_tuples) \

(11)

Demo

# example 4

from datetime import date dataset = sc.parallelize([

{'id': 1, 'group_id': 10, 'timestamp': date(1978, 3, 2)}, {'id': 2, 'group_id': 10, 'timestamp': date(1984, 3, 24)}, {'id': 3, 'group_id': 10, 'timestamp': date(1986, 5, 19)}, {'id': 4, 'group_id': 11, 'timestamp': date(1956, 6, 5)}, {'id': 5, 'group_id': 11, 'timestamp': date(1953, 2, 21)}, ])

def calculate_age(d):

d['age'] = (date.today() - d['timestamp']).days() return d

def calculate_group_stats(left, right):

earliest = min(left['earliest'], right['earliest']) latest = max(left['latest'], right['latest'])

total_age = left['total_age'] + right['total_age'] count = left['count'] + right['count']

return {

'earliest': earliest, 'latest': latest,

'total_age': total_age,

'count': left['count'] + right[‘count'] }

group_stats = dataset.map(calculate_age) \

.map(lambda d: (d['group_id'], {'earliest': d['timestamp'], 'latest': d['timestamp'], 'total_age': d['age'], 'count': 1})) \

(12)

Joining RDDs

• Like many RDD operations, works on (k, v) pairs

• Each side shuffled using common keys

(13)

Joining RDDs

{‘id’: 1, ‘field1’: ‘foo’} {‘id’: 2, ‘field1’: ‘bar’}

{‘id’: 1, ‘field2’: ‘baz’} {‘id’: 2, ‘field2’: ‘baz’}

(1, {‘id’: 1, ‘field1’: ‘foo’}) (2, {‘id’: 2, ‘field1’: ‘bar’})

(1, {‘id’: 1, ‘field2’: ‘baz’}) (2, {‘id’: 2, ‘field2’: ‘baz’})

(1, ({‘id’: 1, ‘field1’: ‘foo’}, {‘id’: 1, ‘field2’: ‘baz’}))

(2, ({‘id’: 2, ‘field1’: ‘bar’}, {‘id’: 2, ‘field2’: ‘baz’}))

(14)

Demo

# example 4

first_dataset = sc.parallelize([ {'id': 1, 'field1': 'foo'},

{'id': 2, 'field1': 'bar'}, {'id': 2, 'field1': 'baz'}, {'id': 3, 'field1': 'foo'} ])

first_dataset = first_dataset.map(lambda d: (d['id'], d)) second_dataset = sc.parallelize([

{'id': 1, 'field2': 'abc'}, {'id': 2, 'field2': 'def'} ])

second_dataset = second_dataset.map(lambda d: (d['id'], d)) output = first_dataset.join(second_dataset)

(15)

Key Skew

• Achilles’ heel of M/R: key skew

• Shuffle phase distributes like keys to like nodes

• If billions of rows are shuffled to the same node,

(16)
(17)

Joining RDDs w/ Skew

• When joining to small RDD, an alternative is to

“broadcast” the RDD

• Instead of shuffling, entire RDD is sent to each

worker

• Now each worker has all data needed

(18)

Demo

# example 5

first_dataset = sc.parallelize([ {'id': 1, 'field1': 'foo'},

{'id': 2, 'field1': 'bar'}, {'id': 2, 'field1': 'baz'}, {'id': 3, 'field1': 'foo'} ])

first_dataset = first_dataset.map(lambda d: (d['id'], d)) second_dataset = sc.parallelize([

{'id': 1, 'field2': 'abc'}, {'id': 2, 'field2': 'def'} ])

second_dataset = second_dataset.map(lambda d: (d['id'], d)) second_dict = sc.broadcast(second_dataset.collectAsMap()) def join_records((key, record)):

if key in second_dict.value.keys():

yield (key, (record, second_dict.value[key])) output = first_dataset.flatMap(join_records)

(19)

Ordering

• Row order isn’t guaranteed unless you explicitly

sort the RDD

• But: sometimes you need to process events in

order!

(20)

Ordering

{‘id’: 1, ‘value’: 10} {‘id’: 2, ‘value’: 10} {‘id’: 3, ‘value’: 20} {‘id’: 1, ‘value’: 12} {‘id’: 1, ‘value’: 5} {‘id’: 2, ‘value’: 15} {‘id’: 1, ‘value’: 5} {‘id’: 1, ‘value’: 10} {‘id’: 1, ‘value’: 12} {‘id’: 3, ‘value’: 20 {‘id’: 2, ‘value’: 10} {‘id’: 2, ‘value’: 15}

Shuffle & Sort

{‘id’: 1, ‘interval’: 5}

{‘id’: 1, ‘interval’: 2} {‘id’: 2, ‘interval’: 5}

(21)

References

Related documents

a) There will be more meaningful course participation. b) Training will be given closer to the time when it is needed (i.e. at the command level). c) The course can be

There is no mediator between man and {his/her} salvation." "Salvation is the freeing of the soul from its bodily fetters, becoming a God through knowledge and wisdom, controlling

In addition to the three core courses, Professional Project option students must complete a minimum of two courses in each the Environmental Applied Science elective group (Group

Room for improvement in recycling businesses may also exist in relation to efficiency rates as well as the recycling of other cobalt products (not assessed in this

The result of the study indicated that 13.04% and 7.74% of sampled household heads in Bilen and Endod-ber, respectively, identified high cost of inputs as their main

one relative (apart from the mother) does improve the survival rates of children, but that 11?. relatives differ in whether they are consistently beneficial to children

The cost for cosmetology transfer students is $11 .30 per hour and $10 .00 per hour for barbering students to attended at PAUL MITCHELL THE SCHOOL Fort Myers; this does not include

Dengan adanya home page diharapkan dapat mempermudah home industry untuk menyebarkan informasi tentang produk abon yang diproduksi dan mempermudah transaksi penjualan