Lab O: Use Concurrency and Multiplicity Features
Overview
In this lab, you will use an application that demonstrates the way that StreamBase creates parallel regions and assigns them to platform threads at runtime. You will get the Java Virtual Machine (“JVM”) thread ID using a custom Java plugin function. You will also use multiplicity and data parallelism features both synchronously and asynchronously. You will use all three
multiplicity dispatch modes.
You will run your application and observe the results.
Objectives
Create a simple Java plugin function to retrieve the JVM Thread ID
Examine and understand an application that’s designed to help investigate concurrency, parallel regions, multiplicity, data parallelism, and Java thread usage
Examine the results of running an application with a single parallel region Examine the results of running an application with a two parallel regions Examine the results of synchronous multiplicity
Examine the results of concurrent multiplicity
Prerequisites
The following software installed and configured on your machine: TIBCO Streaming 10.5.0 or later
Access to the lab file
ExecutionRulesLab.zip
Disclaimer
In order to illustrate some concepts more concretely, these lab exercises use details of the StreamBase EventFlow engine implementation that TIBCO has not documented. These details relate to the use of JVM threads and the behavior of mapping JVM threads to StreamBase parallel regions at runtime. TIBCO has not documented these details because they are subject to change without notice in future StreamBase releases, and users should not rely on the specifics of these behaviors in StreamBase application designs or implementations.
Directions
Page 2
Lab O — Exercise 1: Import Application
Overview
In this exercise, you import a pre-built application into StreamBase Studio and examine its components.
Steps
1. Import a StreamBase project into your workspace. From the top menu, choose File > Import...
♦ Choose General > Existing Projects into Workspace, and click Next ♦ In the next screen, choose Select archive file
♦ Click Browse... and choose ExecutionRulesLab.zip - Click Finish
♦ Click OK to Update the Maven POM if presented with such a dialog. 2. Examine the imported project contents and identify various applications.
In Package Explorer, note the following application imported under src/main/eventflow/com.tiboc.example/executionruleslab:
♦ Concurrent.sbapp
Tip: Concurrent.sbapp will show a typecheck error. This error is
intentional and will be resolved in the next exercise. Also note the files under src/main/resources:
♦ items.sbfs
Page 4
Lab O — Exercise 2: Create a Java plugin function to retrieve the
JVM Thread ID
Overview
In this exercise, we will create a simple Java plugin function getThreadId() that returns the ID of the current JVM thread. This value will allow us to observe how Event Flow parallel regions are mapped to threads at runtime.
Steps
1. Make a custom simple function plugin called getThreadId() that returns java.lang.Thread.currentThread().getId()
Right-click on the ExecutionRules/src/main/java folder and select New > StreamBase Java Functions.
Fill in the first StreamBase Custom Java Function wizard dialog screen:
♦ The defaults are Type: Simple Function and New Class, which we will keep ♦ Package: com.tibco.example.executionruleslab
Click Next to proceed to the next dialog screen Add/Edit StreamBase Custom Simple Function:
♦ Click the Add… button to bring up the Create StreamBase Simple Function dialog, and enter:
- Function Name: getThreadId
- Return Type: long (Java: long, fastest but no nulls) - No Argument List arguments to add.
- Click OK to dismiss the dialog window.
♦ Ensure that the Create Function Aliases option is checked
♦ Click Finish to generate the new Java function’s source code as ThreadUtil.java in an Editor tab.
♦ Observe that the generated getThreadId() function is static.
Analysis: All StreamBase custom Java simple functions are static functions
♦ In the getThreadId() function: - delete the line
Page 6
// TODO Implement function here - replace the line
return 0L; - with
return java.lang.Thread.currentThread().getId();
Analysis: In order to call a member or instance function from a custom
Java simple plugin, the function must create an instance and then invoke the instance function using the created instance.
♦ Save your changes
2. Examine the getThreadId plugin function alias Open src/main/configurations/engine.conf.
Observe that the engine.conf file contains an auto-generated pluginFunctions object for the getThreadId function to declare an alias getThreadId, so that the
com.tibco.example.util.ThreadUtil.getThreadId() method may be invoked using just the alias name in a StreamBase expression, just like a built-in StreamBase function:
NOTE: the auto-generated engine.conf file shows a warning that pluginFunctions is a deprecated property. Newer releases generate the new CustomFunctionGroup.
Tip: Using a pluginFunctions object to declare an alias is optional for
simple functions – it’s possible to use the calljava() function to call the function directly: in this case, that would be
calljava(‘com.tibco.example.util.ThreadUtil’, ‘getThreadId’) – not as elegant.
3. Make sure Concurrent.sbapp typechecks
Open Concurrent.sbapp using one of these methods: Select ExecutionRules/Concurrent.sbapp in Package Explorer view and
♦ Press <F3>, or ♦ Double-click, or ♦ Right-click > Open
Page 8
Click on the Concurrent.sbapp Editor tab – it should typecheck cleanly with no errors and no red Map components
If however, Concurrent.sbapp presents a Typecheck Error such as “Unable to find builtin function or field "getThreadId": the field "getThreadId" is not available from any stream” then:
♦ Select a blank spot on the application canvas and either:
- Right-click > Refresh Project Typecheck Environment, or - Press <Ctrl-F5>
Tip: There are times when the Studio typechecker needs to discard its
incremental state and re-typecheck the entire project. Adding a plug-in function can be one of those times. Using Refresh Project Typecheck Environment can often resolve spurious typecheck errors.
Lab O — Exercise 3: Examine an EventFlow application that reports
its parallel regions and Java threads
Overview
In this exercise, we will create an EventFlow application that we will use as the basis for exploring the concurrency and multiplicity features of the StreamBase runtime as expressed in the EventFlow language.
Steps
1. Open EventFlow application called Concurrent.sbapp.
Double-click or right-click > Open or press <F3> on Concurrent.sbapp in the Package Explorer view to open the application.
The application should look like this:
2. Examine the Input Stream OrdersIn that uses OrderSchema as its schema. Select the Editor tab of Concurrent.sbapp.
Select the OrdersIn Input Stream
With OrdersIn selected, open the Edit Schema tab in the StreamBase Properties view, which will look like this:
3. Examine MainMap’s Output Settings.
Page 10
Select the MainMap Map operator
With MainMap selected, open the Output Settings tab in the StreamBase Properties view, which will look like this:
Analysis: The getThreadId()function returns the ID of the current JVM
thread. This is the custom Java plugin function created in the previous exercise. The getParallelRoot() function returns a string that is unique for every parallel region in the current StreamBase server process.
hash(item) returns a long hashcode of the value of the item field, which will be used later to select the instance to which to route tuples.
4. Examine OtherMap’s Output Settings
Select the Editor tab of Concurrent.sbapp. Select the OtherMap Map operator
♦ With OtherMap selected, open the Output Settings tab in the StreamBase Properties view, which will look like this:
Analysis: This Map adds additional fields to hold the current thread ID
and the parallel root for this operator. In addition, the Mod2 and Mod3 fields hold the result of moding the item value’s hash code against 2 and 3, respectively. These values will be illustrative in subsequent exercises to show how tuple routing occurs using multiple instances of EventFlow components.
5. Examine OutputMap’s Output Settings
Select the Editor tab of Concurrent.sbapp. Select the OutputMap Map operator
With OutputMap selected, open the Output Settings tab in the StreamBase Properties view, which will look like this:
6. Examine the application’s output schema
Select the Editor tab of Concurrent.sbapp. Select the OrdersOut Output Stream
Page 12
With OrdersOut selected, open the Schema tab in the StreamBase Properties view, which will look like this:
Analysis: Each of the three Map operators have added fields to the
schema that will allow us to observe some of the threading behavior of the StreamBase runtime with and without concurrency, and also to observe how tuples are routed to multiple instances.
Lab O — Exercise 4: Examine the results of running an application
with a single parallel region
Overview
In this exercise, we will examine the results of running a single parallel region application in terms of thread usage.
Steps
1. Run the application
Select Concurrent.sbapp and run it, for example, by clicking on the “Run” icon (white/green arrow) on the Studio toolbar.
Tip: If running a StreamBase application does not cause Studio to
automatically switch to the SB Test/Debug perspective, you can make it automatic by checking the Studio menu Window > Preferences >
StreamBase Studio > Switch to SB Test/Debug perspective when running a default launch option and then clicking on OK.
2. Play a known feed of input data
In the SB Test/Debug perspective, select the Feed Simulations view. Select the line with Name src/main/configurations/items.sbfs Press Run
The feed simultion will open the CSV file and send 100 tuples. This action should finish very quickly.
3. Observe the output data
In the Application Output view, select any output data row — the fields and values are displayed in a separate pane below
♦ Confirm that MainParallelRoot, OtherParallelRoot, OutputParallelRoot all have the value default
♦ Confirm that MainThreadId, OtherThreadId, OutputThreadId all have the same numeric value. The specific value will vary from run to run
Page 14
Analysis: Since there is only one parallel region in the application, the
parallel root value will always be the same anywhere we get it. For any given input tuple in the parallel region, the JVM thread will be the same during the entire processing of that tuple to completion.
It is likely that during an execution of this small data set through this small application that the runtime will assign the same thread to the processing of every tuple, as it did in the run from which the screen shot above was taken. However, the runtime is free to assign a different thread to each tuple processed.
Lab O — Exercise 5: Examine the results of running an application
with a two parallel regions
Overview
In this exercise, we will examine the results of running a two parallel region application in terms of thread usage.
Steps
1. Modify Concurrent.sbapp to have two parallel regions
Click the OtherMap Map operator in the Concurrent.sbapp editor pane In the Properties view, click on the Concurrency tab
Check the Run this component in a parallel region option
Click on the Editor pane to apply the change
Analysis: Notice that OtherMap is now displayed in a grey circle to
indicate that it is in a separate parallel region from the rest of the operators in the module.
Page 16
Tip: Usually a single operator such as Map does not provide enough
computational weight to justify the overhead of parallel region queuing and scheduling. It’s much more common, and better practice, to make Module References concurrent. But using a Map here is convenient to illustrate the concepts without having to understand the logic of what might be contained in a sub-module example.
Save your project (File -> Save) 2. Run Concurrent.sbapp and play the recording
Select Concurrent.sbapp and run it, for example, by clicking on the “Run” icon (white/green arrow) on the Studio toolbar
In the SB Test/Debug perspective, select the Feed Simulations view. Select the line with Name src/main/configurations/items.sbfs Press Run
Tip: The feedsim will run very quickly and stop itself when done
3. Observe the output
In the Application Output view, select any output data row — the fields and values are displayed in a separate pane below
♦ Confirm that both MainParallelRoot and OutputParallelRootl have the value default
♦ Confirm that OtherParallelRoot has the value default.OtherMap
Examine the values of MainThreadId, OtherThreadId, and OutputThreadId.
Analysis: In the screenshot below, there are three different thread IDs.
However, the StreamBase runtime could have chosen the same thread for each – that’s up to the runtime to choose.
Even though MainMap and OutputMap are in the same parallel region – default, the runtime assigned a different thread to tuples processed downstream of the default.OtherMap region.
Because of the parallel region lock, even though more than one thread may be assigned to process tuples in the same region, no more than one tuple will ever be in process in a given region at any time.
Page 18
Lab O — Exercise 6: Examine the results of synchronous
multiplicity
Overview
In this exercise, we will examine the results and threading behavior of running an application that has a component with a multiplicity of 2 and no concurrency.
Steps
1. Modify the application to perform synchronous multiplicity with Broadcast dispatch Click the OtherMap Map operator in the Concurrent.sbapp editor pane In the Properties view, click on the Output Settings tab
♦ Select the OtherParallelRoot row
♦ Click and hold the down arrow to the right of the green plus (+) icon and select Add Row Below
- Name: OtherPath - Expression: getPath()
Analysis: getPath() returns the qualified name of the path of the current
component using StreamBase Path Notation. In particular, where the Multiplicity option ensures multiple instances of the component, the path name contains an instance number, such as :0, that will allow us to identify which instance processed the tuple.
In the Properties view, click on the Concurrency tab
Un-Check the Run this component in a parallel region option Configure the Multiplicity option:
♦ Select the Multiple: radio button ♦ Number of instances: 2
♦ Ensure Dispatch Style for input port #1 is set to the Broadcast setting Click on the Editor pane to apply the changes
Save the application (File -> Save) 2. Observe the results of Broadcast multiplicity
Run the application In the Manual Input view:
♦ item: onion ♦ qty: 1
♦ Click on Send Data or press <Enter> to send a tuple to the OrdersIn input stream
Analysis: The Application Output view will show two output tuples. The
Broadcast dispatch style sends the input tuple to each of the two multiple instances of OtherMap, so that one output tuple is emitted from
OrdersOut for each of the multiple instances.
In the Application Output view, select the first output data row — the fields and values are displayed in a separate pane below
♦ Notice that the *ThreadId and *ParallelRoot fields behave as they did in Exercise 3; that is, the *ParallelRoot value is always default, the *ThreadId values are all the same value, and all the Map operators are processed the tuple in the same JDK thread
♦ Confirm that OtherPath is default.OtherMap:0 or default.OtherMap:1 In the Application Output view, select the second output data row
♦ Confirm that OtherPath is has a different value than did OtherPath for the tuple in the first row of the Application Output view.
Page 20
Analysis: The Broadcast dispatch style ensures that each multiple
instance gets a copy of each input tuple. In this case, since a Map
operator always outputs exactly one tuple for each input tuple, then this application with a Multiplicity: multiple and Number of Instances of 2 will output two tuples for each input tuple. However, since we turned off concurrency, all the processing occurs in the same thread, and each instance processes its tuple to completion before the other instance gets a chance to process its tuple to completion.
Stop the application
3. Modify the application to perform synchronous multiplicity with Round Robin dispatch Click the OtherMap Map operator in the Concurrent.sbapp editor pane Configure the Multiplicity option:
♦ Dispatch Style for input port #1: Round Robin Click on the Editor pane to apply the changes Save the application (File -> Save)
Run the application In the Manual Input view:
♦ item: onion ♦ qty: 1
♦ Click on Send Data or press <Enter> to send a tuple to the OrdersIn input stream In the Application Output view, select the first output data row — the fields and
values are displayed in a separate pane below
♦ Observe that only one tuple has been output by the application in response to the input tuple
Analysis: Using the Round Robin dispatch style, each input tuple is only
processed by one of the multiple instances, and thus a Map operator with multiple instances outputs only one tuple. The first tuple processed is routed to instance 0.
♦ Observe that OtherParallelRoot is default ♦ Confirm that OtherPath is default.OtherMap:0
♦ Observe that MainThreadId, OtherThreadId, and OutputThreadId have the same values
In the Manual Input view: ♦ item: onion
♦ qty: 2
♦ Click on Send Data or press <Enter> to send a tuple to the OrdersIn input stream In the Application Output view, select the second output data row
♦ Observe that OtherPath is default.OtherMap:1
Analysis: The second tuple processed was routed to instance 1, rather
than instance 0.
In the Manual Input view: ♦ item: onion
Page 22
♦ Click on Send Data or press <Enter> to send a tuple to the OrdersIn input stream In the Application Output view, select the third output data row
♦ Observe that OtherPath is default.OtherMap:0
Analysis: The third tuple processed was routed to instance 0, just like the
first tuple. Using Round Robin, each subsequent input tuple will be routed to the instance with the next highest instance number, until there are no more and then the next tuple is routed to instance 0 again.
In general, the Kth tuple processed is routed to instance K-1 mod N, where N is the number of multiple instances configured.
Stop the application
5. Modify the application to perform synchronous multiplicity with Numeric dispatch Click the OtherMap Map operator in the Concurrent.sbapp editor pane Configure the Multiplicity option:
♦ Dispatch Style for input port #1: Numeric ♦ Expression: hash(item)
Click on the Editor pane to apply the changes Save the application (File -> Save)
6. Observe the results of Numeric multiplicity Run the application
In the Manual Input view: ♦ item: deodorant ♦ qty: 1
♦ Click on Send Data or press <Enter> to send a tuple to the OrdersIn input stream In the Application Output view, select the output data row — the fields and values
are displayed in a separate pane below
♦ Observe that the value of HashItem is -2024022834
♦ Observe that the value of OtherPath is default.OtherMap:0 In the Manual Input view:
♦ item: onion ♦ qty: 2
♦ Click on Send Data or press <Enter> to send a tuple to the OrdersIn input stream In the Application Output view, select the second output data row — the fields and
values are displayed in a separate pane below ♦ Observe that the value of HashItem is 105892297 ♦ Observe that the value of Mod2 is 1
♦ Observe that the value of OtherPath is default.OtherMap:1 In the Manual Input view:
♦ item: gum ♦ qty: 3
♦ Click on Send Data or press <Enter> to send a tuple to the OrdersIn input stream In the Application Output view, select the third output data row — the fields and
values are displayed in a separate pane below ♦ Observe that the value of HashItem is 102719 ♦ Observe that the value of Mod2 is 1
♦ Observe that the value of OtherPath is default.OtherMap:1
Analysis: Numeric dispatch routes each input tuple to a single instance.
The instance chosen will be the value of the dispatch expression mod N, where N is the number of multiple instances
Page 24
Lab O — Exercise 7: Examine the results of concurrent multiplicity
Overview
In this exercise, we will examine the results and threading behavior of running an application that has a component with a multiplicity of 3 and concurrency.
Steps
1. Modify the application to perform concurrent (asynchronous) multiplicity Click the OtherMap Map operator in the Concurrent.sbapp editor pane In the Properties view, click on the Concurrency tab
♦ Configure the Multiplicity option: - Select the Multiple: radio button - Number of instances: 3
- Leave Dispatch Style: Numeric
♦ Re-Check the Run this component in a parallel region option
Analysis: When a component has concurrent multiplicity, the component
is display in a grey circle and has the multiplicity decorated on the upper right corner of the operator.
Concurrent multiplicity means that each of the multiple instances will be its own parallel region. Tuples can therefore be processed concurrently by each instance.
Click on the Editor pane to apply the changes Save the application (File -> Save)
2. Observe the results of Numeric multiplicity Run the application
In the Manual Input view: ♦ item: deodorant ♦ qty: 1
♦ Click on Send Data or press <Enter> to send a tuple to the OrdersIn input stream In the Application Output view, select the output data row — the fields and values
are displayed in a separate pane below ♦ Observe that the value of Mod3 is 0
♦ Observe that the value of OtherParallelRoot is default.OtherMap:0 In the Manual Input view:
♦ item: onion ♦ qty: 2
♦ Click on Send Data or press <Enter> to send a tuple to the OrdersIn input stream In the Application Output view, select the second output data row — the fields and
values are displayed in a separate pane below ♦ Observe that the value of Mod3 is 1
♦ Observe that the value of OtherParallelRoot is default.OtherMap:1 In the Manual Input view:
♦ item: gum ♦ qty: 3
♦ Click on Send Data or press <Enter> to send a tuple to the OrdersIn input stream In the Application Output view, select the third output data row — the fields and
values are displayed in a separate pane below ♦ Observe that the value of Mod3 is 2
♦ Observe that the value of OtherParallelRoot is default.OtherMap:2
Analysis: Multiplicity and concurrency are independent properties of a
component, and can work together. Multiplicity with numeric or round-robin dispatch may be used to partition tuple properties across multiple
Page 26
instances; concurrent multiplicity distributes the work of the partition across multiple cores with potentially parallel processing