In the above implementation, the following actions are performed:
• In the init method, schedule the punctuation every 1 second and retrieve the local state store by its name "Counts".
• In the process method, upon each received record, split the value string into words, and update their counts into the state store (we will talk about this feature later in the section).
• In the punctuate method, iterate the local state store and send the aggregated counts to the downstream processor, and commit the current stream state.
Processor Topology
With the customized processors defined in the Processor API, developers can use the TopologyBuilder to build a processor topology by connecting these processors together:
TopologyBuilder builder = new TopologyBuilder();
builder.addSource("SOURCE", "src-topic")
.addProcessor("PROCESS1", MyProcessor1::new /* the ProcessorSupplier that can generate MyProcessor1 */, "SOURCE") .addProcessor("PROCESS2", MyProcessor2::new /* the ProcessorSupplier that can generate MyProcessor2 */, "PROCESS1") .addProcessor("PROCESS3", MyProcessor3::new /* the ProcessorSupplier that can generate MyProcessor3 */, "PROCESS1")
.addSink("SINK1", "sink-topic1", "PROCESS1") .addSink("SINK2", "sink-topic2", "PROCESS2") .addSink("SINK3", "sink-topic3", "PROCESS3");
There are several steps in the above code to build the topology, and here is a quick walk through:
• First of all a source node named "SOURCE" is added to the topology using the addSource method, with one Kafka topic "src-topic" fed to it.
• Three processor nodes are then added using the addProcessor method; here the first processor is a child of the "SOURCE" node, but is the parent of the other two processors.
• Finally three sink nodes are added to complete the topology using the addSink method, each piping from a different parent processor node and writing to a separate topic.
Local State Store
Note that the Processor API is not limited to only accessing the current records as they arrive, but can also maintain local state stores that keep recently arrived records to use in stateful processing operations such as aggregation or windowed joins. To take advantage of this local states, developers can use the
TopologyBuilder.addStateStore method when building the processor topology to create the local state and associate it with the processor nodes that needs to access it; or they can connect a created local state store with the existing processor nodes through
TopologyBuilder.connectProcessorAndStateStores .
TopologyBuilder builder = new TopologyBuilder();
builder.addSource("SOURCE", "src-topic")
.addProcessor("PROCESS1", MyProcessor1::new, "SOURCE")
// create the in-memory state store "COUNTS" associated with processor "PROCESS1"
.addStateStore(Stores.create("COUNTS").withStringKeys().withStringValues().inMemory().build(), "PROCESS1")
.addProcessor("PROCESS2", MyProcessor3::new /* the ProcessorSupplier that can generate MyProcessor3 */, "PROCESS1") .addProcessor("PROCESS3", MyProcessor3::new /* the ProcessorSupplier that can generate MyProcessor3 */, "PROCESS1")
// connect the state store "COUNTS" with processor "PROCESS2"
.connectProcessorAndStateStores("PROCESS2", "COUNTS");
.addSink("SINK1", "sink-topic1", "PROCESS1") .addSink("SINK2", "sink-topic2", "PROCESS2") .addSink("SINK3", "sink-topic3", "PROCESS3");
In the next section we present another way to build the processor topology: the Kafka Streams DSL.
High-Level Streams DSL
To build a processor topology using the Streams DSL, developers can apply the KStreamBuilder class, which is extended from the
TopologyBuilder . A simple example is included with the source code for Kafka in the streams/examples package. The rest of this section will Download
@apachekafka Implementation Operations Security Kafka Connect Kafka Streams PERFORMANCE POWERED BY PROJECT INFO ECOSYSTEM CLIENTS EVENTS CONTACT US APACHE
walk through some code to demonstrate the key steps in creating a topology using the Streams DSL, but we recommend developers to read the full example source codes for details.
KStream and KTable
The DSL uses two main abstractions. A KStream is an abstraction of a record stream, where each data record represents a self-contained datum in the unbounded data set. A KTable is an abstraction of a changelog stream, where each data record represents an update. More precisely, the value in a data record is considered to be an update of the last value for the same record key, if any (if a corresponding key doesn't exist yet, the update will be considered a create). To illustrate the difference between KStreams and KTables, let’s imagine the following two data records are being sent to the stream:
1) --> ("alice", 3) . If these records a KStream and the stream processing application were to sum the values it would return 4 . If these records were a KTable, the return would be 3 , since the last record would be considered as an update.
Create Source Streams from Kafka
Either a record stream (defined as KStream ) or a changelog stream (defined as KTable ) can be created as a source stream from one or more Kafka topics (for KTable you can only create the source stream from a single topic).
KStreamBuilder builder = new KStreamBuilder();
KStream source1 = builder.stream("topic1", "topic2");
KTable source2 = builder.table("topic3", "stateStoreName");
Windowing a stream
A stream processor may need to divide data records into time buckets, i.e. to window the stream by time. This is usually needed for join and aggregation operations, etc. Kafka Streams currently defines the following types of windows:
• Hopping time windows are windows based on time intervals. They model fixed-sized, (possibly) overlapping windows. A hopping window is defined by two properties: the window's size and its advance interval (aka "hop"). The advance interval specifies by how much a window moves forward relative to the previous one. For example, you can configure a hopping window with a size 5 minutes and an advance interval of 1 minute. Since hopping windows can overlap a data record may belong to more than one such windows.
• Tumbling time windows are a special case of hopping time windows and, like the latter, are windows based on time intervals. They model fixed-size, non-overlapping, gap-less windows. A tumbling window is defined by a single property: the window's size. A tumbling window is a hopping window whose window size is equal to its advance interval. Since tumbling windows never overlap, a data record will belong to one and only one window.
• Sliding windows model a fixed-size window that slides continuously over the time axis; here, two data records are said to be included in the same window if the difference of their timestamps is within the window size. Thus, sliding windows are not aligned to the epoch, but on the data record timestamps. In Kafka Streams, sliding windows are used only for join operations, and can be specified through the JoinWindows class.
Joins
A join operation merges two streams based on the keys of their data records, and yields a new stream. A join over record streams usually needs to be performed on a windowing basis because otherwise the number of records that must be maintained for performing the join may grow indefinitely. In Kafka Streams, you may perform the following join operations:
• KStream-to-KStream Joins are always windowed joins, since otherwise the memory and state required to compute the join would grow infinitely in size.
Here, a newly received record from one of the streams is joined with the other stream's records within the specified window interval to produce one result for each matching pair based on user-provided ValueJoiner . A new KStream instance representing the result stream of the join is returned from this operator.
• KTable-to-KTable Joins are join operations designed to be consistent with the ones in relational databases. Here, both changelog streams are materialized into local state stores first. When a new record is received from one of the streams, it is joined with the other stream's materialized state stores to produce one result for each matching pair based on user-provided ValueJoiner. A new KTable instance representing the result stream of the join, which is also a changelog stream of the represented table, is returned from this operator.
• KStream-to-KTable Joins allow you to perform table lookups against a changelog stream ( KTable ) upon receiving a new record from another record stream (KStream). An example use case would be to enrich a stream of user activities ( KStream ) with the latest user profile information (
Only records received from the record stream will trigger the join and produce results via ValueJoiner , not vice versa (i.e., records received from the changelog stream will be used only to update the materialized state store). A new KStream instance representing the result stream of the join is returned from this operator.
Depending on the operands the following join operations are supported: inner joins, outer joins and left joins. Their semantics are similar to the corresponding operators in relational databases. a
Transform a stream
There is a list of transformation operations provided for KStream and KTable respectively. Each of these operations may generate either one or more KStream and KTable objects and can be translated into one or more connected processors into the underlying processor topology. All these transformation methods can be chained together to compose a complex processor topology. Since KStream and KTable are strongly typed, all these transformation operations are defined as generics functions where users could specify the input and output data types.
Download
@apachekafka Implementation Operations Security Kafka Connect Kafka Streams PERFORMANCE POWERED BY PROJECT INFO ECOSYSTEM CLIENTS EVENTS CONTACT US APACHE
Among these transformations, filter , map , mapValues , etc, are stateless transformation operations and can be applied to both KStream KTable , where users can usually pass a customized function to these functions as a parameter, such as Predicate for filter , KeyValueMapper for map , etc:
// written in Java 8+, using lambda expressions
KStream mapped = source1.mapValue(record -> record.get("category"));
Stateless transformations, by definition, do not depend on any state for processing, and hence implementation-wise they do not require a state store associated with the stream processor; Stateful transformations, on the other hand, require accessing an associated state for processing and producing outputs. For example, in join and aggregate operations, a windowing state is usually used to store all the received records within the defined window boundary so far. The operators can then access these accumulated records in the store and compute based on them.
// written in Java 8+, using lambda expressions KTable, Long> counts = source1.groupByKey().aggregate(
() -> 0L, // initial value
(aggKey, value, aggregate) -> aggregate + 1L, // aggregating value
TimeWindows.of("counts", 5000L).advanceBy(1000L), // intervals in milliseconds Serdes.Long() // serde for aggregated value
);
KStream joined = source1.leftJoin(source2,
(record1, record2) -> record1.get("user") + "-" + record2.get("region");
);
Write streams back to Kafka
At the end of the processing, users can choose to (continuously) write the final resulted streams back to a Kafka topic through KStream.to KTable.to .
joined.to("topic4");
If your application needs to continue reading and processing the records after they have been materialized to a topic via to above, one option is to construct a new stream that reads from the output topic; Kafka Streams provides a convenience method called through :
// equivalent to //
// joined.to("topic4");
// materialized = builder.stream("topic4");
KStream materialized = joined.through("topic4");
Besides defining the topology, developers will also need to configure their applications in StreamsConfig before running it. A complete list of Kafka Streams configs can be found here.
Download
@apachekafka Implementation Operations Security Kafka Connect Kafka Streams PERFORMANCE POWERED BY PROJECT INFO ECOSYSTEM CLIENTS EVENTS CONTACT US APACHE
The contents of this website are © 2016 Apache Software Foundation under the terms of the Apache License v2.Apache Kafka, Kafka, and the Kafka logo are either registered trademarks or trademarks of The Apache Software Foundation in the United States and other countries.
Download
@apachekafka Implementation Operations Security Kafka Connect Kafka Streams PERFORMANCE POWERED BY PROJECT INFO ECOSYSTEM CLIENTS EVENTS CONTACT US APACHE