Adaptive Load Balancing Method Enabling Auto-Specifying
Threshold of Node Load Status for Apache Flume
UnGyu Han and Jinho Ahn
Dept. of Comp. Scie., Kyonggi Univ., Iuidong, Yeongtong, Suwon 443-760
Gyeonggi, Republic of Korea
{nick_hug,jhahn}@kgu.ac.kr
Abstract
Typically, the previous load balancing methods for Flume which completely depends on the user-specified threshold does not adaptively deal with the performance change of the entire log processing system at runtime. Furthermore, their task-transferring algorithm aggravates the performance degradation of the overloaded node because the excessive data transfer to another node should be done on the overloaded node. In this paper, we propose a new load balancing method for Apache Flume by automatically and dynamically modifying threshold of node load status in accordance with the runtime performance of the system. This feature can be realized by monitoring both the increasing rate of incoming log information in the queue of each collector agent and its occupancy rate at the request of the overloaded or under-loaded collection node in a decentralized manner. The proposed method considerably alleviates the additional overhead incurred by the task migration and makes the load of the entire system as fair as possible by selecting the optimal task migration destination depending on the current load-state values of collector agents unlike the previous round-robin and random ones.
Keywords: Data intensive processing, Data collection, Apache Flume, Agent, Load balancing
1. Introduction
As data generated in the Internet today explosively increases day by day, web log data their users leave daily rapidly emerge as valuable assets for web service companies. The big data includes both structured data and unstructured data and mostly unstructured data is video and audio data generated in today's web and mobile application. It presents an opportunity to create unprecedented business advantage and better service delivery. It also requires new infrastructures and new ways of thinking about how business and IT industry works. However, this unstructured data requires a new repository for itself because storing the data in relational databases like MySQL or Oracle isn’t appropriate for analysis in an efficient manner. For this purpose, HDFS(Hadoop Distributed File System) has been most frequently utilized as a well-known among many candidate storages [1]. However, the data users left cannot be automatically collected by using only HDFS. HDFS does not handle data collection, but is just in charge of reliable storage of unstructured data. Thus, Flume as one of the most promising log data aggregator solutions, is used for playing this role of helping making large-scale HDFSs in an
transferring job.
In this paper, we propose a load management method that works with the Flume together to compensate for the disadvantages of Flume mentioned above. The proposed method automatically provide the system with more accurate thresholds by dynamically configuring them according to the current system performance unlike the previous one. Based on these thresholds, it makes some log producing nodes attached to each highly loaded node reconnected to under -loaded nodes to minimize the number of unevenly loaded collection nodes. It performs a dynamic load -balancing to make the system environment able to handle incoming log workload as more as possible with the same set of collection agents compared with the e xisting one.
The remainder of this paper is organized as follows. In sections 2 and 3, we describe the theoretical background related to Apache Flume and load balancing methods and problems of the previous methods used for Flume. Section 4 and 5 introduce our load balancing method and show performance evaluation results for claiming the effectiveness of the method. In section 6, we summaries this paper.
2. Related Work
2-1. Apache Flume
Flume is a distributed log aggregator log data collector that collects online log data, especially unstructured data, into various kinds of storages such as HDFS, HBase and so on. Its versions are divided into FlumeOG and FlumeNG. Each class of Flume consists of the same logical nodes, but is different from each othe r in terms of the internal structure. In this paper, our load balancing method is constructed only for FlumeNG (hereafter named just Flume).
Each logical node as the primary unit in Flume is called agent. System structure of the Flume is shown in Figure 1. Flume agent is divided into source, sink and channel. Data input comes into the source and goes out through channel to the sink. The incoming data is input to the channel via a serialization process inside the source component and the channel stores the data while the sink component processes another data and sends it to the sink component when it requests next available data. Finally, the sink component converts the data to the final storage form and sends it to the final storage [2].
Figure 1. Flume Architecture
Each component is used to set the type of input and output data. Although Flume system configuration is possible by using a single agent, multiple agent configuration is generally preferred to the first choice. The general system structure using the Flume is shown in Figure 2. Agents A-F and collector agents A-C are all Flume agents, but each agent is a data generating node that is responsible for fetching the data from their sources and each collector agent is a collection node which serves to collect and store aggregated data on the final storage.
Figure 2. Flume Basic System Configuration 2-2. Load Balancing
handled at a collection agent within a user-specified threshold time, it determines the agent to be overloaded and resets connections to several data generating agents to another collection nodes [8].
3. Problem of the Flume
In this section, we identify which kinds of problems the existing method of the current Flume has in detail. Each collection node may face some performance problem when data processing capability of the node is unexpectedly exceeded due to the enormous amount of log workload suddenly coming from data generating agents connected to itself. On the contrary, if the amount of data transmitted to the collection node is too small compared with its data processing capacity, the node may become under-utilized, even remaining almost in the idle state.
Figure 3 shows an example that overloaded and under-loaded nodes have occurred. In this case, the existing method forces overloaded collection node to transmit some excessive data the node keeps in its queue to another in either the round-robin or random way.
This problem of the existing method fundamentally results from not considering the load condition of the task taking collection node. Therefore, when the method attempts to load balance the entire system, its performance may significantly varies depending on the state of the receiving collection node. There is also a problem that the method is entirely dependent on his or her specified threshold. If its value is set too high, the method is infrequently invoked even if there are several overloaded nodes. Otherwise, the opposite behavior may occur. Therefore, the Flume requires an effective load balancing method to be able to adapt to dynamic characteristics of incoming workload.
Figure 3. An Example of Overloaded and Under-loaded Node Occurrence in the Flume
4. Proposed Load Balancing System
When a collection node is determined as overloaded node, our proposed load balancing method enables some among data generating nodes connected to the overloaded collection node to be reconnected to the most suitable low -loaded node. In here, which node is overloaded or under-loaded is determined based on both the increasing rate of incoming log information in the queue of each collector agent an d its occupancy rate. Also, it can automatically change the threshold value based on performance and load of each collection node.
Figure 4 shows a process for determining the load state of the collection node. First, it checks the amount of log data filled in the current system channel. If the data is occupied in more than 60% of the channel capacity of each collection node, the node may be overloaded. Next, if the current threshold value for the node is more than 1, the node is considered overloaded. Otherwise, it is supposed to be in a normally loaded state. If the data is occupied in less than 40% of the channel capacity of each collection node, the node may be under-loaded. If the current threshold value for the node is less than 0.5, the node is considered under -loaded. Otherwise, it is supposed to be in a normally loaded state.
If the node is marked overloaded, the method forces it to be still in the same status even though its channel occupancy rate is less than 60%. If the occupancy rate is less than 50%, the node load status is changed to normal. The reason why the two occupancy levels are used for delaying the node status transition is to avoid the ping pong effect resulting from agent connection reestablishment. Similarly, if the node is marked under-loaded, it cannot change its load status to normal even though
9: if threshold < 0.5 10: node.stat = under-loaded 11: else 12: node.stat = normal 13: else 14: node.stat = normal
Figure 4. Algorithm of Node Load Decision Process
Figure 5 shows a load balancing process for overloaded node. When the collection node is determined to be overloaded, it first receives the node load status information with the information about its connected data generating agents from the other collection nodes. The node load status information of each collection node consists of three elements, its identifier, host location information and available channel occupancy rate. Also, the information about each data generating node is composed of its agent identifier, host location information, its connecting collection node’s identifier and data generating rate. Based on the information received from the other nodes, if there are a list of suitable collection nodes capable of accommodating excessive data generating agents that the overloaded node tries to transfer, the method takes the next step to changes destinations of the data generating agents to the collection nodes depending on their current channel capability.
Algorithm 2 Load balancing process for overloaded node 1: if node.stat == overloaded
2: node_list[] = list of the other collector nodes 3: i =: node_list.length
4: k =: 0
5: while ( k < i && node.stat == overloaded) 6: if node_list[k].stat == underloaded or normal 7: host = node_list[k].host
8: transferable_throughput = node_list[k].available_throughput 9: agents = node.agents_list
10: source_nodes = Find_Transferred_Nodes(transferable_throughput, agents) 11: Change_SourcesToCollector_Mapping(host, source_nodes)
12: k++
13: if node.stat == overloaded
14: min_agents = get_minimum_transferred_agents(agents.total_throughput) 15: reserved_for_changed_nodes(min_agents)
Figure 6 shows a load balancing process for under-loaded node. When the collection node is determined to be under-loaded, it broadcasts a request with its node load status information to a group of collection nodes to allocate at least one additional data generating agent to itself. The under-loaded collection node waits until it receives the node load status information about its connected data generating agents from the other group members. With the received status information, it retrieves a list of relatively heavy-loaded collection nodes incapable of handling excessive data generating agents that the under-loaded node can take over. If there is at least one collection that have to alleviate its burden on excessive i ncoming data workload, the method takes the next step to changes destinations of the data generating agents from the collection nodes to itself until its channel occupancy rate goes up to 50%.
Algorithm 3 Load balancing process for under-loaded node 1: if node.stat == low load
2: agents[] = list of recommended agent nodes 3: i =: agents.length
4: k =: 0
5: while ( k < i && node.stat == low load)
6: isConnectable = Find_Optimal_Nodes (node.avail_throughput, agents[k]); 7: if isConnectable == true
8: change_to_under-loaded node(agents[k].collector) 9: k++
Figure 6. Algorithm of Load Balancing Process for Under-loaded Node Although multiple load transferring requests occur simultaneously, our proposed method doesn’t perform the ambiguous decision making at all thanks to the following reason. In the method, when any overloaded or under-loaded collection node makes requests for workload transferring, every request should first goes to the leader among a group of collection nodes to obtain its sequence number. Afterwards, the requesting node broadcasts its request to a group of collection nodes .
5. Performance Evaluation
5-1. Experimental Environment
Table 1. Server Specification Used for Experiment
OS Ubuntu Server 13.10
CPU Intel Xeon 3.10GHz
RAM DDR3 8GB
Development language JAVA
In this section, we analyse the performance of the proposed load balancing method compared with that of the Flume default. Experimental environment is shown in Table 1. Flume cluster configuration for the experiment is made in 20
axis represents the load test type number and the vertical axis, total elapsed time. The time unit of the vertical axis is minute. T1 is randomized load test type and T2 is concentrated load test type. In case of T1 and T2, the proposed method reduces up to 20% and 30% of the total time over Flume default setting.
Figure 8 shows the data arrival time of the two methods depending on the total system load stress level ranging from 40% to 100%. The horizontal axis means the degree of the total load stress level and the vertical axis, the data processing time. The unit of the vertical axis is the milliseconds. Experimental results show that as the load stress level increases, their performance gap between the two methods significantly becomes higher up to 30%.
From this results, we can see that our method can distribute unbalanced workload coming from data generating agent nodes on a group of collection nodes more evenly compare with the Flume default setting and if overloaded and under-loaded collection nodes coexist, this reduction rate may become higher.
Figure 8. Experimental Results of the Third Test Type
6. Conclusions
This paper observed existing Flume system has a problem that its load balancing method completely depends on the threshold value the current user has specified for system configuration before executing it. To address this problem, we present a new load balancing method to improve the performance of Apache Flume data collector. The proposed method has solved the problem by automatically changing the threshold value based on performance and load of each node. Therefore, the method can select the most suitable collection node based on both the increasing rate of incoming log information in the queue of each collector agent and its occupancy rate at the request of the overloaded or under-loaded collection node. With these desirable features, our method can distribute unbalanced workload coming from data generating agent nodes on a group of collection nodes more evenly compare with the Flume default setting and if overloaded and under-loaded collection nodes coexist, its effectiveness may rise up. Simulation results show that our method can obtain up to more than 30% performance gains compared with the Flume default in terms of total data processing time.
Acknowledgements
This work was supported by the Gyeonggi Regional Research Center (GRRC) and Contents Convergence Software (CCS) research center in Korea (Project No.: GRRC Kyonggi 2014-B04). This paper is a revised and expanded version of a paper entitled “Dynamic Load Balancing Method for Apache Flume Log Processing” presented at Int’l Conference on Information Science and Technology, held at HNA Grand Hotel Mingguang Haikou, Hainan China, in December 20-23, 2014.
Authors
Un-Gyu Han received his B.S. degrees in Computer Science from Kyonggi University, Korea, in 2013, respectively. He has been a M.S. student in Department of Computer Science, Kyonggi University from 2013. His research interests include distributed computing, big data solution, P2P networks and social network.
Jinho Ahn received his B.S., M.S. and Ph.D. degrees in Computer Science and Engineering from Korea University, Korea, in 1997, 1999 and 2003, respectively. Since 2003, he has been a full professor in Department of Computer Science, Kyonggi University. He has published more than 80 papers in refereed journals and conference proceedings and served as program or organizing committee member or session chair in several domestic/international conferences and editor-in-chief of journal of Korean Institute of Information Technology and editorial board member of journal of Korean Society for Internet Information. His research interests include distributed computing, fault-tolerance, sensor networks and mobile agent systems.