• No results found

Continuing the example of filter-join-filter, let’s see an implementation in Hive.

First, we will need to make tables for our data sets, shown in the following code. We will use external tables because that way if we were to delete the table, only the metadata (information about the name of table, column names, types, etc. in the metastore) will get deleted. The underlying data in HDFS still remains intact:

CREATE EXTERNAL TABLE foo(fooId BIGINT, fooVal INT, fooBarId BIGINT) ROW FORMAT DELIMITED

FIELDS TERMINATED BY '|' STORED AS TEXTFILE

LOCATION 'foo';

CREATE EXTERNAL TABLE bar(barId BIGINT, barVal INT) ROW FORMAT DELIMITED

FIELDS TERMINATED BY '|' STORED AS TEXTFILE

LOCATION 'bar';

You’ll notice this syntax is a little different from a typical RDBMS system, but any user familiar with SQL should be able to get the idea of what is going on. In short, we are making two tables named foo and bar. Their data is stored in text files with “|” as a column delimiter, and the files containing the data are located in the foo and bar

directories, respectively, in HDFS.

Another thing to note is that we’re also defining the storage format in these statements. In this case we’re simply storing the data as delimited text files, but in a production

application we’ll likely use an optimized binary format, such as Parquet, for storage. We’ll see a concrete example of this in Chapter 8. This is in contrast to most traditional data stores, such as an RDBMS, in which data is automatically converted into a proprietary, optimized format for that database.

Now, when that is done we can optionally (but it’s always recommended) run the

command to compute statistics. This will allow Hive to select join strategies and execution plans better suited to the data based on its distribution. The commands for computing statistics in Hive for the preceding tables would look like:

ANALYZE TABLE foo COMPUTE STATISTICS;

ANALYZE TABLE bar COMPUTE STATISTICS;

The automatic population of Hive statistics is controlled by a property named

hive.stats.autogather, which is set to true by default. However, the stats are only

automatically computed if you are inserting data via a Hive insert statement, like INSERT OVERWRITE TABLE. If you are moving data into HDFS outside of Hive or using something

like Flume to stream data into your Hive table’s HDFS location, you will have to run an

ANALYZE TABLE TABLE NAME COMPUTE STATISTICS command explicitly to update the

This will run map-only jobs that will read through the data and compute various statistics about it (min, max, etc.), which can then be used by the Hive query planner for queries on these tables later on.

When compute stats are done, we are ready to go. The following code shows how we would execute the filter-join-filter query in Hive:

SELECT * FROM

foo f JOIN bar b ON (f.fooBarId = b.barId) WHERE

f.fooVal < 500 AND

f.fooVal + b.barVal < 1000;

As you’ll notice this is quite simple, especially for someone who is already familiar with SQL. There is no need to learn MapReduce, Pig, Crunch, Cascading, and so on.

A word of caution here is that Hive is not the best at optimization and sometimes, even to optimize, you may have to set a few configuration properties. For example, Hive supports various kinds of distributed joins: map join (also known as hash join), bucketed join, sorted bucketed merge join, and regular join. If your data sets meet certain prerequisites, some joins may lead to much better performance than others. However, older versions of Hive didn’t select the right join automatically, so the Hive compiler would rely on hints supplied by the query author to choose the right join for the query in question. Newer versions of Hive select the right join automatically, and as the project evolves more and more of these optimizations are being done automatically.

Also note that even though SQL is great for querying, it’s not the best language for

expressing all forms of processing. For every processing problem that you are expressing in SQL, you should ask whether SQL is a good fit, or if you’re shoehorning the problem into it. Problems that use simple filtering and aggregation are a good fit for SQL. For example, if you need to find the user who was most active on Twitter in the past month, that is fairly easy to do in SQL (assuming you have access to the Twitter data set); you need to count Twitter activity for each user and then find the user with the highest count. On the other hand, machine learning, text processing, and graph algorithms are typically a poor fit for SQL. If you need to choose which advertisement to show to each Twitter user based on that user’s interests and friends, it is unlikely that SQL is the right tool.

Similar to Pig, Hive is an abstraction over MapReduce, unless you’re using one of the newer execution engines noted earlier. This means that Hive also hides all the MapReduce work behind the scenes. You should still make a habit of reviewing what Hive is doing under the hood to ensure it’s doing what you intend. You can do this by simply adding the word EXPLAIN before the query command. The next example shows what the explain plan

looks like for our filter-join-filter query.

As you’ll see in the explain plan, the SQL query is mapped to three stages in Hive

out to do a map join (instead of the less performant regular join). Stage 3 populates a hash table equivalent to the query SELECT * FROM foo f WHERE f.fooVal < 500. This hash

table is held in memory in all nodes of the cluster doing the join. Then, as shown in stage 4, the values from the bar table are simply read and joined against the in-memory hash table containing prefiltered values from the foo table:

EXPLAIN SELECT *

> FROM foo f JOIN bar b ON (f.fooBarId = b.barId) > WHERE f.fooVal < 500 AND

> f.fooVal + b.barVal < 1000 > ;

OK

ABSTRACT SYNTAX TREE:

STAGE DEPENDENCIES: Stage-4 is a root stage

Stage-3 depends on stages: Stage-4 Stage-0 is a root stage

STAGE PLANS: Stage: Stage-4

Map Reduce Local Work

Alias -> Map Local Tables: f

Fetch Operator limit: -1

Alias -> Map Local Operator Tree: f TableScan alias: f Filter Operator predicate: expr: (fooval < 500) type: boolean

HashTable Sink Operator condition expressions:

0 {fooid} {fooval} {foobarid} 1 {barid} {barval}

handleSkewJoin: false keys:

0 [Column[foobarid]] 1 [Column[barid]] Position of Big Table: 1

Stage: Stage-3 Map Reduce

Alias -> Map Operator Tree: b

TableScan alias: b

Map Join Operator condition map:

Inner Join 0 to 1 condition expressions:

0 {fooid} {fooval} {foobarid} 1 {barid} {barval}

handleSkewJoin: false keys:

0 [Column[foobarid]] 1 [Column[barid]]

outputColumnNames: _col0, _col1, _col2, _col5, _col6 Position of Big Table: 1

Filter Operator predicate:

expr: ((_col1 < 500) and ((_col1 + _col6) < 1000)) type: boolean

Select Operator expressions: expr: _col0 type: bigint

expr: _col1 type: int expr: _col2 type: bigint expr: _col5 type: bigint expr: _col6 type: int

outputColumnNames: _col0, _col1, _col2, _col3, _col4 File Output Operator

compressed: false GlobalTableId: 0 table:

input format: org.apache.hadoop.mapred.TextInputFormat output format:\

org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe Local Work:

Map Reduce Local Work

Stage: Stage-0 Fetch Operator limit: -1