// == JobBuilder import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool;
publicclass JobBuilder {
privatefinal Class<?> driverClass; privatefinal Job job;
privatefinalintextraArgCount; privatefinal String extrArgsUsage;
private String[] extraArgs;
public JobBuilder(Class<?> driverClass) throws IOException { this(driverClass, 0, "");
}
public JobBuilder(Class<?> driverClass, int extraArgCount, String extrArgsUsage) throws IOException { this.driverClass = driverClass;
this.extraArgCount = extraArgCount; this.job = new Job();
this.job.setJarByClass(driverClass); this.extrArgsUsage = extrArgsUsage; }
// vv JobBuilder
publicstatic Job parseInputAndOutput(Tool tool, Configuration conf, String[] args) throws IOException {
if (args.length != 2) {
printUsage(tool, "<input> <output>"); returnnull;
Hpot-Tech
}
Job job = new Job(conf);
job.setJarByClass(tool.getClass());
FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); return job;
}
publicstaticvoid printUsage(Tool tool, String extraArgsUsage) { System.err.printf("Usage: %s [genericOptions] %s\n\n",
tool.getClass().getSimpleName(), extraArgsUsage);
GenericOptionsParser.printGenericCommandUsage(System.err); }
// ^^ JobBuilder
public JobBuilder withCommandLineArgs(String... args) throws IOException { Configuration conf = job.getConfiguration();
GenericOptionsParser parser = new GenericOptionsParser(conf, args); String[] otherArgs = parser.getRemainingArgs();
if (otherArgs.length < 2 && otherArgs.length > 3 + extraArgCount) {
System.err.printf("Usage: %s [genericOptions] [-overwrite] <input path> <output path> %s\n\n", driverClass.getSimpleName(), extrArgsUsage);
GenericOptionsParser.printGenericCommandUsage(System.err); System.exit(-1);
}
int index = 0;
boolean overwrite = false;
if (otherArgs[index].equals("-overwrite")) { overwrite = true;
index++; }
Path input = new Path(otherArgs[index++]); Path output = new Path(otherArgs[index++]);
if (index < otherArgs.length) {
extraArgs = new String[otherArgs.length - index];
System.arraycopy(otherArgs, index, extraArgs, 0, otherArgs.length - index); }
Hpot-Tech
output.getFileSystem(conf).delete(output, true); }
FileInputFormat.addInputPath(job, input); FileOutputFormat.setOutputPath(job, output); returnthis;
}
public Job build() { returnjob; }
public String[] getExtraArgs() { returnextraArgs;
} }
Hpot-Tech
package com.hp.join; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter;publicclass JoinRecordMapper extends MapReduceBase implements Mapper<LongWritable, Text, TextPair, Text> { private NcdcRecordParser parser = new NcdcRecordParser();
publicvoid map(LongWritable key, Text value,
OutputCollector<TextPair, Text> output, Reporter reporter) throws IOException {
parser.parse(value);
output.collect(new TextPair(parser.getStationId(), "1"), value); }
Hpot-Tech
package com.hp.join; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.*; import org.apache.hadoop.mapred.lib.MultipleInputs; import org.apache.hadoop.util.*; @SuppressWarnings("deprecation")publicclass JoinRecordWithStationName extends Configured implements Tool {
publicstaticclass KeyPartitioner implements Partitioner<TextPair, Text> { @Override
publicvoid configure(JobConf job) {}
@Override
publicint getPartition(TextPair key, Text value, int numPartitions) {
return (key.getFirst().hashCode() & Integer.MAX_VALUE) % numPartitions; }
}
@Override
publicint run(String[] args) throws Exception { if (args.length != 3) {
JobBuilder.printUsage(this, "<ncdc input> <station input> <output>"); return -1;
}
JobConf conf = new JobConf(getConf(), getClass()); conf.setJobName("Join record with station name");
Path ncdcInputPath = new Path(args[0]); Path stationInputPath = new Path(args[1]); Path outputPath = new Path(args[2]);
MultipleInputs.addInputPath(conf, ncdcInputPath, TextInputFormat.class, JoinRecordMapper.class); MultipleInputs.addInputPath(conf, stationInputPath, TextInputFormat.class, JoinStationMapper.class);
Hpot-Tech
FileOutputFormat.setOutputPath(conf, outputPath); conf.setPartitionerClass(KeyPartitioner.class); conf.setOutputValueGroupingComparator(TextPair.FirstComparator.class); conf.setMapOutputKeyClass(TextPair.class); conf.setReducerClass(JoinReducer.class); conf.setOutputKeyClass(Text.class); JobClient.runJob(conf); return 0; }publicstaticvoid main(String[] args) throws Exception { args = new String[3];
args[0] = "inputncdc"; args[1] = "inputstation";
args[2] = "output"+System.currentTimeMillis();
int exitCode = ToolRunner.run(new JoinRecordWithStationName(), args); System.exit(exitCode);
} }
Hpot-Tech
package com.hp.join; import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.*;publicclass JoinReducer extends MapReduceBase implements
Reducer<TextPair, Text, Text, Text> {
publicvoid reduce(TextPair key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
Text stationName = new Text(values.next()); while (values.hasNext()) {
Text record = values.next();
Text outValue = new Text(stationName.toString() + "\t" + record.toString()); output.collect(key.getFirst(), outValue);
} } }
Hpot-Tech
package com.hp.join;import java.io.IOException;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
publicclass JoinStationMapper extends MapReduceBase implements Mapper<LongWritable, Text, TextPair, Text> {
private NcdcStationMetadataParser parser = new NcdcStationMetadataParser();
publicvoid map(LongWritable key, Text value,
OutputCollector<TextPair, Text> output, Reporter reporter) throws IOException {
if (parser.parse(value)) {
output.collect(new TextPair(parser.getStationId(), "0"), new Text(parser.getStationName()));
} } }
Hpot-Tech
package com.hp.join;import java.math.*;
import org.apache.hadoop.io.Text;
publicclass MetOfficeRecordParser {
private String year;
private String airTemperatureString; privateintairTemperature;
privatebooleanairTemperatureValid;
publicvoid parse(String record) { if (record.length() < 18) { return; } year = record.substring(3, 7); if (isValidRecord(year)) { airTemperatureString = record.substring(13, 18); if (!airTemperatureString.trim().equals("---")) {
BigDecimal temp = new BigDecimal(airTemperatureString.trim()); temp = temp.multiply(new BigDecimal(BigInteger.TEN));
airTemperature = temp.intValueExact(); airTemperatureValid = true; } } }
privateboolean isValidRecord(String year) { try { Integer.parseInt(year); returntrue; } catch (NumberFormatException e) { returnfalse; } }
publicvoid parse(Text record) { parse(record.toString()); }
Hpot-Tech
public String getYear() { returnyear;
}
publicint getAirTemperature() { returnairTemperature;
}
public String getAirTemperatureString() { returnairTemperatureString;
}
publicboolean isValidTemperature() { returnairTemperatureValid;
}
Hpot-Tech
package com.hp.join;import java.text.*;
import java.util.Date;
import org.apache.hadoop.io.Text;
publicclass NcdcRecordParser {
privatestaticfinalintMISSING_TEMPERATURE = 9999;
privatestaticfinal DateFormat DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmm");
private String stationId;
private String observationDateString; private String year;
private String airTemperatureString; privateintairTemperature;
privatebooleanairTemperatureMalformed; private String quality;
publicvoid parse(String record) {
stationId = record.substring(4, 10) + "-" + record.substring(10, 15); observationDateString = record.substring(15, 27);
year = record.substring(15, 19); airTemperatureMalformed = false;
// Remove leading plus sign as parseInt doesn't like them
if (record.charAt(87) == '+') {
airTemperatureString = record.substring(88, 92);
airTemperature = Integer.parseInt(airTemperatureString); } elseif (record.charAt(87) == '-') {
airTemperatureString = record.substring(87, 92);
airTemperature = Integer.parseInt(airTemperatureString); } else {
airTemperatureMalformed = true; }
airTemperature = Integer.parseInt(airTemperatureString); quality = record.substring(92, 93);
}
Hpot-Tech
publicvoid parse(Text record) { parse(record.toString()); }
publicboolean isValidTemperature() {
return !airTemperatureMalformed && airTemperature != MISSING_TEMPERATURE
&& quality.matches("[01459]"); }
publicboolean isMalformedTemperature() { returnairTemperatureMalformed;
}
publicboolean isMissingTemperature() {
returnairTemperature == MISSING_TEMPERATURE; }
public String getStationId() { returnstationId;
}
public Date getObservationDate() { try {
System.out.println(observationDateString);
returnDATE_FORMAT.parse(observationDateString); } catch (ParseException e) {
thrownew IllegalArgumentException(e); }
}
public String getYear() { returnyear;
}
publicint getYearInt() { return Integer.parseInt(year); }
publicint getAirTemperature() { returnairTemperature;
Hpot-Tech
}
public String getAirTemperatureString() { returnairTemperatureString;
}
public String getQuality() { returnquality;
}
Hpot-Tech
package com.hp.join;import java.io.*;
import java.util.*;
import org.apache.hadoop.io.IOUtils;
publicclass NcdcStationMetadata {
private Map<String, String> stationIdToName = new HashMap<String, String>();
publicvoid initialize(File file) throws IOException { BufferedReader in = null;
try {
in = new BufferedReader(new InputStreamReader(new FileInputStream(file))); NcdcStationMetadataParser parser = new NcdcStationMetadataParser(); String line;
while ((line = in.readLine()) != null) { if (parser.parse(line)) {
stationIdToName.put(parser.getStationId(), parser.getStationName()); } } } finally { IOUtils.closeStream(in); } }
public String getStationName(String stationId) { String stationName = stationIdToName.get(stationId); if (stationName == null || stationName.trim().length() == 0) { return stationId; // no match: fall back to ID
}
return stationName; }
public Map<String, String> getStationIdToNameMap() { return Collections.unmodifiableMap(stationIdToName); }
}
Hpot-Tech
package com.hp.join;import org.apache.hadoop.io.Text;
publicclass NcdcStationMetadataParser {
private String stationId; private String stationName;
publicboolean parse(String record) { if (record.length() < 42) { // header
returnfalse; }
String usaf = record.substring(0, 6); String wban = record.substring(7, 12); stationId = usaf + "-" + wban;
stationName = record.substring(13, 42); try {
Integer.parseInt(usaf); // USAF identifiers are numeric
returntrue; } catch (NumberFormatException e) { returnfalse; } }
publicboolean parse(Text record) { return parse(record.toString()); }
public String getStationId() { returnstationId;
}
public String getStationName() { returnstationName;
} }
Hpot-Tech
package com.hp.join;// cc TextPair A Writable implementation that stores a pair of Text objects
// cc TextPairComparator A RawComparator for comparing TextPair byte representations
// cc TextPairFirstComparator A custom RawComparator for comparing the first field of TextPair byte representations // vv TextPair
import java.io.*;
import org.apache.hadoop.io.*;
publicclass TextPair implements WritableComparable<TextPair> {
private Text first; private Text second;
public TextPair() {
set(new Text(), new Text()); }
public TextPair(String first, String second) { set(new Text(first), new Text(second)); }
public TextPair(Text first, Text second) { set(first, second);
}
publicvoid set(Text first, Text second) { this.first = first;
this.second = second; }
public Text getFirst() { returnfirst;
}
public Text getSecond() { returnsecond;
}
Hpot-Tech
publicvoid write(DataOutput out) throws IOException { first.write(out);
second.write(out); }
@Override
publicvoid readFields(DataInput in) throws IOException { first.readFields(in);
second.readFields(in); }
@Override
publicint hashCode() {
returnfirst.hashCode() * 163 + second.hashCode(); }
@Override
publicboolean equals(Object o) { if (o instanceof TextPair) { TextPair tp = (TextPair) o;
returnfirst.equals(tp.first) && second.equals(tp.second); }
returnfalse; }
@Override
public String toString() { returnfirst + "\t" + second; }
@Override
publicint compareTo(TextPair tp) { int cmp = first.compareTo(tp.first); if (cmp != 0) {
return cmp; }
returnsecond.compareTo(tp.second); }
// ^^ TextPair
Hpot-Tech
// vv TextPairComparator
publicstaticclass Comparator extends WritableComparator {
privatestaticfinal Text.Comparator TEXT_COMPARATOR = new Text.Comparator();
public Comparator() { super(TextPair.class); }
@Override
publicint compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
try {
int firstL1 = WritableUtils.decodeVIntSize(b1[s1]) + readVInt(b1, s1); int firstL2 = WritableUtils.decodeVIntSize(b2[s2]) + readVInt(b2, s2); int cmp = TEXT_COMPARATOR.compare(b1, s1, firstL1, b2, s2, firstL2); if (cmp != 0) {
return cmp; }
returnTEXT_COMPARATOR.compare(b1, s1 + firstL1, l1 - firstL1, b2, s2 + firstL2, l2 - firstL2);
} catch (IOException e) {
thrownew IllegalArgumentException(e); }
} }
static {
WritableComparator.define(TextPair.class, new Comparator()); }
// ^^ TextPairComparator
// vv TextPairFirstComparator
publicstaticclass FirstComparator extends WritableComparator {
privatestaticfinal Text.Comparator TEXT_COMPARATOR = new Text.Comparator();
public FirstComparator() { super(TextPair.class);
Hpot-Tech
}
@Override
publicint compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
try {
int firstL1 = WritableUtils.decodeVIntSize(b1[s1]) + readVInt(b1, s1); int firstL2 = WritableUtils.decodeVIntSize(b2[s2]) + readVInt(b2, s2); returnTEXT_COMPARATOR.compare(b1, s1, firstL1, b2, s2, firstL2); } catch (IOException e) {
thrownew IllegalArgumentException(e); }
}
@Override
publicint compare(WritableComparable a, WritableComparable b) { if (a instanceof TextPair && b instanceof TextPair) {
return ((TextPair) a).first.compareTo(((TextPair) b).first); }
returnsuper.compare(a, b); } } // ^^ TextPairFirstComparator // vv TextPair } // ^^ TextPair