The Sequencer (part 2)


In my last post I made a promise: showing the implementation of Sequencer

So please, bear with me while I review the requirements. The sequencer

  1. Is a facade: wraps an execution context, such as a thread or a pool of threads
  2. Prevents race conditions: enforces sequential execution of submitted tasks
  3. Guarantees ordering: submitted tasks are processed in the order they are submitted
  4. Avoid useless consumptions of resources.

The first one is simply a design choice, no need to elaborate. Next is how to prevent race conditions: as the execution basis is a lambda/delegate/runable of some sort, we need to prevent concurrent execution of those.

A simplistic state machine can help us. The possible states are running and waiting; transitions are triggered by incoming tasks.
If the machine is in the ‘waiting‘ state, it transitions to ‘running‘ and executes the task; when the task is done, it transitions back to ‘waiting‘.
But if the machine is in the ‘running‘ state, execution cannot start, it needs to wait for the appropriate state.
It can be implemented with a lock, the ‘running‘ state being embodied by the lock being held. What interests me is how we deal with conflicts, i.e. when a task comes in while the state is still ‘running‘.
If we respect the lock semantic, the incoming request is blocked until we reached the desired state. Blocking implies waste and potential deadlock. So no go there.

We need to complexify a bit our logic. So, when a task is submitted for execution and the state is ‘running‘, we can store it for a differed execution. There is no need to keep to current thread blocked.
As a consequence, the transition out of ‘running‘ is now: when the task is executed, if there is at least another task pending execution, we execute it (state remains ‘running‘). When there are no pending task, we can safely transition to ‘waiting‘.

Ok, time to look at some code. Disclaimer: this implementation is for educational purpose, I know it will look ugly to some, slow to others,… you name it. But I do not care, so bear with me.
For non C# readers: an Action is a parameter-less procedure ( i.e. no return value).


using System;
using System.Collections.Generic;
using System.Threading;
namespace Sequencer
{
public class Sequencer
{
private Queue<Action> _pendingTasks = new Queue<Action>();
private bool _isRunning;
private readonly object _lock = new Object();
public void Dispatch(Action action)
{
// here we queue a call to our own logic
ThreadPool.QueueUserWorkItem( (x)=> Run((Action) x), action );
}
// run when the pool has available cpu time for us.
private void Run(Action action)
{
lock(_lock) {
if (_isRunning) {
// we need to store and stop
_pendingTasks.Enqueue(action);
return;
}
// ok, we can run
_isRunning = true;
}
while (true) {
// execute the next action
action ();
// we check if others are available
lock (_lock) {
if (_pendingTasks.Count == 0) {
_isRunning = false;
return;
}
// pop the next task
action = _pendingTasks.Dequeue();
}
}
}
}
}

view raw

Sequencer_1.cs

hosted with ❤ by GitHub

As you can see, the actual implementation is slightly more complex than described earlier. Indeed, we need to cater for two atomic transactions:

  1. On a new task: starting it right away or queuing it
  2. When the task is done: fetching a new one or ‘freeing’ the sequencer

Lets look at the requirements

  • Requirement #1: we wrap an existing execution mechanism (the .Net thread pool). Nothing fancy here, it will be improved in a future revision
  • Requirement #2: done as well, it has been specifically for that purpose
  • Requirement #3: we did nothing specific here, but on the other hand it looks ok
  • Requirement #4: by using a differed execution mechanism, we prevented the need to block threads, something we know is expensive

So, are we done now?

Nope, requirements #3 is definitely not covered. On a multicore machine (basically any machine nowadays), multiple threads (from the thread pool) can start to work on tasks at the same time, and there will be a race to queue them in the sequencer.

We need to refine this implementation.
What would be your proposal to have ordering guarantee?

14 thoughts on “The Sequencer (part 2)

  1. To improve this “educational purpose” implementation of the Sequencer, I suggest that you call for contributions before you release the solution (like you usually do with your “spot the deadlock” game). Something like: “how would you improve that code to guarantee that tasks will be processed in the same order they are submitted?”

    Like

  2. A quick fix could be to move to first lock block to the Dispatch method.

    But I wonder why you didn’t go with a BlockingCollection + dedicated thread solution ?

    Like

    1. Hello
      Moving the lock (and the task queueing) to the Dispatch method would indeed secure the order. But it will introduce another issue…

      Like

    2. Regarding the dedicated thread approach: the general idea is to avoid having a dedicated thread, so that you can have hundreds, thousands or even more of sequencers.

      Like

  3. Possible fix for #3 but probably introducing other issues (I feel like a task could be forgotten on the queue with no one processing it …)


    using System;
    using System.Collections.Generic;
    using System.Threading;
    namespace Sequencer
    {
    using System.Collections.Concurrent;
    public class Sequencer
    {
    private readonly ConcurrentQueue<Action> _pendingTasks = new ConcurrentQueue<Action>();
    private readonly object _running = new Object();
    public void Dispatch(Action action)
    {
    // Queue the task
    _pendingTasks.Enqueue(action);
    // Schedule a processing run (may be a noop)
    ThreadPool.QueueUserWorkItem( x=> Run());
    }
    // run when the pool has available cpu time for us.
    private void Run()
    {
    if (Monitor.TryEnter(_running))
    {
    try
    {
    Action taskToRun;
    while (_pendingTasks.TryDequeue(out taskToRun))
    {
    taskToRun();
    }
    }
    finally
    {
    Monitor.Exit(_running);
    }
    }
    }
    }
    }

    view raw

    Sequencer_1.cs

    hosted with ❤ by GitHub

    Like

    1. I am afraid your fears may be justified indeed.
      In a way this code matches requirements #3, but mostly because you will discard any concurrent task.

      Like

      1. I object to the discarding of ‘any’ concurrent tasks 🙂

        Only tasks that were enqueued after the while loop is finished, before the _running lock is released and for which the Run() call has been processed in the meantime will be kept in the queue until another task is dispatched. So no discarding but potentially excessive (or infinite) waiting…

        But I agree that if there is a bug in the implementation of this kind of fundamental construct they are worthless 🙂

        >

        Like

        1. My bad indeed, so I stand corrected. For some reason I did not realized you moved the task queuing :-).
          Yes, some tasks my be arbitrarily delayed. But there is another issue as well, more subtle, that I will comment tomorrow.
          Many thanks for your valuable contribution and please go on.

          Like

  4. A simple impl. in Java that support any number of sequencer on the same executor


    package sequencer;
    import java.util.Queue;
    import java.util.concurrent.ConcurrentLinkedQueue;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.atomic.AtomicBoolean;
    import java.util.function.Supplier;
    /**
    * @author <a href="http://twitter.com/aloyer">@aloyer</a&gt;
    */
    public class Sequencer {
    private final ExecutionUnit executionUnit;
    private final AtomicBoolean shutdown = new AtomicBoolean();
    public Sequencer(Supplier<ExecutorService> executor) {
    this.executionUnit = new ExecutionUnit(executor);
    }
    public void dispatch(Runnable runnable) {
    executionUnit.dispatch(runnable);
    }
    private static class ExecutionUnit implements Runnable {
    private final Queue<Runnable> queue = new ConcurrentLinkedQueue<>();
    private final Supplier<ExecutorService> executor;
    private volatile boolean isRunning = false;
    public ExecutionUnit(Supplier<ExecutorService> executor) {
    this.executor = executor;
    }
    public synchronized void dispatch(Runnable runnable) {
    queue.add(runnable);
    if (!isRunning) {
    dispatchNext();
    }
    }
    public synchronized void taskDone() {
    isRunning = false;
    dispatchNext();
    }
    private synchronized void dispatchNext() {
    if (queue.isEmpty())
    return;
    isRunning = true;
    executor.get().submit(this);
    }
    private void executeNextTask() {
    try {
    Runnable runnable = queue.poll();
    if (runnable != null)
    runnable.run();
    } catch (Exception ie) {
    exceptionPolicyHandle(ie);
    }
    }
    @Override
    public void run() {
    try {
    executeNextTask();
    } catch (Exception ex) {
    exceptionPolicyHandle(ex);
    } finally {
    taskDone();
    }
    }
    private void exceptionPolicyHandle(Exception ex) {
    // ignore and continue to next task ?
    }
    }
    }

    view raw

    Sequencer.java

    hosted with ❤ by GitHub

    Like

  5. Many thanks for your input. I have a positive impression that it fulfills the requirements. So congrats!
    It is quite different from the one I have, so it may give me ideas for improvements.
    Once I have the opportunity to torture and test it a bit I will give you more feedback.

    Like

    1. Smart implementation, indeed. I suspect exception handling would hit the current code simplicity.
      I will comment it further in an upcoming post.

      Like

  6. Of course, this is only a basic implementation. I don’t like the fact that the task queue cannot be monitored. A simple counter may help, but, in a latency sensitive context, we could need more control over the queue, so conflation strategies could be applied and actions could be taken when the queue gets too big. It might also be a good idea to have a well defined set of dedicated worker threads instead of using the ThreadPool.

    Like

Leave a comment

This site uses Akismet to reduce spam. Learn how your comment data is processed.