• No results found

Building Big Data Pipelines using OSS. Costin Leau Staff Engineer

N/A
N/A
Protected

Academic year: 2021

Share "Building Big Data Pipelines using OSS. Costin Leau Staff Engineer"

Copied!
60
0
0

Loading.... (view fulltext now)

Full text

(1)
(2)

Building Big Data Pipelines using

OSS

Costin Leau

Staff Engineer VMware

@CostinL

(3)

3

Costin Leau

Speaker Bio

■ Spring committer since 2006

■ Spring Framework (JPA, @Bean, cache abstraction)

■ Spring OSGi/Dynamic Modules, OSGi Blueprint spec ■ Spring Data (GemFire, Redis, Hadoop)

(4)
(5)

Data Trends

(6)
(7)

Enterprise Data Trends

Unstructured data

No predefined model

Often doesn’t fit well in RDBMS

Pre-Aggregated Data

Computed during data collection

Counters

(8)

Cost Trends

H ardware cost halving

every 18 months

Big Iron: $40k/CPU

(9)

The Value of Data

Value from Data Exceeds Hardware & Software costs

Value in connecting data sets

Grouping e-commerce users by user agent

(10)

Big Data

“Big data” refers to datasets whose size is beyond the ability of typical database software tools to capture, store, manage, and analyze

A subjective and moving target

(11)
(12)

A Holistic View of a Big Data System

ETL Real Time Streams Unstructured Data (HDFS) Real Time Structured Database (hBase, Gemfre, Cassandra) Big SQL (Greenplum, AsterData, Etc…) Batch Processing Real-Time Processing (s4, storm) Analytics

(13)

Big Data probls == Integration probls

Real world big data solutions require workflow across systems

Workflow for big data processing is an integration problem

• Share core components of a classic integration workflow

• Big data solutions need to integrate with existing data and apps

• Event-driven vs Batch workflows

No silver bullet

 Michael Stonebraker: One Size Fits All, An Idea Whose Time Has Come And Pass

(14)

Big Data probls == Integration probls

Real world big data solutions require workflow across systems

Workflow for big data processing is an integration problem

• Share core components of a classic integration workflow

• Big data solutions need to integrate with existing data and apps

• Event-driven vs Batch workflows

No silver bullet

 Michael Stonebraker: One Size Fits All, An Idea Whose Time Has Come And Pass

Spring projects can provide the foundation for Big Data workflows

(15)
(16)

Hadoop as a Big Data Platform

Hadoop Distributed File System (HDFS) Map Reduce Framework (MapRed)

(17)

Spring for Hadoop - Goals

Hadoop has a poor out of the box programming model

Applications are generally a collection of scripts calling command line apps

Spring simplifies developing Hadoop applications

By providing a familiar and consistent programming and configuration mode

Across a wide range of use cases

HDFS usage

Data Analysis (MR/Pig/Hive/Cascading)

Workflow

Event Streams

Integration

(18)

Relationship with other Spring projects

Spring Framework

Web, Messaging Applications

Spring Data

Redis, MongoDB, Neo4j, Gemfire

Spring Integration

Event-driven applications, Enterprise Integration Patterns

Spring Batch

On and Off Hadoop workflows

Spring for

Apache Hadoop

Simplify Hadoop programming

(19)

Capabilities: Spring + Hadoop

Declarative configuration

Create, configure, and parameterize Hadoop connectivity and all job types

Environment profiles – easily move from dev to qa to prod

Developer productivity

Create well-formed applications, not spaghetti script applications

Simplify HDFS and FsShell API with support for JVM scripting

Runner classes for MR/Pig/Hive/Cascading for small workflows

(20)
(21)
(22)

Counting Words – M/R

public class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {

private final static IntWritable one = new IntWritable(1); Text word = new Text(); public void map(Object key, Text value, Context context)

throws IOException, InterruptedException {

StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) {

word.set(itr.nextToken());context.write(word, one); }}}

public class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable();

public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {

int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } }

(23)

Counting Words – Configuring M/R

Configuration conf = new Configuration();

Job job = new Job(conf, "wordcount"); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class);

FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true);

(24)

Running Hadoop Jars (WordCount 1.0)

Vanilla Hadoop

SHDP

<hdp:configuration />

<hdp:jar-runner id=“wordcount“ jar="hadoop-examples.jar> <hdp:arg value=“wordcount“ />

<hdp:arg value=“/wc/input“ /> <hdp:arg value=“/wc/output“/> </hdp:jar-runner>

(25)

Running Hadoop Tools (WordCount 2.0)

Vanilla Hadoop

SHDP

bin/hadoop jar –conf myhadoop-site.xml –D ignoreCase=true

wordcount.jar org.myorg.WordCount /wc/input /wc/output

<hdp:configuration resources=“myhadoop-site.xml“/> <hdp:tool-runner id="wc“ jar=“wordcount.jar”>

<hdp:arg value=“/wc/input“ /> <hdp:arg value=“/wc/output“/> ignoreCase=true

(26)

Configuring Hadoop

<context:property-placeholder location="hadoop-dev.properties"/> <hdp:configuration> fs.default.name=${hd.fs} </hdp:configuration> <hdp:job id="word-count-job" input-path=“${input.path}" output-path="${output.path}“ jar=“myjob.jar” mapper="org.apache.hadoop.examples.WordCount.TokenizerMapper“ reducer="org.apache.hadoop.examples.WordCount.IntSumReducer"/> <hdp:job-runner id=“runner” job-ref="word-count-job“

run-at-startup=“true“ /> input.path=/wc/input/ output.path=/wc/word/ hd.fs=hdfs://localhost:9000 applicationContext.xml hadoop-dev.properties

(27)

Running a Streaming Job

<context:property-placeholder location="hadoop-${env}.properties"/> <hdp:streaming id=“wc“ input-path=“${input}” output-path=“${output}” mapper=“${cat}” reducer=“${wc}”

files=“classpath:stopwords.txt”> </hdp:streaming>

bin/hadoop jar hadoop-streaming.jar \

–input /wc/input –output /wc/output \ -mapper /bin/cat –reducer /bin/wc \

-files stopwords.txt

env=dev java –jar SpringLauncher.jar applicationContext.xml

hadoop-dev.properties

input.path=/wc/input/

output.path=/wc/word/

(28)

Running a Streaming Job

<context:property-placeholder location="hadoop-${env}.properties"/> <hdp:streaming id=“wc“ input-path=“${input}” output-path=“${output}” mapper=“${cat}” reducer=“${wc}”

files=“classpath:stopwords.txt”> </hdp:streaming>

bin/hadoop jar hadoop-streaming.jar \

–input /wc/input –output /wc/output \ -mapper /bin/cat –reducer /bin/wc \

-files stopwords.txt

env=qa java –jar SpringLauncher.jar applicationContext.xml

hadoop-dev.properties input.path=/wc/input/ output.path=/wc/word/ hd.fs=hdfs://localhost:9000 hadoop-qa.properties input.path=/gutenberg/input/ output.path=/gutenberg/word/ hd.fs=hdfs://darwin:9000

(29)

Word Count – Injecting Jobs

Use Dependency Injection to obtain reference to Hadoop Job

Perform additional runtime configuration and submit public class WordService {

@Inject

private Job mapReduceJob;

public void processWords() { mapReduceJob.submit();

} }

(30)

HDFS and Hadoop Shell as APIs

Has all “bin/hadoop fs” commands through FsShell

mkdir, chmod, test class MyScript {

@Autowired FsShell fsh;

@PostConstruct void init() {

String outputDir = "/data/output"; if (fsShell.test(outputDir)) {

fsShell.rmr(outputDir); }

(31)

HDFS and FsShell as APIs

Excellent for JVM scripting

// use the shell (made available under variable fsh) if (!fsh.test(inputDir)) { fsh.mkdir(inputDir); fsh.copyFromLocal(sourceFile, inputDir); fsh.chmod(700, inputDir) } if (fsh.test(outputDir)) { fsh.rmr(outputDir) } init-files.groovy

(32)

HDFS and FsShell as APIs

<hdp:script id=“init-script“ language=“groovy“>

<hdp:property name=“inputDir“ value=“${input}“/>

<hdp:property name=“outputDir“ value=“${output}“/> <hdp:property name=“sourceFile“ value=“${source}“/> // use the shell (made available under variable fsh) if (!fsh.test(inputDir)) { fsh.mkdir(inputDir); fsh.copyFromLocal(sourceFile, inputDir); fsh.chmod(700, inputDir) } if (fsh.test(outputDir)) { fsh.rmr(outputDir) } </hdp:script> appCtx.xml

(33)

Counting Words - Pig

input_lines = LOAD '/tmp/books' AS (line:chararray);

-- Extract words from each line and put them into a pig bag

words = FOREACH input_lines GENERATE FLATTEN(TOKENIZE(line)) AS word; -- filter out any words that are just white spaces filtered_

words = FILTER words BY word MATCHES '\\w+';

-- create a group for each word

word_groups = GROUP filtered_words BY word;

-- count the entries in each group

word_count = FOREACH word_groups GENERATE COUNT(filtered_words)

AS count, group AS word;

ordered_word_count = ORDER word_count BY count DESC; STORE ordered_word_count INTO '/tmp/number-of-words';

(34)

Pig

Vanilla Pig

<pig-factory job-name=“wc” properties-location=“pig.properties">

pig.exec.nocombiner=true

<script location=“wordcount.pig">

<arguments>ignoreCase=TRUE</arguments> </script>

</pig-factory>

pig –x mapreduce wordcount.pig

SHDP

Creates a PigServer

 Executes script on startup (optional)

(35)

PigRunner – A small pig workflow

@Scheduled(cron= “0 0 12 * * ?”) public void process() {

pigRunner.call(); }

(36)
(37)

PigTemplate – Programmatic Use

public class PigPasswordRepository implements PasswordRepository {

private PigTemplate pigTemplate;

private String pigScript = "classpath:password-analysis.pig";

public void processPasswordFile(String inputFile) {

String outputDir = baseOutputDir + File.separator +

counter.incrementAndGet();

Properties scriptParameters = new Properties();

scriptParameters.put("inputDir", inputFile);

scriptParameters.put("outputDir", outputDir);

pigTemplate.executeScript(pigScript, scriptParameters); }

//... }

(38)

Counting Words – Hive

-- import the file as lines

CREATE EXTERNAL TABLE lines(line string)

LOAD DATA INPATH ‘books’ OVERWRITE INTO TABLE lines;

-- create a virtual view that splits the lines

SELECT word, count(*) FROM lines

LATERAL VIEW explode(split(text, ‘ ‘ )) lTable as word GROUP BY word;

(39)

Vanilla Hive

Command-line

(40)

Hive w/ SHDP

<bean id="hive-driver" class="org.apache.hadoop.hive.jdbc.HiveDriver"/>

<bean id="hive-ds"

class="org.springframework.jdbc.datasource.SimpleDriverDataSource" c:driver-ref="hive-driver" c:url="${hive.url}"/>

<bean id="template" class="org.springframework.jdbc.core.JdbcTemplate" c:data-source-ref="hive-ds"/>

(41)

Hive w/ SHDP

<bean id="hive-driver" class="org.apache.hadoop.hive.jdbc.HiveDriver"/>

<bean id="hive-ds"

class="org.springframework.jdbc.datasource.SimpleDriverDataSource"

c:driver-ref="hive-driver" c:url="${hive.url}"/>

<bean id="template" class="org.springframework.jdbc.core.JdbcTemplate"

c:data-source-ref="hive-ds"/>

Create Hive JDBC Client and use with Spring JdbcTemplate

Reuse Spring’s Rich ResultSet to POJO Mapping Features

public long count() {

return jdbcTemplate.queryForLong("select count(*) from " + tableName);

}

List<Password> result = jdbcTemplate.query(“select * from passwords", new ResultSetExtractor<List<Password>() {

public String extractData(ResultSet rs) throws SQLException {

// extract data from result set

(42)

Vanilla Hive - Thrift

HiveClient is not thread-safe, throws checked exceptions

public long count() {

HiveClient hiveClient = createHiveClient();

try {

hiveClient.execute("select count(*) from " + tableName);

return Long.parseLong(hiveClient.fetchOne());

// checked exceptions

} catch (HiveServerException ex) { throw translateExcpetion(ex);

} catch (org.apache.thrift.TException tex) { throw translateExcpetion(tex); } finally {

try { hiveClient.shutdown(); } catch (org.apache.thrift.TException tex) {

logger.debug("Unexpected exception on shutting down HiveClient", tex);

}}}

protected HiveClient createHiveClient() {

TSocket transport = new TSocket(host, port, timeout);

HiveClient hive = new HiveClient(new TBinaryProtocol(transport));

try { transport.open();

} catch (TTransportException e) { throw translateExcpetion(e); }

return hive; }

(43)

SHDP – Hive

<hive-client-factory host="${hive.host}" port="${hive.port}"/> <hive-template id="hiveTemplate"/>

Easy client confguration

Can create an embedded Hive server instance

Declarative Usage

<hive-server auto-startup="true" port="${hive.port}"/>

<hive-runner run-at-startup="true”> <hdp:script>

DROP TABLE IF EXISTS ${wc.table}; </hdp:script>

<hdp:script location=“word-count.q”/> </hive-runner>

(44)

SHDP - HiveTemplate (Thrift)

One-liners to execute queries

One-lines for executing scripts

@Repository

public class HiveTemplatePasswordRepository implements PasswordRepository {

private @Value("${hive.table}") String tableName;

private @Autowired HiveOperations hiveTemplate;

@Override

public Long count() {

return hiveTemplate.queryForLong("select count(*) from " + tableName);

} }

Properties scriptParameters = new Properties(); scriptParameters.put("inputDir", inputFile);

scriptParameters.put("outputDir", outputDir);

(45)

Cascading – Counting Words

Scheme sourceScheme = new TextLine(new Fields("line")); Tap source = new Hfs(sourceScheme, inputPath);

Scheme sinkScheme = new TextLine(new Fields("word", "count")); Tap sink = new Hfs(sinkScheme, outputPath, SinkMode.REPLACE);

Pipe assembly = new Pipe("wordcount");

String regex = "(?<!\\pL)(?=\\pL)[^ ]*(?<=\\pL)(?!\\pL)";

Function function = new RegexGenerator(new Fields("word"), regex); assembly = new Each(assembly, new Fields("line”), function );

assembly = new GroupBy(assembly, new Fields("word”) ); Aggregator count = new Count(new Fields("count”));

(46)

Cascading

Based on Spring’s type safe @Configuration

<bean class=“wordcount.cascading.CascadingConfig "/>

<bean id="cascade"

class="org.springframework.data.hadoop.cascading.HadoopFlowFactoryBean"

p:configuration-ref="hadoopConfiguration" p:tails-ref=“countPipe" />

(47)

HBase

<hdp:configuration/>

<hdp:hbase-configuration delete-connection="true”/>

<bean id="hbaseTemplate”

class="org.springframework.data.hadoop.hbase.HbaseTemplate“

p:configuration-ref="hbaseConfiguration” />

Bootstrap HBase confguration from Hadoop Confguration

Template usage

public List<User> findAll() {

return hbaseTemplate.find(tableName, "cfInfo", new RowMapper<User>() { @Override

public User mapRow(Result result, int rowNum) throws Exception {

return new User(Bytes.toString(result.getValue(CF_INFO, qUser)),

Bytes.toString(result.getValue(CF_INFO, qEmail)),

Bytes.toString(result.getValue(CF_INFO, qPassword)));

} });}

(48)
(49)

On Hadoop Workflows

HDFS PIG

MR Hive HDFS

Reuse same infrastructure for Hadoop based workflows

Step can any Hadoop job type or HDFS operation

(50)

Capabilities: Spring + Hadoop + Batch

Spring Batch for File/DB/NoSQL driven applications

Collect: Process local files

Transform: Scripting or Java code to transform and enrich

RT Analysis: N/A

Ingest: (batch/aggregate) write to HDFS or split/filtering

Batch Analysis: Orchestrate Hadoop steps in a workflow

Distribute: Copy data out of HDFS to structured storage

JMX enabled along with REST interface for job control

(51)

Spring Batch Configuration

<job id="job1">

<step id="import" next="wordcount">

<tasklet ref=“import-tasklet"/>

</step>

<step id=“wc" next="pig">

<tasklet ref="wordcount-tasklet"/>

</step>

<step id="pig">

<tasklet ref="pig-tasklet“></step> <split id="parallel" next="hdfs">

<flow><step id="mrStep">

<tasklet ref="mr-tasklet"/> </step></flow>

<flow><step id="hive">

<tasklet ref="hive-tasklet"/> </step></flow>

</split>

<step id="hdfs">

<tasklet ref="hdfs-tasklet"/></step> </job>

(52)

Spring Batch Configuration

Additional configuration behind the graph

(53)
(54)
(55)

Capabilities: Spring + Hadoop + EAI

Big data solutions need to integrate with existing data and apps

Share core components of a classic integration workflow Spring Integration for Event driven applications

Collect: Single node or distributed data collection (tcp/JMS/Rabbit)

Transform: Scripting or Java code to transform and enrich

RT Analysis: Connectivity to multiple analysis techniques

Ingest: Write to HDFS, Split/Filter data stream to other stores

JMX enabled + control bus for starting/stopping individual components

(56)

Spring Integration – Polling Log File

Poll a directory for files, files are rolled over every 10 min

Copy files to staging area

Copy files to HDFS

Use an aggregator to wait for “10 files in 20 minute interval” to launch MR job

(57)

Spring Integration – Syslog to HDFS

Use tcp/udp Syslog adapter

Transformer categorizes messages

Route to specific channels based on category

One route leads to HDFS write and filtered data stored in Redis

(58)

Spring Integration – Multi-node Syslog

Spread log collection across multiple machines

Use TCP Adapters to forward events across machines

Can use other middleware

Reusable flows, creak the flow at a channel boundary and insert inbound/outbound adapter pair

(59)

Resources

Prepping for GA – feedback welcome

Project Page: springsource.org/spring-data/hadoop

Source Code: github.com/SpringSource/spring-hadoop

(60)

Q&A

References

Related documents

Understand data as expected vendor their product a spreadsheet and any enterprise ample time saving can help give the amazon.. Comprehensive and their product spreadsheet

The main wall of the living room has been designated as a &#34;Model Wall&#34; of Delta Gamma girls -- ELLE smiles at us from a Hawaiian Tropic ad and a Miss June USC

In conclusion, for the studied Taiwanese population of diabetic patients undergoing hemodialysis, increased mortality rates are associated with higher average FPG levels at 1 and

Mean differences (MD) in percentage body fat between each of the four lower family income quintiles and the highest income quintile were calculated in multiple linear regression

Hertel and Martin (2008), provide a simplified interpretation of the technical modalities. The model here follows those authors in modeling SSM. To briefly outline, if a

BIG DATA MANAGEMENT BIG DATA ANALYTICS BIG DATA APPLICATIONS BIG DATA INTEGRATION CREATE VALUE FROM DATA Streaming + Batch Data Reservoir + Data Warehouse Discovery +

After discussing the notion of an “implied reader” and also sketching a few examples of the expectations that biblical authors have for their projected readers, the issue of

The Diyotta Data Integration Suite provides a unique design approach to defining data transformation and integration processes in this environment, effectively leveraging the power