Thursday, July 18, 2013

Monitor RabbitMQ queues & count total messages


- I used to do the hard way which is creating another queue which is bound to the same exchange with same parameters, routing keys. Then consume from that queue and count the message in 5 minutes, 15 minutes, 1 hour etc.

- I've just got another idea which is more simple. The solution is similar to the first approach, but you apply "Message TTL" parameter when creating the queue. So you will have to create 3 queues with Message TTL to 5 minutes, 15 minutes, 1 hour. At any point, the total messages in each queue is the actual number you want. And with latest Burrow.NET (From 1.0.17) you can actually count the message in the queue this way:



ITunnel tunnel = RabbitTunnel.Factory.Create();
// Set routeFinder if you have custom routefinder
var messageCount = tunnel.GetMessageCount<YourMessageType>("");


//Or if you want to count all the messages in priority queues:

ushort maxLevel = 5;
ITunnel tunnel = RabbitTunnel.Factory.WithPrioritySupport()
                             .Create().WithPrioritySupport();
// Set routeFinder if you have custom routefinder 
var messageCount = tunnel.GetMessageCount<YourMessageType>("", maxLevel);


Cheers

Monday, October 15, 2012

Deploy .NET window service artifact to a remote server from TeamCity (Updated)



- This post is an update of my previous post. Basically, it's the same approach of 90% similar except I won't use Remote Powershell in this post. Config remote PowerShell is such a pain in the ass, I reckon. I had to setup a deployment on another box which have IIS actually but someone in my team had already removed the Default Web Site so I couldn't find any quick and easy way to make Remote Powershell work. The error message Powershell prints out is so stupid which does not even tell me what it wants.

“Set-WSManQuickConfig : The client cannot connect to the destination specified in the request. Verify that the service on the destination is running and is accepting requests. Consult the logs and documentation for the WS-Management service running on the destination, most commonly IIS or WinRM. If the destination is the WinRM service, run the following command on the destination to analyze and configure the WinRM service: “winrm quickconfig”. At line:50 char:33 + Set-WSManQuickConfig <<<< -force + CategoryInfo : InvalidOperation: (:) [Set-WSManQuickConfig], InvalidOperationException + FullyQualifiedErrorId : WsManError,Microsoft.WSMan.Management.SetWSManQuickConfigCommand“

- Ok, there is another reason to hate Microsoft. It's time to abandon PS, I tried pstools before Remote Power Shell and got other problems so I won't waste time to go back to the very old tool as Power Shell is much more power full. So writting a simple console WCF application to communitcate between TeamCity and the Remote server is my choice.

- And the tool's name is DD which is a shortname of "Distributed Deployment". In this post, I'll sum up with details how to setup deployment for a windows service from TeamCity.

- Unlike web application, a window service is often a long running process in background. A .NET windows service has an OnStop method for you to clean up resource before stopping, which is cool. HOWEVER, when you try to stop the service using "net stop servicename", it does stop the service but the process will not end as fast as it can. I reckon a .NET window service can host multiple window services which are classes inherit from ServiceBase class so it could be a reason that makes the window services manager wait a little while for all potential services within a process to stop before ending the main process.

- In some cases like mine, I want the service stop immediately when it can so I have to somehow call Environment.Exit to make the process stop asap. Apparently I cannot use TASK KILL like the previous post as it was such a hacky way and it could corrupt my data. So my approach is letting the program listen to a command, when receiving an exit signal, the app should cleanup resources and invoke Enrironment.Exit. So if you need something like this, go on reading.

I/ Things you need to prepare:

  • Remote server: aka target server, a server that we'll install the service on
  • TeamCity server: aka build server, a server that has TeamCity installed
  • 1 Project slot in Teamcity as I want to deploy whenever I click "Run" on a Teamcity deployment project instead of auto deployment after code build, so it'll cost you 1 project in 20 available slots of free TeamCity
  • Wget: this tool will download a compressed artifac from Teamcity
  • 7zip: this tool will be used to decompressed the artifac
  • DD: this tool will do 2 things: listen on a deployment command from teamcity and send exit signal to your long running window service


II/ Code involved:

1/ Assume that your service class is AwesomeService.cs, implement interface ISignalListener and add event WhenStop:
public partial class AwesomeService : ServiceBase, ISignalListener
{
    public Action WhenStop { get; set; }
    public void Exit()
    {
        Stop();
        if (WhenStop != null)
        {
            WhenStop();
        }
    }
    
    // ...
}

2/ In your service code, anywhere before service start such as Program.cs, add this code:
var svc = new AwesomeService();            
//NOTE: Wait for signal from Teamcity Deployment
Server.Start<ISignalListener>(svc, "AwesomeService");
Timer timer;
svc.WhenStop = () =>
{
    // NOTE: Will end process after 1 second
    timer = new Timer(o => Environment.Exit(0), null, 1000, Timeout.Infinite);
};
ServiceBase.Run(svc);
// ...

3/ Prepare the deploy.bat script. I like to control the deployment process in a batch file instead of implement the steps in a program as I think people will have their own steps and a batch file is simple enough to manage. Again this batch file will basically do these things:
  • Use wget to download the latest artifact from TeamCity.
  • Use 7zip to extract the artifact, copy it to a temporary folder.
  • Save the artifact to Artifacts folder
  • Backup the whole current service folder
  • Stop target window service
  • Copy over new files and old configuration files
  • Start target window service
  • Clean up

Here is the basic code that anyone can use, just keep it somewhere, we'll need to copy the batch file to RemoteServer.
@echo off
SETLOCAL
:: 0/ --------- Set some local variables
SET Environment.ExecutingFolder="C:\Deployment"
SET Environment.7zip="C:\Program Files (x86)\7-Zip\7z.exe"
SET Environment.Wget="C:\Deployment\wget.exe"

SET TeamCity.User=your-teamcity-account
SET TeamCity.Password=your-teamcity-password
SET TeamCity.BuildTypeId=teamcity-build-type
SET TeamCity.Artifact=awesomeservice.{build.number}.zip ::

SET AwesomeService.TargetFolderName=AwesomeService
SET AwesomeService.TargetFolderPath=C:\AwesomeService
SET AwesomeService.ServiceName=AwesomeService
SET AwesomeService.ImageName=AwesomeService.exe
CD /D %Environment.ExecutingFolder%
 
ECHO 1/ --------- Get latest artifact from TeamCity, AwesomeService
%Environment.Wget% -q --http-user=%TeamCity.User% --http-password=%TeamCity.Password% --auth-no-challenge http://your.teamcity.url.com/repository/download/%TeamCity.BuildTypeId%/.lastSuccessful/%TeamCity.Artifact%
REN *.zip* *.zip
ECHO Found following artifact
DIR /B *zip


ECHO 2/ --------- Extract the artifact to folder __Temp ---------------
%Environment.7zip% e -y -o__Temp *.zip


ECHO 3/ --------- Store the artifact ------------------ 
MOVE /Y *.zip Artifacts\


ECHO 4/ --------- Backup current service folder --------------- 
for %%a in (%AwesomeService.TargetFolderPath%) do set Temp.LastDate=%%~ta
SET Temp.LastDate=%Temp.LastDate:~6,4%-%Temp.LastDate:~0,2%-%Temp.LastDate:~3,2% %Temp.LastDate:~11,2%%Temp.LastDate:~14,2%%Temp.LastDate:~17,2%
ECHO Last deployment: %Temp.LastDate%
ECHO Now backup files to folder %AwesomeService.TargetFolderName%.%Temp.LastDate%
XCOPY /E /I /H /R /Y %AwesomeService.TargetFolderPath% "%AwesomeService.TargetFolderName%.%Temp.LastDate%"


ECHO 5/ --------- Stop %AwesomeService.ServiceName% service ---------------
DD AwesomeService /wait 50
ECHO Wait 2 more seconds
ping 1.1.1.1 -n 1 -w 2000 > NUL

ECHO 6/ --------- Deploy new files and copy over old configs ----------------------
ECHO ... Deploy latest assemblies
XCOPY /E /H /R /Y __Temp %AwesomeService.TargetFolderPath%

ECHO ... Deploy old configs 
COPY /Y "%AwesomeService.TargetFolderName%.%Temp.LastDate%\*.config" %AwesomeService.TargetFolderPath%

ECHO ... Delete log files 
DEL /F /Q %AwesomeService.TargetFolderPath%\Logs\log.txt* > NUL
pause

ECHO 7/ --------- Start %AwesomeService.ServiceName% service ---------------
net start %AwesomeService.ServiceName% 

ECHO 8/ --------- Cleanup --------------------------------- 
::DEL /F /Q /S *jsessionid*
RD /S /Q __Temp
ENDLOCAL

- The url to the artifac in TeamCity will look like:
http://teamcity-ip-address:80/repository/download/bt2/3907:id/service.name.1.12.1234.zip - So dedend on the build type id of your artifac, change it in the above deploy.bat

III/ Setup steps:

* On Remote Server

- Download and install 7zip
- Assume that you put all custom tools and Deploy.bat in C:\Deployment. Create folder C:\Deployment\Artifacs to store your teamcity artifacs.
The Deployment folder should look like this:
 Volume in drive C is OS
 Volume Serial Number is ABCD-EFGH

 Directory of C:\Deployment

14/10/2012  02:59 PM    <DIR>          .
14/10/2012  02:59 PM    <DIR>          ..
14/10/2012  02:58 PM    <DIR>          Artifacts
14/10/2012  02:58 PM            38,912 DD.exe
14/10/2012  02:58 PM             2,558 Deploy.bat
14/10/2012  02:58 PM           401,408 wget.exe
               4 File(s)        442,878 bytes
               3 Dir(s)  405,532,868,608 bytes free

- Run DD tool:
C:\Deployment\DD -listen 5555 -token YourS3cur!tyTok3n

- Or install it as a window service, remember to start it once installed ;)
C:\Windows\Microsoft.NET\Framework64\v4.0.30319\InstallUtil.exe /listen=5555 /token=YourS3cur!tyTok3n /i C:\Deployment\DD.exe

* On Team City Server

- Copy DD.exe and put somewhere such as C:\Program Files\DD.exe
- Add new project such as: Awesome service deployment
- Add 1 build step like this picture:

Command parameters:
-execute C:\Deployment\deploy.bat -target remoteserver-ip:5555 -token YourS3cur!tyTok3n



That's it. Your AwesomeService should be deployed whenever you click Run on the deployment project from TeamCity. Obviously you could adjust some thing in the deploy.bat to suit your needs, let me know if you have any problems.

Cheers

Tuesday, October 2, 2012

RPC with Burrow.NET and RabbitMQ


Definitely RPC is nothing new, I just implemented it in a slightly different way. Burrow.RPC is some code I made recently for my project. It helps applications communicate in a RPC style using Burrow.NET.

To use Burrow.RPC you definitely need Burrow.NET package, and that's all. If you need some more utilities like JsonSerializer, you have to grab Burrow.Extras. So if you only need Burrow.NET for your RabbitMQ stuff, Burrow.RPC is absolutely not neccessary.

The way Burrow.RPC works is that it wraps your method call in a request object and wait for response object. Everything is done via Castle Dynamic Proxy at client and .NET Reflection at server so you don't have to write much extra code to make your application work in RPC way.

Let's say you have a "basic" interface and its implementation, now you want these existing code work remotely, just download package Burrow.RPC, declate the client using RpcFactory and create a listener at server side, also using RpcFactory and it just works.

I mentioned the "basic" interface because this library will work for all basic methods except methods have Action or Func param. It can work with methods have "out" or "ref" params as soon as the paramether is serializable, just don't make the method be so fancy ;), it will work. I recommend using JsonSerializer from Burrow.Extras package as it using Json.NET which is pretty awesome.

It's time for some sample. Given that you have following interface and its implementation:
public interface ISomeService
{
    [Async] // This method will be called asynchronously
    void Delete(string userId);        
    
    [RpcTimeToLive(100)] // in seconds
    void SendWelcomeMessage(EmailMessage message);
    
    IEnumerable<User> Get(int page, int pageSize, out int totalCount);    
}

public class SomeServiceImplementation : ISomeService
{
     // Implementation here
}

There are something to note about the above interface:

  1. Attribute Async decorated on method Delete will make this "void" method work asynchronously. That means the client code will not wait until it receives response after calling the method. It's pretty convenient in some casee when you don't need to wait for the result. So you cannot use Async attribute on methods that have return type or have "out" param.
  2. Attribute RpcTimeToLive decorated on method SendWelcomeMessage will make the request valid in 100 seconds. If the server is so busy to pickup messages on the request queue, and when it has a chance to do that but it's over 100 seconds since the request was created, the TimeOutException will be thrown from server and certainly the client will get that Excepton.
  3. The last method of this interface has an out param and a return type, so whenever you call this method, the client thread will be blocked until it receives the result from server. The out parameter "totalCount" will definitely have value

So at client code, you will need an instance of ISomeService which is just a mock object created by Castle dynamic proxy. All the method of this object will be intercepted and the method call together with its parameters will be wrapped in a RpcRequest which eventually will be published to the RabbitMQ server:

var client = RpcFactory.CreateClient<ISomeService>();
client.Delete("van-the-shark");

Ideally, the client code is a class which has dependency on ISomeService and we just have to register the proxy instance using your favorite IOC container.

At server side, we will need to do similar thing for the server, there must be a real instance of ISomeService which eventually handle the method call.

ISomeService realService = new SomeServiceImplementation();
IRpcServerCoordinator server = RpcFactory.CreateServer<ISomeService>(realService, serverId: "TestApp");
server.Start();

The RpcFactory basically will use Burrow.NET to subscribe to request queue, when a request comes, it will try to map the request to a valid method on ISomeService and delegate the invocation to the correct method of the real service object. After that, the result value if any together with all the params which are potentially changed during the method call will be wrapped in a RpcRespone object and sent back to the response queue. Please explicitly specify the generic type when creating the server as the type will be a part of the queue/exchange names for Requests and Responses.

There is one interesting thing to note here: every instance of the proxied client will have a separated response queue. Let's say you have 2 instances of your application running on different machine, these instances of the same application create the same RPC request and send to the same request queue, the server will somehow need to know where to send the response back to the correct client. That's why the response address (which is a queue name) will be different between clients. If the service is singleton and shared accross different objects in your client code, you have to guarantee the RPC method call in different threads should be synchronize properly. Otherwise, you could face some silly sittuation where result for the second call arrives to the code block where the first call happened. Anyway, too much information will make more confuse , please just know that Burrow.RPC is not thread safe for singleton object.

By default, Burrow.RPC will create queues and exchanges for you. It's neccessary as the response queue should be deleted once the client disconnected. When the client connect again, new response queue will be generated. You can change the exchange name, type and queue names by implementing following interface:
/// <summary>
/// This route finder is created for RPC communication.
/// Implement this interface if you want to use custom naming conventions for your Response/Request queue
/// </summary>
public interface IRpcRouteFinder
{
    /// <summary>
    /// If set to true, the library will create exchange and queue for you
    /// </summary>
    bool CreateExchangeAndQueue { get; }

    /// <summary>
    /// Default can be empty as the empty exchange is the built-in exchange
    /// </summary>
    string RequestExchangeName { get; }

    /// <summary>
    /// Should be either direct or fanout
    /// </summary>
    string RequestExchangeType { get; }
    
    /// <summary>
    /// If RequestExchangeName is empty, Burrow.RPC will route the RpcRequest object to this queue by publishing the msg to the empty exchange with the routing key is equal to this queue name
    /// </summary>
    string RequestQueue { get; }

    /// <summary>
    /// The response queue must be unique per instance of the RPC client
    /// <para>If you have 2 instances of the rpc clients, these instances should subscribe to different response queue as the responses from the rpc server must be routed to correct client</para>
    /// </summary>
    string UniqueResponseQueue { get; }
}

There are a few more things about the code but I think it's better for you to findout yourself if you're interested, there are quite a few comments in the code which could make thing clear eventhough I always believe if I try to comment to make thing clear, that means the code is not cleared enough. This blogspot should show only basic usage of the library. And furthur more, there are probably changes in the future for this RPC API so I'll leave it for now. Please grab the source code from github, and run the TestRPC console demo project to see how it works . Nuget package is also available: Burrow.RPC

Cheers.

Tuesday, September 11, 2012

Dual-invocation using Autofac interception - AOP


- The project I've been working on used to have SQL as its primary datastore. We're implementing another data store using MongoDB as it will replace the SQL one soon. During the migration process, we have 2 versions of the website running parallel. The production version is using SQL which is viewed by customers and the other one using MongoDB which is being tested by our staff. So once the Mongo version development is finished, it will be kept running in parallel with the SQL version until we're completely confident that all the features are working fine. During that time, any changes made by users to the SQL db have to be applied to MongoDB.

- There're quite a few solutions in my mind. However, we've had some sort of DataAccess dlls to access Sql database before and another implementation of same interfaces which talk to MongoDB have been implemented. So the options is narrowed down to something like an adapter pattern which takes 2 instances of the same interface, invoke method on the first one and then invoke the same method on the second instance. Being a good fan of Composition over Inheritance, I was thinking of something like:
public class DualService<T> : T
{
    private readonly T _primary;
    private readonly T _secondary;
    
    public DualService(T primary, T secondary)    
    {
        _primary = primary;
        _secondary = secondary;    
    }
    
    public void MethodA()
    {
        _primary.MethodA();
        _secondary.MethodA();    
    }
    
    // So on
}

- Well, this approach is good only if the generic T interface has just a few methods. Ours has over 30 methods and it keeps counting ;), it's implemented by the "Outsource team" So I dropped this option, and thought about the Autofac Interceptor. The intercepter I'm gonna implement will take the secondary implementation and invoke the same method as the invocation if:

  1. The method is not decorated by IgnoreInterceptAttribute
  2. The method has a result type void. (Well, a command should not have return type here if you're curios about the reason)
  3. The method is decorated by DualInvocationAttribute

- Here is the implementation. This approach requires Castle.Core and AutofacContrib.DynamicProxy2. The 2 attributes are just simple attributes I created to enhance the filter:
[AttributeUsage(AttributeTargets.Method)]
public class IgnoreInterceptAttribute : Attribute
{
}

[AttributeUsage(AttributeTargets.Method)]
public class DualInvocationAttribute : Attribute
{
}

public class DualInvocation<T> : IInterceptor where T: class 
{
    private readonly T _secondaryImplementation;
    private readonly Action<Exception> _onSecondaryImplementationInvokeError;

    public DualInvocation(T secondaryImplementation, Action<Exception> onSecondaryImplementationInvokeError = null)
    {
        _secondaryImplementation = secondaryImplementation;
        _onSecondaryImplementationInvokeError = onSecondaryImplementationInvokeError ?? (ex => Trace.WriteLine(ex));
    }

    public void Intercept(IInvocation invocation)
    {
        invocation.Proceed();
        var returnType = invocation.Method.ReturnType;
        var att = invocation.Method.GetCustomAttributes(typeof(IgnoreInterceptAttribute), true);
        if (att.Any())
        {
            return;
        }

        if (returnType != typeof(void))
        {
            att = invocation.Method.GetCustomAttributes(typeof(DualInvocationAttribute), true);
            if (!att.Any())
            {
                return;
            }
        }

        var methodInfo = invocation.Method;
        try
        {
            methodInfo.Invoke(_secondaryImplementation, invocation.Arguments);
        }
        catch (Exception ex)
        {
            _onSecondaryImplementationInvokeError(ex);
        }
    }
}

- You might wonder about how to use this class, take a look at the unit tests. You should probably do something either in code or autofac xml config, similar to what I did in the SetUp:
- Here is a dummy interface to demonstrate the Autofac registrations
public interface ISomeService
{
    void Update();

    void Update(int userId);

    [IgnoreIntercept]
    void Update(int userId, bool multi);

    int GetUser(int userId);

    [DualInvocation]
    bool DeleteUser(int userId);
}

- And here is the tests, the tests are using NSubstitute to mock the interface. (Pretty similar to Moq, etc)
[TestFixture]
public class MethodIntercept
{
    private ISomeService primary,
                         secondary;

    private IContainer container;
    private Action<Exception> exceptionHandler;

    [SetUp]
    public void RegisterAutofacComponents()
    {
        primary = Substitute.For<ISomeService>();
        secondary = Substitute.For<ISomeService>();

        var builder = new ContainerBuilder();

        builder.RegisterInstance(secondary).Named<ISomeService>("secondary").SingleInstance();

        builder.RegisterInstance(primary)
               .Named<ISomeService>("primary").SingleInstance()
               .EnableInterfaceInterceptors()
               .InterceptedBy(typeof (DualInvocation<ISomeService>));

        builder.RegisterType<DualInvocation<ISomeService>>()
               .WithParameters(new Parameter[]
               {
                   new ResolvedParameter((p, c) => p.Name == "secondaryImplementation", (p, c) => c.ResolveNamed("secondary", typeof (ISomeService))),
                   new NamedParameter("exceptionHandler", exceptionHandler)
               })
               .AsSelf()
               .InstancePerLifetimeScope();

        container = builder.Build();
    }

    [Test]
    public void Should_invoke_secondary_method_if_it_is_not_ignored()
    {
        // Arrange
        var primaryService = container.ResolveNamed<ISomeService>("primary");
        var secondaryService = container.ResolveNamed<ISomeService>("secondary");

        // Action
        primaryService.Update();

        // Assert
        secondaryService.Received(1).Update();
    }

    [Test]
    public void Should_invoke_secondary_overload_method_if_it_is_not_ignored()
    {
        // Arrange
        var primaryService = container.ResolveNamed<ISomeService>("primary");
        var secondaryService = container.ResolveNamed<ISomeService>("secondary");

        // Action
        primaryService.Update(1);

        // Assert
        secondaryService.Received(1).Update(1);
    }

    [Test]
    public void Should_invoke_secondary_overload_method_if_it_is_decorated_by_DualInvocationAttribute()
    {
        // Arrange
        var primaryService = container.ResolveNamed<ISomeService>("primary");
        var secondaryService = container.ResolveNamed<ISomeService>("secondary");

        // Action
        primaryService.DeleteUser(1);

        // Assert
        secondaryService.Received(1).DeleteUser(1);
    }

    [Test]
    public void Should_not_invoke_secondary_method_if_it_is_not_void()
    {
        // Arrange
        var primaryService = container.ResolveNamed<ISomeService>("primary");
        var secondaryService = container.ResolveNamed<ISomeService>("secondary");

        // Action
        primaryService.GetUser(1);

        // Assert
        secondaryService.DidNotReceive().GetUser(Arg.Any<int>());
    }

    [Test]
    public void Should_not_invoke_secondary_method_if_it_is_ignored()
    {
        // Arrange
        var primaryService = container.ResolveNamed<ISomeService>("primary");
        var secondaryService = container.ResolveNamed<ISomeService>("secondary");

        // Action
        primaryService.Update(1, true);

        // Assert
        secondaryService.DidNotReceive().Update(Arg.Any<int>(), Arg.Any<bool>());
    }

    [Test]
    public void Should_not_throw_exception_if_cannot_invoke_secondary_method()
    {
        // Arrange
        exceptionHandler = ex => Trace.WriteLine(ex);
        secondary.When(x => x.Update()).Do(callInfo => { throw new Exception(); });

        var primaryService = container.ResolveNamed<ISomeService>("primary");
        var secondaryService = container.ResolveNamed<ISomeService>("secondary");

        // Action
        primaryService.Update();

        // Assert
        secondaryService.Received(1).Update();
    }
}


- I reckon this approach is clean, simple and It works on my machine . Cheers.

Thursday, July 26, 2012

Priority With RabbitMQ implementation in .NET



- Follow up several posts about RabbitMQ and Burrow.NET, this post is about the Priority implementation for RabbitMQ in Burrow.NET. Actually, We have it implemented in the library a few weeks ago. However, the code probably has bugs and we need more time to tune and fix those bugs. At this point, I'm confident to say the implementation is quite stable and It's time for a blog post.

- Basically, the implementation is following this post. We split the logical queue to several physical queues and use a memory queue to aggregate messages from those physical queues. The in-memory-queue is implemented using the IntervalHeap from C5 library and the code rests in side Burrow.Extras package as I hope RabbitMQ will support priority in the future. If they do, it will be the good time to get rid of this implementation without modifying much code in core Burrow.NET core. - We've followed the same pattern mentioned in Doug Barth post. We let each physical queue decide how much unacked messages can be delivered to client using PrefetchSize so if you have n priority, the internal queue will have capacity of n x PrefetchSize. It will block RabbitMQ from sending more messages to client if the internal memory queue is full so the client has to process and ack messages from high priorty to low priority order and then get more messages.

- The physical queues have the same prefetch size and we would recommend to keep the prefetch size approximately as double as the processing velocity. if you set prefetsize too high compare to the processing velocity, the only drawback we can think of is that the internal memory queue will unnecessary contain so much messages, and that could lead to high memory consumption.

- In our production code, we need 5 priorities so there will be 5 threads created to consume messages from those physical queues. We got several issues with ThreadPool and TPL in our production environment as the window service was getting slow after a while running. We figured out that should never used ThreadPool for long running processing. I'll post about this topic later in Aug but for now if you have a long running window service and it takes time plus lot of IO operations to process your messages, use normal Thread instead of threads from pool. In order to do that, set following line at your BootStrapper:
Global.DefaultTaskCreationOptionsProvider = () => TaskCreationOptions.LongRunning;


- This implementation uses an exchange type Header to delivery messages with a priority value to the expected queue. Someone said the exchange Header has lower performance than the others but I'm think it's good because I do need the messages alive after server reboot, that means I need the messages to be persited to disk by setting the PersistentMode to true. That eventually reduces the maximum potential performance of RabbitMQ as mentioned in their docs. Anyway, it's not very difficult to switch to different exchange type such as Topic or Direct if we really need.

- So, here are the steps to setup and consume from the priority queues:

1/ Declare exchange and queues

ExchangeSetupData exchangeSetupData = new HeaderExchangeSetupData();
QueueSetupData queueSetupData = new PriorityQueueSetupData(3)
{
    SubscriptionName = "YourApplicationName", 
};

var connectonString = ConfigurationManager.ConnectionStrings["RabbitMQ"].ToString();
var environment = "PROD";

Func<string, string, IRouteFinder> factory = (environment, exchangeType) => new YourRouteFinderImplementation();
var setup = new PriorityQueuesRabbitSetup(factory, Global.DefaultWatcher, connectonString, environment);
setup.SetupExchangeAndQueueFor<YourMessageType>(exchangeSetupData, queueSetupData);

2/ Publish a priority message

using(var tunnel = RabbitTunnel.Factory.WithPrioritySupport().Create().WithPrioritySupport())
{
    tunnel.SetSerializer(new JsonSerializer());
    tunnel.SetRouteFinder(new YourRouteFinderImplementation());

    uint priority = 3;

    tunnel.Publish(new YourMessageType() , priority);     
}

3/ Consume from a logical queue with priority support asynchronously

const ushort maxPriorityLevel = 3;
Global.PreFetchSize = 64;
var tunnel = RabbitTunnel.Factory.WithPrioritySupport()
                         .Create().WithPrioritySupport();

tunnel.SubscribeAsync<YourMessageType>("YourApplicationName", maxPriorityLevel, msg => {
    // Process message here
});

4/ Consume from a logical queue with priority support synchronously

MessageDeliverEventArgs arg = null;
var subscriber = tunnel.Subscribe<YourMessageType>("YourApplicationName", maxPriorityLevel, (msg, eventArgs) =>
{
    arg = eventArgs;
    // Process message here 
});

// Later on
subscriber.Ack(arg.ConsumerTag, arg.DeliveryTag);


Hope it can help someone's problem
Cheers