• No results found

In order to analyze streaming data using STREAMMINE3G, data analysts can either choose from a set of provided standard operators or implement their own special crafted operators using user-defined-functions (UDF) by simply sub-classing a well defined operator interface.

3.2.1 Event Format

Events in STREAMMINE3G are provided as simple Byte arrays giving its users a maximum of flexibility in data representation. However, when using custom operators in combination with standard operators, events must follow and use the same predefined data format and serialization mechanisms as used by the standard operators in order to ensure compatibility and a correct processing of events.

Users not tied to specific standard operators or serialization frameworks can choose from a wide variety of techniques and frameworks publicly available for their internal data representa- tion. One of the most popular frameworks in this area is Google Protocol Buffers [Pro15] which not only provides convenient mechanisms for serializing complex data structures but also allows a seamless data exchange across platforms due to its generic binary format. However, the use of such frameworks imposes an additional overhead for event processing which can lower the overall system’s performance. In case the data can be represented in a non-nested way using solely flat data structures, and the user opted to implement the business logic using STREAMMINE3G’s native C++ interface, simple techniques such as static or dynamic casts as provided in C/C++ can be used instead.

Event Timestamp

In addition to the event itself, users can provide a timestamp with each event if an ordering of events is desired. Event ordering is crucial for order sensitive applications or applications which require precise recovery to fully mask system crashes. Order sensitivity describes the fact that the output of an operator strongly depends on the order events are being received and processed which is the case when using, e.g., non-commutative operations within an operator.

Timestamps in STREAMMINE3G are represented as 64bit integers which can be either inter- preted as physical or logical timestamp by the application developer. However, in order to ensure a correct ordering of events, operators must emit events with strictly monotonically increasing timestamps.

3.2.2 Event Loop/Process Method

When implementing custom operators, user have to subclass from the provided operator interface and overwrite the process method. The process method is the central point of operation when performing data processing in STREAMMINE3G, as it takes as input an event (provided as a Byte array), a state object (for implementing stateful operators), and a collector

object needed for emitting new events to downstream operators.

Listing 3.1 shows an implementation of the classical word count application using STREAM- MINE3G’s Java interface. The implementation consists of two operators, a map and a reduce operator. In the given example, the incoming event (provided as Byte array) is first interpreted and converted into a string object (Line 7). Since the string is representing a sentence, it is split up into chunks using Java’s StringTokenizer (Lines 8-13) where each individual word is then emitted as new event using the collector’semit()method (Line 12). Note that the word is converted back into a Byte array prior passing it to theemit()method using thegetBytes() method provided by Java’s String class.

Program Listing 3.1 Word count map and reduce operator.

1 public class WordCountMapper implements Operator 2 {

3 ...

4 /* process method of the map operator */

5 public void process (... , byte[] event , Object state , Collector collector )

6 {

7 String sentence = new String ( event );

8 StringTokenizer st = new StringTokenizer ( sentence ); 9 while (st. hasMoreElements ())

10 {

11 String word = st. nextToken ();

12 collector . emitEvent (... , word . getBytes ());

13 }

14 }

15 ...

16 } 17

18 public class WordCountReducer implements Operator 19 {

20 ...

21 /* process method of the reduce operator */

22 public void process (... , byte[] event , Object state , Collector collector )

23 {

24 String word = new String ( event ); 25 Integer count = new Integer (1) ;

26 Map < String , Integer > map = (Map < String , Integer >) state ; 27 if ( map . containsKey ( word ))

28 count = new Integer ( map . get ( word ). intValue () +1) ; 29 map . put (word , count );

30 }

31 ...

32 }

As with the map operator, the reduce operator converts the input event (i.e., the word) in a string first (Line 24). However, contrary to the map operator, the reducer is stateful where the state is composed of a hash-map keeping track of the word frequencies. Hence, the generic

state object which was provided as an additional parameter in theprocess()method is first casted to a Java HashMap object (Line 26) in order to update its counters accordingly (Lines 27-29).

3.2.3 Topology

Once the set of operators has been defined, i.e., in our example the map and reduce operator, the flow of events must be specified for carrying out a complete word count application. The flow of events is defined by an operator topology which is also often referred as a query. An operator topology in STREAMMINE3G is a directed acyclic graph2(DAG) specifying the set of upstream and downstream neighbors for each operator. Note that an operator in our system can have multiple upstream as well as multiple downstream neighbors. However, in our given example, the topology for the word count application is quite simple as the map operator has only a single downstream operator, i.e., the reduce operator as shown in Figure 3.1. Using this topology, events produced by theprocess()method of the map operator (using the collector object) will be routed through the network and provided as input events to theprocess() method of the reduce operator.

reduce reduce map map generate process process emit( ) emit( ) map reduce 1 2 3

Figure 3.1: Word count topology (left) and event flow (right).

3.2.4 Input/Output Adapter and Event Generator

In order to feed an ESP application with events, some form of a source operator is needed for converting data coming from external systems into the application specific event format or, for generating events based on some predefined patterns. Instead of defining an explicit source operator for data conversion and event production, the operator interface offers additional methods and callbacks for reading and writing raw data from TCP and file streams through the callbacks of theonReadAdapter(),generate()method, respectively. Similar as in theprocess()method, those callbacks take a collector object as input for emitting events. However, those events are provided locally to the first operator defined in the topology rather than routed across the network as shown in Figure 3.1.

Listing 3.2 shows an implementation of thegenerate()method for our word count exam- ple application. First, a static string array has been defined with an assignment of random

2The architecture of STREAMMINE3G supports also cycles, however, cycles in ESP impose certain challenges

sentences in the constructor of the map operator (Lines 6-10). The periodically calledgener-

ate()method selects randomly one sentence out of previously defined pool of phrases, i.e., the static array (Line 17) and emits it for consumption at theprocess()method of the map operator using the collector object. Hence, thegenerate()method can be envisioned as the

process()method of a virtual source operator.

Program Listing 3.2 Word count application.

1 public class WordCountMapper implements Operator 2 {

3 ...

4 public WordCountMapper ()

5 {

6 randomSentences [0] = " the cow jumped over the moon ";

7 randomSentences [1] = "an apple a day keeps the doctor away "; 8 randomSentences [2] = " four score and seven years ago "; 9 randomSentences [3] = " snow white and the seven dwarfs "; 10 randomSentences [4] = "i am at two with nature ";

11 rand = new Random ( System . currentTimeMillis ());

12 }

13

14 /* generate method of the map operator of the word count applicatin */ 15 public boolean generate ( Collector collector )

16 {

17 String sentence = randomSentences [ rand . nextInt (5) ]; 18 collector . emitEvent (... , sentence . getBytes ()); 19 return true;

20 }

21 ...

22 }

Note that from a software engineering point of view, it is also possible to make the source operator explicit through simple wrapping techniques, i.e., users would create a source op- erator class and override theprocess()method as in regular operators. The newly defined source operator is then instantiated as an object (class member) inside it’s successor operator, i.e., the map operator in our word count example where theprocess()method of the source operator is called with eachgenerate()call of the mapper instance by passing a null pointer as input event and state as arguments.