Intro to PySpark
What is PySpark?
•
Python interface to Apache Spark
•
Map/Reduce style distributed computing
•
Natively Scala
•
Interfaces to Python, R are well-maintained
•Uses Py4J for Java <-> Scala interface
PySpark Basics
• Distributed Computing basic premise:
• Data is big
• Program to process data is relatively small
• Send program to where the data lives
PySpark Basics
• Driver: e.g. my laptop
• Cluster Manager: YARN, Mesos, etc
PySpark Basics
• RDD: Resilient Distributed Dataset
• Interface to parallelized data in the cluster
• Map, Filter, Reduce functions sent from driver,
executed by workers on chunks of data in parallel
PySpark: Hello World
• Classic Word Count problem
• How many times does each word appear in a
given text?
• Approach: Each worker computes word counts
PySpark: Hello World
The brown dog brown dog (The, 1) (brown, 1) (dog, 1) (brown, 1) (dog, 1) (The, 1) (dog, 2) (brown, 2) (The, 1) (dog, 2) (brown, 2) Map Reduce Shuffle CollectDemo
# example 1
text = "the brown dog jumped over the other brown dog" text_rdd = sc.parallelize(text.split(' '))
text_rdd.map(lambda word: (word, 1)) \
.reduceByKey(lambda left, right: left + right).collect() # example 2
import string
time_machine = sc.textFile('/user/jasonwhite/time_machine')
time_machine_tuples = time_machine.flatMap(lambda line: line.lower().split(' ')) \ .map(lambda word: ''.join(ch for ch in word if ch in string.letters)) \
.filter(lambda word: word != '') \ .map(lambda word: (word, 1))
Monoids
• Monoids are combinations of: • set of data; and
• associative, commutative functions • Very efficient in M/R, strongly preferred • Examples:
• addition of integers
Demo
# example 3 dataset = sc.parallelize([ {'id': 1, 'value': 1}, {'id': 2, 'value': 2}, {'id': 2, 'value': 6} ])def add_tuples(left, right):
left_sum, left_count = left
right_sum, right_count = right
return (left_sum + right_sum, left_count + right_count) averages = dataset.map(lambda d: (d['value'], 1)) \
.reduce(add_tuples)
averages_by_key = dataset.map(lambda d: (d['id'], (d['value'], 1))) \ .reduceByKey(add_tuples) \
Demo
# example 4
from datetime import date dataset = sc.parallelize([
{'id': 1, 'group_id': 10, 'timestamp': date(1978, 3, 2)}, {'id': 2, 'group_id': 10, 'timestamp': date(1984, 3, 24)}, {'id': 3, 'group_id': 10, 'timestamp': date(1986, 5, 19)}, {'id': 4, 'group_id': 11, 'timestamp': date(1956, 6, 5)}, {'id': 5, 'group_id': 11, 'timestamp': date(1953, 2, 21)}, ])
def calculate_age(d):
d['age'] = (date.today() - d['timestamp']).days() return d
def calculate_group_stats(left, right):
earliest = min(left['earliest'], right['earliest']) latest = max(left['latest'], right['latest'])
total_age = left['total_age'] + right['total_age'] count = left['count'] + right['count']
return {
'earliest': earliest, 'latest': latest,
'total_age': total_age,
'count': left['count'] + right[‘count'] }
group_stats = dataset.map(calculate_age) \
.map(lambda d: (d['group_id'], {'earliest': d['timestamp'], 'latest': d['timestamp'], 'total_age': d['age'], 'count': 1})) \
Joining RDDs
• Like many RDD operations, works on (k, v) pairs
• Each side shuffled using common keys
Joining RDDs
{‘id’: 1, ‘field1’: ‘foo’} {‘id’: 2, ‘field1’: ‘bar’}
{‘id’: 1, ‘field2’: ‘baz’} {‘id’: 2, ‘field2’: ‘baz’}
(1, {‘id’: 1, ‘field1’: ‘foo’}) (2, {‘id’: 2, ‘field1’: ‘bar’})
(1, {‘id’: 1, ‘field2’: ‘baz’}) (2, {‘id’: 2, ‘field2’: ‘baz’})
(1, ({‘id’: 1, ‘field1’: ‘foo’}, {‘id’: 1, ‘field2’: ‘baz’}))
(2, ({‘id’: 2, ‘field1’: ‘bar’}, {‘id’: 2, ‘field2’: ‘baz’}))
Demo
# example 4
first_dataset = sc.parallelize([ {'id': 1, 'field1': 'foo'},
{'id': 2, 'field1': 'bar'}, {'id': 2, 'field1': 'baz'}, {'id': 3, 'field1': 'foo'} ])
first_dataset = first_dataset.map(lambda d: (d['id'], d)) second_dataset = sc.parallelize([
{'id': 1, 'field2': 'abc'}, {'id': 2, 'field2': 'def'} ])
second_dataset = second_dataset.map(lambda d: (d['id'], d)) output = first_dataset.join(second_dataset)
Key Skew
• Achilles’ heel of M/R: key skew
• Shuffle phase distributes like keys to like nodes
• If billions of rows are shuffled to the same node,
Joining RDDs w/ Skew
• When joining to small RDD, an alternative is to
“broadcast” the RDD
• Instead of shuffling, entire RDD is sent to each
worker
• Now each worker has all data needed
Demo
# example 5
first_dataset = sc.parallelize([ {'id': 1, 'field1': 'foo'},
{'id': 2, 'field1': 'bar'}, {'id': 2, 'field1': 'baz'}, {'id': 3, 'field1': 'foo'} ])
first_dataset = first_dataset.map(lambda d: (d['id'], d)) second_dataset = sc.parallelize([
{'id': 1, 'field2': 'abc'}, {'id': 2, 'field2': 'def'} ])
second_dataset = second_dataset.map(lambda d: (d['id'], d)) second_dict = sc.broadcast(second_dataset.collectAsMap()) def join_records((key, record)):
if key in second_dict.value.keys():
yield (key, (record, second_dict.value[key])) output = first_dataset.flatMap(join_records)
Ordering
• Row order isn’t guaranteed unless you explicitly
sort the RDD
• But: sometimes you need to process events in
order!
Ordering
{‘id’: 1, ‘value’: 10} {‘id’: 2, ‘value’: 10} {‘id’: 3, ‘value’: 20} {‘id’: 1, ‘value’: 12} {‘id’: 1, ‘value’: 5} {‘id’: 2, ‘value’: 15} {‘id’: 1, ‘value’: 5} {‘id’: 1, ‘value’: 10} {‘id’: 1, ‘value’: 12} {‘id’: 3, ‘value’: 20 {‘id’: 2, ‘value’: 10} {‘id’: 2, ‘value’: 15}Shuffle & Sort
{‘id’: 1, ‘interval’: 5}
{‘id’: 1, ‘interval’: 2} {‘id’: 2, ‘interval’: 5}