Spark SQL supports a number of structured data sources out of the box, letting you get Row objects from them without any complicated loading process. These sources include Hive tables, JSON, and Parquet files. In addition, if you query these sources using SQL and select only a subset of the fields, Spark SQL can smartly scan only the subset of the data for those fields, instead of scanning all the data like a naive Spark Context.hadoopFile might.
Apart from these data sources, you can also convert regular RDDs in your program to SchemaRDDs by assigning them a schema. This makes it easy to write SQL queries even when your underlying data is Python or Java objects. Often, SQL queries are more concise when you’re computing many quantities at once (e.g., if you wanted to compute the average age, max age, and count of distinct user IDs in one pass). In addition, you can easily join these RDDs with SchemaRDDs from any other Spark SQL data source. In this section, we’ll cover the external sources as well as this way of using RDDs.
Apache Hive
When loading data from Hive, Spark SQL supports any Hive-supported storage for‐ mats (SerDes), including text files, RCFiles, ORC, Parquet, Avro, and Protocol Buffers.
To connect Spark SQL to an existing Hive installation, you need to provide a Hive configuration. You do so by copying your hive-site.xml file to Spark’s ./conf/ direc‐ tory. If you just want to explore, a local Hive metastore will be used if no hive-site.xml
is set, and we can easily load data into a Hive table to query later on.
Examples 9-15 through 9-17 illustrate querying a Hive table. Our example Hive table has two columns, key (which is an integer) and value (which is a string). We show how to create such a table later in this chapter.
Example 9-15. Hive load in Python
from pyspark.sql import HiveContext hiveCtx = HiveContext(sc)
rows = hiveCtx.sql("SELECT key, value FROM mytable")
keys = rows.map(lambda row: row[0])
Example 9-16. Hive load in Scala
import org.apache.spark.sql.hive.HiveContext
val rows = hiveCtx.sql("SELECT key, value FROM mytable")
val keys = rows.map(row => row.getInt(0))
Example 9-17. Hive load in Java
import org.apache.spark.sql.hive.HiveContext;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SchemaRDD;
HiveContext hiveCtx = new HiveContext(sc);
SchemaRDD rows = hiveCtx.sql("SELECT key, value FROM mytable");
JavaRDD<Integer> keys = rdd.toJavaRDD().map(new Function<Row, Integer>() {
public Integer call(Row row) { return row.getInt(0); } });
Parquet
Parquet is a popular column-oriented storage format that can store records with nes‐ ted fields efficiently. It is often used with tools in the Hadoop ecosystem, and it sup‐ ports all of the data types in Spark SQL. Spark SQL provides methods for reading data directly to and from Parquet files.
First, to load data, you can use HiveContext.parquetFile or SQLContext.parquet File, as shown in Example 9-18.
Example 9-18. Parquet load in Python
# Load some data in from a Parquet file with field's name and favouriteAnimal rows = hiveCtx.parquetFile(parquetFile)
names = rows.map(lambda row: row.name)
print "Everyone"
print names.collect()
You can also register a Parquet file as a Spark SQL temp table and write queries against it. Example 9-19 continues from Example 9-18 where we loaded the data.
Example 9-19. Parquet query in Python
# Find the panda lovers
tbl = rows.registerTempTable("people")
pandaFriends = hiveCtx.sql("SELECT name FROM people WHERE favouriteAnimal = \"panda\"")
print "Panda friends"
print pandaFriends.map(lambda row: row.name).collect()
Finally, you can save the contents of a SchemaRDD to Parquet with saveAsParquet File(), as shown in Example 9-20.
Example 9-20. Parquet file save in Python
pandaFriends.saveAsParquetFile("hdfs://...")
JSON
If you have a JSON file with records fitting the same schema, Spark SQL can infer the schema by scanning the file and let you access fields by name (Example 9-21). If you have ever found yourself staring at a huge directory of JSON records, Spark SQL’s schema inference is a very effective way to start working with the data without writ‐ ing any special loading code.
To load our JSON data, all we need to do is call the jsonFile() function on our hiveCtx, as shown in Examples 9-22 through 9-24. If you are curious about what the inferred schema for your data is, you can call printSchema on the resulting Sche‐ maRDD (Example 9-25).
Example 9-21. Input records
{"name": "Holden"}
{"name":"Sparky The Bear", "lovesPandas":true, "knows":{"friends": ["holden"]}}
Example 9-22. Loading JSON with Spark SQL in Python
input = hiveCtx.jsonFile(inputFile)
Example 9-23. Loading JSON with Spark SQL in Scala
val input = hiveCtx.jsonFile(inputFile)
Example 9-24. Loading JSON with Spark SQL in Java
SchemaRDD input = hiveCtx.jsonFile(jsonFile);
Example 9-25. Resulting schema from printSchema()
root
|-- knows: struct (nullable = true) | |-- friends: array (nullable = true)
| | |-- element: string (containsNull = false) |-- lovesPandas: boolean (nullable = true)
|-- name: string (nullable = true)
Example 9-26. Partial schema of tweets
root
|-- contributorsIDs: array (nullable = true) | |-- element: string (containsNull = false) |-- createdAt: string (nullable = true)
|-- currentUserRetweetId: integer (nullable = true) |-- hashtagEntities: array (nullable = true) | |-- element: struct (containsNull = false) | | |-- end: integer (nullable = true) | | |-- start: integer (nullable = true) | | |-- text: string (nullable = true) |-- id: long (nullable = true)
|-- inReplyToScreenName: string (nullable = true) |-- inReplyToStatusId: long (nullable = true) |-- inReplyToUserId: long (nullable = true) |-- isFavorited: boolean (nullable = true)
|-- isPossiblySensitive: boolean (nullable = true) |-- isTruncated: boolean (nullable = true)
|-- mediaEntities: array (nullable = true) | |-- element: struct (containsNull = false) | | |-- displayURL: string (nullable = true) | | |-- end: integer (nullable = true) | | |-- expandedURL: string (nullable = true) | | |-- id: long (nullable = true)
| | |-- mediaURL: string (nullable = true) | | |-- mediaURLHttps: string (nullable = true) | | |-- sizes: struct (nullable = true)
| | | |-- 0: struct (nullable = true)
| | | | |-- height: integer (nullable = true) | | | | |-- resize: integer (nullable = true) | | | | |-- width: integer (nullable = true) | | | |-- 1: struct (nullable = true)
| | | | |-- height: integer (nullable = true) | | | | |-- resize: integer (nullable = true) | | | | |-- width: integer (nullable = true) | | | |-- 2: struct (nullable = true)
| | | | |-- height: integer (nullable = true) | | | | |-- resize: integer (nullable = true) | | | | |-- width: integer (nullable = true) | | | |-- 3: struct (nullable = true)
| | | | |-- height: integer (nullable = true) | | | | |-- resize: integer (nullable = true) | | | | |-- width: integer (nullable = true) | | |-- start: integer (nullable = true)
| | |-- type: string (nullable = true) | | |-- url: string (nullable = true) |-- retweetCount: integer (nullable = true) ...
As you look at these schemas, a natural question is how to access nested fields and array fields. Both in Python and when we register a table, we can access nested Loading and Saving Data | 173
elements by using the . for each level of nesting (e.g., toplevel.nextlevel). You can
access array elements in SQL by specifying the index with [element], as shown in Example 9-27.
Example 9-27. SQL query nested and array elements
select hashtagEntities[0].text from tweets LIMIT 1;
From RDDs
In addition to loading data, we can also create a SchemaRDD from an RDD. In Scala, RDDs with case classes are implicitly converted into SchemaRDDs.
For Python we create an RDD of Row objects and then call inferSchema(), as shown in Example 9-28.
Example 9-28. Creating a SchemaRDD using Row and named tuple in Python
happyPeopleRDD = sc.parallelize([Row(name="holden", favouriteBeverage="coffee")])
happyPeopleSchemaRDD = hiveCtx.inferSchema(happyPeopleRDD)
happyPeopleSchemaRDD.registerTempTable("happy_people")
With Scala, our old friend implicit conversions handles the inference of the schema for us (Example 9-29).
Example 9-29. Creating a SchemaRDD from case class in Scala
case class HappyPerson(handle: String, favouriteBeverage: String) ...
// Create a person and turn it into a Schema RDD
val happyPeopleRDD = sc.parallelize(List(HappyPerson("holden", "coffee")))
// Note: there is an implicit conversion
// that is equivalent to sqlCtx.createSchemaRDD(happyPeopleRDD) happyPeopleRDD.registerTempTable("happy_people")
With Java, we can turn an RDD consisting of a serializable class with public getters and setters into a schema RDD by calling applySchema(), as Example 9-30 shows.
Example 9-30. Creating a SchemaRDD from a JavaBean in Java
class HappyPerson implements Serializable {
private String name;
private String favouriteBeverage;
public HappyPerson() {}
public HappyPerson(String n, String b) {
name = n; favouriteBeverage = b;
public void setName(String n) { name = n; }
public String getFavouriteBeverage() { return favouriteBeverage; }
public void setFavouriteBeverage(String b) { favouriteBeverage = b; } };
...
ArrayList<HappyPerson> peopleList = new ArrayList<HappyPerson>();
peopleList.add(new HappyPerson("holden", "coffee"));
JavaRDD<HappyPerson> happyPeopleRDD = sc.parallelize(peopleList);
SchemaRDD happyPeopleSchemaRDD = hiveCtx.applySchema(happyPeopleRDD,
HappyPerson.class);
happyPeopleSchemaRDD.registerTempTable("happy_people");