The Sequencer (part 2)

In my last post I made a promise: showing the implementation of Sequencer

So please, bear with me while I review the requirements. The sequencer

  1. Is a facade: wraps an execution context, such as a thread or a pool of threads
  2. Prevents race conditions: enforces sequential execution of submitted tasks
  3. Guarantees ordering: submitted tasks are processed in the order they are submitted
  4. Avoid useless consumptions of resources.

The first one is simply a design choice, no need to elaborate. Next is how to prevent race conditions: as the execution basis is a lambda/delegate/runable of some sort, we need to prevent concurrent execution of those.

A simplistic state machine can help us. The possible states are running and waiting; transitions are triggered by incoming tasks.
If the machine is in the ‘waiting‘ state, it transitions to ‘running‘ and executes the task; when the task is done, it transitions back to ‘waiting‘.
But if the machine is in the ‘running‘ state, execution cannot start, it needs to wait for the appropriate state.
It can be implemented with a lock, the ‘running‘ state being embodied by the lock being held. What interests me is how we deal with conflicts, i.e. when a task comes in while the state is still ‘running‘.
If we respect the lock semantic, the incoming request is blocked until we reached the desired state. Blocking implies waste and potential deadlock. So no go there.

We need to complexify a bit our logic. So, when a task is submitted for execution and the state is ‘running‘, we can store it for a differed execution. There is no need to keep to current thread blocked.
As a consequence, the transition out of ‘running‘ is now: when the task is executed, if there is at least another task pending execution, we execute it (state remains ‘running‘). When there are no pending task, we can safely transition to ‘waiting‘.

Ok, time to look at some code. Disclaimer: this implementation is for educational purpose, I know it will look ugly to some, slow to others,… you name it. But I do not care, so bear with me.
For non C# readers: an Action is a parameter-less procedure ( i.e. no return value).


using System;
using System.Collections.Generic;
using System.Threading;
namespace Sequencer
{
public class Sequencer
{
private Queue<Action> _pendingTasks = new Queue<Action>();
private bool _isRunning;
private readonly object _lock = new Object();
public void Dispatch(Action action)
{
// here we queue a call to our own logic
ThreadPool.QueueUserWorkItem( (x)=> Run((Action) x), action );
}
// run when the pool has available cpu time for us.
private void Run(Action action)
{
lock(_lock) {
if (_isRunning) {
// we need to store and stop
_pendingTasks.Enqueue(action);
return;
}
// ok, we can run
_isRunning = true;
}
while (true) {
// execute the next action
action ();
// we check if others are available
lock (_lock) {
if (_pendingTasks.Count == 0) {
_isRunning = false;
return;
}
// pop the next task
action = _pendingTasks.Dequeue();
}
}
}
}
}

view raw

Sequencer_1.cs

hosted with ❤ by GitHub

As you can see, the actual implementation is slightly more complex than described earlier. Indeed, we need to cater for two atomic transactions:

  1. On a new task: starting it right away or queuing it
  2. When the task is done: fetching a new one or ‘freeing’ the sequencer

Lets look at the requirements

  • Requirement #1: we wrap an existing execution mechanism (the .Net thread pool). Nothing fancy here, it will be improved in a future revision
  • Requirement #2: done as well, it has been specifically for that purpose
  • Requirement #3: we did nothing specific here, but on the other hand it looks ok
  • Requirement #4: by using a differed execution mechanism, we prevented the need to block threads, something we know is expensive

So, are we done now?

Nope, requirements #3 is definitely not covered. On a multicore machine (basically any machine nowadays), multiple threads (from the thread pool) can start to work on tasks at the same time, and there will be a race to queue them in the sequencer.

We need to refine this implementation.
What would be your proposal to have ordering guarantee?