Big Data Analysis in a Message Oriented Framework
Arcot Rajasekar
1, Howard Lander
2!1 School of Information and Library Science, 2 The Renaissance Computing Institute, The University of North Carolina at Chapel Hill
The DataBridge System ABSTRACT
The!rapid!increase!in!the!amount!and!diversity! of! scien4fic! data! implies! a! corresponding! increase! in! the! poten4al! of! this! data.! The! volume,! velocity! and! variety! of! these! datasets! present! new! challenges! in! discovering! and! analyzing! data! relevant! to! a! par4cular! line! of! research.! We! propose! a! message=oriented! architecture!(MOA)!as!an!appropriate!paradigm! for! big! data! analy4cs.! MOAs! provide! loosely! coupled! interfaces! between! agents! and! analy4cal! services! and! are! independent! of! the! number! of! producers! and! consumers! of! the! data.! Subscribe! and! no4fy! models! of! communica4ons! advocated! by! the! MOA! are! well!suited!for!some!big!data!systems.!We!apply! the! MOA! paradigm! to! the! design! of! the! NSF= funded! DataBridge! project.! DataBridge! focuses! on!analyzing!and!linking!the!massive!numbers!of! small!to!moderately!sized!datasets!produced!in! the! long! tail! of! science.! AKer! considera4on! of! both!the!Data!Distribu4on!Service!(DDS)!and!the! Advanced! Message! Queuing! Protocol! (AMQP)! the! DataBridge! team! selected! the! AMQP! architecture!for!it’s!MOA.!
Why we chose AMQP
ACKNOWLEDGEMENT
This!work!is!supported!by:!
NSF! OCI=1247652,! OCI=1247602,! OCI=1247663!! “BIGDATA:!DataBridge!=!A!Sociometric!System! for!Long!tail!Science!Data!Collec4ons”!! !!
H!
AMQP vs. DDS Design Considerations
• AMQP! uses! TCP! for! transport,! providing! reliability! at! the! cost! of! performance.! DDS! supports! both! TCP! and! UDP! allowing! large! numbers!of!consumers.!!
• The! AMQP! standard! includes! rou4ng:! DDS! does! not.! Rou4ng! simplifies! load! balancing! and!allows!paaern!based!message!selec4on.!
• DDS! includes! a! number! of! different! QoS! aaributes.! ! This! provides! control! of! parameters!such!as!latency!and!redundancy! at!the!cost!of!complexity.!!
• AMQP! components! use! brokers! located! at! known! addresses.! ! This! is! simple! but! inflexible.! DDS! components! specify! the! messages!they!provide!or!require:!matching! is!done!by!the!middleware.!!
• The!DDS!standard!includes!the!API!while!the! AMQP! standard! does! not.! Switching! between! various! DDS! implementa4ons! is! s i m p l e r! t h a n! r e p l a c i n g! A M Q P! implementa4ons.!! DataBridge!Components! DataBridge!Example!Data!Flow! D a t a B r i d g e! u s e s! t h e! R a b b i t M Q! i m p l e m e n t a 4 o n! o f! A M Q P .! S e v e r a l! considera4ons!mo4vated!this!choice.!!
• We! don’t! foresee! the! DataBridge! having! a! large! number! of! components! so! we! prefer! the!reliability!of!the!TCP!protocol.!
• Paaern/topic! based! message! selec4on! is! part!of!the!DataBridge!design!so!the!rou4ng! capabili4es!of!AMQP!systems!are!cri4cal.!
• The!DataBridge!is!not!a!real!4me!system,!so! the! QoS! func4onality! provided! by! DDS! is! not! worth! the! increased! complexity! of! configura4on.!!
• The! simplicity! of! the! AMQP! broker=based! component! connec4on! system! is! sufficient! for!the!current!DataBridge!system.!
• We! will! provide! a! communica4on! class! library!for!system!components!to!send!and! receive! messages.! ! This! will! allow! us! to! change! the! AMQP! implementa4on! without! effec4ng! the! components.! We! would! provide!such!a!library!even!if!we!used!a!DDS! based!system.!
! !
Big Data Analysis in a Message Oriented Framework
Arcot Rajasekar1, Howard Lander2
The University of North Carolina at Chapel Hill Chapel Hill, North Carolina USA
rajasekar1 , howard_lander2 @unc.edu
Abstract
The rapid increase in the amount and diversity of data collected in multiple scientific domains implies a corresponding increase in the potential of data to empower important new collaborative research. However, the sheer volume, velocity and variety of these datasets present new challenges in discovering and analyzing data relevant to a particular line of research. Because of the modularity and extensibility required for a Big Data system, we propose a message-oriented architecture (MOA) as an appropriate paradigm for big data analytics. MOAs provide loosely coupled interfaces between agents and analytical services and are independent of the number of producers and consumers of the data. Subscribe and notify models of communications advocated by the MOA are well suited for the analytical requirements of certain types of big data. Specifically, we discuss our application of the MOA paradigm to the NSF-funded DataBridge project. DataBridge focuses on analyzing and linking the massive numbers of small to moderately sized datasets produced in the scientific enterprise, a region sometimes called the long tail of science. Realizing the potential of data in the long tail of science requires tools that will enable important new multidisciplinary collaborative research at scales ranging from small teams focused on relatively simple issues to large collaborations investigating grand challenge problems. After consideration of both the Data Distribution Service (DDS) and the Advanced Message Queuing Protocol (AMQP) the DataBridge selected the AMQP architecture as a backbone for loosely coupling the various services and agents that
form the modules of the system.
Keywords: Messaging; Architecture; BigData, AMQP; DDS;
1. Introduction
Big Data is characterized by high Volume, Velocity, Variety, Veracity and Value (5Vs). This characterization holds even when dealing with the ‘long tail’ big data [1] – data that is produced in relatively small quanta by such numerous sources that the overall amount is as large or larger than that produced by the far fewer number of large producers. Management and access to the data over the wide area network is performed with standard Internet protocols and has typically relied on the client-server paradigm. But the unique characteristics of big data require us to explore alternate architectures – architectures that take us away from process-centric communications towards data-centric communications.
Traditionally, client-server protocols have been the workhorse of Internet communications from email to World Wide Web, and from web services to e-commerce and social media. The client-server architecture has been sufficient for lower data volumes because the communications were primarily between known end-points (IP addresses) and did not involve a fast-changing resource environment. But the data scape is changing rapidly. As we move into the big data era,
the increased prevalence of rapidly changing or more heterogeneous sets of data producers and consumers suggests that traditional methods of communication will not suffice. Clients are rapidly evolving: getting more powerful, data-hungry, and mobile. Moreover, in this new environment, the data producers are also changing and becoming increasingly mobile: for example streaming videos from hand-held devices and biometric information from medical devices.
In message-oriented systems such as the DataBridge, producers and consumers loosely couple over the message bus with no common point of data aggregation connecting data producers and consumers. In contrast, the client-server protocol employs static service end-points and every client knows the appropriate server node with which to exchange data. In a big data environment there may be more than one end-point that provides similar information and these hosts might also be dynamically moving (or coming on and going off) across the wide area network. In this environment the concept of connecting to a static end point for a particular service is no longer sufficient.
We propose a message-oriented communication interface for handling the high volume and high velocity data exchanges possible in big data environments. We begin by discussing some of the advantages and capabilities of message oriented communication protocols with special emphasis on the DDS and AMQP protocols. We continue by examining several use case scenarios where the message orientation is appropriate.
2. Message Oriented Architectures
2.1 The Basics
Messages and various queuing systems for processing them efficiently have been topics of research interest since at least 1907 when F. Johannsen, of the Copenhagen Telephone Company published a paper entitled “Waiting times and number of calls” [2]. Agner Erlang published the first mathematically rigorous paper on the topic, “The theory of probability and telephone conversations” [3], in 1909. These two papers inaugurated the study of what has since become known as queuing theory. These initial studies concerned human-initiated telephone communications, but the basic principles carry over to automated communications between autonomous computer programs.
Many modern computer operating systems provide message queuing as a high performance option for inter-process and inter-thread communication. Examples include both real time operating systems such as VxWorks, VRTX and pSOS+ and general-purpose operating systems such as Linux and the other UNIX variants. The message queuing built in to these systems
is limited to communication between processes executing on the same hosts.
A type of software known as “Message Oriented Middleware” (MOM) provides messaging for software systems encompassing geographically distributed hosts. A fair number of commercial MOM implementations exist today. These include IBM’s WebSphere MQ [4], Microsoft Message Queuing [5] and StormMQ [6]. There have also been several open source implementations; these include JBoss Messaging [7], Apache Active MQ [8] and RabbitMQ [9].
These systems represent a number of different implementations of the MOM architecture. In this paper we will explain the advantages of a message-oriented architecture and discuss two of the most popular paradigms: the Message Centric and Data Centric paradigms. We will also discuss several Big Data system scenarios and the considerations that motivated the authors to select the Message Centric MOM architecture for the DataBridge system.
2.2 Advantages of Message Oriented Architecures
Message queue-based systems have a number of advantages over service-based systems. To start, functional units can be completely independent and decoupled. The interactions amongst the various components are restricted to the communications backbone. In addition, various applications of the system can be decoupled in time. Because many messaging implementations provide persistent queues it is not always necessary that the consuming application for a message be running when the producer of the message deposits the message into the backbone. Furthermore, message queues provide broadcast capability while service calls are by definition unicast. Once a message is deposited into the backbone, as many consumers as are interested in it can access it. These consumers can be either multiple instances of the same client configured to divide the work or distinct applications processing the message with their own individual algorithms. Finally, messages in the queue can be prioritized either by the message producer explicitly or by the message consumer or the message bus. Different consumer applications can prioritize the same message differently. This provides a more flexible application environment than a traditional service environment. In general, message queuing middleware provides these advantages by implementing a distributed network of message managers (messaging system aka postal
network) that can store and forward messages.
2.3 A Data Centric Architecture: The DDS Model
The Data Distribution Service (DDS) provides a global data space that is accessed by the message providers and consumers. The message data is organized into topics and types, which are similar to tables and table schemas in the SQL world. Data publishers write data to the global data space and subscribers read from the space. A message bus transfers data directly from the publishers to the consumers without the intervention of a broker. In addition to transferring the data, the message bus exposes and manages a robust set of quality of service (QoS) characteristics. These include how reliably the messages are delivered, the required latency for message delivery and how many times to attempt message delivery
before failing. A relationship, termed a “contract”, is made by the middleware between publishers and subscribers based on the QoS the publishers agree to provide and the QoS the consumers require.
These QoS based contracts allow the DDS model to support a very flexible set of transport options including TCP, UDP, multicast UDP, and shared memory. Consequently, DDS can support very large numbers of clients/subscribers with varying degrees of reliability and performance.
One other interesting detail about DDS is that the standard specifies the application programming interface (API). Bindings are provided for many common languages. The practical impact of this is that code is easily moved between vendor implementations of the standard. More details about DDS can be accessed via the Data Distribution Service Portal [10] maintained by the Object Management Group consortium.
2.4 A Message Centric Architecture: The AMQP Model
The Advanced Message Queuing Protocol (AMQP) [11] supports a message queuing and routing (including point-to-point and publish-notify-subscribe) mechanism for exchange of messages and has features that make it very reliable and highly secure. The AMQP provides vendor independence, platform independence, and service independence. The AMQP, as implemented by RabbitMQ and used in the DataBridge system, is an MOM system where producing applications send information to intermediating resources known as brokers. The brokers, in turn, distribute the messages to the consumer applications. The following terms are germane to the AMQP:
• Exchange: An exchange is the “mailbox” to which a
message is sent by a producer and from which the message is retrieved by a consumer.
• Queue: A queue is the virtual location to which
messages sent to an exchange are routed. Once the message is delivered to the queue it is either retrieved by the consumers on demand or delivered on a subscription basis to consumers monitoring the queue. Queues have several important properties, such as whether or not they are durable or exclusive.
• Binding: A binding is the set of rules that determine
exactly how the exchange distributes the message to the queues.
• Header: A header is information that is included along
with the message that is used to affect the distribution of messages from producers to consumers. Headers can be used to tag a message with elements of a controlled vocabulary that is then interpreted by possible consumers of the message. We can think of this usage of the header as providing a ‘meme’ for the message. Messages may be tagged with more than one meme.
There are four Exchange Types supported by AMQP. First, the direct exchange delivers messages from exchanges to client-queues based on matching the routing key specified for a message to a key specified when the queue was created.
Second, the fanout exchange delivers arriving messages to every client-queue subscribed in the exchange. Third, the topic exchange delivers arriving messages to the queues based on a match between the message routing key and a pattern asserted for the queue. Finally, the header exchange delivers arriving messages to queues when the headers of the arriving message matches the value set for the queue. More details about the AMQP protocol can be found in [12].
2.5 Design Considerations
While both DDS and AMQP implement message queuing, there are a number of differences not just in their philosophies, as described above, but in the real world consequences of choosing either one. Key differences between the two include the following:
• AMQP supports only TCP as a transport protocol and
requires an acknowledgement of each message. This ensures reliability at the cost of performance. DDS supports both TCP and UDP (including multicast), so DDS excels in a scenario with large numbers of consumers.
• The AMQP standard includes support for routing,
while the DDS standard does not, though some DDS vendors do offer routing. Routing simplifies load balancing and allows pattern based message selection.
• The DDS standard includes a number of different
QoS attributes, while the AMQP standard does not. This allows much more control of delivery parameters such as latency and redundancy at the cost of increased complexity.
• In AMQP systems, the individual components of the
system communicate with each other through brokers located at known addresses. This is simple to configure but somewhat inflexible. By contrast, components of a DDS system find each other by specifying the messages they either provide or require. The matching is then done primarily by the DDS software.
• The DDS standard includes the specification of the
API while the AMQP standard does not. Switching between various DDS implementations will therefore be simpler than replacing the AMQP component of the system.
For an overview of the DDS and AMQP systems including a more thorough comparison of their respective strengths and weaknesses, please see [13].
3. Big Data Scenarios
We now present three hypothetical scenarios that we assert are good candidates for an MOA. We are currently designing and implementing the DataBridge system discussed in the third
scenario.
3.1 Scenario 1
Consider sensors that measure temperature and forward that data to a messaging system: anyone with a smart phone can use it to transmit their location and temperature to the
messaging system asserting a unique meme. Suppose a meteorologist is performing real-time modeling for predicting micro-weather (possibly hail or tornado) in a certain region. She can subscribe to a temp-sensor meme, get real-time data at locations of interest, and modify her model input in real-time to get a more accurate weather prediction. Similar scenarios can be envisioned for other applications including better earthquake detection, traffic information, flight delay predictions, and map of communicable disease outbreaks. Similar types of crowd-sourced predictions are being attempted in traditional client-server social media, but we believe that the superior flexibility of the message-oriented architecture, as described in the previous section, is the correct choice for this type of scientific workflow.
3.2 Scenario 2
Consider a scientist interested in data that relates to her field of interest, for example the thyroid gland. He can subscribe to the messaging system using this particular meme and receive notifications whenever any new data is reported on this subject. This can be very helpful to both novice researchers in a field and to experienced scientists. Moreover, if the memes are part of a controlled vocabulary, the researcher can also get notified about other data of interest. Similar types of data may be of interest to scientists in related areas of research such as biology and environmental science. Such topic-based data sharing will be greatly enhanced by using the message oriented distribution architecture, compared to the limited abilities of
the traditional client-server architecture.
3.3 Scenario 3
In DataBridge, we plan to implement a suite of sociometric network algorithms (SNA) that are applied to a collection of datasets to define semantic distances between them. In the current implementation, we have several relevance algorithms implemented for testing. These semantic similarities can be used to cluster the data into communities. As a new dataset is ingested into the DataBridge system, multiple algorithms can be applied and a new derived dataset is placed into appropriate communities based on semantic similarities. In the DataBridge vision, a scientist can subscribe to a meme (based on a data community of interest) and get notified as a new dataset of interest is added to the system. Scientists can also submit their own relevance and clustering algorithms that may further partition a community and can subscribe to be notified when communities of interest are updated.
The above scenario is one example of how a message-oriented paradigm can enrich publication, discovery, and access to the growing multitude of scientific datasets in the long tail of science. These ideas are not new - social media has adopted similar paradigms to deal with user-generated content such as micro-blogging, crowd sourcing, online sharing, content production, virtual worlds, and gaming. The Ocean Observatories Initiative [14] is also defining their marine network using MOM. Our belief is that the message-oriented pattern will be essential for large-scale data sharing systems with integrated data processing. The message-oriented backbone that we are building for DataBridge will provide a prototype for testing scalability, extensibility, and usability of this architecture in a big data environment.
4. The DataBridge Project
The DataBridge system [15, 16] is designed to be highly distributed and highly extensible. DataBridge is comprised not just of a sizable number of components, but a variety of different types of components. Each component type will have a defined interface to a message passing communication backbone.
In designing the DataBridge we have selected the RabbitMQ implementation of AMQP. Note that we also believe we could have built the DataBridge successfully with a DDS system, but several considerations motivated our choice of AMQP. Referring back to the key differences we mentioned in the Design Considerations section above, these are:
• We don’t foresee the DataBridge having a large
number (> 100) of discrete components, so we prefer the reliability of an acknowledged TCP protocol.
• Pattern/meme based message selection is part of the
DataBridge design so the inherent routing capabilities provided by AMQP systems are attractive.
• The DataBridge is not a real time system, so the QoS
functionality provided by DDS is not worth the increased complexity of configuration.
• The simplicity of the AMQP broker-based component
connection system seems sufficient for the DataBridge system, though we do see the advantages of the system-based matching provided by DDS. As the DataBridge system grows, we recognize the possibility that this estimation will change.
• Our intent is to provide a simple AMQP based
communication class library for individual system components to use. This will allow us to change the AMQP implementation as needed without imposing a coding burden on the components. We would have provided such a library even if we had decided on a DDS based system so we feel comfortable with the lack of a standardized AMQP API.
Figure 1 depicts the core components of the DataBridge system and their designed interactions with the AMQP based middleware.
The MOM middleware bus is the orchestration mechanism that ties together all the loosely coupled components. Each of the component types can have multiple implementations and multiple instances of these implementations. For example, there might be several data reformatters – one each for reformatting DVN metadata to RDF format, DICOM metadata to AVU-triplets, or FITS metadata to XML schema. Each of these reformatters can subscribe to messages that are related to metadata in the input format(s) they are designed to process. As long as each unit understands the portions of the controlled message vocabulary relevant to itself, it can work independently of interactions with the rest of the system. New reformatters and other data harvesters can be made part of the DataBridge system by just identifying the message ‘meme’ they are interested in. These units can publish their results as messages to the MOM using specific memes that might be of interest to other consumers. Hence, the asynchronously connected system provides a loosely coupled architecture that comes together to share data products and “camp on” messages of interest. This provides an open framework where these producers and consumers of data products can join and leave as their interest dictates - similar to how people interact in social media. An advantage of the MOM architecture is that one can have multiple workflows configured using messages and they can work asynchronously without much human interaction. Indeed the MOM can be easily used to implement a Map-Reduce paradigm on a distributed dynamically changing set of workers.
Authentication, authorization, security, and privacy are of concern in such an extensible architecture, but AMQP-based middleware provides administrative facilities to alleviate such
concerns and provide strong communication reliability.
Here we briefly explain how the different components apply the AMQP model through an example of how datasets from a data producer such as the Dataverse Network (DVN) [17, 18] will flow through the DataBridge. The DVN is an e-science collaboration environment used to publish, share, reference, extract, and analyze large numbers of research studies that are, individually, typically small to medium in size. The DVN serves many fields of study and uses, and complies with many content-agnostic standards. DVN will be a source for data in the DataBridge project.
In this example, a metadata gatherer harvests the metadata for all datasets in a particular DVN (producer) instance. At this point our specified MOA allows several choices. Either the metadata will be formatted as a message and sent directly to an appropriately configured topic exchange, or, if the metadata is large, it will be written to a file and the URL for the file will be the content of the message. In either case, the metadata is now available to whatever system component is interested in it. The metadata can then be processed by a reformatter interested in a specific metadata format, or a metadata database that can store the metadata in RDF format or by both. These components will read the message, locate the data in either the URL or the
message, and process it as appropriate. Themetadata database
component will then post a message to the queuing system specifying that metadata is available; similarly the reformatter will post a message about the reformatted metadata.
Eventually the metadata will be of interest to a relevance engine that will pull this message and retrieve the metadata. The relevance engine will process the harvested and re-formatted metadata to calculate the similarity of the new or updated set of datasets to other datasets using various relevance algorithms and sociometric network analysis algorithms. The output of the relevance engine is a similarity network represented by a non-directional graph where the nodes of the graph are the individual datasets in the Dataverse and the edges represent the similarity measures encoded in the relevance engine. Note that each individual algorithm will produce a new set of edges in a multi-dimensional network.
While one set of edges might represent similarity derived from information about users of the dataset, another set of edges may be the result of comparing the metadata provided with the datasets. A comprehensive section on the various distance measures that can be implemented as the relevance algorithm is included in [15]. As before, either the graph will be formatted as a message and sent directly to an appropriately configured topic exchange or it will be written to a file and the URL for the file will be the content of the message. At this point a network writer process will handle the task of adding the graph to a network database. Finally, this multi-dimensional relevance network information is made available to the visualization and query components (consumer) using an analogous message based protocol. Figure 2 is a visual representation of the data flow that implements one variant of this example.
Figure 2: Data flow diagram including messages and components.
5. Conclusions
The 5V characteristics of Big Data impose new and unprecedented demands on communications between various components of an analytic system. These demands motivate our investigation of system designs intended to replace traditional tightly coupled service-oriented client-server architectures. In this paper we discuss several different types of message-oriented architectures and discuss our selection of one that allows for servers and clients, data producers and consumers, and analyzers and synthesizers to exchange data efficiently in a virtual collaboration environment based on a controlled vocabulary of message memes. We explore this paradigm through our design of the DataBridge project – an e-Science collaboration system that seeks to enable researchers by discovering and exploiting new modalities of linkages between data sets.
Acknowledgment
The DataBridge project is funded by the NSF Office of Cyberinfrastructure 1247652, 1247602, OCI-1247663 grant, “BIGDATA: Mid-Scale: ESCE: DCM: Collaborative Research: DataBridge - A Sociometric System for Long tail Science Data Collections”, (2012-2015).
References
[1] C. Palmer, M. Cragin, P. Heidorn, and L. Smith, “Data curation for the long tail of science: The case of environmental sciences.” Paper presented at the Third International Digital Curation Conference, Washington, DC., Dec. 2007.
[2] F. Johannsen, “Waiting times and number of calls”, Reprinted in Post
Office Electrical Engineers Journal, London, October 1910
[3] A. Erlang, “The Theory of Probabilities and Telephone Conversations”,
Nyt Tidsskrift for Matematik, Vol. 20, No. B. (1909), pp 33 – 39
[4] WebSphere MQ, http://www-03.ibm.com/software/products/en/wmq [5] Message Queuing (MSMQ)
http://msdn.microsoft.com/en-us/library/ms711472(v=vs.85).aspx [6] StormMQ http://stormmq.com/
[7] JBoss Messaging http://www.jboss.org/jbossmessaging/ [8] Apache ActiveMQ, http://activemq.apache.org [9] RabbitMQ, http://www.rabbitmq.com
[10] Data Distribution Service Portal, http://portals.omg.org/dds/ [11] Advanced Message Queuing Protocol, http://www.amqp.org
[12] AMQP Model Explained, http://www.rabbitmq.com/tutorials/amqp-concepts.html
[13] S.Schneider, “What’s The Difference Between DDS and AMQP”,
Electronic Design, April 15, 2013,
http://electronicdesign.com/embedded/what-s-difference-between-dds-and-amqp
[14] The Ocean Observatories Initiative, http://www.oceanobservatories.org [15] A. Rajasekar, H. Kum, M. Crosas, J. Crabtree, S. Sankaran, H. Lander,
T. Carsey, G. King, and J. Zhan. “The DataBridge”, Science Journal, ASE Vol 2, No 1, 2013.
[16] Databridge, http://www.databridge.web.unc.edu
[17] G. King, “An Introduction to the Dataverse Network as an Infrastructure for Data Sharing”, Sociological Methods & Research, vol 36(2), 2007, pp. 173-199.
[18] DVN, The Dataverse Network, http://thedata.org