• No results found

Hardware Provisioning

The hardware resources you give to Spark will have a significant effect on the com‐ pletion time of your application. The main parameters that affect cluster sizing are the amount of memory given to each executor, the number of cores for each execu‐ tor, the total number of executors, and the number of local disks to use for scratch data.

In all deployment modes, executor memory is set with spark.executor.memory or the --executor-memory flag to spark-submit. The options for number and cores of executors differ depending on deployment mode. In YARN you can set spark.execu

tor.cores or the --executor-cores flag and the --num-executors flag to determine the total count. In Mesos and Standalone mode, Spark will greedily acquire as many cores and executors as are offered by the scheduler. However, both Mesos and Stand‐ alone mode support setting spark.cores.max to limit the total number of cores across all executors for an application. Local disks are used for scratch storage during shuffle operations.

Broadly speaking, Spark applications will benefit from having more memory and cores. Spark’s architecture allows for linear scaling; adding twice the resources will often make your application run twice as fast. An additional consideration when siz‐ ing a Spark application is whether you plan to cache intermediate datasets as part of your workload. If you do plan to use caching, the more of your cached data can fit in memory, the better the performance will be. The Spark storage UI will give details about what fraction of your cached data is in memory. One approach is to start by caching a subset of your data on a smaller cluster and extrapolate the total memory you will need to fit larger amounts of the data in memory.

In addition to memory and cores, Spark uses local disk volumes to store intermediate data required during shuffle operations along with RDD partitions that are spilled to disk. Using a larger number of local disks can help accelerate the performance of Spark applications. In YARN mode, the configuration for local disks is read directly from YARN, which provides its own mechanism for specifying scratch storage direc‐ tories. In Standalone mode, you can set the SPARK_LOCAL_DIRS environment variable in spark-env.sh when deploying the Standalone cluster and Spark applications will inherit this config when they are launched. In Mesos mode, or if you are running in another mode and want to override the cluster’s default storage locations, you can set the spark.local.dir option. In all cases you specify the local directories using a sin‐ gle comma-separated list. It is common to have one local directory for each disk vol‐ ume available to Spark. Writes will be evenly striped across all local directories provided. Larger numbers of disks will provide higher overall throughput.

One caveat to the “more is better” guideline is when sizing memory for executors. Using very large heap sizes can cause garbage collection pauses to hurt the through‐ put of a Spark job. It can sometimes be beneficial to request smaller executors (say, 64 GB or less) to mitigate this issue. Mesos and YARN can, out of the box, support pack‐ ing multiple, smaller executors onto the same physical host, so requesting smaller executors doesn’t mean your application will have fewer overall resources. In Spark’s Standalone mode, you need to launch multiple workers (determined using SPARK_WORKER_INSTANCES) for a single application to run more than one executor on a host. This limitation will likely be removed in a later version of Spark. In addition to using smaller executors, storing data in serialized form (see “Memory Manage‐ ment” on page 157) can also help alleviate garbage collection.

Conclusion

If you’ve made it through this chapter, you are well poised to tackle a production Spark use case. We covered Spark’s configuration management, instrumentation and metrics in Spark’s UI, and common tuning techniques for production workloads. To dive deeper into tuning Spark, visit the tuning guide in the official documentation.

CHAPTER 9

Spark SQL

This chapter introduces Spark SQL, Spark’s interface for working with structured and semistructured data. Structured data is any data that has a schema—that is, a known set of fields for each record. When you have this type of data, Spark SQL makes it both easier and more efficient to load and query. In particular, Spark SQL provides three main capabilities (illustrated in Figure 9-1):

1. It can load data from a variety of structured sources (e.g., JSON, Hive, and Parquet).

2. It lets you query the data using SQL, both inside a Spark program and from external tools that connect to Spark SQL through standard database connectors (JDBC/ODBC), such as business intelligence tools like Tableau.

3. When used within a Spark program, Spark SQL provides rich integration between SQL and regular Python/Java/Scala code, including the ability to join RDDs and SQL tables, expose custom functions in SQL, and more. Many jobs are easier to write using this combination.

To implement these capabilities, Spark SQL provides a special type of RDD called SchemaRDD. A SchemaRDD is an RDD of Row objects, each representing a record. A SchemaRDD also knows the schema (i.e., data fields) of its rows. While SchemaRDDs look like regular RDDs, internally they store data in a more efficient manner, taking advantage of their schema. In addition, they provide new operations not available on RDDs, such as the ability to run SQL queries. SchemaRDDs can be created from external data sources, from the results of queries, or from regular RDDs.

Figure 9-1. Spark SQL usage

In this chapter, we’ll start by showing how to use SchemaRDDs inside regular Spark programs, to load and query structured data. We’ll then describe the Spark SQL JDBC server, which lets you run Spark SQL on a shared server and connect either SQL shells or visualization tools like Tableau to it. Finally, we’ll discuss some advanced features. Spark SQL is a newer component of Spark and it will evolve sub‐ stantially in Spark 1.3 and future versions, so consult the most recent documentation for the latest information on Spark SQL and SchemaRDDs.

As we move through this chapter, we’ll use Spark SQL to explore a JSON file with tweets. If you don’t have any tweets on hand, you can use the Databricks reference application to download some, or you can use files/testweet.json in the book’s Git repo.