• No results found

Using Avro key/value pairs in MapReduce

Data serialization— working with text

TECHNIQUE 16 Using Avro key/value pairs in MapReduce

MapReduce’s native data model is key/value pairs, and as I’ve mentioned earlier, Avro’s is record-based. Avro doesn’t have native support for key/value data, but some helper classes exist in Avro to help model key/value data and to use this natively in MapReduce.

■ Problem

You want to use Avro as a data format and container, but you want to model your data using key/value pairs in Avro and use them as native key/value pairs in MapReduce.

■ Solution

Use the AvroKeyValue, AvroKey, and AvroValue classes to work with Avro key/value data.

■ Discussion

Avro has an AvroKeyValue class that encapsulates a generic Avro record containing two records named key and value. AvroKeyValue serves as a helper class so that you can eas- ily read and write key/value data. The types of these records are defined by you.

In this technique you’ll repeat the average stock MapReduce job, but this time using Avro’s key/value framework. You’ll first need to generate the input data for your job. In this case, we’ll put the stock symbol in the key and the Stock object in the value:35

public static void writeToAvro(File inputFile,

OutputStream outputStream) throws IOException {

Schema schema = AvroKeyValue.getSchema(

Schema.create(Schema.Type.STRING), Stock.SCHEMA$); DataFileWriter<GenericRecord> writer = new DataFileWriter<GenericRecord>( new GenericDatumWriter<GenericRecord>()); writer.setCodec(CodecFactory.snappyCodec()); writer.create(schema, outputStream);

for (Stock stock : StockUtils.fromCsvFile(inputFile)) {

35GitHub source: https://github.com/alexholmes/hiped2/blob/master/src/main/java/hip/ch3/avro/ AvroKeyValueFileWrite.java.

Use the helper class to generate the schema.

Create a generic writer.

Set the compression codec.

Configure the writer with the output stream and schema.

105

TECHNIQUE 16 Using Avro key/value pairs in MapReduce

AvroKeyValue<CharSequence, Stock> record = new AvroKeyValue<CharSequence, Stock>(

new GenericData.Record(schema)); record.setKey(stock.getSymbol()); record.setValue(stock); writer.append(record.get()); } IOUtils.closeStream(writer); IOUtils.closeStream(outputStream); }

Go ahead and generate a file in HDFS containing the stock data in key/value format:

$ hip hip.ch3.avro.AvroKeyValueFileWrite \ --input test-data/stocks.txt \ --output stocks.kv.avro

If you’re curious to know the Avro schema definition of the file you just generated, use the tip highlighted in technique 12 to extract the schema from the file. In addi- tion, you can use the AvroDump utility to show the contents of the file:

# the "getschema" tool only works with data in the local filesystem, # so first copy the stocks file from HDFS to local disk

$ hadoop fs -get stocks.kv.avro .

$ java -jar $HIP_HOME/lib/avro-tools-1.7.4.jar getschema stocks.kv.avro {

"type" : "record", "name" : "KeyValuePair",

"namespace" : "org.apache.avro.mapreduce", "doc" : "A key/value pair",

"fields" : [ { "name" : "key", "type" : "string", "doc" : "The key" }, { "name" : "value", "type" : { "type" : "record", "name" : "Stock", "namespace" : "hip.ch3.avro.gen", "fields" : [ { "name" : "symbol", "type" : "string" }, { "name" : "date", "type" : "string" }, { "name" : "open", "type" : "double" }, { "name" : "high", "type" : "double" }, { Create a wrapper object and set the key and value.

Write the encapsulated generic object to the output stream.

"name" : "low", "type" : "double" }, { "name" : "close", "type" : "double" }, { "name" : "volume", "type" : "int" }, { "name" : "adjClose", "type" : "double" } ] },

"doc" : "The value" } ]

}

$ hip hip.util.AvroDump --file stocks.kv.avro

{"key": "AAPL", "value": {"symbol": "AAPL", "date": "2009-01-02", ... {"key": "AAPL", "value": {"symbol": "AAPL", "date": "2008-01-02", ... {"key": "AAPL", "value": {"symbol": "AAPL", "date": "2007-01-03", ...

Now for some MapReduce code—you’ll define your mapper, reducer, and driver in one shot:36

public int run(final String[] args) throws Exception { .... job.setInputFormatClass(AvroKeyValueInputFormat.class); AvroJob.setInputKeySchema(job, Schema.create(Schema.Type.STRING)); AvroJob.setInputValueSchema(job, Stock.SCHEMA$); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(DoubleWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(AvroValue.class); job.setOutputFormatClass(AvroKeyValueOutputFormat.class); AvroJob.setOutputValueSchema(job, StockAvg.SCHEMA$); ... }

public static class Map extends

Mapper<AvroKey<CharSequence>, AvroValue<Stock>, Text, DoubleWritable> {

@Override

public void map(AvroKey<CharSequence> key, AvroValue<Stock> value, Context context) {

36GitHub source: https://github.com/alexholmes/hiped2/blob/master/src/main/java/hip/ch3/avro/ AvroKeyValueMapReduce.java.

107

TECHNIQUE 16 Using Avro key/value pairs in MapReduce

context.write(new Text(key.toString()),

new DoubleWritable(value.datum().getOpen())); }

}

public static class Reduce extends

Reducer<Text, DoubleWritable, Text, AvroValue<StockAvg>> { @Override

protected void reduce(Text key,

Iterable<DoubleWritable> values, Context context) {

double total = 0.0; double count = 0;

for (DoubleWritable val: values) { total += val.get();

count++; }

StockAvg avg = new StockAvg(); avg.setSymbol(key.toString()); avg.setAvg(total / count);

context.write(key, new AvroValue<StockAvg>(avg)); }

}

As you can see, the AvroKey and AvroValue wrappers are used to supply input data in the mapper, as well as output data in the reducer. The neat thing here is that Avro is smart enough to support Hadoop Writable objects and automatically convert them into their Avro counterparts, which is why you don’t need to tell Avro the schema type of the output key.

You can run the MapReduce job with the following command:

$ hip hip.ch3.avro.AvroKeyValueMapReduce \ --input stocks.kv.avro \

--output output

And again, you can view the output with the AvroDump tool:

$ hip hip.util.AvroDump --file output/part*

{"key": "AAPL", "value": {"symbol": "AAPL", "avg": 68.631}} {"key": "CSCO", "value": {"symbol": "CSCO", "avg": 31.148}} {"key": "GOOG", "value": {"symbol": "GOOG", "avg": 417.478}} {"key": "MSFT", "value": {"symbol": "MSFT", "avg": 44.631}} {"key": "YHOO", "value": {"symbol": "YHOO", "avg": 69.333}}

■Summary

This concludes our coverage of the three Avro approaches for working with your data in MapReduce. Each of the methods is suited to a particular task, and you can select whichever one works best for your needs.

Let’s wrap up our Avro and MapReduce coverage by looking at how you can cus- tomize ordering characteristics of Avro data in MapReduce.