Apache Spark
Document Analysis Course (Fall 2015 -‐ Scott Sanner) Zahra Iman
Some slides from (Matei Zaharia,UC Berkeley / MIT& Harold Liu)
•Reminder
•Spark introduction
•DataFrame
•I/O in Spark
•Performance tuning
•Computational Model
Context
Reminder
•SparkConf
•JavaSparkContext
•RDD: Resilient Distributed Datasets
•Representation of data coming to your system as an object format
•Rely on lineage (case of failure, recover)
•Transformation
•What you to to RDD to get another RDD (open file, filter)
•Actions
•Asking for an answer the system needs to provide (count,…)
•Lazy Evaluations
•Only done where there is an actual action to be done
Context
•Reminder
•Spark introduction
•DataFrame
•I/O in Spark
•Performance tuning
•Computational Model
What is Spark?
•Fast and expressive cluster computing system compatible with ApacheHadoop
•Improves efficiency through:
» General execution graphs
» In-‐memory storage Improves usability through:
» Rich APIs in Scala, Java, Python
» Interactive shell
Up to 10× faster ondisk, 100× inmemory
2-‐5× less code
Spark Spark Streaming
real-‐time Shark SQL
GraphX graph
MLbase machine learning
…
A GrowingStack
Why a New Programming Model?
• Easy to use
• Compose well for large applications (Implementation)
• Higher level of computational model
• Fast data sharing and DAGs lead to
• more efficiency for the engine
• much simpler for the end users
• Spark’s goal was to generalize MapReduce to support new apps within same engine
Context
•Reminder
•Spark introduction
•DataFrame
•I/O in Spark
•Performance tuning
•Computational Model
A Brief History : RDD
•An RDD is an immutable, partitioned, logical collection of records
•Spark enabled distributed data processing through functional transformations on distributed collections of data (RDDs)
Transformations (define a new RDD) map
filter sample union groupByKey reduceByKey join cache
…
Parallel operations (Actions) (return a result to driver) reduce
collect count save lookupKey
…
RDD Essentials
•Transformations create a new dataset from an existing one
•All transformations in Spark are Lazy
•Do not compute their results right away
•Remember the transformations applied to some base datasets
•Optimize the required calculations
•Recover from lost data partitions
DataFrame
•A distributed collection of data organized into named columns
•Conceptually equivalent to a table in a relational database or a data frame in R/Python
•Under the hood, DataFrame contains an RDD composed of row objects with additional schema information of types
•Can incorporate SQL while working with DataFrames, using Spark SQL
•Can be constructed from a wide array of sources:
•structured data files tables in Hive
•external databases existing
RDD vs DataFrame
•New DataFrame API goal: enable wider audiences beyond “Big Data”
engineers to leverage the power of distributed processing
•DataFrame provides a way to operate on them using existing RDD tranformations like map(). However, provides additional capabilities
•Register DataFrame as a temporary table to query it
•Supporting functions with behavior similar to SQL counterparts like select()
•Cache tables
•Sql queries using SQLContext return DataFrames
•DataFrame allows Spark to run certain optimizations on the finalized query
•Since DataFrame has additional metadata due to its tabular format
•DataFrame can process Json data, parquet data, HiveQL data at a time by loading them into a DataFrame
DataFrame Example
JavaSparkContext sc= ...;// An existing JavaSparkContext.
SQLContext sqlContext=neworg.apache.spark.sql.SQLContext(sc);
DataFrame df=
sqlContext.read().json("examples/src/main/resources/people.json");
// Displays the content of the DataFrame to stdout df.show();
DataFrame Operations
// Print the schema in a tree format df.printSchema();
// Select only the"name" column df.select("name").show();
// Select everybody, but increment the ageby 1 df.select(df.col("name"),df.col("age").plus(1)).show();
// Select peopleolder than 21 df.filter(df.col("age").gt(21)).show();
// Count peopleby age df.groupBy("age").count().show();
Running SQL Queries Programmatically
SQLContext sqlContext= ...// An existing SQLContext DataFrame df=sqlContext.sql("SELECT * FROM table") JavaRDD<Person>people = …
// Apply a schema to an RDD of JavaBeans and register it as a table.
DataFrame schemaPeople=sqlContext.createDataFrame(people,Person.class);
schemaPeople.registerTempTable("people");
// SQL can be run over RDDs that have been registered as tables.
DataFrame teenagers =sqlContext.sql("SELECT name FROM people WHERE age >=
13 AND age <= 19")
DataFrame SupportedOperators
map filter groupBy sort union join leftOuterJo in rightOuterJ oi n
reduce count fold reduceByKey groupByKey cogroup cross zip
sample take first partitionBy save ...
Context
•Reminder
•Spark introduction
•DataFrame
•I/O in Spark
•Performance tuning
•Computational Model
I/O Process in Spark
•Reminder: Write as text file in one partition?
•By default spark create one partition for each block of the file
•Make number of partition is equal n times the number of cores in the cluster
•all partition will process parallel and resources are also used equally
•What if data does not fit in memory to write in one partition?
•Use multiple partitions
•Different formats of input/output files
•Parquet Files
•CSV Files
Parquet Files
•A columnar format supported by many other data processing systems
•Spark SQL provides support for both reading and writing Parquet files that automatically preserves the schema of the original data
•Loading & Writing data
// sqlContext from the previous example is used in this example.
DataFrame schemaPeople= ...// The DataFrame from the previous example.
// DataFrames can be saved as Parquet files, maintaining the schema information.
schemaPeople.write().parquet("people.pa rque t");
// Read in the Parquet file created above. Parquet files are self-‐describing so the schema is preserved.
// The result of loading a parquet file is also a DataFrame.
DataFrame parquetFile=sqlContext.read().parquet("people.pa rque t");
Context
•Reminder
•Spark introduction
•DataFrame
•I/O in Spark
•Performance tuning
•Computational Model
Performance Tuning
•Partitions
•Fragmentation enables Spark to execute in parallel
•Level of fragmentation is function of #partitions in your RDD
•Caching Data In Memory
•Spark SQL can cache tables using an in-‐memory columnar format DataFrame schemaPeople = sqlContext.createDataFrame(people, Person.class);
//cache DataFrame in memory schemaPeople.cache();
sqlContext.cacheTable("tableName")
•Serialization (something transparent that spark does)
•Avoiding writing back and forth
•translate code into ideally compressed format for transferring over the network => Kryo Serialization
•Other Configuration Options
•Spark Documentation
Example Config File
vi spark/config/spark-‐defaults.conf
•spark.eventLog.enabled true
•spark.serializer org.apache.spark.serializer.KryoSerializer
•spark.shuffle.consolidateFiles true
•spark.kryo.referenceTracking false
•spark.driver.extraJavaOptions "-‐XX:+UseCompressedOops"
•spark.executor.extraJavaOptions "-‐XX:+UseCompressedOops”
•spark.default.parallelism 48
•spark.driver.memory 2560M
Context
•Reminder
•Spark introduction
•DataFrame
•I/O in Spark
•Performance tuning
•Computational Model
Spark MapReduce Comparison -‐The Bottomline
•Hadoop MapReduce is meant for data that does not fit in the memory whereas Apache Spark has a better performance for the data that fits in the memory, particularly on dedicated clusters.
•Hadoop processing model is On-‐disk (disk-‐base parallelization) while Spark can be in-‐memory or On-‐disk
•Apache Spark follows a DAG (Directed Acyclic Graph) execution engine for execution
•In a distributed system, a conventional program would not work as the data is split across nodes. DAG is a programming style for distributed systems
•The DAG scheduler divides operators into stages of tasks.
•A stage is comprised of tasks based on partitions of the input data.
•The DAG scheduler pipelines operators together.
•The final result of a DAG scheduler is a set of stages.
DAG Example Hadoop MapReduce vs. Tez vs. Spark
Criteria
License Open Source Apache 2.0, version 2.x
Open Source, Apache 2.0, version 0.x
Open Source, Apache 2.0, version 1.x Processing
Model On-Disk (Disk- based parallelization), Batch
On-Disk, Batch,
Interactive In-Memory, On-Disk, Batch, Interactive, Streaming (Near Real- Time) Language written
in
Java Java Scala
API [Java, Python,
Scala], User-Facing Java,[ ISV/
Engine/Tool builder]
[Scala, Java, Python], User-Facing Libraries None, separate tools None [Spark Core, Spark
Streaming, Spark SQL, MLlib, GraphX]
Hadoop MapReduce vs. Tez vs. Spark
Criteria
License Open Source Apache 2.0, version 2.x
Open Source, Apache 2.0, version 0.x
Open Source, Apache 2.0, version 1.x Processing
Model On-Disk (Disk- based parallelization), Batch
On-Disk, Batch,
Interactive In-Memory, On-Disk, Batch, Interactive, Streaming (Near Real- Time) Language written
in Java Java Scala
API [Java, Python,
Scala], User-Facing Java,[ ISV/
Engine/Tool builder]
[Scala, Java, Python], User-Facing Libraries None, separate tools None [Spark Core, Spark
Streaming, Spark SQL, MLlib, GraphX]
Hadoop Vs. Spark
Hadoop MapReduce vs. Tez vs. Spark
Criteria
Installation Bound to Hadoop Bound to Hadoop Isn’t bound to Hadoop Ease of Use Difficult to program,
needs abstractions No Interactive mode except Hive
Difficult to program
No Interactive mode except Hive
Easy to program, no need of abstractions Interactive mode
Compatibility to data types and data
sources is same to data types and data sources is same
to data types and data sources is same YARN
integration YARN application Ground up YARN
application Spark is moving towards YARN
11
Hadoop MapReduce vs. Tez vs. Spark
Criteria
Installation Bound to Hadoop Bound to Hadoop Isn’t bound to Hadoop Ease of Use Difficult to program,
needs abstractions No Interactive mode except Hive
Difficult to program
No Interactive mode except Hive
Easy to program, no need of abstractions Interactive mode
Compatibility to data types and data
sources is same to data types and data sources is same
to data types and data sources is same YARN
integration YARN application Ground up YARN
application Spark is moving towards YARN
11
Hadoop Vs. Spark Conclusion
•Why did we need Spark after Hadoop?
•handles batch, interactive, and real-‐time within a single framework
•Easier to code
•programming at a higher level of abstraction
•more general: map/reduce is just one set of supported constructs
•Spark important Data Structures and I/O Files
•DataFrames
•Parquet Files
•Performance Tuning of Spark
•Change the default configurations in spark’s default config file
•Computational model of Spark
•Hadoop for very big datasets, Spark for when data fits in memory
1000+ meetup members 80+ contributors 24 companies contributing