• No results found

A job is registered with the system via an RPC call to the coordinator. This call must include a location of the query to be run and the graph file to be queried. The location of the files needs to be accessible to the nodes in the system. The coordinator will create a job configuration and generate an ID for the job. This job ID will then be returned to the caller so that it knows which queue to subscribe to in order to receive results.

The next step is for the coordinator to send a message to the partitioning queue, specifying the graph file that needs to be partitioned. A partitioning node then downloads the graph and partitions it. The partitions are saved to files and uploaded to storage accessible to the workers. The partitioner is also responsible for splitting each partitions vertex IDs into small sets, so that they can be fed to workers as input. These work unit messages are sent to the active job queue for the specific job and partition. The partitioner then sends a message to the partitioning completed queue that gives the job ID and the location of the partition files.

The coordinator then creates the partition task messages that contain the job and partition pairs. Once these messages are placed in the queue, workers can receive them and start processing the graph.

When a worker receives a task message, the coordinator remembers which task the worker has. This is so that if a worker fails, the coordinator knows which task needs to be placed back in the queue for another worker to complete. When a worker has the correct graph and query file loaded, the worker starts taking messages from the queue for the particular job/partition work unit queue.

There are two types of messages that can be in a job/partition task queue. The first is an input set for the particular partition. In this case, the worker starts the motif search from each of the vertices specified. The second type is a partial solution, upon which the worker will bind the motif to the last vertex in the chain and carry on processing. Workers can use proxy vertex objects for all of the vertices in the partial motif that are not in the current partition.

A job is complete when there are no active tasks for a job and the job/partition queues associated with a job are empty. If there is no more input to process, then there can be no more results, and hence the job is finished.

6.10

Scalability

DGPROC is designed to be horizontally scalable. To add a worker, run the DGPROC worker jar on a new machine and point it to the RabbitMQ host. The worker will then register itself with the coordinator, and request a task. Scaling is limited by the capacity of the RabbitMQ host to send, receive and store messages. If many large graphs are being processed simultaneously, RabbitMQ might not be able to store all of the input set and result messages in memory. The options are then to limit the number of simultaneous jobs, or allow the messages to be written to disk. If this limit is consistently hit, it may be more effective to store larger messages such as work units in a database. Each message then becomes a reference to the ID of the entry in the database that contains the full text of the message. This will introduce a small overhead, as each work unit will have to be looked up by the worker, but this is what databases are designed to handle. The advantage of this approach is that the RabbitMQ host will have a greatly reduced workload when dealing with the work unit messages.

Throughput of messages is unlikely to be a problem in any case, as RabbitMQ is designed for thousands of messages a second. However, it is not designed to be used as a database. Using persisted queues is significantly slower than non-persisted queues.

If the system outgrows a single host, two or more hosts can be used with each job having all its associated queues on a single server. This should make it possible to balance multiple jobs running simultaneously without needing to keep track of which part of a job is on what server.

6.11

Weaknesses

There are two main points of failure in this system. First, there is the message broker. If this fails, then none of the components can communicate, rendering the system useless. The second point of failure is the coordinator. If this fails, then the workers can continue to process data, but there will be no way to add new jobs, and all job state information will be lost.

Message broker failure can be mitigated by running RabbitMQ in a cluster using High Availability mode. This means that data is replicated across two or more servers so that if one fails, then others can handle the load. There is a risk a small amount of information may be lost if the cluster has not been

synchronized recently.

Using a central message broker, twice the bandwidth is required in order to send the message directly to the recipient. This is unlikely to be a problem due to the ubiquity of gigabit and beyond networks. The largest cause of messages is likely to be cross partition motif discovery or result messages. Both of these are likely to have relatively large message contents, which will increase the load on the RabbitMQ server.

It may not be effective to store the input messages on RabbitMQ, as it is designed for throughput, not as a data store. It might prove better to go with a task management framework that can communicate via messages. This would be relatively easy to do in the current design. A proper database could then be used to store the work unit and partial motif messages.

There is no streaming fault tolerance implemented in DGPROC. How fault tolerance could be applied to DGPROC is discussed in the next section.

6.12

Fault Tolerance in Streaming Systems