Not Your Usual Threading
The Object-based Parallel Evaluation Environment or OPEE project explores an alternative approach to using multiple threads for system design. OPEE was purpose built to solve problems encountered in systems that involve significant processing and shared resources. Shared resources are components such as file writers, hash tables, and summary data collectors. A trading system design that touches on several of these types of resources is explored in this article.
In simple systems that only receive a request, perform a lookup, and return data there are many solutions that work well. Any decent event handle framework will do. Event Machine is a very good framework for this type of system. As soon as those requests require modifications of data or the processing steps become more CPU intensive the solutions becomes more complex. The root of most of the problems encountered are resource contention and inefficient use of threads or memory.
Instead of speaking in the abstract about the issues encounted with threaded system lets look at an example to illustrate those issues. The selected example will be described along with a classic solution, an analysis of that solution, and the solution using the OPEE approach.
Real Life Example
Lets start with a real life example of a system that I encountered. The system was not implemented in Ruby but the principles are the same. As far as I know the system is still in production although the details have been changed slightly in this article to simplify the example.
The example is a trading application that receives messages in FIX format, processes those messages, and eventually sends an order to an exchange. When a message arrives it must be stored and parsed then the parsed order object is passed to a internal matching engine, a notification is sent to the customer, an order is sent to an exchange, and all steps are logged to provide an audit trail.
Some of the steps such as the parsing take significant CPU time to complete. Other steps such as storing to a database and logging use a shared resource that can only be accessed by a single thread at a time. Assume for this example that only one database connection is used for storing the messages. Due to the way the matching engine is implemented it is also limited to one thread access at a time as is the delivery of notifications to customers.
Trade Processing Steps
The approach taken for the implementation was to write the processing steps into one body of code and then expand that solution to have multiple threads execute the same body of code in parallel. That is pretty much the way the system evolved.
How it was Implemented
Implementation of the trading system used a fixed number of threads. That number was configurable but was fixed once the application was running. There was also a feeder thread that accepted FIX messages and initiated the processing of each message. Each thread ran through the processing steps in order. When a shared resource was used such as logging a mutex was used to assure that only one thread at a time accessed that resource. The last processing step was to batch up orders and send them to the exchange. Once a thread had completed the processing of an order or message the thread picked up the next message to process.
Since all steps were performed one after the other in sequence each thread took turns being blocked as it tried to use each resource. The final step attempted to batch orders together as the sending of orders to the exchange was more efficient if orders were sent in batches. This meant threads blocked on the last step in the processing until there were either enough orders for a batch or it was not worth waiting any longer. The approach allowed each resource to be used at the same time until threads stalled waiting to send orders to the exchange. This resulted in behavior where latencies would increase in incremental steps as the system became more busy. This approach also limited the lowest latency for processing an order to the summation of the times required for each step. Since all steps had to be completed in sequence the throughput was limited by the number of threads and the total processing time for each order.
Latency or message processing time increased and became more bursty as the system became more heavily loaded due to the limited number of threads and the need to block on the last processing step until a batch was filled. A new message could not be processed until a thread became available. That meant no internal matching could be done on new orders until the older ones has been fully processed and sent to the exchange. This also reduced the opportunity for internal matching before sending to the exchange resulting lost revenue.
There was no explicit throttling control on the system other than the time it took to complete the processing at a give resource. That resulted in the input feeder thread backing up until input buffers became full. That may be the correct place to throttle the system but there was no choice involved in that decision and no options to change where the system was throttled if that was desired.
The root causes of the symptoms manifested were poor thread utilization, excessive blocking on resources, and serialization of processing.
Poor thread utilization leaves threads blocked with work still to be done. The solution described has threads blocking while waiting for resources at the same time new orders are waiting to be processed. This is due to limiting the number of threads avaliable to processing and expecting each thread to have a turn at using a resource. The design decision to serialize processing also plays into the poor thread utilization. An ideal design will keep all threads at full utilization as long as there is processing to be completed. With a resource as a bottleneck that is not always possible but striving for that goal should result in a good design.
The use of multiple threads attempting to access a resource and blocking until that resource was available coupled with serialization of the processing steps resulted in resources being idle when there were still orders that needed to use that resource. This also bound the processing thread to that resource until it could make use of it.
One problem that was not encountered was deadlocking when two threads attempt to use the same pair of resources in different order. These problems were avoided by moving the mutex protection of the finer grained resources out to encompass sets of smaller resources. This circumvented the problem at the cost of longer blocking times.
Lack of control over throttling of orders results in backups where they were not desired. In this case backups resulted in blocking new orders. It different situations it could also lead to running out of memory. Throttle control is needed when making use of multiple threads.
Classic Trade System Design
Using OPEE design principles as the foundation of the design yields a very different system. With the goals of always moving input into the system, accessing resources in parallel, and minimizing or eliminating threads from blocking a possible design is described by the paths an order takes through processing steps.
OPEE Approach Details
The processing steps become Actors and each Actor is driven by a dedicated thread. Each Actor accepts requests and places those requests on a queue. When the Actor is ready it takes the next request off the queue and processes it. Actors own the resources under its control and the Actor only uses one thread to process requests so there is no need for a mutex to protect the resources. The only use of a mutex is when adding a request to the Actor's queue. That takes almost no time and the time required is constant. Armed with these principles the design is them made up of Actors and data being processed.
The first Actor gathers order messages from the input sockets. These messages are immediately distributed using a fanout to both a store and a parser work queue. The input Actor is the only Actor that does not have a limit on the request queue size to avoid the input channel from being blocked. Since parsing of the FIX formatted order messages takes some CPU that work is distributed across some configurable number of parser Actors that take messages off the parser work queue and parse the message into an order object. The resulting order objects are then sent to both the matching engine and the customer notification Actors.
The matching engine Actor attempts to match the order with existing orders already in the engine and then either passes the order onto the exchange Actor or to the customer notifier of a successful match.
If the order is sent to the exchange Actor it is batched up and sent when it is most optimum, either batched or alone. After delivery a notification is sent to the customer notification Actor.
At each step a message is created and sent to the logger. None of those messages block but are queued up on the logger Actor's input queue.
OPEE Approach Effectiveness
The OPEE design effectively eliminates blocked threads as each Actor will attempt to process any request it receives and will not block on resource availability since there is only one thread accessing any given resource. Threads are never blocked when there is work to be done. By keeping the total number of threads less than or equal to the CPUs on the machine context switching can be drastically reduced.
CPU intensive operations such as the parsing can be spread out over multiple parser threads as no shared resource is involved. Each parser take a new messages only when it is ready. There is no scheduler needed. This allows the parsing activity to be scaled to match the performance of other processing steps.
Parallel use of resources is supported using a fanout approach. At the start message storage and parsing occur in parallel. Later matching and notification occur in parallel. This allows latency to be less than the sum of all the processing step times.
By setting up maximums on the queue lengths of all but the input Actor, other Actors put back pressure on the input Actor if they are too busy. This causes the input Actor to queue the incoming messages after taking them from the input sockets. Of course the acknowledgement of the order may be delayed but it has been received.
OPEE Trade System Design
How OPEE is Different
The intent of OPEE is to encourage alternate ways of viewing solutions to problems. As the number of CPU or cores in a machine increases making efficient use of threads can have a dramatic effect on system performance. Taking advantage of these expanded capabilities often requires thinking differently about solutions.
Separation of Data and Processing
By separating the data elements, lets call it a job, from the processing elements, the Actors in the case of OPEE it is easier to implement parallel processing systems. Care should be taken in the job to make sure sections of the job are not modified or at least not modified by two Actors at the same time. This can be accomplished with the proper design of a process flow. It can also be accomplished by isolating and encapsulating parts of the job for specific Actors.
Each Actor has it's own execution thread and often has one or more resources that it makes use of. Modifications of those resources or variables should only be done through private methods. Private methods are called through the internal queue in the Actor either by calling the ask() method or through the method_missing() method.
By using a separate thread for each Actor the OS does the thread schduling instead of having to work out those details in Ruby. In the future OPEE may make use of a C extension to implement the threading or possibly even a lighter weight thread replacement.
No Mutex Blocking
With resources held by Actors there is no need to use a mutex to avoid simultaneous access. The Actor deals with that by only using one thread to access the resource. The lack of method returns also discourages attempts to block waiting on a resource. It is possible to hack around this limitation but it is better to find a design that does not need that feature.
Flow Forward, No Returns
Actor methods should not have return values. To be more precise, methods that modify Actor variables should all be asynchronous methods implemented as private methods and invoked using the ask() method or the Actor. Any return value is ignored. By following this guideline there is no need for the use of a mutex and designs must follow a flow instead of a stack design pattern. Each Actor method is invoked in a "call and forget" manner forcing a rethinking of the design approach when compared to the more classic patterns.
OPEE systems naturally lend themselves to a flow chart model with multiple parallel paths through the chart or graph. The designs have to consider the dynamic aspects of what is taking place instead of a simple one path down and up a stack. Fan outs of processing paths may have to be merged back into a collector before an operation is considered complete. Other patterns such as work queues may also have to be used.
Queues Are Your Friend
Queues are used internally in each Actor to queue up async method invocations. There is also a WorkQueue Actor that can be used to distribute jobs across multiple workers. The use of queues and lack of returns encourages a pipelining design pattern. This tends to make more optimum use of resources that would normally be blocking resources in a design that utilized multiple parallel threads all performing the same set of operations.
Out of Band Error Handling
In a flow forward design error handing is still needed. An out of band approach fits in nicely. Errors can be handled in one centralized bit of code. Logging is much the same. If an Actor raises an Exception it passes that Exception on to a central error handler or logger. Central error handling encourages consistent handling of exceptional conditions and better designs.
Thats About It
OPEE offers a different perspective on the design of systems. It requires a more careful design but simplifies implemenation by avoiding some of the mutex and deadlocking issues common in many multi-threaded systems.