• No results found

Hpot-Tech package com.hp.join;

In document Hadoop HP Tutorial (Page 112-131)

// == JobBuilder import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool;

publicclass JobBuilder {

privatefinal Class<?> driverClass; privatefinal Job job;

privatefinalintextraArgCount; privatefinal String extrArgsUsage;

private String[] extraArgs;

public JobBuilder(Class<?> driverClass) throws IOException { this(driverClass, 0, "");

}

public JobBuilder(Class<?> driverClass, int extraArgCount, String extrArgsUsage) throws IOException { this.driverClass = driverClass;

this.extraArgCount = extraArgCount; this.job = new Job();

this.job.setJarByClass(driverClass); this.extrArgsUsage = extrArgsUsage; }

// vv JobBuilder

publicstatic Job parseInputAndOutput(Tool tool, Configuration conf, String[] args) throws IOException {

if (args.length != 2) {

printUsage(tool, "<input> <output>"); returnnull;

Hpot-Tech

}

Job job = new Job(conf);

job.setJarByClass(tool.getClass());

FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); return job;

}

publicstaticvoid printUsage(Tool tool, String extraArgsUsage) { System.err.printf("Usage: %s [genericOptions] %s\n\n",

tool.getClass().getSimpleName(), extraArgsUsage);

GenericOptionsParser.printGenericCommandUsage(System.err); }

// ^^ JobBuilder

public JobBuilder withCommandLineArgs(String... args) throws IOException { Configuration conf = job.getConfiguration();

GenericOptionsParser parser = new GenericOptionsParser(conf, args); String[] otherArgs = parser.getRemainingArgs();

if (otherArgs.length < 2 && otherArgs.length > 3 + extraArgCount) {

System.err.printf("Usage: %s [genericOptions] [-overwrite] <input path> <output path> %s\n\n", driverClass.getSimpleName(), extrArgsUsage);

GenericOptionsParser.printGenericCommandUsage(System.err); System.exit(-1);

}

int index = 0;

boolean overwrite = false;

if (otherArgs[index].equals("-overwrite")) { overwrite = true;

index++; }

Path input = new Path(otherArgs[index++]); Path output = new Path(otherArgs[index++]);

if (index < otherArgs.length) {

extraArgs = new String[otherArgs.length - index];

System.arraycopy(otherArgs, index, extraArgs, 0, otherArgs.length - index); }

Hpot-Tech

output.getFileSystem(conf).delete(output, true); }

FileInputFormat.addInputPath(job, input); FileOutputFormat.setOutputPath(job, output); returnthis;

}

public Job build() { returnjob; }

public String[] getExtraArgs() { returnextraArgs;

} }

Hpot-Tech

package com.hp.join; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter;

publicclass JoinRecordMapper extends MapReduceBase implements Mapper<LongWritable, Text, TextPair, Text> { private NcdcRecordParser parser = new NcdcRecordParser();

publicvoid map(LongWritable key, Text value,

OutputCollector<TextPair, Text> output, Reporter reporter) throws IOException {

parser.parse(value);

output.collect(new TextPair(parser.getStationId(), "1"), value); }

Hpot-Tech

package com.hp.join; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.*; import org.apache.hadoop.mapred.lib.MultipleInputs; import org.apache.hadoop.util.*; @SuppressWarnings("deprecation")

publicclass JoinRecordWithStationName extends Configured implements Tool {

publicstaticclass KeyPartitioner implements Partitioner<TextPair, Text> { @Override

publicvoid configure(JobConf job) {}

@Override

publicint getPartition(TextPair key, Text value, int numPartitions) {

return (key.getFirst().hashCode() & Integer.MAX_VALUE) % numPartitions; }

}

@Override

publicint run(String[] args) throws Exception { if (args.length != 3) {

JobBuilder.printUsage(this, "<ncdc input> <station input> <output>"); return -1;

}

JobConf conf = new JobConf(getConf(), getClass()); conf.setJobName("Join record with station name");

Path ncdcInputPath = new Path(args[0]); Path stationInputPath = new Path(args[1]); Path outputPath = new Path(args[2]);

MultipleInputs.addInputPath(conf, ncdcInputPath, TextInputFormat.class, JoinRecordMapper.class); MultipleInputs.addInputPath(conf, stationInputPath, TextInputFormat.class, JoinStationMapper.class);

Hpot-Tech

FileOutputFormat.setOutputPath(conf, outputPath); conf.setPartitionerClass(KeyPartitioner.class); conf.setOutputValueGroupingComparator(TextPair.FirstComparator.class); conf.setMapOutputKeyClass(TextPair.class); conf.setReducerClass(JoinReducer.class); conf.setOutputKeyClass(Text.class); JobClient.runJob(conf); return 0; }

publicstaticvoid main(String[] args) throws Exception { args = new String[3];

args[0] = "inputncdc"; args[1] = "inputstation";

args[2] = "output"+System.currentTimeMillis();

int exitCode = ToolRunner.run(new JoinRecordWithStationName(), args); System.exit(exitCode);

} }

Hpot-Tech

package com.hp.join; import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.*;

publicclass JoinReducer extends MapReduceBase implements

Reducer<TextPair, Text, Text, Text> {

publicvoid reduce(TextPair key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {

Text stationName = new Text(values.next()); while (values.hasNext()) {

Text record = values.next();

Text outValue = new Text(stationName.toString() + "\t" + record.toString()); output.collect(key.getFirst(), outValue);

} } }

Hpot-Tech

package com.hp.join;

import java.io.IOException;

import org.apache.hadoop.io.*;

import org.apache.hadoop.mapred.*;

publicclass JoinStationMapper extends MapReduceBase implements Mapper<LongWritable, Text, TextPair, Text> {

private NcdcStationMetadataParser parser = new NcdcStationMetadataParser();

publicvoid map(LongWritable key, Text value,

OutputCollector<TextPair, Text> output, Reporter reporter) throws IOException {

if (parser.parse(value)) {

output.collect(new TextPair(parser.getStationId(), "0"), new Text(parser.getStationName()));

} } }

Hpot-Tech

package com.hp.join;

import java.math.*;

import org.apache.hadoop.io.Text;

publicclass MetOfficeRecordParser {

private String year;

private String airTemperatureString; privateintairTemperature;

privatebooleanairTemperatureValid;

publicvoid parse(String record) { if (record.length() < 18) { return; } year = record.substring(3, 7); if (isValidRecord(year)) { airTemperatureString = record.substring(13, 18); if (!airTemperatureString.trim().equals("---")) {

BigDecimal temp = new BigDecimal(airTemperatureString.trim()); temp = temp.multiply(new BigDecimal(BigInteger.TEN));

airTemperature = temp.intValueExact(); airTemperatureValid = true; } } }

privateboolean isValidRecord(String year) { try { Integer.parseInt(year); returntrue; } catch (NumberFormatException e) { returnfalse; } }

publicvoid parse(Text record) { parse(record.toString()); }

Hpot-Tech

public String getYear() { returnyear;

}

publicint getAirTemperature() { returnairTemperature;

}

public String getAirTemperatureString() { returnairTemperatureString;

}

publicboolean isValidTemperature() { returnairTemperatureValid;

}

Hpot-Tech

package com.hp.join;

import java.text.*;

import java.util.Date;

import org.apache.hadoop.io.Text;

publicclass NcdcRecordParser {

privatestaticfinalintMISSING_TEMPERATURE = 9999;

privatestaticfinal DateFormat DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmm");

private String stationId;

private String observationDateString; private String year;

private String airTemperatureString; privateintairTemperature;

privatebooleanairTemperatureMalformed; private String quality;

publicvoid parse(String record) {

stationId = record.substring(4, 10) + "-" + record.substring(10, 15); observationDateString = record.substring(15, 27);

year = record.substring(15, 19); airTemperatureMalformed = false;

// Remove leading plus sign as parseInt doesn't like them

if (record.charAt(87) == '+') {

airTemperatureString = record.substring(88, 92);

airTemperature = Integer.parseInt(airTemperatureString); } elseif (record.charAt(87) == '-') {

airTemperatureString = record.substring(87, 92);

airTemperature = Integer.parseInt(airTemperatureString); } else {

airTemperatureMalformed = true; }

airTemperature = Integer.parseInt(airTemperatureString); quality = record.substring(92, 93);

}

Hpot-Tech

publicvoid parse(Text record) { parse(record.toString()); }

publicboolean isValidTemperature() {

return !airTemperatureMalformed && airTemperature != MISSING_TEMPERATURE

&& quality.matches("[01459]"); }

publicboolean isMalformedTemperature() { returnairTemperatureMalformed;

}

publicboolean isMissingTemperature() {

returnairTemperature == MISSING_TEMPERATURE; }

public String getStationId() { returnstationId;

}

public Date getObservationDate() { try {

System.out.println(observationDateString);

returnDATE_FORMAT.parse(observationDateString); } catch (ParseException e) {

thrownew IllegalArgumentException(e); }

}

public String getYear() { returnyear;

}

publicint getYearInt() { return Integer.parseInt(year); }

publicint getAirTemperature() { returnairTemperature;

Hpot-Tech

}

public String getAirTemperatureString() { returnairTemperatureString;

}

public String getQuality() { returnquality;

}

Hpot-Tech

package com.hp.join;

import java.io.*;

import java.util.*;

import org.apache.hadoop.io.IOUtils;

publicclass NcdcStationMetadata {

private Map<String, String> stationIdToName = new HashMap<String, String>();

publicvoid initialize(File file) throws IOException { BufferedReader in = null;

try {

in = new BufferedReader(new InputStreamReader(new FileInputStream(file))); NcdcStationMetadataParser parser = new NcdcStationMetadataParser(); String line;

while ((line = in.readLine()) != null) { if (parser.parse(line)) {

stationIdToName.put(parser.getStationId(), parser.getStationName()); } } } finally { IOUtils.closeStream(in); } }

public String getStationName(String stationId) { String stationName = stationIdToName.get(stationId); if (stationName == null || stationName.trim().length() == 0) { return stationId; // no match: fall back to ID

}

return stationName; }

public Map<String, String> getStationIdToNameMap() { return Collections.unmodifiableMap(stationIdToName); }

}

Hpot-Tech

package com.hp.join;

import org.apache.hadoop.io.Text;

publicclass NcdcStationMetadataParser {

private String stationId; private String stationName;

publicboolean parse(String record) { if (record.length() < 42) { // header

returnfalse; }

String usaf = record.substring(0, 6); String wban = record.substring(7, 12); stationId = usaf + "-" + wban;

stationName = record.substring(13, 42); try {

Integer.parseInt(usaf); // USAF identifiers are numeric

returntrue; } catch (NumberFormatException e) { returnfalse; } }

publicboolean parse(Text record) { return parse(record.toString()); }

public String getStationId() { returnstationId;

}

public String getStationName() { returnstationName;

} }

Hpot-Tech

package com.hp.join;

// cc TextPair A Writable implementation that stores a pair of Text objects

// cc TextPairComparator A RawComparator for comparing TextPair byte representations

// cc TextPairFirstComparator A custom RawComparator for comparing the first field of TextPair byte representations // vv TextPair

import java.io.*;

import org.apache.hadoop.io.*;

publicclass TextPair implements WritableComparable<TextPair> {

private Text first; private Text second;

public TextPair() {

set(new Text(), new Text()); }

public TextPair(String first, String second) { set(new Text(first), new Text(second)); }

public TextPair(Text first, Text second) { set(first, second);

}

publicvoid set(Text first, Text second) { this.first = first;

this.second = second; }

public Text getFirst() { returnfirst;

}

public Text getSecond() { returnsecond;

}

Hpot-Tech

publicvoid write(DataOutput out) throws IOException { first.write(out);

second.write(out); }

@Override

publicvoid readFields(DataInput in) throws IOException { first.readFields(in);

second.readFields(in); }

@Override

publicint hashCode() {

returnfirst.hashCode() * 163 + second.hashCode(); }

@Override

publicboolean equals(Object o) { if (o instanceof TextPair) { TextPair tp = (TextPair) o;

returnfirst.equals(tp.first) && second.equals(tp.second); }

returnfalse; }

@Override

public String toString() { returnfirst + "\t" + second; }

@Override

publicint compareTo(TextPair tp) { int cmp = first.compareTo(tp.first); if (cmp != 0) {

return cmp; }

returnsecond.compareTo(tp.second); }

// ^^ TextPair

Hpot-Tech

// vv TextPairComparator

publicstaticclass Comparator extends WritableComparator {

privatestaticfinal Text.Comparator TEXT_COMPARATOR = new Text.Comparator();

public Comparator() { super(TextPair.class); }

@Override

publicint compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {

try {

int firstL1 = WritableUtils.decodeVIntSize(b1[s1]) + readVInt(b1, s1); int firstL2 = WritableUtils.decodeVIntSize(b2[s2]) + readVInt(b2, s2); int cmp = TEXT_COMPARATOR.compare(b1, s1, firstL1, b2, s2, firstL2); if (cmp != 0) {

return cmp; }

returnTEXT_COMPARATOR.compare(b1, s1 + firstL1, l1 - firstL1, b2, s2 + firstL2, l2 - firstL2);

} catch (IOException e) {

thrownew IllegalArgumentException(e); }

} }

static {

WritableComparator.define(TextPair.class, new Comparator()); }

// ^^ TextPairComparator

// vv TextPairFirstComparator

publicstaticclass FirstComparator extends WritableComparator {

privatestaticfinal Text.Comparator TEXT_COMPARATOR = new Text.Comparator();

public FirstComparator() { super(TextPair.class);

Hpot-Tech

}

@Override

publicint compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {

try {

int firstL1 = WritableUtils.decodeVIntSize(b1[s1]) + readVInt(b1, s1); int firstL2 = WritableUtils.decodeVIntSize(b2[s2]) + readVInt(b2, s2); returnTEXT_COMPARATOR.compare(b1, s1, firstL1, b2, s2, firstL2); } catch (IOException e) {

thrownew IllegalArgumentException(e); }

}

@Override

publicint compare(WritableComparable a, WritableComparable b) { if (a instanceof TextPair && b instanceof TextPair) {

return ((TextPair) a).first.compareTo(((TextPair) b).first); }

returnsuper.compare(a, b); } } // ^^ TextPairFirstComparator // vv TextPair } // ^^ TextPair

Hpot-Tech

In document Hadoop HP Tutorial (Page 112-131)

Related documents