• No results found

VI. rxdn Framework: Modular Network Control with Reactive

6.3 Design and Implementation

6.3.2 Drivers

A driver is simply a function with accepts an Observable of any type as its sink

and returns an Observable of any type as its source. The run function is responsible for calling each driver with its corresponding sink and collecting all drivers’ source Observables into the sources object which is the input tomain.

There are three drivers implemented in rxdn, and it is a simple pattern to follow to add more. The simplest is the console logging driver. As logging is a side-effect, this functionality can be implemented as a driver. Note that there is nothing technically

stopping a component from including aconsole.log statement; however, starting with

a trivial console logging is useful to show how components and drivers interact. The source of the console driver is given in Figure 42. In it, observe that it (and every driver):

• Subscribes to the input sink Observable

• Performs some side-effect, most likely related to the input sink events

• Returns an Observable, most likely related to the result of the performed side- effect

In this case, there is no “result” from printing to the console, so the returned Observable is Observable.never(). This special static instantiation method returns an Observable which never emits any events. Failing to return an Observable is an error, as the TypeScript compiler will report, since it does not adhere to the interface of a driver. Notice that this driver is declared of type Driver<string, void>, which (in TypeScript

notation) denotes that the input is an Observable of type string and the output is

an Observable of type void, the “never” Observable. (Reference Figure 41 for the

1 import {Driver} from "../interfaces"; 2 import {Observable} from "rxjs"; 3

4 /**

5 * Logs sink to the console 6 */

7 export const consoleDriver: Driver<string, void> = sink => { 8 /* tslint:disable-next-line:no-console */

9 sink.subscribe(msg => console.log(msg)); 10 return <Observable<any>> Observable.never(); 11 };

Figure 42. The rxdn console driver

A driver for state is also included. While it is possible to have accumulated state

via the scan operator within a component, the state driver shows how state can be

separated as a side-effect and lays a foundation for distributed state.

The source of the state driver is given in Figure 43. Rather than directly exporting the driver function, this driver wraps the driver function within a creation function, allowing the application to provide an initial state with which to kick off the Observable.

As shown in Figure 40, the Subjects tying together the main function input with the

output of the drivers are of type ReplaySubject(1), which means the last emitted item will be cached and accessible to any subscribers, and for state to be added to by a component, requires an initial value. The state driver cannot have a hard-coded initial value as the driver itself is generic, as shown in Figure 43 by its type ofDriver<T, T>. This indicates that the type of the input Observable is the same as the type of the output Observable.

With this simple implementation, a component can get and set state from one or more instantiated state drivers. This driver could also be extended as a database client. For example, if state is kept as a JavaScript object type, this can be serialized with JSON and sent to a document store database like MongoDB.

1 import {Driver} from "../interfaces"; 2 import {Observable} from "rxjs"; 3

4 /**

5 * Generic state driver

6 * @param {T} initialState The initial state to use 7 * @returns {Driver<T, T>}

8 *

9 * @example

10 * const updateState = otherObservable

11 * .map(value => state => state.set("key", value)); 12 */

13 export const makeStateDriver: <T>(initialState: T) => 14 Driver<T, T> = <T>(initialState: T) =>

15 (sink: Observable<(state: T) => T>) =>

16 sink

17 .scan((state, changeFn) => changeFn(state), initialState)

18 .startWith(initialState)

19 .share();

Figure 43. The rxdn state driver

Finally and most importantly, rxdn includes an OpenFlow driver. The OpenFlow driver accepts connections from OpenFlow switches and maintains a map of socket IDs to socket objects, allowing it to send any OpenFlow message to any connected switch,

as dictated by the components through its sink input. The type of the OpenFlow

driver is Driver<OFEvent, OFEvent>, where OFEvent is defined in Figure 44; the id field is a concatenation of the socket remote address and remote port as a string, the event is an enumerated type indicating why the event is occurring (e.g., a switch connected, or a message was received), and where appropriate, the type may include an error object or OpenFlow message object. This is the interface used by each component to send and receive messages to and from OpenFlow switches. The algorithm for this driver is given in Algorithm 2.

1 export enum OFEventType { 2 Connection, 3 Disconnection, 4 Error, 5 Message, 6 } 7

8 export type OFEvent =

9 {id: string, event: OFEventType.Connection} | 10 {id: string, event: OFEventType.Disconnection} | 11 {id: string, event: OFEventType.Error, error: Error} |

12 {id: string, event: OFEventType.Message, message: OF.OpenFlowMessage};

Figure 44. The OFEvent type

Algorithm 2 rxdn OpenFlow driver

subscribe tosink input

for each message event in sink do socket ← lookup idin sockets map buffer ← callencode on message

write buffer to socket

end for

source← create new Observable of type OFEvent for each new connection do

add socket to socket map emit new connection

for each new buffer from socket do messages ← calldecode with buffer for each message in messages do

emit message end for

end for end for

Related documents