Introduction to Big Data Science
14
th
Period
Retrieving, Storing, and Querying
Big Data
Contents
Retrieving Data from SNS
Introduction to Facebook APIs and Data
Format
K-V Data Scheme on Hadoop
Storing and Querying Data on Hive
Distributed Objects
Objects that can communicate with objects on
heterogeneous run-time environments
Distribute Objects Standard Protocol – ex: JRMP
Robust
Reliable
Transparent
Distributed Objects Technology
Multi-Platform
Transparent access to distributed objects
Language Neutral: RMI, CORBA, DCOM
Java Remote Method Invocation
(RMI)
Can use objects on remote different
run-time environments as like objects on a local
run-time environment
Abstraction of low-level network code on
distributed network to provide developers an
environment where they focus on their
CORBA Contributions
CORBA addresses two challenges of
developing distributed system
:
Making distributed application development no more
difficult than developing centralized programs.
Easier said than done due to :
Partial failures
Impact of latency
Load balancing
Event Ordering
Providing an infrastructure to integrate application
components into a distributed system
Big Data Science 6
APIs on the Web
Web Service Standard: Recommended
by W3C, Robust and Fast, but Not Easy to
use
Simple Object Access Protocol (SOAP)
―
Simple XML Message
―
Remote Procedure Call
Web Service Description Language (WSDL)
―
Specification of Web Service Function
Universal Description, Discovery, and
Integration (UDDI)
APIs on the Web
RESTful Web API: No Standard by Some Authorities,
but Easy to Use
Representational state transfer (REST) is an architectural style
consisting of a coordinated set of constraints applied to components,
connectors, and data elements, within a distributed hypermedia system.
REST ignores the details of component implementation and protocol
syntax in order to focus on the roles of components, the constraints upon
their interaction with other components, and their interpretation of
significant data elements.
REST has been applied to describe desired web architecture, to identify
existing problems, to compare alternative solutions, and to ensure that
protocol extensions would not violate the core constraints that make the
Web successful. Fielding used REST to design HTTP 1.1 and Uniform
Resource Identifiers (URI).
The REST architectural style is also applied to the development of Web
services as an alternative to other distributed-computing specifications
such as SOAP.
Retrieving Data from SNS
Social Network Services (SNS) provide
useful API for accessing their data.
Usually, they provide it in the form of Web
API, Web programming, Smart Phone
SDK.
It is almost impossible for us to retrieve all
data, but we can save what we need for
special purpose to a long time big data
storage.
Web APIs for Web and Several SNS
Localization and
translation
Atlas API
Public Feed API
Keyword Insights API
Twitter API
Google API
Facebook API
Graph API
Open Graph
Dialogs
Chat
Ads API
FQL
Twitter API
REST API v1.1 Resources
Timelines
Collections of Tweets, ordered with the most recent first.
Tweets
The atomic building blocks of Twitter, 140-character status updates
with additional associated metadata. People tweet for a variety of
reasons about a multitude of topics.
Search
Find relevant Tweets based on queries performed by your users.
Streaming
Direct Messages
Short, non-public messages sent between two users. Access to
Direct Messages is governed by the The Application Permission
Model.
Twitter API
Friends & Followers
Users follow their interests on Twitter through both one-way and
mutual following relationships.
Users
Users are at the center of everything Twitter: they follow, they
favorite, and tweet & retweet.
Suggested Users
Categorical organization of users that others may be interested to
follow.
Favorites
Users favorite tweets to give recognition to awesome tweets, to
curate the best of Twitter, to save for reading later, and a variety
of other reasons. Likewise, developers make use of "favs" in
many different ways.
Twitter API
Lists
Collections of tweets, culled from a curated list of Twitter users.
List timeline methods include tweets by all members of a list.
Saved Searches
Allows users to save references to search criteria for reuse later.
Places & Geo
Users tweet from all over the world. These methods allow you to
attach location data to tweets and discover tweets & locations.
Trends
With so many tweets from so many users, themes are bound to
arise from the zeitgeist. The Trends methods allow you to explore
what's trending on Twitter.
Spam Reporting
These methods are used to report user accounts as spam
Facebook APIs
Graph API
The Graph API is a simple HTTP-based API that gives access to
the Facebook social graph, uniformly representing objects in the
graph and the connections between them. Most other APIs at
Facebook are based on the Graph API.
Open Graph
The Open Graph API allows apps to tell stories on Facebook
through a structured, strongly typed API.
Dialogs
Facebook offers a number of dialogs for Facebook Login, posting
to a person's timeline or sending requests.
Chat
You can integrate Facebook Chat into your Web-based, desktop,
or mobile instant messaging products. Your instant messaging
client connects to Facebook Chat via the Jabber XMPP service.
Facebook APIs
Ads API
The Ads API allows you to build your own app as a customized
alternative to the Facebook Ads Manager and Power Editor tools.
FQL
Facebook Query Language, or FQL, enables you to use a
SQL-style interface to query the data exposed by the Graph API. It
provides for some advanced features not available in the Graph
API such as using the results of one query in another.
Localization and translation
Facebook supports localization of apps. Read about the tools we
provide.
Atlas API
The Atlas APIs provides you with programmatic access to the
Facebook APIs
Public Feed API
The Public Feed API lets you read the stream of public comments
as they are posted to Facebook.
Keyword Insights API
The Keyword Insights API exposes an analysis layer on top of all
Facebook posts that enables you to query aggregate, anonymous
insights about people mentioning a certain term.
Facebook Query APIs: FQL
Facebook Query APIs: FQL
Fields of
comment
Big Data Science 18
Facebook APIs Running Example
Example
Runs the query "SELECT uid2 FROM friend WHERE uid1=me()"
https://developers.facebook.com/tools/explorer?method=GET&pat
h=fql%3Fq%3DSELECT+uid2+FROM+friend+WHERE+uid1%3D
me%28%29
Read
You can issue a HTTP GET request to /fql?q=query where query
can be a single fql query or a JSON-encoded dictionary of queries.
Query
Queries are of the form SELECT [fields] FROM [table] WHERE
[conditions]. Unlike SQL, the FQL FROM clause can contain only a
single table. You can use the IN keyword in SELECT or WHERE
clauses to do subqueries, but the subqueries cannot reference
variables in the outer query's scope. Your query must also be
indexable, meaning that it queries properties that are marked as
indexable in the documentation below.
FQL Example
<?php $app_id = 'YOUR_APP_ID'; $app_secret = 'YOUR_APP_SECRET'; $my_url = 'POST_AUTH_URL'; $code = $_REQUEST["code"]; // auth user if(empty($code)) { $dialog_url = 'https://www.facebook.com/dialog/oauth?client_id=' . $app_id . '&redirect_uri=' . urlencode($my_url) ;echo("<script>top.location.href='" . $dialog_url . "'</script>"); }
// get user access_token
$token_url = 'https://graph.facebook.com/oauth/access_token?client_id=' . $app_id . '&redirect_uri=' . urlencode($my_url)
. '&client_secret=' . $app_secret . '&code=' . $code;
// response is of the format "access_token=AAAC..."
FQL Example
// run fql query $fql_query_url = 'https://graph.facebook.com/' . 'fql?q=SELECT+uid2+FROM+friend+WHERE+uid1=me()' . '&access_token=' . $access_token; $fql_query_result = file_get_contents($fql_query_url); $fql_query_obj = json_decode($fql_query_result, true);// display results of fql query echo '<pre>'; print_r("query results:"); print_r($fql_query_obj); echo '</pre>'; // run fql multiquery $fql_multiquery_url = 'https://graph.facebook.com/' . 'fql?q={"all+friends":"SELECT+uid2+FROM+friend+WHERE+uid1=me()",' . '"my+name":"SELECT+name+FROM+user+WHERE+uid=me()"}' . '&access_token=' . $access_token; $fql_multiquery_result = file_get_contents($fql_multiquery_url); $fql_multiquery_obj = json_decode($fql_multiquery_result, true);
// display results of fql multiquery echo '<pre>';
print_r("multi query results:"); print_r($fql_multiquery_obj);
Big Data Science 21
Map-Reduce for Multiple Outputs
Parallel Execution of Map-Reduce Program
To give several control flow in Map operation, we can use
GenericOptionsParser, but that kinds of way can decrease
performance severely for a big data.
MultipleOutputs provides a trick of parallel processing of
Map-Reduce job by multiple output data.
org.apache.hadoop.mapreduce.lib.output.MultipleOutputs
Provides function of creating multiple output data.
Creating multiple OutputCollectors, and setting output path, output
format, key, and value type for OutputCollectors.
It creates different data to that the existing Map-Reduce program
outputs.
When Map-Reduce job finished, a output data “part-r-nnnnn” is to
be created in the Reduce stage.
If a programmer creates data on a directory “myfile” using
MultipleOutputs, “part-r-nnnnn” and “myfile-r-nnnnn” are created at
the same time.
Mapper Implementation for MultipleOutputs
public class DelayCountMapperWithMultipleOutputs extends
Mapper<LongWritable, Text, Text, IntWritable> {
// map output value
private final static IntWritable outputValue = new IntWritable(1);
// map output key
private Text outputKey = new Text();
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { if (key.get() > 0) {
String[] colums = value.toString().split(","); if (colums != null && colums.length > 0) { try {
// Departure dealy data output if (!colums[15].equals("NA")) {
int depDelayTime = Integer.parseInt(colums[15]); if (depDelayTime > 0) {
// Output key set
outputKey.set("D," + colums[0] + "," + colums[1]); // Output data creation
context.write(outputKey, outputValue);
} else if (depDelayTime == 0) {context.getCounter( DelayCounters.scheduled_departure).increment(1); } else if (depDelayTime < 0) {
Mapper Implementation for MultipleOutputs
} else { context.getCounter(DelayCounters.not_available_departure).increment(1); }
// Arrival Delay Data Output if (!colums[14].equals("NA")) {
int arrDelayTime = Integer.parseInt(colums[14]); if (arrDelayTime > 0) {
// Output Key Setting
outputKey.set("A," + colums[0] + "," + colums[1]); // Output Data Creation
context.write(outputKey, outputValue); } else if (arrDelayTime == 0) { context.getCounter(DelayCounters.scheduled_arrival).increment(1); } else if (arrDelayTime < 0) { context.getCounter(DelayCounters.early_arrival).increment(1); } } else { context.getCounter(DelayCounters.not_available_arrival).increment(1); } } catch (Exception e) { e.printStackTrace(); } } } }
Reducer Implementation for MultipleOutputs
public class DelayCountReducerWithMultipleOutputs extends Reducer<Text, IntWritable, Text, IntWritable> {
private MultipleOutputs<Text, IntWritable> mos; // reduce Output Key
private Text outputKey = new Text(); // reduce Output Value
private IntWritable result = new IntWritable(); @Override
public void setup(Context context) throws IOException, InterruptedException { mos = new MultipleOutputs<Text, IntWritable>(context); }
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { // Split by comma
String[] colums = key.toString().split(","); // Output Key Setting
outputKey.set(colums[1] + "," + colums[2]); // Departure Delay
if (colums[0].equals("D")) { // Delay count sum int sum = 0;
for (IntWritable value : values) { sum += value.get(); } // Output Value Setting
result.set(sum);
// Output Data Setting
Reducer Implementation for MultipleOutputs
// Arrival Delay } else {
// Delay count sum int sum = 0;
for (IntWritable value : values) { sum += value.get(); }
// Output value setting result.set(sum);
// Output Data Creation
mos.write("arrival", outputKey, result); }
}
@Override
public void cleanup(Context context) throws IOException, InterruptedException {
mos.close(); }
Big Data Science 26
Hive Programming
Hive
To provide a means of running MapReduce job through
a SQL-like scripting language, called HiveQL, that can
be applied towards summarization, querying, and
analysis of large volumes of data.
Important difference to SQL
Table-generating function
Lateral view
Useful URLs
Hive
―
https://cwiki.apache.org/confluence/display/Hive/Home
Language Reference
―https://cwiki.apache.org/confluence/display/Hive/LanguageManual
Hive Programming
Workflow of Hive
Create Table
Load Data into HDFS/Hive
Query Data: Use HiveQL to query data
―
Table-generating functions
―
User-defined operations via external programs (TRANSFORM)
HiveQL
DDL Operation
Creating Hive Tables
hive> CREATE TABLE pokes (foo INT, bar STRING);
hive> CREATE TABLE invites (foo INT, bar STRING) PARTITIONED BY
(ds STRING);
Browsing through Tables
hive> SHOW TABLES;
hive> SHOW TABLES '.*s';
hive> DESCRIBE invites;
Altering and Dropping Tables
hive> ALTER TABLE events RENAME TO 3koobecaf;
hive> ALTER TABLE pokes ADD COLUMNS (new_col INT);
hive> ALTER TABLE invites ADD COLUMNS (new_col2 INT COMMENT 'a
comment');
hive> ALTER TABLE invites REPLACE COLUMNS (foo INT, bar STRING,
baz INT COMMENT 'baz replaces new_col2');
HiveQL
DML Operation
Loading data from flat files into Hive
hive> LOAD DATA LOCAL INPATH './examples/files/kv1.txt' OVERWRITE
INTO TABLE pokes;
hive> LOAD DATA LOCAL INPATH './examples/files/kv2.txt' OVERWRITE
INTO TABLE invites PARTITION (ds='2008-08-15');
hive> SELECT a.foo FROM invites a WHERE a.ds='2008-08-15';
hive> INSERT OVERWRITE TABLE events SELECT a.* FROM profiles a;
hive> INSERT OVERWRITE TABLE events SELECT a.* FROM profiles a WHERE
a.key < 100;
hive> INSERT OVERWRITE LOCAL DIRECTORY '/tmp/reg_3' SELECT a.* FROM
events a;
hive> INSERT OVERWRITE DIRECTORY '/tmp/reg_4' select a.invites, a.pokes FROM
profiles a;
hive> INSERT OVERWRITE DIRECTORY '/tmp/reg_5' SELECT COUNT(*) FROM
invites a WHERE a.ds='2008-08-15';
SQL Operation
HiveQL
GROUP BY, JOIN, STREAMING
hive> FROM invites a INSERT OVERWRITE TABLE events SELECT a.bar, count(*)
WHERE a.foo > 0 GROUP BY a.bar;
hive> INSERT OVERWRITE TABLE events SELECT a.bar, count(*) FROM invites a
WHERE a.foo > 0 GROUP BY a.bar;
hive> FROM pokes t1 JOIN invites t2 ON (t1.bar = t2.bar) INSERT OVERWRITE
TABLE events SELECT t1.bar, t1.foo, t2.foo;
hive> FROM invites a INSERT OVERWRITE TABLE events SELECT
TRANSFORM(a.foo, a.bar) AS (oof, rab) USING '/bin/cat' WHERE a.ds >
'2008-08-09';
Table example for Apache Weblog data
CREATE TABLE apachelog (
host STRING, identity STRING, user STRING, time STRING,
request STRING, status STRING, size STRING, referer STRING, agent STRING) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.RegexSerDe'
WITH SERDEPROPERTIES (
"input.regex" = "([^]*) ([^]*) ([^]*) (-|¥¥[^¥¥]*¥¥]) ([^ ¥"]*|¥"[^¥"]*¥") (-|[0-9]*) (-|[0-9]*)(?: ([^ ¥"]*|¥".*¥") ([^ ¥"]*|¥".*¥"))?"
)