Data serialization— working with text
TECHNIQUE 21 Reading and writing Avro data in Parquet with Java
date: REQUIRED BINARY O:UTF8 R:0 D:0 open: REQUIRED DOUBLE R:0 D:0 high: REQUIRED DOUBLE R:0 D:0 low: REQUIRED DOUBLE R:0 D:0 close: REQUIRED DOUBLE R:0 D:0 volume: REQUIRED INT32 R:0 D:0 adjClose: REQUIRED DOUBLE R:0 D:0 row group 1: RC:45 TS:2376
--- symbol: BINARY SNAPPY DO:0 FPO:4 SZ:85/84/0.99 VC:45 ENC:PD ... date: BINARY SNAPPY DO:0 FPO:89 SZ:127/198/1.56 VC:45 ENC ... open: DOUBLE SNAPPY DO:0 FPO:216 SZ:301/379/1.26 VC:45 EN ... high: DOUBLE SNAPPY DO:0 FPO:517 SZ:297/379/1.28 VC:45 EN ... low: DOUBLE SNAPPY DO:0 FPO:814 SZ:292/379/1.30 VC:45 EN ... close: DOUBLE SNAPPY DO:0 FPO:1106 SZ:299/379/1.27 VC:45 E ... volume: INT32 SNAPPY DO:0 FPO:1405 SZ:203/199/0.98 VC:45 EN ... adjClose: DOUBLE SNAPPY DO:0 FPO:1608 SZ:298/379/1.27 VC:45 E ...
Next let’s look at how you can write and read Parquet files.
TECHNIQUE 21 Reading and writing Avro data in Parquet with Java
One of the first things you’ll want to do when working with a new file format is to understand how a standalone Java application can read and write data. This technique shows how you can write Avro data into a Parquet file and read it back out.
■Problem
You want to read and write Parquet data directly from your Java code outside of Hadoop using an Avro object model.
■Solution
Use the AvroParquetWriter and AvroParquetReader classes.
■Discussion
Parquet, a columnar storage format for Hadoop, has support for Avro, which allows you to work with your data using Avro classes, and to efficiently encode the data using Parquet’s file format so that you can take advantage of the columnar layout of your data. It sounds odd to mix data formats like this, so let’s investigate why you’d want to do this and how it works.
Parquet is a storage format, and it has a formal programming language–agnostic specification. You could use Parquet directly without any other supporting data format such as Avro, but Parquet is at heart a simple data format and doesn’t support complex types such as maps or unions. This is where Avro comes into play, as it supports these richer types as well as features such as code generation and schema evolution. As a result, marrying Parquet and a rich data format such as Avro creates a perfect match of sophisticated schema capabilities coupled with efficient data encoding.
For this technique, we’ll continue to use the Avro Stock schema. First, let’s look at how you can write a Parquet file using these Stock objects.44
44GitHub source: https://github.com/alexholmes/hiped2/blob/master/src/main/java/hip/ch3/parquet/ ParquetAvroStockWriter.java.
AvroParquetWriter<Stock> writer = new AvroParquetWriter<Stock>( outputPath, Stock.SCHEMA$, CompressionCodecName.SNAPPY, ParquetWriter.DEFAULT_BLOCK_SIZE, ParquetWriter.DEFAULT_PAGE_SIZE, true );
for (Stock stock : AvroStockUtils.fromCsvFile(inputFile)) { writer.write(stock);
}
writer.close();
The following command generates a Parquet file by executing the preceding code:
$ hip hip.ch3.parquet.ParquetAvroStockWriter \ --input test-data/stocks.txt \
--output stocks.parquet
The previous technique showed you how to use the Parquet tools to dump the file to standard output. But what if you wanted to read the file in Java?45
ParquetReader<Stock> reader = new AvroParquetReader<Stock>(inputFile); Stock stock;
while((stock = reader.read()) != null) { System.out.println(stock);
}
reader.close()
The following command shows the output of the preceding code:
$ hip hip.ch3.parquet.ParquetAvroStockReader \ --input stocks.parquet AAPL,2009-01-02,85.88,91.04,85.16,90.75,26643400,90.75 AAPL,2008-01-02,199.27,200.26,192.55,194.84,38542100,194.84 AAPL,2007-01-03,86.29,86.58,81.9,83.8,44225700,83.8 ...
TECHNIQUE 22 Parquet and MapReduce
This technique examines how you can work with Parquet files in MapReduce. Using Parquet as a data source as well as a data sink in MapReduce will be covered.
45GitHub source: https://github.com/alexholmes/hiped2/blob/master/src/main/java/hip/ch3/parquet/ ParquetAvroStockReader.java.
Create a writer to write out Avro records.
The output file. The Avro schema.
Use Snappy to compress columns; only Snappy, gzip, and LZO are supported.
The amount of memory used to buffer writes— the default is 128 MB. The page size—the
default is 1 MB. Enable dictionary
encoding.
Write out Stock instances.
121
TECHNIQUE 22 Parquet and MapReduce
■Problem
You want to work with Avro data serialized as Parquet in MapReduce.
■Solution
Use the AvroParquetInputFormat and AvroParquetOutputFormat classes.
■Discussion
The Avro subproject in Parquet comes with MapReduce input and output formats to let you read and write your Avro data using Parquet as the storage format. The follow- ing example calculates the average stock price for each symbol:46
public int run(final String[] args) throws Exception { Path inputPath = new Path(args[0]);
Path outputPath = new Path(args[1]); Configuration conf = super.getConf(); Job job = new Job(conf);
job.setJarByClass(AvroParquetMapReduce.class); job.setInputFormatClass(AvroParquetInputFormat.class); AvroParquetInputFormat.setInputPaths(job, inputPath); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(DoubleWritable.class); job.setOutputFormatClass(AvroParquetOutputFormat.class); FileOutputFormat.setOutputPath(job, outputPath); AvroParquetOutputFormat.setSchema(job, StockAvg.SCHEMA$); return job.waitForCompletion(true) ? 0 : 1; }
public static class Map
extends Mapper<Void, Stock, Text, DoubleWritable> { @Override
public void map(Void key, Stock value, Context context) { context.write(new Text(value.getSymbol().toString()),
new DoubleWritable(value.getOpen())); }
}
public static class Reduce
extends Reducer<Text, DoubleWritable, Void, StockAvg> { @Override
protected void reduce(Text key, Iterable<DoubleWritable> values, Context context) {
46GitHub source: https://github.com/alexholmes/hiped2/blob/master/src/main/java/hip/ch3/parquet/ AvroParquetMapReduce.java.
Set the Avro Parquet input format.
Set the Avro Parquet output format.
Specify the Avro schema for the job outputs.
Parquet doesn’t supply an input key, just the value as an Avro object.
Similarly, the output format ignores the key and only uses the Avro value.
Mean mean = new Mean();
for (DoubleWritable val : values) { mean.increment(val.get()); }
StockAvg avg = new StockAvg(); avg.setSymbol(key.toString()); avg.setAvg(mean.getResult()); context.write(null, avg); }
}
Working with Avro in Parquet is very simple, and arguably easier than working with Avro-serialized data.47 You can run the example:
$ hip hip.ch3.parquet.AvroParquetMapReduce \ --input stocks.parquet \
--output output
Parquet comes with some tools to help you work with Parquet files, and one of them allows you to dump the contents to standard output:
$ hdfs -ls output output/_SUCCESS output/_metadata
output/part-r-00000.parquet
$ hip --nolib parquet.tools.Main cat output/part-r-00000.parquet symbol = AAPL avg = 68.631 symbol = CSCO avg = 31.148000000000003 symbol = GOOG avg = 417.47799999999995 symbol = MSFT avg = 44.63100000000001 symbol = YHOO avg = 69.333
You may have noticed that there’s an additional file in the output directory called _metadata. When the Parquet OutputComitter runs upon job completion, it reads the footer of all the output files (which contains the file metadata) and generates this summarized metadata file. This file is used by subsequent MapReduce (or Pig/Hive) jobs to reduce job startup times.48
47The input and output formats supplied with Avro to support Avro’s storage format wrap the Avro objects,
requiring a level of indirection.
48Calculating the input splits can take a long time when there are a large number of input files that need to
123
TECHNIQUE 22 Parquet and MapReduce
■Summary
In this technique, you saw how to use code-generated Avro object files with Parquet. If you don’t want to work with Avro object files, you have a few options that allow you to work with Avro data generically using Avro’s GenericData class:
■ If you wrote the Avro data using GenericData objects, then that’s the format in which Avro will supply them to your mappers.
■ Excluding the JAR containing your Avro-generated code will also result in GenericData objects being fed to your mappers.
■ You can trick Avro by mutating the input schema so that Avro can’t load the spe-
cific class, forcing it to supply the GenericData instance instead.
The following code shows how you would perform the third option—you’re essentially taking the original schema and duplicating it, but in the process you’re supplying a dif- ferent classname, which Avro won’t be able to load (see "foobar" in the first line):49
Schema schema = Schema.createRecord("foobar",
Stock.SCHEMA$.getDoc(), Stock.SCHEMA$.getNamespace(), false); List<Schema.Field> fields = Lists.newArrayList();
for (Schema.Field field : Stock.SCHEMA$.getFields()) { fields.add(new Schema.Field(field.name(), field.schema(),
field.doc(), field.defaultValue(), field.order())); }
schema.setFields(fields);
AvroParquetInputFormat.setAvroReadSchema(job, schema);
What if you want to work with the Parquet data natively? Parquet comes with an exam- ple object model that allows you to work with any Parquet data, irrespective of the object model that was used to write the data. It uses a Group class to represent records, and provides some basic getters and setters to retrieve fields.
The following code once again shows how to calculate the stock averages. The input is the Avro/Parquet data, and the output is a brand new Parquet schema:50
private final static String writeSchema = "message stockavg {\n" + "required binary symbol;\n" +
"required double avg;\n" + "}";
public void run(Path inputPath, Path outputPath) { Configuration conf = super.getConf();
Job job = new Job(conf);
job.setJarByClass(ExampleParquetMapReduce.class); job.setInputFormatClass(ExampleInputFormat.class); FileInputFormat.setInputPaths(job, inputPath);
49GitHub source https://github.com/alexholmes/hiped2/blob/master/src/main/java/hip/ch3/parquet/ AvroGenericParquetMapReduce.java.
50GitHub source: https://github.com/alexholmes/hiped2/blob/master/src/main/java/hip/ch3/parquet/ ExampleParquetMapReduce.java.
job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(DoubleWritable.class); job.setOutputFormatClass(ExampleOutputFormat.class); FileOutputFormat.setOutputPath(job, outputPath); ExampleOutputFormat.setSchema( job, MessageTypeParser.parseMessageType(writeSchema)); }
public static class Map extends Mapper<Void, Group,
Text, DoubleWritable> { @Override
public void map(Void key, Group value, Context context) { context.write(new Text(value.getString("symbol", 0)),
new DoubleWritable(Double.valueOf(
value.getValueToString(2, 0)))); }
}
public static class Reduce extends Reducer<Text, DoubleWritable, Void, Group> { private SimpleGroupFactory factory;
@Override
protected void setup(Context context) { factory = new SimpleGroupFactory(
GroupWriteSupport.getSchema(
ContextUtil.getConfiguration(context))); }
@Override
protected void reduce(Text key, Iterable<DoubleWritable> values, Context context) {
Mean mean = new Mean();
for (DoubleWritable val : values) { mean.increment(val.get()); }
Group group = factory.newGroup() .append("symbol", key.toString()) .append("avg", mean.getResult()); context.write(null, group);
} }
The example object model is pretty basic and is currently missing some functional- ity—for example, there are no getters for double types, which is why the preceding code accesses the stock value using the getValueToString method. But there’s work afoot to provide better object models, including a POJO adapter.51
51See the GitHub ticket number 325 titled “Pojo Support for Parquet” at https://github.com/Parquet/ parquet-mr/pull/325.
125