Thursday, July 26, 2012

Priority With RabbitMQ implementation in .NET

- Follow up several posts about RabbitMQ and Burrow.NET, this post is about the Priority implementation for RabbitMQ in Burrow.NET. Actually, We have it implemented in the library a few weeks ago. However, the code probably has bugs and we need more time to tune and fix those bugs. At this point, I'm confident to say the implementation is quite stable and It's time for a blog post.

- Basically, the implementation is following this post. We split the logical queue to several physical queues and use a memory queue to aggregate messages from those physical queues. The in-memory-queue is implemented using the IntervalHeap from C5 library and the code rests in side Burrow.Extras package as I hope RabbitMQ will support priority in the future. If they do, it will be the good time to get rid of this implementation without modifying much code in core Burrow.NET core. - We've followed the same pattern mentioned in Doug Barth post. We let each physical queue decide how much unacked messages can be delivered to client using PrefetchSize so if you have n priority, the internal queue will have capacity of n x PrefetchSize. It will block RabbitMQ from sending more messages to client if the internal memory queue is full so the client has to process and ack messages from high priorty to low priority order and then get more messages.

- The physical queues have the same prefetch size and we would recommend to keep the prefetch size approximately as double as the processing velocity. if you set prefetsize too high compare to the processing velocity, the only drawback we can think of is that the internal memory queue will unnecessary contain so much messages, and that could lead to high memory consumption.

- In our production code, we need 5 priorities so there will be 5 threads created to consume messages from those physical queues. We got several issues with ThreadPool and TPL in our production environment as the window service was getting slow after a while running. We figured out that should never used ThreadPool for long running processing. I'll post about this topic later in Aug but for now if you have a long running window service and it takes time plus lot of IO operations to process your messages, use normal Thread instead of threads from pool. In order to do that, set following line at your BootStrapper:
Global.DefaultTaskCreationOptionsProvider = () => TaskCreationOptions.LongRunning;

- This implementation uses an exchange type Header to delivery messages with a priority value to the expected queue. Someone said the exchange Header has lower performance than the others but I'm think it's good because I do need the messages alive after server reboot, that means I need the messages to be persited to disk by setting the PersistentMode to true. That eventually reduces the maximum potential performance of RabbitMQ as mentioned in their docs. Anyway, it's not very difficult to switch to different exchange type such as Topic or Direct if we really need.

- So, here are the steps to setup and consume from the priority queues:

1/ Declare exchange and queues

ExchangeSetupData exchangeSetupData = new HeaderExchangeSetupData();
QueueSetupData queueSetupData = new PriorityQueueSetupData(3)
    SubscriptionName = "YourApplicationName", 

var connectonString = ConfigurationManager.ConnectionStrings["RabbitMQ"].ToString();
var environment = "PROD";

Func<string, string, IRouteFinder> factory = (environment, exchangeType) => new YourRouteFinderImplementation();
var setup = new PriorityQueuesRabbitSetup(factory, Global.DefaultWatcher, connectonString, environment);
setup.SetupExchangeAndQueueFor<YourMessageType>(exchangeSetupData, queueSetupData);

2/ Publish a priority message

using(var tunnel = RabbitTunnel.Factory.WithPrioritySupport().Create().WithPrioritySupport())
    tunnel.SetSerializer(new JsonSerializer());
    tunnel.SetRouteFinder(new YourRouteFinderImplementation());

    uint priority = 3;

    tunnel.Publish(new YourMessageType() , priority);     

3/ Consume from a logical queue with priority support asynchronously

const ushort maxPriorityLevel = 3;
Global.PreFetchSize = 64;
var tunnel = RabbitTunnel.Factory.WithPrioritySupport()

tunnel.SubscribeAsync<YourMessageType>("YourApplicationName", maxPriorityLevel, msg => {
    // Process message here

4/ Consume from a logical queue with priority support synchronously

MessageDeliverEventArgs arg = null;
var subscriber = tunnel.Subscribe<YourMessageType>("YourApplicationName", maxPriorityLevel, (msg, eventArgs) =>
    arg = eventArgs;
    // Process message here 

// Later on
subscriber.Ack(arg.ConsumerTag, arg.DeliveryTag);

Hope it can help someone's problem

Monday, July 23, 2012

DotNet Hotfix KB 2640103

I've a window service which has been running pretty well for a long time. Recently I have installed a .NET framework extended update and it probably caused this error:

Description: The process was terminated due to an internal error in the .NET Runtime at IP 6B484BC2 (6B300000) with exit code 80131506

This error message leads me to this support page: It seems to be so many people got the same problem and they've been trying to download this hot fix. I think Microsoft has not officially released it yet.

So if you met the same problem and need the hot fix, download it here, more info:

The problem seems to be fixed after I've installed it as my windows service has been running smoothly for 2 days without crashing. It used to randomly crash within a day.

Goood luck.