- Follow up several posts about RabbitMQ and Burrow.NET, this post is about the Priority implementation for RabbitMQ in Burrow.NET. Actually, We have it implemented in the library a few weeks ago. However, the code probably has bugs and we need more time to tune and fix those bugs. At this point, I'm confident to say the implementation is quite stable and It's time for a blog post.
- Basically, the implementation is following this post. We split the logical queue to several physical queues and use a memory queue to aggregate messages from those physical queues. The in-memory-queue is implemented using the IntervalHeap from C5 library and the code rests in side Burrow.Extras package as I hope RabbitMQ will support priority in the future. If they do, it will be the good time to get rid of this implementation without modifying much code in core Burrow.NET core. - We've followed the same pattern mentioned in Doug Barth post. We let each physical queue decide how much unacked messages can be delivered to client using PrefetchSize so if you have n priority, the internal queue will have capacity of n x PrefetchSize. It will block RabbitMQ from sending more messages to client if the internal memory queue is full so the client has to process and ack messages from high priorty to low priority order and then get more messages.
- The physical queues have the same prefetch size and we would recommend to keep the prefetch size approximately as double as the processing velocity. if you set prefetsize too high compare to the processing velocity, the only drawback we can think of is that the internal memory queue will unnecessary contain so much messages, and that could lead to high memory consumption.
- In our production code, we need 5 priorities so there will be 5 threads created to consume messages from those physical queues. We got several issues with ThreadPool and TPL in our production environment as the window service was getting slow after a while running. We figured out that should never used ThreadPool for long running processing. I'll post about this topic later in Aug but for now if you have a long running window service and it takes time plus lot of IO operations to process your messages, use normal Thread instead of threads from pool. In order to do that, set following line at your BootStrapper:
Global.DefaultTaskCreationOptionsProvider = () => TaskCreationOptions.LongRunning;
- This implementation uses an exchange type Header to delivery messages with a priority value to the expected queue. Someone said the exchange Header has lower performance than the others but I'm think it's good because I do need the messages alive after server reboot, that means I need the messages to be persited to disk by setting the PersistentMode to true. That eventually reduces the maximum potential performance of RabbitMQ as mentioned in their docs. Anyway, it's not very difficult to switch to different exchange type such as Topic or Direct if we really need.
- So, here are the steps to setup and consume from the priority queues:
1/ Declare exchange and queues
ExchangeSetupData exchangeSetupData = new HeaderExchangeSetupData(); QueueSetupData queueSetupData = new PriorityQueueSetupData(3) { SubscriptionName = "YourApplicationName", }; var connectonString = ConfigurationManager.ConnectionStrings["RabbitMQ"].ToString(); var environment = "PROD"; Func<string, string, IRouteFinder> factory = (environment, exchangeType) => new YourRouteFinderImplementation(); var setup = new PriorityQueuesRabbitSetup(factory, Global.DefaultWatcher, connectonString, environment); setup.SetupExchangeAndQueueFor<YourMessageType>(exchangeSetupData, queueSetupData);
2/ Publish a priority message
using(var tunnel = RabbitTunnel.Factory.WithPrioritySupport().Create().WithPrioritySupport()) { tunnel.SetSerializer(new JsonSerializer()); tunnel.SetRouteFinder(new YourRouteFinderImplementation()); uint priority = 3; tunnel.Publish(new YourMessageType() , priority); }
3/ Consume from a logical queue with priority support asynchronously
const ushort maxPriorityLevel = 3; Global.PreFetchSize = 64; var tunnel = RabbitTunnel.Factory.WithPrioritySupport() .Create().WithPrioritySupport(); tunnel.SubscribeAsync<YourMessageType>("YourApplicationName", maxPriorityLevel, msg => { // Process message here });
4/ Consume from a logical queue with priority support synchronously
MessageDeliverEventArgs arg = null; var subscriber = tunnel.Subscribe<YourMessageType>("YourApplicationName", maxPriorityLevel, (msg, eventArgs) => { arg = eventArgs; // Process message here }); // Later on subscriber.Ack(arg.ConsumerTag, arg.DeliveryTag);
Hope it can help someone's problem
Cheers