Tuesday, September 24, 2013

EasyNetQ: IConsumerDispatcher

logo_design_150

EasyNetQ has always had a single dispatcher thread that runs user message handlers. This means that a slow handler on one queue can cause other handlers on other queues to wait. The intention is that one shouldn’t write long running consumers. If you are doing long running IO you should use the SubsribeAsync method and return a task that completes when the long running IO completes.

A recent change to EasyNetQ has been to make the dispatcher a separate abstraction. This means you can replace it with your own implementation if desired.

IConsumerDispatcher

Inside EasyNetQ the dispatcher receives deliveries from the RabbitMQ C# Client library and places the delivery information on an internal concurrent queue. By default, all consumers share a single internal queue. A single dispatcher thread pulls deliveries from the queue and then asks the consumer to invoke the user message handler.

The consumer dispatcher implementation is very simple, it simply maintains a queue of Action and a thread which takes those actions from the end of the queue and runs them. You could use the same pattern whenever you need to marshal an action onto a single thread. I wrote about this more general terms here.

public class ConsumerDispatcher : IConsumerDispatcher
{
    private readonly Thread dispatchThread;
    private readonly BlockingCollection<Action> queue = new BlockingCollection<Action>();
    private bool disposed;

    public ConsumerDispatcher(IEasyNetQLogger logger)
    {
        Preconditions.CheckNotNull(logger, "logger");

        dispatchThread = new Thread(_ =>
            {
                try
                {
                    while (true)
                    {
                        if (disposed) break;

                        queue.Take()();
                    }
                }
                catch (InvalidOperationException)
                {
                    // InvalidOperationException is thrown when Take is called after 
                    // queue.CompleteAdding(), this is signals that this class is being
                    // disposed, so we allow the thread to complete.
                }
                catch (Exception exception)
                {
                    logger.ErrorWrite(exception);
                }
            }) { Name = "EasyNetQ consumer dispatch thread" };
        dispatchThread.Start();
    }

    public void QueueAction(Action action)
    {
        Preconditions.CheckNotNull(action, "action");
        queue.Add(action);
    }

    public void Dispose()
    {
        queue.CompleteAdding();
        disposed = true;
    }
}

An implementation of IConsumerDispatcherFactory maintains a single instance of IConsumerDispatcher:

public class ConsumerDispatcherFactory : IConsumerDispatcherFactory
{
    private readonly Lazy<IConsumerDispatcher> dispatcher;

    public ConsumerDispatcherFactory(IEasyNetQLogger logger)
    {
        Preconditions.CheckNotNull(logger, "logger");
        
        dispatcher = new Lazy<IConsumerDispatcher>(() => new ConsumerDispatcher(logger));
    }

    public IConsumerDispatcher GetConsumerDispatcher()
    {
        return dispatcher.Value;
    }
    
    public void Dispose()
    {
        if (dispatcher.IsValueCreated)
        {
            dispatcher.Value.Dispose();
        }
    }
}

If you wanted to use an alternative dispatch strategy; say for example you (quite reasonably) wanted a new dispatch thread for each consumer, you would simply implement an alternative IConsumerDispatcherFactory and register it with EasyNetQ like this:

var bus = RabbitHutch.CreateBus("host=localhost", 
    x => x.Register<IConsumerDispatcherFactory>(_ => new MyConsumerDispatcherFactory()));

Happy messaging!

1 comment:

çağatay kalan said...

Hi,
I think it will be better to change the interface to use the consumers queue so for some very specific queues, a seperate dispatcher thread can be used but for all the others, default implementation will be enough. What do you think ?