As an example of how Flume works, I build a single Flume agent to asynchronously take data from a Centos Linux-based message file called /var/log/messages. The message file acts as the data source and is stored in a single Linux-based channel called channel1. The data sink is on HDFS in a directory called “flume/messages.”
In the Linux hadoop account I have created a number of files to run this example of a Flume job, display the resulting data, and clean up after the job. These files make the job easier to run; there is minimal typing, and it is easier to rerun the job because the results have been removed from HDFS. The files will also display the results of the job that reside on HDFS. You can use scripts like these if you desire.
[hadoop@hc1nn ~]$ cd $HOME/flume [hadoop@hc1nn flume]$ ls
agent1.cfg flume_clean_hdfs.sh flume_exec_hdfs.sh flume_show_hdfs.sh
The file agent1.cfg is the Flume configuration file for the agent, while the Bash (.sh) files are for running the agent (flume_exec_hdfs.sh), showing the results on HDFS (flume_show_hdfs.sh), and cleaning up the data on HDFS (flume_clean_hdfs.sh). Examining each of these files in turn, we see that the show script just executes a Hadoop file system ls command against the directory /flume/messages, where the agent will write the data.
[hadoop@hc1nn flume]$ cat flume_show_hdfs.sh
#!/bin/bash
hdfs dfs -ls /flume/messages
The clean script executes a Hadoop file system remove command with a recursive switch:
[hadoop@hc1nn flume]$ cat flume_clean_hdfs.sh
#!/bin/bash
hdfs dfs -rm -r /flume/messages
The execution script, flume_execute_hdfs.sh, runs the Flume agent and needs nine lines:
[hadoop@hc1nn flume]$ cat flume_exec_hdfs.sh 1 #!/bin/bash
2
3 # run the bash agent 4
5 flume-ng agent \
6 --conf /etc/flume-ng/conf \ 7 --conf-file agent1.cfg \
8 -Dflume.root.logger=DEBUG,INFO,console \ 9 -name agent1
This execution script runs the Flume agent within a Linux Bash shell and is easily repeatable because a single script has been run, rather than retyping these options each time you want to move log file content. Line 5 actually runs the agent, while lines 6 and 7 specify the configuration directory and agent configuration file. Line 8 specifies the log4j log configuration via a -D command line option to show DEBUG, INFO, and console messages. Finally, line 9 specifies the Flume agent name agent1.
The Flume agent configuration file (agent1.cfg, in this case) must contain the agent’s source, sink, and channel.
Consider the contents of this example file:
[hadoop@hc1nn flume]$ cat agent1.cfg
1 # 2 # define agent src, channel and sink
3 # 4
5 agent1.sources = source1 6 agent1.channels = channel1 7 agent1.sinks = sink1 8
9 # ---10 # define agent channel
11 # ---12
13 agent1.channels.channel1.type = FILE 14 agent1.channels.channel1.capacity = 2000000
15 agent1.channels.channel1.checkpointInterval = 60000 16 agent1.channels.channel1.maxFileSize = 10737418240 17
18 # ---19 # define agent source
20 # ---21
22 agent1.sources.source1.type = exec
23 agent1.sources.source1.command = tail -F /var/log/messages 24 agent1.sources.source1.channels = channel1
As already defined in the agent execution script, the Flume agent name in this example is agent1. Lines 5 to 7 define the names of the source, channel, and sink.
5 agent1.sources = source1 6 agent1.channels = channel1 7 agent1.sinks = sink1
The channel (channel1) is described between lines 13 and 16. Line 13 specifies that the channel type will be a file. Line 14 indicates that the maximum capacity of the channel will be 2 million events. Line 15, in milliseconds, indicates the time between checkpoints. Line 16 specifies the maximum channel file size in bytes.
13 agent1.channels.channel1.type = FILE 14 agent1.channels.channel1.capacity = 2000000
15 agent1.channels.channel1.checkpointInterval = 60000 16 agent1.channels.channel1.maxFileSize = 10737418240
The configuration file lines (22 to 24 ) show how the Flume data source source1 is defined.
22 agent1.sources.source1.type = exec
23 agent1.sources.source1.command = tail -F /var/log/messages 24 agent1.sources.source1.channels = channel1
In this example, I may need to ensure that the Linux account I am using to run this Flume job has access to read the log file /var/log/messages. Therefore, I grant access using the root account as follows: I use the Linux su (switch user) command to change the user ID to root. Then I use the Linux chmod command to grant global read privileges while maintaining current access. The two Linux ls command listings show that extra access has been granted:
su
-ls -l /var/log/messages
-rw--- 1 root root 410520 Nov 22 09:20 /var/log/messages chmod 644 /var/log/messages
ls -l /var/log/messages
-rw-r--r-- 1 root root 410520 Nov 22 09:25 /var/log/messages exit
The source type is defined as “exec” in line 22, but Flume also supports sources of Avro, Thrift, Syslog, jms, spooldir, twittersource, seq, http, and Netcat. You also could write custom sources to consume your own data types;
see the Flume user guide at flume.apache.org for more information.
The executable command is specified at line 23 as tail -F /var/log/messages. This command causes new messages in the file to be received by the agent. Line 24 connects the source to the Flume agent channel, channel1.
Finally, lines 30 through 35 define the HDFS data sink:
30 agent1.sinks.sink1.type = hdfs
31 agent1.sinks.sink1.hdfs.path = hdfs://hc1nn/flume/messages 32 agent1.sinks.sink1.hdfs.rollInterval = 0
33 agent1.sinks.sink1.hdfs.rollSize = 1000000 34 agent1.sinks.sink1.hdfs.batchSize = 100 35 agent1.sinks.sink1.channels = channel1
In this example, the sink type is specified at line 30 to be HDFS, but it could also be a value like logger, avro, irc, hbase, or a custom sink (see the Flume user guide at flume.apache.org for futher alternatives). Line 31 specifies the HDFS location as a URI, saving the data to /flume/messages.
Line 32 indicates that the logs will not be rolled by time, owing to the value of 0, while the value at line 33 indicates that the sink will be rolled based on size. Line 34 specifies a sink batch size of 100 for writing to HDFS, and line 35 connects the channel to the sink.
For this example, I encountered the following error owing to a misconfiguration of the channel name:
2014-07-26 14:45:10,177 (conf-file-poller-0) [WARN - org.apache.flume.conf.FlumeConfiguration
$AgentConfiguration.
validateSources(FlumeConfiguration.java:589)] Could not configure source source1 due to: Failed to configure component!
This error message indicated a configuration error—in this case, it was caused by putting an “s” on the end of the channels configuration item at line 24. When corrected, the line reads as follows:
24 agent1.sources.source1.channel = channel1