Monday, July 18, 2011

An Action Cache

Do you ever find yourself in a loop calling a method that expects an Action or a Func as an argument? Here’s an example from an EasyNetQ test method where I’m doing just that:

[Test, Explicit("Needs a Rabbit instance on localhost to work")]
public void Should_be_able_to_do_simple_request_response_lots()
{
for (int i = 0; i < 1000; i++)
{
var request = new TestRequestMessage { Text = "Hello from the client! " + i.ToString() };
bus.Request<TestRequestMessage, TestResponseMessage>(request, response =>
Console.WriteLine("Got response: '{0}'", response.Text));
}

Thread.Sleep(1000);
}

My initial naive implementation of IBus.Request set up a new response subscription each time Request was called. Obviously this is inefficient. It would be much nicer if I could identify when Request is called more than once with the same callback and re-use the subscription.

The question I had was: how can I uniquely identify each callback? It turns out that action.Method.GetHashcode() reliably identifies a unique action. I can demonstrate this with the following code:

public class UniquelyIdentifyDelegate
{
readonly IDictionary<int, Action> actionCache = new Dictionary<int, Action>();

public void DemonstrateActionCache()
{
for (var i=0; i < 3; i++)
{
RunAction(() => Console.Out.WriteLine("Hello from A {0}", i));
RunAction(() => Console.Out.WriteLine("Hello from B {0}", i));

Console.Out.WriteLine("");
}
}

public void RunAction(Action action)
{
Console.Out.WriteLine("Mehod = {0}, Cache Size = {1}", action.Method.GetHashCode(), actionCache.Count);
if (!actionCache.ContainsKey(action.Method.GetHashCode()))
{
actionCache.Add(action.Method.GetHashCode(), action);
}

var actionFromCache = actionCache[action.Method.GetHashCode()];

actionFromCache();
}
}


Here, I’m creating an action cache keyed on the action method’s hashcode. Then I’m calling RunAction a few times with two distinct action delegates. Note that they also close over a variable, i, from the outer scope.

Running DemonstrateActionCache() outputs the expected result:

Mehod = 59022676, Cache Size = 0
Hello from A 0
Mehod = 62968415, Cache Size = 1
Hello from B 0

Mehod = 59022676, Cache Size = 2
Hello from A 1
Mehod = 62968415, Cache Size = 2
Hello from B 1

Mehod = 59022676, Cache Size = 2
Hello from A 2
Mehod = 62968415, Cache Size = 2
Hello from B 2

Rather nice I think :)

Task Parallel Library: How To Write a Simple Delay Task

I just had a need for a delay task. A simple method that I can call to create a task that will turn a Func<T> into a Task<T> that will execute after a given delay.

The starting point for any Task creation based on an external asynchronous operation, like a Timer callback, is the TaskCompletionSource class.  It provides methods to transition the task it creates to different states. You call SetResult when the operation is completes, SetException if the operation fails, and SetCancelled if you want to cancel the task.

Here’s my RunDelayed method:

private static Task<T> RunDelayed<T>(int millisecondsDelay, Func<T> func)
{
if (func == null)
{
throw new ArgumentNullException("func");
}
if (millisecondsDelay < 0)
{
throw new ArgumentOutOfRangeException("millisecondsDelay");
}

var taskCompletionSource = new TaskCompletionSource<T>();

var timer = new Timer(self =>
{
((Timer)self).Dispose();
try
{
var result = func();
taskCompletionSource.SetResult(result);
}
catch (Exception exception)
{
taskCompletionSource.SetException(exception);
}
});
timer.Change(millisecondsDelay, millisecondsDelay);

return taskCompletionSource.Task;
}

I simply create a new TaskCompletionSource and a Timer where the callback calls SetResult with the result of the given Func<T>. If the Func<T> throws, we simply catch the exception and call SetException. Finally we start the timer and return the Task.

You would use it like this:

var task = RunDelayed(1000, () => "Hello World!");
task.ContinueWith(t =>
{
// 'Hello World' is output a second later on a threadpool thread.
Console.WriteLine(t.Result);
});

You can use the same technique to turn any asynchronous operation into a Task.

Note however if your operation exposes an APM API, it’s much easier to use the Task.Factory.FromAsync method.

Thursday, July 14, 2011

EasyNetQ: How Should a Messaging Client Handle Errors?

EasyNetQ is my simple .NET API for RabbitMQ.

I’ve started thinking about the best patterns for implementing error handling in EasyNetQ. One of the aims of EasyNetQ is to remove as many infrastructure concerns from the application developer as possible. This means that the API should correctly handle any exceptions that bubble up from the application layer.

One of the core requirements is that we shouldn’t lose messages when the application throws. The question then becomes: where should the message, that the application was consuming when it threw, go? There seem to be three choices:

  1. Put the failed message back on the queue it was consumed from.
  2. Put the failed message on an error queue.
  3. A combination of 1 and 2.

Option 1 has the benefit that it’s the out-of-the-box behaviour of AMQP. In the case of EasyNetQ, I would simply catch any exceptions, log them, and just send a noAck command back to RabbitMQ. Rabbit would put the message at the back of the queue and then resend it when it got to the front.

Another advantage of this technique is that it gives competing consumers the opportunity to process the message. If you have more than one consumer on a queue, Rabbit will send the messages to them in turn, so this is out-of-the-box.

The drawback of this method is that there’s the possibility of the queue filling up with failed messages. The consumer would just be cycling around throwing exceptions and any messages that it might be able to to consume would be slowed down by having to wait their turn amongst a long queue of failed messages.

Another problem is that it’s difficult to manually inspect the messages and selectively delete or retry them.

Option 2 is harder to implement. When an error occurs I would wrap the failed message in a special error message wrapper. This can include details about the type and location of the exception and other information such as stack traces. I would then publish the error message to an error exchange. Each consumer queue should have a matching error exchange. This gives the opportunity to bind generic error queues to all error exchanges, but also to have special case error consumers for particular queues.

I would need to write an error queue consumer to store the messages in a database. I would then need to provide the user with some way to inspect the messages alongside the error that caused them to arrive in the error queue so that they could make a ignore/retry decision.

I could also implement some kind of wait-and-retry function on the error queue, but that would also add additional complexity.

It has the advantage that the original queue remains clear of failing messages. Failed messages and the error condition that caused the failure can be inspected together, and failed messages can be manually ignored or retried.

With the failed messages sitting in a database, it would also be simple to create a mechanism where those messages could be replayed on a developer machine to aid in debugging.

A combination of 1 and 2. I’m moving towards thinking that a combination of 1 & 2 might be the best strategy. When a message fails initially, we simply noAck it and it goes back to the queue. AMQP provides a Redelivered flag, so when the messages is consumed a second time we can be aware that it’s a retry. Unfortunately there doesn’t seem to be a retry count in AMQP, so the best we can do is allow for a single retry. This has the benefit that it gives a competing consumer a chance to process the message.

No retry count is a problem. One option some people use is to roll their own ‘nack’ mechanism. In this case, when an error occurs in the consumer, rather than sending a ‘nack’ to Rabbit and relying on the built-in behaviour, the client ‘acks’ the message to remove it from the queue, and then re-publishes it via the default exchange back to the originating queue. Doing this gives the client access to the message and allows a ‘retry count’ header to be set.

After the single retry we fall back to Option 2. The message is passed to the error queue on the second failure.

I would be very interested in hearing how other people have implemented error handling with AMQP/RabbitMQ.

Updated based on feedback on the 15th July

Wednesday, July 13, 2011

MEF DirectoryCatalog Fails to Load Assemblies

I had an interesting problem with the Managed Extensibility Framework yesterday. I’m using the DirectoryCatalog to load assemblies from a given directory. Pretty standard stuff. When I tested my host on my developer machine, it got the works on my machine badge, but when I ran the host on one of our servers, it ignored all the assemblies.

Nothing loaded …

Hmm …

It turns out, after much digging and help from my Twitter crew,  that the assembly loader that MEF’s DirectoryCatalog uses ignores any files that have a URL Zone set. I described these zones in detail in my previous post here:

http://mikehadlow.blogspot.com/2011/07/detecting-and-changing-files-internet.html

Because we copy our plugins from a file share, Windows was marking them as belonging to the Intranet Zone. Thus the odd only-when-deployed behaviour.

How you deal with this depends on whether you think that files marked in this way represent a security threat or not. If you do, the best policy is to detect any assemblies in your DirectoryCatalogue directory that have a Zone set and log them. You can do that with the System.Security.Policy.Zone class:

var zone = Zone.CreateFromUrl("file:///C:/temp/ZoneTest.doc");
if (zone.SecurityZone != SecurityZone.MyComputer)
{
Console.WriteLine("File is blocked");
}
Console.Out.WriteLine("zone.SecurityZone = {0}", zone.SecurityZone);

If you don’t consider files copied from elsewhere a security concern, but rather a feature of your operating procedure, then you can clear the Zone flags from all the assemblies in the directory with the help of Richard Deeming’s Trinet.Core.IO.Ntfs library. I wrote a little class using this:

public class UrlZoneService
{
public static void ClearUrlZonesInDirectory(string directoryPath)
{
foreach (var filePath in Directory.EnumerateFiles(directoryPath))
{
var fileInfo = new FileInfo(filePath);
fileInfo.DeleteAlternateDataStream("Zone.Identifier");
}
}
}

I just run this before initiating my DirectoryCatalogue and now network copied assemblies load as expected.

Detecting and Changing a File’s Internet Zone in .NET: Alternate Data Streams

I spent most of yesterday investigating some weird behaviour in MEF, which I’ll discuss in another post. I was saved by Twitter in the guise of @Grumpydev, @jordanterrell and @SQLChap who came to the rescue and led me down a very interesting rabbit hole, to a world of URL Zones and Alternate Data Streams. Thanks chaps!

If you download a file from the internet on Windows 2003 or later, right click, and select properties, you’ll see something like this:

BlockedFile

The file is ‘blocked’ which means that you will get various dialogues if you try to say, run an executable with this flag set.

Any file on NTFS can have a ‘Zone’ as the flag is called. The values are described in this enumeration:

typedef enum tagURLZONE {
URLZONE_INVALID = -1,
URLZONE_PREDEFINED_MIN = 0,
URLZONE_LOCAL_MACHINE = 0,
URLZONE_INTRANET,
URLZONE_TRUSTED,
URLZONE_INTERNET,
URLZONE_UNTRUSTED,
URLZONE_PREDEFINED_MAX = 999,
URLZONE_USER_MIN = 1000,
URLZONE_USER_MAX = 10000
} URLZONE;

The Zone is not standard security information stored in the file’s ACL. Instead it uses a little known feature of NTFS, ‘Alternate Data Streams’ (ADS).

Sysinternals provide a command line utility streams.exe that you can use to inspect and remove ADSs, including the Zone flag, on a file or a whole directory tree of files.

You can access a file’s Zone in .NET by using the System.Security.Policy.Zone class. Like this:

var zone = Zone.CreateFromUrl("file:///C:/temp/ZoneTest.doc");
if (zone.SecurityZone != SecurityZone.MyComputer)
{
Console.WriteLine("File is blocked");
}
Console.Out.WriteLine("zone.SecurityZone = {0}", zone.SecurityZone);

If you want to create, view and delate ADSs in .NET you will need to resort to pInvoke, there is no support for them in the BCL. Luckily for us, Richard Deeming, has done the work for us and created a set of classes that wrap the NTFS API. You can read about it here and get the code from GitHub here.

Using Richard’s library, you can list the ADSs for a file and their values like this:

var fileInfo = new FileInfo(path);

foreach (var alternateDataStream in fileInfo.ListAlternateDataStreams())
{
Console.WriteLine("{0} - {1}", alternateDataStream.Name, alternateDataStream.Size);
}

// Read the "Zone.Identifier" stream, if it exists:
if (fileInfo.AlternateDataStreamExists("Zone.Identifier"))
{
Console.WriteLine("Found zone identifier stream:");

var s = fileInfo.GetAlternateDataStream("Zone.Identifier",FileMode.Open);
using (TextReader reader = s.OpenText())
{
Console.WriteLine(reader.ReadToEnd());
}
}
else
{
Console.WriteLine("No zone identifier stream found.");
}

When I run this against a file downloaded from the internet I get this output:

Zone.Identifier - 26
Found zone identifier stream:
[ZoneTransfer]
ZoneId=3

You can see that the ZoneId = 3, so this file’s Zone is URLZONE_INTERNET.

You can delete an ADS like this:

var fileInfo = new FileInfo(path);
fileInfo.DeleteAlternateDataStream("Zone.Identifier");

And lastly you can set the ZoneId like this. Here I’m changing a file to have a internet zone:

var fileInfo = new FileInfo(path);

var ads = new AlternateDataStreamInfo(path, "Zone.Identifier", null, false);
using(var stream = ads.OpenWrite())
using(var writer = new StreamWriter(stream))
{
writer.WriteLine("[ZoneTransfer]");
writer.WriteLine("ZoneId=3");
}

ADSs are very interesting, and open up a whole load of possibilities. Imagine storing application specific metadata in an ADS for example. I’d be very interested to hear if anyone has used them in this way.

Monday, July 11, 2011

RabbitMQ Subscriptions with the DotNet Client

RabbitMQ comes with a nice .NET client called, appropriately enough, ‘RabbitMQ DotNet Client’. It does a good job of implementing the AMQP protocol in .NET and comes with excellent documentation, which is good because there are some interesting subtleties in its usage. This is because AMQP is designed with flexibility in mind and supports a mind boggling array of possible messaging patterns. But as with any API, with flexibility comes complexity.

The aim of EasyNetQ, my simple messaging API for RabbitMQ on .NET, is to hide much of this complexity and provide a very simple to use interface. But in order to make it simple I have had to take away much of the flexibility of AMQP and instead provide a strongly opinionated view of one way of using RabbitMQ with .NET.

Today I’m going to discuss how Subscriptions work with the RabbitMQ DotNet Client  (RDC) and some of the choices that I’ve made in EasyNetQ.

You create a subscription using the RDC with the AMQP command ‘basic consume’. You pass in the name of the queue you want to consume from.

channel.BasicConsume(ackNackQueue, noAck, consumer);

If you use the default QueueingBasicConsumer, the RabbitMQ server then takes messages from the queue you specified and sends them over the network to the RDC. The RDC has a dedicated worker thread that listens to a TCP socket and pulls the messages off as they arrive and places them on a shared thread-safe queue. The client application, in my case EasyNetQ, pulls messages off the shared queue on its own thread and processes them as required. Once it has processed the message it can acknowledge that it has completed by sending an AMQP ‘basic ack’ command. At that point the RabbitMQ server removes the message from its queue.

RabbitMQDotNetClient

Now, what happens if messages are arriving faster than the user application can process them? The shared queue will gradually fill up with messages and eventually the process will run out of memory. That’s a bad thing. To fix this, you can limit the number of messages that RabbitMQ will send to the RDC before they are acknowledged with the Quality of Service prefetchCount setting.

channel.BasicQos(0, prefetchCount, false);

The default value for prefetchCount is zero, which means that there is no limit. If you set prefetchCount to any other positive value, that will be the maximum number of messages that the RDC’s queue will hold at any one time. Setting the prefectchCount to a reasonably high number will allow RabbitMQ to more efficiently stream messages across the network.

What happens if the shared queue is full of messages and my client application crashes? Won’t all the messages be lost? No, because messages are only removed from the RabbitMQ queue when the user application sends the basic ack message. The messages queued in the RDC’s shared queue are not acknowledged and so will not yet have been removed from the RabbitMQ queue.

However, if when you call ‘basic consume’ you pass in true for ‘noAck’ then the messages will be removed from the RabbitMQ queue as they are transmitted across the network. You would use this setting if you’re not worried about loosing some messages, but need them to be transmitted as efficiently as possible.

For EasyNetQ, I’ve made the default settings as follows: 1000 messages for the prefetchCount and noAck to be false. I’m assuming that most users will value reliability over performance. Eventually I hope to provide some dial with setting like ‘high throughput, low reliability’, ‘low throughput, high reliability’, but for now I’m going for reliability.

I’d be very interested to hear from anyone who’s using RabbitMQ with .NET and how they have configured these settings.

Sunday, July 10, 2011

What is a Closure?

This question came up at the last Brighton ALT.NET Beers. It proved almost impossible to discuss in words without seeing some code, so here’s my attempt to explain closures in C#. Wikipedia says:

In computer science, a closure (also lexical closure, function closure or function value) is a function together with a referencing environment for the nonlocal names (free variables) of that function. Such a function is said to be "closed over" its free variables. The referencing environment binds the nonlocal names to the corresponding variables in scope at the time the closure is created, additionally extending their lifetime to at least as long as the lifetime of the closure itself.

So a closure is a function that ‘captures’ or ‘closes over’ variables that it references from the scope in which it was created. Yes, hard to picture, but actually much easier to understand when you see some code.

var x = 1;

Action action = () =>
{
var y = 2;
var result = x + y;
Console.Out.WriteLine("result = {0}", result);
};

action();

Here we first define a variable ‘x’ with a value of 1. We then define an anonymous function delegate (a lambda expression) of type Action. Action takes no parameters and returns no result, but if you look at the definition of ‘action’, you can see that ‘x’ is used. It is ‘captured’ or ‘closed over’ and automatically added to action’s environment.

When we execute action it prints out the expected result. Note that the original ‘x’ can be out of scope by the time we execute action and it will still work.

It’s interesting to look at ‘action’ in the debugger. We can see that the C# compiler has created a Target class for us and populated it with x:

closure_in_debugger

Closures (along with higher order functions) are incredibly useful. If you’ve ever done any serious Javascript programming you’ll know that they can be used to replace much of the functionality of object oriented languages like C#. I wrote an example playing with this idea in C# a while back.

As usual, John Skeet covers closures in far more detail. Check this chapter from C# in Depth for more information, including the common pitfalls you can run into.

Tuesday, July 05, 2011

The First Rule of Threading: You Don’t Need Threads!

I’ve recently been introduced to a code base that illustrates a very common threading anti-pattern. Say you’ve got a batch of data that you need to process, but processing each item takes a significant amount of time. Doing each item sequentially means that the entire batch takes an unacceptably long time. A naive approach to solving this problem is to create a new thread to process each item. Something like this:

foreach (var item in batch)
{
var itemToProcess = item;
var thread = new Thread(_ => ProcessItem(itemToProcess));
thread.Start();
}

The problem with this is that each thread takes significant resources to setup and maintain. If there are hundreds of items in the batch we could find ourselves short of memory.

It’s worth considering why ProcessItem takes so long. Most business applications don’t do processor intensive work. If you’re not protein folding, the reason your process is talking a long time is usually because it’s waiting on IO – communicating with the database or web services somewhere, or reading and writing files. Remember, IO operations aren’t somewhat slower than processor bound ones, they are many many orders of magnitude slower. As Gustavo Duarte says in his excellent post What Your Computer Does While You Wait:

Reading from L1 cache is like grabbing a piece of paper from your desk (3 seconds), L2 cache is picking up a book from a nearby shelf (14 seconds), and main system memory is taking a 4-minute walk down the hall to buy a Twix bar. Keeping with the office analogy, waiting for a hard drive seek is like leaving the building to roam the earth for one year and three months.

You don’t need to keep a thread around while you’re waiting for an IO operation to complete. Windows will look after the IO operation for you, so long as you use the correct API. If you are writing these kinds of batch operations, you should always favour asynchronous IO over spawning threads. Most (but not all unfortunately) IO operations in the Base Class Library (BCL) have asynchronous versions based on the Asynchronous Programming Model (APM). So, for example:

string MyIoOperation(string arg)

Would have an equivalent pair of APM methods:

IAsyncResult BeginMyIoOperation(string arg, AsyncCallback callback, object state);
string EndMyIoOperation(IAsyncResult);

You typically ignore the return value from BeginXXX and call the EndXXX inside a delegate you provide for the AsyncCallback:

BeginMyIoOperation("Hello World", asyncResult => 
{
var result = EndMyIoOperation(asyncResult);
}, null);

Your main thread doesn’t block when you call BeginMyIoOperation, so you can run hundreds of them in short order. Eventually your IO operations will complete and the callback you defined will be run on a worker thread in the CLR’s thread pool. Profiling your application will show that only a handful of threads are used while your hundreds of IO operations happily run in parallel. Much nicer!

Of course all this will become much easier with the async features of C# 5, but that’s no excuse not to do the right thing today with the APM.