Building Big Data Pipelines using
OSS
Costin Leau
Staff Engineer VMware@CostinL
3
Costin Leau
Speaker Bio
■ Spring committer since 2006
■ Spring Framework (JPA, @Bean, cache abstraction)
■ Spring OSGi/Dynamic Modules, OSGi Blueprint spec ■ Spring Data (GemFire, Redis, Hadoop)
Data Trends
Enterprise Data Trends
Unstructured data•
No predefined model•
Often doesn’t fit well in RDBMS
Pre-Aggregated Data•
Computed during data collection•
CountersCost Trends
H ardware cost halving
every 18 months
Big Iron: $40k/CPU
The Value of Data
Value from Data Exceeds Hardware & Software costs
Value in connecting data sets
• Grouping e-commerce users by user agent
Big Data
“Big data” refers to datasets whose size is beyond the ability of typical database software tools to capture, store, manage, and analyze
A subjective and moving targetA Holistic View of a Big Data System
ETL Real Time Streams Unstructured Data (HDFS) Real Time Structured Database (hBase, Gemfre, Cassandra) Big SQL (Greenplum, AsterData, Etc…) Batch Processing Real-Time Processing (s4, storm) AnalyticsBig Data probls == Integration probls
Real world big data solutions require workflow across systems
Workflow for big data processing is an integration problem• Share core components of a classic integration workflow
• Big data solutions need to integrate with existing data and apps
• Event-driven vs Batch workflows
No silver bullet Michael Stonebraker: One Size Fits All, An Idea Whose Time Has Come And Pass
Big Data probls == Integration probls
Real world big data solutions require workflow across systems
Workflow for big data processing is an integration problem• Share core components of a classic integration workflow
• Big data solutions need to integrate with existing data and apps
• Event-driven vs Batch workflows
No silver bullet Michael Stonebraker: One Size Fits All, An Idea Whose Time Has Come And Pass
Spring projects can provide the foundation for Big Data workflows
Hadoop as a Big Data Platform
Hadoop Distributed File System (HDFS) Map Reduce Framework (MapRed)
Spring for Hadoop - Goals
Hadoop has a poor out of the box programming model
Applications are generally a collection of scripts calling command line apps
Spring simplifies developing Hadoop applications
By providing a familiar and consistent programming and configuration mode
Across a wide range of use cases
HDFS usage
Data Analysis (MR/Pig/Hive/Cascading)
Workflow
Event Streams
IntegrationRelationship with other Spring projects
Spring Framework
Web, Messaging Applications
Spring Data
Redis, MongoDB, Neo4j, Gemfire
Spring Integration
Event-driven applications, Enterprise Integration Patterns
Spring Batch
On and Off Hadoop workflows
Spring for
Apache Hadoop
Simplify Hadoop programming
Capabilities: Spring + Hadoop
Declarative configuration
Create, configure, and parameterize Hadoop connectivity and all job types
Environment profiles – easily move from dev to qa to prod
Developer productivity
Create well-formed applications, not spaghetti script applications
Simplify HDFS and FsShell API with support for JVM scripting
Runner classes for MR/Pig/Hive/Cascading for small workflowsCounting Words – M/R
public class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1); Text word = new Text(); public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) {
word.set(itr.nextToken());context.write(word, one); }}}
public class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } }
Counting Words – Configuring M/R
Configuration conf = new Configuration();Job job = new Job(conf, "wordcount"); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true);
Running Hadoop Jars (WordCount 1.0)
Vanilla Hadoop
SHDP<hdp:configuration />
<hdp:jar-runner id=“wordcount“ jar="hadoop-examples.jar> <hdp:arg value=“wordcount“ />
<hdp:arg value=“/wc/input“ /> <hdp:arg value=“/wc/output“/> </hdp:jar-runner>
Running Hadoop Tools (WordCount 2.0)
Vanilla Hadoop
SHDPbin/hadoop jar –conf myhadoop-site.xml –D ignoreCase=true
wordcount.jar org.myorg.WordCount /wc/input /wc/output
<hdp:configuration resources=“myhadoop-site.xml“/> <hdp:tool-runner id="wc“ jar=“wordcount.jar”>
<hdp:arg value=“/wc/input“ /> <hdp:arg value=“/wc/output“/> ignoreCase=true
Configuring Hadoop
<context:property-placeholder location="hadoop-dev.properties"/> <hdp:configuration> fs.default.name=${hd.fs} </hdp:configuration> <hdp:job id="word-count-job" input-path=“${input.path}" output-path="${output.path}“ jar=“myjob.jar” mapper="org.apache.hadoop.examples.WordCount.TokenizerMapper“ reducer="org.apache.hadoop.examples.WordCount.IntSumReducer"/> <hdp:job-runner id=“runner” job-ref="word-count-job“
run-at-startup=“true“ /> input.path=/wc/input/ output.path=/wc/word/ hd.fs=hdfs://localhost:9000 applicationContext.xml hadoop-dev.properties
Running a Streaming Job
<context:property-placeholder location="hadoop-${env}.properties"/> <hdp:streaming id=“wc“ input-path=“${input}” output-path=“${output}” mapper=“${cat}” reducer=“${wc}”
files=“classpath:stopwords.txt”> </hdp:streaming>
bin/hadoop jar hadoop-streaming.jar \
–input /wc/input –output /wc/output \ -mapper /bin/cat –reducer /bin/wc \
-files stopwords.txt
env=dev java –jar SpringLauncher.jar applicationContext.xml
hadoop-dev.properties
input.path=/wc/input/
output.path=/wc/word/
Running a Streaming Job
<context:property-placeholder location="hadoop-${env}.properties"/> <hdp:streaming id=“wc“ input-path=“${input}” output-path=“${output}” mapper=“${cat}” reducer=“${wc}”
files=“classpath:stopwords.txt”> </hdp:streaming>
bin/hadoop jar hadoop-streaming.jar \
–input /wc/input –output /wc/output \ -mapper /bin/cat –reducer /bin/wc \
-files stopwords.txt
env=qa java –jar SpringLauncher.jar applicationContext.xml
hadoop-dev.properties input.path=/wc/input/ output.path=/wc/word/ hd.fs=hdfs://localhost:9000 hadoop-qa.properties input.path=/gutenberg/input/ output.path=/gutenberg/word/ hd.fs=hdfs://darwin:9000
Word Count – Injecting Jobs
Use Dependency Injection to obtain reference to Hadoop Job
Perform additional runtime configuration and submit public class WordService {@Inject
private Job mapReduceJob;
public void processWords() { mapReduceJob.submit();
} }
HDFS and Hadoop Shell as APIs
Has all “bin/hadoop fs” commands through FsShell
mkdir, chmod, test class MyScript {@Autowired FsShell fsh;
@PostConstruct void init() {
String outputDir = "/data/output"; if (fsShell.test(outputDir)) {
fsShell.rmr(outputDir); }
HDFS and FsShell as APIs
Excellent for JVM scripting// use the shell (made available under variable fsh) if (!fsh.test(inputDir)) { fsh.mkdir(inputDir); fsh.copyFromLocal(sourceFile, inputDir); fsh.chmod(700, inputDir) } if (fsh.test(outputDir)) { fsh.rmr(outputDir) } init-files.groovy
HDFS and FsShell as APIs
<hdp:script id=“init-script“ language=“groovy“>
<hdp:property name=“inputDir“ value=“${input}“/>
<hdp:property name=“outputDir“ value=“${output}“/> <hdp:property name=“sourceFile“ value=“${source}“/> // use the shell (made available under variable fsh) if (!fsh.test(inputDir)) { fsh.mkdir(inputDir); fsh.copyFromLocal(sourceFile, inputDir); fsh.chmod(700, inputDir) } if (fsh.test(outputDir)) { fsh.rmr(outputDir) } </hdp:script> appCtx.xml
Counting Words - Pig
input_lines = LOAD '/tmp/books' AS (line:chararray);
-- Extract words from each line and put them into a pig bag
words = FOREACH input_lines GENERATE FLATTEN(TOKENIZE(line)) AS word; -- filter out any words that are just white spaces filtered_
words = FILTER words BY word MATCHES '\\w+';
-- create a group for each word
word_groups = GROUP filtered_words BY word;
-- count the entries in each group
word_count = FOREACH word_groups GENERATE COUNT(filtered_words)
AS count, group AS word;
ordered_word_count = ORDER word_count BY count DESC; STORE ordered_word_count INTO '/tmp/number-of-words';
Pig
Vanilla Pig
<pig-factory job-name=“wc” properties-location=“pig.properties">
pig.exec.nocombiner=true
<script location=“wordcount.pig">
<arguments>ignoreCase=TRUE</arguments> </script>
</pig-factory>
pig –x mapreduce wordcount.pig
SHDP
Creates a PigServer
Executes script on startup (optional)
PigRunner – A small pig workflow
@Scheduled(cron= “0 0 12 * * ?”) public void process() {
pigRunner.call(); }
PigTemplate – Programmatic Use
public class PigPasswordRepository implements PasswordRepository {
private PigTemplate pigTemplate;
private String pigScript = "classpath:password-analysis.pig";
public void processPasswordFile(String inputFile) {
String outputDir = baseOutputDir + File.separator +
counter.incrementAndGet();
Properties scriptParameters = new Properties();
scriptParameters.put("inputDir", inputFile);
scriptParameters.put("outputDir", outputDir);
pigTemplate.executeScript(pigScript, scriptParameters); }
//... }
Counting Words – Hive
-- import the file as lines
CREATE EXTERNAL TABLE lines(line string)
LOAD DATA INPATH ‘books’ OVERWRITE INTO TABLE lines;
-- create a virtual view that splits the lines
SELECT word, count(*) FROM lines
LATERAL VIEW explode(split(text, ‘ ‘ )) lTable as word GROUP BY word;
Vanilla Hive
Command-lineHive w/ SHDP
<bean id="hive-driver" class="org.apache.hadoop.hive.jdbc.HiveDriver"/>
<bean id="hive-ds"
class="org.springframework.jdbc.datasource.SimpleDriverDataSource" c:driver-ref="hive-driver" c:url="${hive.url}"/>
<bean id="template" class="org.springframework.jdbc.core.JdbcTemplate" c:data-source-ref="hive-ds"/>
Hive w/ SHDP
<bean id="hive-driver" class="org.apache.hadoop.hive.jdbc.HiveDriver"/>
<bean id="hive-ds"
class="org.springframework.jdbc.datasource.SimpleDriverDataSource"
c:driver-ref="hive-driver" c:url="${hive.url}"/>
<bean id="template" class="org.springframework.jdbc.core.JdbcTemplate"
c:data-source-ref="hive-ds"/>
Create Hive JDBC Client and use with Spring JdbcTemplate
Reuse Spring’s Rich ResultSet to POJO Mapping Features
public long count() {
return jdbcTemplate.queryForLong("select count(*) from " + tableName);
}
List<Password> result = jdbcTemplate.query(“select * from passwords", new ResultSetExtractor<List<Password>() {
public String extractData(ResultSet rs) throws SQLException {
// extract data from result set
Vanilla Hive - Thrift
HiveClient is not thread-safe, throws checked exceptionspublic long count() {
HiveClient hiveClient = createHiveClient();
try {
hiveClient.execute("select count(*) from " + tableName);
return Long.parseLong(hiveClient.fetchOne());
// checked exceptions
} catch (HiveServerException ex) { throw translateExcpetion(ex);
} catch (org.apache.thrift.TException tex) { throw translateExcpetion(tex); } finally {
try { hiveClient.shutdown(); } catch (org.apache.thrift.TException tex) {
logger.debug("Unexpected exception on shutting down HiveClient", tex);
}}}
protected HiveClient createHiveClient() {
TSocket transport = new TSocket(host, port, timeout);
HiveClient hive = new HiveClient(new TBinaryProtocol(transport));
try { transport.open();
} catch (TTransportException e) { throw translateExcpetion(e); }
return hive; }
SHDP – Hive
<hive-client-factory host="${hive.host}" port="${hive.port}"/> <hive-template id="hiveTemplate"/>
Easy client confguration
Can create an embedded Hive server instance
Declarative Usage
<hive-server auto-startup="true" port="${hive.port}"/>
<hive-runner run-at-startup="true”> <hdp:script>
DROP TABLE IF EXISTS ${wc.table}; </hdp:script>
<hdp:script location=“word-count.q”/> </hive-runner>
SHDP - HiveTemplate (Thrift)
One-liners to execute queries
One-lines for executing scripts
@Repository
public class HiveTemplatePasswordRepository implements PasswordRepository {
private @Value("${hive.table}") String tableName;
private @Autowired HiveOperations hiveTemplate;
@Override
public Long count() {
return hiveTemplate.queryForLong("select count(*) from " + tableName);
} }
Properties scriptParameters = new Properties(); scriptParameters.put("inputDir", inputFile);
scriptParameters.put("outputDir", outputDir);
Cascading – Counting Words
Scheme sourceScheme = new TextLine(new Fields("line")); Tap source = new Hfs(sourceScheme, inputPath);
Scheme sinkScheme = new TextLine(new Fields("word", "count")); Tap sink = new Hfs(sinkScheme, outputPath, SinkMode.REPLACE);
Pipe assembly = new Pipe("wordcount");
String regex = "(?<!\\pL)(?=\\pL)[^ ]*(?<=\\pL)(?!\\pL)";
Function function = new RegexGenerator(new Fields("word"), regex); assembly = new Each(assembly, new Fields("line”), function );
assembly = new GroupBy(assembly, new Fields("word”) ); Aggregator count = new Count(new Fields("count”));
Cascading
Based on Spring’s type safe @Configuration<bean class=“wordcount.cascading.CascadingConfig "/>
<bean id="cascade"
class="org.springframework.data.hadoop.cascading.HadoopFlowFactoryBean"
p:configuration-ref="hadoopConfiguration" p:tails-ref=“countPipe" />
HBase
<hdp:configuration/>
<hdp:hbase-configuration delete-connection="true”/>
<bean id="hbaseTemplate”
class="org.springframework.data.hadoop.hbase.HbaseTemplate“
p:configuration-ref="hbaseConfiguration” />
Bootstrap HBase confguration from Hadoop Confguration
Template usage
public List<User> findAll() {
return hbaseTemplate.find(tableName, "cfInfo", new RowMapper<User>() { @Override
public User mapRow(Result result, int rowNum) throws Exception {
return new User(Bytes.toString(result.getValue(CF_INFO, qUser)),
Bytes.toString(result.getValue(CF_INFO, qEmail)),
Bytes.toString(result.getValue(CF_INFO, qPassword)));
} });}
On Hadoop Workflows
HDFS PIG
MR Hive HDFS
Reuse same infrastructure for Hadoop based workflows
Step can any Hadoop job type or HDFS operationCapabilities: Spring + Hadoop + Batch
Spring Batch for File/DB/NoSQL driven applications
Collect: Process local files
Transform: Scripting or Java code to transform and enrich
RT Analysis: N/A
Ingest: (batch/aggregate) write to HDFS or split/filtering
Batch Analysis: Orchestrate Hadoop steps in a workflow
Distribute: Copy data out of HDFS to structured storage
JMX enabled along with REST interface for job controlSpring Batch Configuration
<job id="job1">
<step id="import" next="wordcount">
<tasklet ref=“import-tasklet"/>
</step>
<step id=“wc" next="pig">
<tasklet ref="wordcount-tasklet"/>
</step>
<step id="pig">
<tasklet ref="pig-tasklet“></step> <split id="parallel" next="hdfs">
<flow><step id="mrStep">
<tasklet ref="mr-tasklet"/> </step></flow>
<flow><step id="hive">
<tasklet ref="hive-tasklet"/> </step></flow>
</split>
<step id="hdfs">
<tasklet ref="hdfs-tasklet"/></step> </job>