• No results found

BIG DATA TECHNOLOGY (HADOOP) Getting Started

In document Versatile Java - Deepak Mali 351 (Page 128-164)

Lets say we have a huge back with an insurance product who have data in various forms like customer data, website activities to understand the interest of the customers and their transaction patterns and Competitors pricing and social media (Social media like Facebook , Twitter)updates generated by market research firms . It acts as a decision support system and bank will generate a optimal price considering all these factors. More the data better will be predictions .The future of bid data analytics is that of a digital nervous system where in any input changes will produce a corresponding result.

Reacting swiftly, adapting to changes, learning from experience ,what data is important , what is not are some of the important parameters.

Currently data from multiple sources is fed into the dataware house (Database ) and on top of that statistical algorithms are run to generate analytics helpful in taking business decisions .Since the data is sampled we don‘t get the clear picture of the entire data. Also the there is a delay in decision due to cleaning of the data .The size of the data is large and complex to get the better picture while taking the decisions .The performance of the traditional systems will decrease and hence there is a need of Big Data Technologies.

The three important attributes being velocity, variety and volume

Larger will be the data, better will be the result of the data analytics algorithm. A simple algorithm on a large data set will yield better result than a sophisticated algorithm on smaller data

set. More will be the attributes better will be the result .The data should be write once and read many times .As pert the international data conference 6‘th annual study the data will increase by 300 times from 2005 to 2020 from 130 exabytes to 4000 exabytes and 33% of the data will be usesul as compared to 25% used today Distributed computing is the core concept to Hadoop.

<Difference between map reduce and rdbms>

As the data grew , growing the computing resources was expensive due to hardware cost , software license and risk of failure .And the data was ever increasing .In this case Hadoop concept of distributed programming comes to save us .The task was now distributed to many processing units or nodes

Advantages of Hadoop

Cost of commodity hardware is pretty low .The licenced software is free of cost and in case one node goes down , other node can process the data .Thus the performance will degrade but it will not halt .With haddop cluster the processing power increases 10 times with 1/10 the processing cost and 1/10 the time .Hadoop employs parallelism thus the seek time and the transfer time paradigm is resolved .Hadoop maintains replica thus failure of one node does not effect the data integrity as such .A single server architecture is expensive and inefficient when compared with the distributed architecture .There are number of processors communicating via MPI (message passing interface ) and shared memory .

As the data size increases bandwidth becomes a problem however Hadoop‘s architecture is a little different .Haddop has nodes which are like personal computers .There is a hard disk with CPU on each node .The master node has data locality and

hence the bandwidth is used only for small update messages.

There is concept of job tracker and task tracker .Scalability is high in Hadoop.

Fig 5.1 HDFS Architecture Hadoop Releases

Every release is in the form of x.y.z , where , X = major release

Y = minor release Z = point release

A change in the major release breaks backward compatibility but a change in point and minor release does not.

Apache Hadoop is best known for map reduce and it‘s distributed file system HDFS but it has a family of projects collectively referred to as Hadoop ecosystem.The other projects are Pig, Hive, Scoop, Fume, Zoo Keeper, HBASE.

Fig 5.2 Features in different releases HDFS

HDFS is a distributed file system .A cluster is multiple racks put together and a single rack is multiple computers put together which are individually named as nodes .The nodes which store data are data nodes These are slave nodes .Name nodes are responsible for the management of the files distributed across the cluster. A file is broken down into smaller chunks called as blocks .These blocks are then replicated by a factor of three .(default) These blocks are then distributed across the cluster and name node keeps track of complete file system and block locations .The distribution done is smartly done to provide resilience such that if a block fails or the complete rack fails , name node will be able to put the file together. HDFS handles very large files. Data access for write once and read multiple times is the best option .HDFS works with commodity hardware .HDFS is not designed for quick read of data as an OLTP database. HDFS doesn‘t work well with lot of small files.

Block size is the minimum size of data that can be read or written to a file system. Its default block size id 64 MB. The

idea behind large block size is to keep the seek time 1% of the transfer rate.

Fig 5.3 Name, Nodes and Data Nodes

HDFS Architecture

HDFS follows a master slave architecture where data node stores the data and name node manages the data space on the worker nodes .Name node keeps the track of the complete file system by managing 2 things -the name space image and the edit logs .Namespace is the metadata about files and all the directories .It contains all the metadata of the blocks .Edit log is the log of activities performed by the client .Edit logs keep on piling up and grow as the activity on HDFS keeps on happening. These two combined gives the complete file system image .As soon as the data node connects to the network and boots up it will send the information to the name node and thus the namespace image is updated .The file system image is maintained in the main memory of the name node .In case the name node fails the complete file system will go down since the namespace image will be lost .That‘s why name node is termed as Single Point of Failure and hence it is advisable to spend more on the name node hardware .To avoid complete failures ,

name node is backed up by Remote NFS mount .There is also a secondary name node where the name node can transfer the edit logs in case the space is a constraint through check points .Name node and secondary name nodes are Java programs .In case of name node fails, the administrator has to reboot the name node .In case of failure the machine running the secondary name node is often the best candidate for name node .The secondary name node in this case takes the information from NFS mount before starting to act as Name node .As a rule of thumb, 1000 MB per million storage blocks is required for the name node main memory .

Fig 5.4 HDFS Architecture and File System Image The name node maintains the namespace image which is the metadata like the block size, files, directories, permissions. Edit log maintain all the activities done on the data anodes .Its keeps on growing at a faster pace .These two combined give us the complete file and block information .As soon as the data node connects to the network the namespace image is modified with the blocks the data node contains.This information resides in the main memory of the name node .In case name node goes down ,its known as SPOF and name node should be resilient to

hardware failures and we should always spent more on the name node .Name node is backed up with a remote NFS mount and there is also a secondary name node feature .It dones‘nt function like a name node ..Its only purpose is to merge the edit logs and write it to a file system. And create check points on the combined name space and edit log .Name node and secondary name nodes are java programs .In case of failure of the name node Hadoop admin has to boot up a new name node .in case of failure the secondary name node machine is often the best choice to be booted as the name node .In the later releases, name node has been made more resilient .Name node should have enough main memory to manage the pool of data blocks in the cluster .1000 mb per million storage blocks is the thumb rule.

HDFS client is a JVM has to run on the node that runs hdfc .the client will first communicates with name node and name node makes certain checks like if the file exists or not or the clinet has correct permission levels or not .(dfs.replicationfacot is set to 3 )

The name node returns the data nodes to be copied on .The clist connects to the first dat anode and ask for the subsequent pipeleine to copy on other data nodes. Data node acknowledges the copy .and this goes on till the files files is copied on the HDFS

Name node would send a completion message .in case a data node fails the data is replicated on the other data nodes .Name node notices under replication and arranges for replication.

In order to wirte the dat on the data node the first node is either the node on which the client resides or the a given rack.. such that the node is not overly loaded .the second node is chosed off

the rack and the third node is chosen on the same rask on which the second node was cohhsen which forms the pipeline .selection and replication happens behind the scenes . node distance is related to the bandwidth .

HDFS Read /Write

HDFS client is a JVM which has to run on the node which interacts with HDFS .The hdfs.replication property contains the replication factor .In pseudo distribution mode it is overwritten and set to 1 in the configuration file hdfs-site.xml .The client communicates with the name node that it wants to write on the name node .The name node performs various checks like whether the file exists or not or the client has correct permissions or not. Name node returns the list of nodes to be copied on .Client connects to the first data node and asks it for subsequent pipelines to data nodes .The data nodes will The second one is chosen off the rack with the first one and the third is chosen on the same rack as the second one. This forms the pipeline .The whole process of selection and replication happens behind the curtains.

Fig 5.5 HDFS Write

The distances are calculated in HDFS as follows. The idea of distance is related to bandwidth.

Block on the same rack and node – distance = 0

Block on the same rack but different node – distance = 2

Block on the different rack but the same data centre – distance

= 4

Block on the different Data centre – distance = 6

The client sends the read request to the name node .The name node sends the data blocks containing the first few blocks .Name node will returns the blocks starting with the closest node to the farthest .The client will start to read the blocks one by one.In case a data node fails client makes a note of it and that node is not picked for later reads.

Fig 5.6 HDFS Read HDFS Federation and High availability.

Federation is added to balance the load on the name node as the cluster size increases.

A new name node can be added and the file tree structures and the doc pool can be divided amongst the name nodes .Thus each name node has to manage only the pool of blocks it is associated with and not the complete pool .Data nodes can be associated with multiple name nodes .Name nodes wont communicate with each other and failure of one wont effect the other.

Fig 5.7 HDFS Federation High Availability

It is the time taken to come back to the stable state in case of the name node failure .To address this the name node is always running on stand-by .The primary name node and the secondary name node shares the name space and the edit logs via highly available NFS mount. In future Zoo Keeper would be used to transition from the primary to the secondary one .Data nodes must send reports to both the name nodes .The reserve node fences the primary node when it takes over wherein the standby node will revoke the shared access and disable the network port.

Map reduce

Split is a fixed chunk of data that will act as an input to the map task .Blocks belong to the HDFS world and splits belong to the Map reduce world. All map jobs run in parallel and produce output. All the results are merged, shuffled and act as an input to the reduce job .The whole job execution is controlled by Job Tracker and Task Tracker. A name node in HDFS is an analogues to job tracker in map reduce and data node in HDFS is analogues to task tracker in map reduce .Task tracker runs the map and reduce job on data nodes .Job Tracker schedule tasks for the task trackers. Job Tracker and task tracker run as Java Jobs and they are not the hardware .It is wiser to spend on the hardware that runs task tracker .

Fig 5.8 Map Reduce Paradigm

The map jobs get their inputs which are local to the data nodes .This is called as data locality else latency will be added to the network .Hence optimal size of the block is equal to the split size .Map task write the output on the local disk and not on HDFS with replications since its only the intermediate result .Job tracker cleans the map output only after the successful completion of the reduce job .All the maps output is merged, sorted and partitioned .Reducer wont get the data locally .It would be fetched from the network .Reducers output is written to HDFS for reliability .

Fig 5.10 Map Reduce Classes

Map Reduce Mechanism

Input to map is split and split will contain many records .The map functions has input in the form of keys and values and output in the form of keys and values as well .The key is unique to every record. Map produces one or more key value pairs. The input to map will have unique keys .The output may have non-unique keys .This will be helpful since we will sort the values based on key values and would like to make sense of values with the same keys. One particular reducer must get all the values for particular keys .This is managed by Hadoop and programmer need not manage anything here .The input to reducer has list of values associated to single key .The sequence of values is not much important here .The output of the reducer will be sorted since its receiving the inputs in the sorted manner.

The output of the reducer depends on the inputs to the map and subsequently on the keys in the input to the map.

Taking an example of a word count program .Lets assume the input to the map job is a String ―to err is human to forgive is divine‖ and the output from the reducer is

Divine 1 Err 1 Forgive 1 Human 1 Is 2 To 2

Brief algorithm for the same is given below:

The same can be achieved using parallel maps and parallel reducers .For parallel reducers value for all the same keys should go to the same reducer and the distribution should be equal .The shuffle and sort step is very critical to any map reduce solution. In Java , we will have the Map Class with the map logic ,Reduce Class with the reduce logic and the Driver Class which will control and decide the configuration and how the job will read and write the data .The distribution of data and the shuffle and sort steps is all taken care by the Hadoop itself .

Fig 5.11 Map Reduce functioning

Lets take an example of the code base for writing the map, reduce functions.

importjava.io.IOException;

importjava.util.StringTokenizer;

The following are the import statements for key and value datatypes.These are Hadoop datatypes.LongWritable is something similar to Long in Java .Text is analogues to String in Java .Intwritable is a datatype similar to Int in Java

importorg.apache.hadoop.io.IntWritable;

importorg.apache.hadoop.io.LongWritable;

importorg.apache.hadoop.io.Text;

importorg.apache.hadoop.mapreduce.Mapper;

publicclassWordCountMapper

extends Mapper<LongWritable, Text, Text, IntWritable> { Every map class will extent the Mapper class and override the map method .

LongWirtable and Text are input key and value data types followed by the output key and value data types .

privatefinalstaticIntWritableone = newIntWritable(1);

private Text word = new Text();

We need to override map function providing the input key ,input value and context .Role of context is to catch the output key and value pair .we tokenize the string into words and write it into the context with word as key and one as value.

@Override

publicvoid map(LongWritablekey, Text value, Context context)

throwsIOException, InterruptedException { String line = value.toString();

StringTokenizeritr = newStringTokenizer(line);

while (itr.hasMoreTokens()) {

//just added the below line to convert everything to lower case word.set(itr.nextToken().toLowerCase());

// the following check is that the word starts with an alphabet.

if(Character.isAlphabetic((word.toString().charAt(0)))){

context.write(word, one);

} } } }

Lets now look at the reducer class . importorg.apache.hadoop.io.IntWritable;

importorg.apache.hadoop.io.Text;

importorg.apache.hadoop.mapreduce.Reducer;

Every reduce class will have to extend Reducer Class and override reduce () method.The parameters have input key and value followed by the output key and value .The iterable values here implies that the key will point to multiple values .Context collect the output key and value pair through write method .The processing logic will iterate over the values .The input key and value of the reducer should match the output key and value of the map function .

publicclassWordCountReducer

extends Reducer<Text, IntWritable, Text, IntWritable> {

@Override

publicvoid reduce(Text key, Iterable<IntWritable>values, Context context)

throwsIOException, InterruptedException { intsum = 0;

for (IntWritablevalue : values) { sum += value.get();

}

context.write(key, newIntWritable(sum));

} }

Finally the driver class is as below importorg.apache.hadoop.fs.Path;

importorg.apache.hadoop.io.IntWritable;

importorg.apache.hadoop.io.Text;

importorg.apache.hadoop.io.Text;

In document Versatile Java - Deepak Mali 351 (Page 128-164)

Related documents