• No results found

Time for action – generating shape summaries in MapReduce In this section we will write a mapper that takes as input the UFO sighting record we defined

In document Hadoop Beginner's Guide(2013) pdf (Page 179-184)

earlier. It will output the shape and a count of 1, and the reducer will take this shape and count records and produce a new structured Avro datafile type containing the final counts for each UFO shape. Perform the following steps:

1.

Copy the sightings.avro file to HDFS. $ hadoopfs -mkdiravroin

$ hadoopfs -put sightings.avroavroin/sightings.avro

2.

Create the following as AvroMR.java:

import java.io.IOException; import org.apache.avro.Schema; import org.apache.avro.generic.*; import org.apache.avro.Schema.Type; import org.apache.avro.mapred.*; import org.apache.avro.reflect.ReflectData; import org.apache.avro.util.Utf8; import org.apache.hadoop.conf.*; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.*; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.io.* ; import org.apache.hadoop.util.*;

class UFORecord {

UFORecord() { }

public String shape ; public long count ; }

public class AvroMR extends Configured implements Tool {

// Create schema for map output

public static final Schema PAIR_SCHEMA = Pair.getPairSchema(Schema.create(Schema.Type.STRING), Schema.create(Schema.Type.LONG));

// Create schema for reduce output

public final static Schema OUTPUT_SCHEMA = ReflectData.get().getSchema(UFORecord.class);

@Override

public int run(String[] args) throws Exception {

JobConfconf = new JobConf(getConf(), getClass()); conf.setJobName("UFO count");

String[] otherArgs = new GenericOptionsParser(conf, args). getRemainingArgs();

if (otherArgs.length != 2) {

System.err.println("Usage: avro UFO counter <in><out>"); System.exit(2);

}

FileInputFormat.addInputPath(conf, new Path(otherArgs[0])); Path outputPath = new Path(otherArgs[1]);

FileOutputFormat.setOutputPath(conf, outputPath); outputPath.getFileSystem(conf).delete(outputPath); Schema input_schema = Schema.parse(getClass().getResourceAsStream("ufo.avsc")); AvroJob.setInputSchema(conf, input_schema); AvroJob.setMapOutputSchema(conf, Pair.getPairSchema(Schema.create(Schema.Type.STRING),

Schema.create(Schema.Type.LONG))); AvroJob.setOutputSchema(conf, OUTPUT_SCHEMA); AvroJob.setMapperClass(conf, AvroRecordMapper.class); AvroJob.setReducerClass(conf, AvroRecordReducer.class); conf.setInputFormat(AvroInputFormat.class) ; JobClient.runJob(conf); return 0 ; }

public static class AvroRecordMapper extends AvroMapper<GenericRecord, Pair<Utf8, Long>> {

@Override

public void map(GenericRecord in, AvroCollector<Pair<Utf8, Long>> collector, Reporter reporter) throws IOException

{

Pair<Utf8,Long> p = new Pair<Utf8,Long>(PAIR_SCHEMA) ; Utf8 shape = (Utf8)in.get("shape") ;

if (shape != null) { p.set(shape, 1L) ; collector.collect(p); } } }

public static class AvroRecordReducer extends AvroReducer<Utf8,

Long, GenericRecord> {

public void reduce(Utf8 key, Iterable<Long> values, AvroCollector<GenericRecord> collector,

Reporter reporter) throws IOException {

long sum = 0;

for (Long val : values) {

sum += val; }

value.put("shape", key); value.put("count", sum);

collector.collect(value); }

}

public static void main(String[] args) throws Exception {

int res = ToolRunner.run(new Configuration(), new AvroMR(), args);

System.exit(res); }

}

3.

Compile and run the job: $ javacAvroMR.java

$ jar -cvfavroufo.jar *.class ufo.avsc

$ hadoop jar ~/classes/avroufo.jarAvroMRavroinavroout

4.

Examine the output directory: $ hadoopfs -lsavroout Found 3 items -rw-r--r-- 1 … /user/hadoop/avroout/_SUCCESS drwxr-xr-x - hadoopsupergroup 0 … /user/hadoop/ avroout/_logs -rw-r--r-- 1 … /user/hadoop/avroout/part-00000.avro

5.

Copy the output file to the local filesystem:

$ hadoopfs -get /user/hadoop/avroout/part-00000.avroresult.avro

What just happened?

We created the Job class and examined its various components. The actual logic within the Mapper and Reducer classes is relatively straightforward: the Mapper class just extracts

the shape column and emits it with a count of 1; the reducer then counts the total number of entries for each shape. The interesting aspects are around the defined input and output types to the Mapper and Reducer classes and how the job is configured.

The Mapper class has an input type of GenericRecord and an output type of Pair. The Reducer class has a corresponding input type of Pair and output type of GenericRecord.

The GenericRecord class passed to the Mapper class wraps a datum that is the UFO sighting record represented in the input file. This is how the Mapper class is able to retrieve

the Shape field by name.

Recall that GenericRecords may or may not be explicitly created with a schema, and in either case the structure can be determined from the datafile. For the GenericRecord

output by the Reducer class, we do pass a schema but use a new mechanism for its creation. Within the previously mentioned code, we created the additional UFORecord class and used Avro reflection to generate its schema dynamically at runtime. We were then able to use this schema to create a GenericRecord class specialized to wrap that particular record type.

Between the Mapper and Reducer classes we use the Avro Pair type to hold a key and

value pair. This allows us to express the same logic for the Mapper and Reducer classes

that we used in the original WordCount example back in Chapter 2, Getting Hadoop Up and Running; the Mapper class emits singleton counts for each value and the reducer sums these into an overall total for each shape.

In addition to the Mapper and Reducer classes' input and output, there is some configuration unique to a job processing Avro data:

Schema input_schema = Schema.parse(getClass(). getResourceAsStream("ufo.avsc")) ; AvroJob.setInputSchema(conf, input_schema); AvroJob.setMapOutputSchema(conf, Pair.getPairSchema(Schema. create(Schema.Type.STRING), Schema.create(Schema.Type.LONG))); AvroJob.setOutputSchema(conf, OUTPUT_SCHEMA); AvroJob.setMapperClass(conf, AvroRecordMapper.class); AvroJob.setReducerClass(conf, AvroRecordReducer.class);

These configuration elements demonstrate the criticality of schema definition to Avro; though we can do without it, we must set the expected input and output schema types. Avro will validate the input and output against the specified schemas, so there is a degree of data type safety. For the other elements, such as setting up the Mapper and Reducer classes,

we simply set those on AvroJob instead of the more generic classes, and once done, the MapReduce framework will perform appropriately.

This example is also the first time we've explicitly implemented the Tool interface. When

running the Hadoop command-line program, there are a series of arguments (such as -D) that are common across all the multiple subcommands. If a job class implements the Tool interface as mentioned in the previous section, it automatically gets access to any of these standard options passed on the command line. It's a useful mechanism that prevents lots of code duplication.

Time for action – examining the output data with Ruby

In document Hadoop Beginner's Guide(2013) pdf (Page 179-184)