• No results found

Loading and Saving Data

Spark SQL supports a number of structured data sources out of the box, letting you get Row objects from them without any complicated loading process. These sources include Hive tables, JSON, and Parquet files. In addition, if you query these sources using SQL and select only a subset of the fields, Spark SQL can smartly scan only the subset of the data for those fields, instead of scanning all the data like a naive Spark Context.hadoopFile might.

Apart from these data sources, you can also convert regular RDDs in your program to SchemaRDDs by assigning them a schema. This makes it easy to write SQL queries even when your underlying data is Python or Java objects. Often, SQL queries are more concise when you’re computing many quantities at once (e.g., if you wanted to compute the average age, max age, and count of distinct user IDs in one pass). In addition, you can easily join these RDDs with SchemaRDDs from any other Spark SQL data source. In this section, we’ll cover the external sources as well as this way of using RDDs.

Apache Hive

When loading data from Hive, Spark SQL supports any Hive-supported storage for‐ mats (SerDes), including text files, RCFiles, ORC, Parquet, Avro, and Protocol Buffers.

To connect Spark SQL to an existing Hive installation, you need to provide a Hive configuration. You do so by copying your hive-site.xml file to Spark’s ./conf/ direc‐ tory. If you just want to explore, a local Hive metastore will be used if no hive-site.xml

is set, and we can easily load data into a Hive table to query later on.

Examples 9-15 through 9-17 illustrate querying a Hive table. Our example Hive table has two columns, key (which is an integer) and value (which is a string). We show how to create such a table later in this chapter.

Example 9-15. Hive load in Python

from pyspark.sql import HiveContext hiveCtx = HiveContext(sc)

rows = hiveCtx.sql("SELECT key, value FROM mytable")

keys = rows.map(lambda row: row[0])

Example 9-16. Hive load in Scala

import org.apache.spark.sql.hive.HiveContext

val rows = hiveCtx.sql("SELECT key, value FROM mytable")

val keys = rows.map(row => row.getInt(0))

Example 9-17. Hive load in Java

import org.apache.spark.sql.hive.HiveContext;

import org.apache.spark.sql.Row;

import org.apache.spark.sql.SchemaRDD;

HiveContext hiveCtx = new HiveContext(sc);

SchemaRDD rows = hiveCtx.sql("SELECT key, value FROM mytable");

JavaRDD<Integer> keys = rdd.toJavaRDD().map(new Function<Row, Integer>() {

public Integer call(Row row) { return row.getInt(0); } });

Parquet

Parquet is a popular column-oriented storage format that can store records with nes‐ ted fields efficiently. It is often used with tools in the Hadoop ecosystem, and it sup‐ ports all of the data types in Spark SQL. Spark SQL provides methods for reading data directly to and from Parquet files.

First, to load data, you can use HiveContext.parquetFile or SQLContext.parquet File, as shown in Example 9-18.

Example 9-18. Parquet load in Python

# Load some data in from a Parquet file with field's name and favouriteAnimal rows = hiveCtx.parquetFile(parquetFile)

names = rows.map(lambda row: row.name)

print "Everyone"

print names.collect()

You can also register a Parquet file as a Spark SQL temp table and write queries against it. Example 9-19 continues from Example 9-18 where we loaded the data.

Example 9-19. Parquet query in Python

# Find the panda lovers

tbl = rows.registerTempTable("people")

pandaFriends = hiveCtx.sql("SELECT name FROM people WHERE favouriteAnimal = \"panda\"")

print "Panda friends"

print pandaFriends.map(lambda row: row.name).collect()

Finally, you can save the contents of a SchemaRDD to Parquet with saveAsParquet File(), as shown in Example 9-20.

Example 9-20. Parquet file save in Python

pandaFriends.saveAsParquetFile("hdfs://...")

JSON

If you have a JSON file with records fitting the same schema, Spark SQL can infer the schema by scanning the file and let you access fields by name (Example 9-21). If you have ever found yourself staring at a huge directory of JSON records, Spark SQL’s schema inference is a very effective way to start working with the data without writ‐ ing any special loading code.

To load our JSON data, all we need to do is call the jsonFile() function on our hiveCtx, as shown in Examples 9-22 through 9-24. If you are curious about what the inferred schema for your data is, you can call printSchema on the resulting Sche‐ maRDD (Example 9-25).

Example 9-21. Input records

{"name": "Holden"}

{"name":"Sparky The Bear", "lovesPandas":true, "knows":{"friends": ["holden"]}}

Example 9-22. Loading JSON with Spark SQL in Python

input = hiveCtx.jsonFile(inputFile)

Example 9-23. Loading JSON with Spark SQL in Scala

val input = hiveCtx.jsonFile(inputFile)

Example 9-24. Loading JSON with Spark SQL in Java

SchemaRDD input = hiveCtx.jsonFile(jsonFile);

Example 9-25. Resulting schema from printSchema()

root

|-- knows: struct (nullable = true) | |-- friends: array (nullable = true)

| | |-- element: string (containsNull = false) |-- lovesPandas: boolean (nullable = true)

|-- name: string (nullable = true)

Example 9-26. Partial schema of tweets

root

|-- contributorsIDs: array (nullable = true) | |-- element: string (containsNull = false) |-- createdAt: string (nullable = true)

|-- currentUserRetweetId: integer (nullable = true) |-- hashtagEntities: array (nullable = true) | |-- element: struct (containsNull = false) | | |-- end: integer (nullable = true) | | |-- start: integer (nullable = true) | | |-- text: string (nullable = true) |-- id: long (nullable = true)

|-- inReplyToScreenName: string (nullable = true) |-- inReplyToStatusId: long (nullable = true) |-- inReplyToUserId: long (nullable = true) |-- isFavorited: boolean (nullable = true)

|-- isPossiblySensitive: boolean (nullable = true) |-- isTruncated: boolean (nullable = true)

|-- mediaEntities: array (nullable = true) | |-- element: struct (containsNull = false) | | |-- displayURL: string (nullable = true) | | |-- end: integer (nullable = true) | | |-- expandedURL: string (nullable = true) | | |-- id: long (nullable = true)

| | |-- mediaURL: string (nullable = true) | | |-- mediaURLHttps: string (nullable = true) | | |-- sizes: struct (nullable = true)

| | | |-- 0: struct (nullable = true)

| | | | |-- height: integer (nullable = true) | | | | |-- resize: integer (nullable = true) | | | | |-- width: integer (nullable = true) | | | |-- 1: struct (nullable = true)

| | | | |-- height: integer (nullable = true) | | | | |-- resize: integer (nullable = true) | | | | |-- width: integer (nullable = true) | | | |-- 2: struct (nullable = true)

| | | | |-- height: integer (nullable = true) | | | | |-- resize: integer (nullable = true) | | | | |-- width: integer (nullable = true) | | | |-- 3: struct (nullable = true)

| | | | |-- height: integer (nullable = true) | | | | |-- resize: integer (nullable = true) | | | | |-- width: integer (nullable = true) | | |-- start: integer (nullable = true)

| | |-- type: string (nullable = true) | | |-- url: string (nullable = true) |-- retweetCount: integer (nullable = true) ...

As you look at these schemas, a natural question is how to access nested fields and array fields. Both in Python and when we register a table, we can access nested Loading and Saving Data | 173

elements by using the . for each level of nesting (e.g., toplevel.nextlevel). You can

access array elements in SQL by specifying the index with [element], as shown in Example 9-27.

Example 9-27. SQL query nested and array elements

select hashtagEntities[0].text from tweets LIMIT 1;

From RDDs

In addition to loading data, we can also create a SchemaRDD from an RDD. In Scala, RDDs with case classes are implicitly converted into SchemaRDDs.

For Python we create an RDD of Row objects and then call inferSchema(), as shown in Example 9-28.

Example 9-28. Creating a SchemaRDD using Row and named tuple in Python

happyPeopleRDD = sc.parallelize([Row(name="holden", favouriteBeverage="coffee")])

happyPeopleSchemaRDD = hiveCtx.inferSchema(happyPeopleRDD)

happyPeopleSchemaRDD.registerTempTable("happy_people")

With Scala, our old friend implicit conversions handles the inference of the schema for us (Example 9-29).

Example 9-29. Creating a SchemaRDD from case class in Scala

case class HappyPerson(handle: String, favouriteBeverage: String) ...

// Create a person and turn it into a Schema RDD

val happyPeopleRDD = sc.parallelize(List(HappyPerson("holden", "coffee")))

// Note: there is an implicit conversion

// that is equivalent to sqlCtx.createSchemaRDD(happyPeopleRDD) happyPeopleRDD.registerTempTable("happy_people")

With Java, we can turn an RDD consisting of a serializable class with public getters and setters into a schema RDD by calling applySchema(), as Example 9-30 shows.

Example 9-30. Creating a SchemaRDD from a JavaBean in Java

class HappyPerson implements Serializable {

private String name;

private String favouriteBeverage;

public HappyPerson() {}

public HappyPerson(String n, String b) {

name = n; favouriteBeverage = b;

public void setName(String n) { name = n; }

public String getFavouriteBeverage() { return favouriteBeverage; }

public void setFavouriteBeverage(String b) { favouriteBeverage = b; } };

...

ArrayList<HappyPerson> peopleList = new ArrayList<HappyPerson>();

peopleList.add(new HappyPerson("holden", "coffee"));

JavaRDD<HappyPerson> happyPeopleRDD = sc.parallelize(peopleList);

SchemaRDD happyPeopleSchemaRDD = hiveCtx.applySchema(happyPeopleRDD,

HappyPerson.class);

happyPeopleSchemaRDD.registerTempTable("happy_people");