Thursday, May 31, 2012

RabbitMQ Exchanges & Queues naming convention with Burrow.NET



- If you just need only 1 exchange and 1 queue, naming convention should not be a big concern. Even that, you may need exchanges and queues for different environments. As in my projects, I often have 3 environments: DEV, UAT and PROD and I've been using following conventions for exchanges and queues:


Exchanges : ProjectName.E.<ExchangeType>.<Environment>.MessageTypeName
Queue        : ProjectName.Q.<Environment>.<ConsumerName>.MessageTypeName

- The consumer name in the Queue convention is optional. It makes sence to have different consumer names for different queues of the same message when Fanout exchange is used. Because at that case, we'll have many clients interested in the same message. We cannot let these clients subcribe to the same queue because only 1 of them will get the message while they all want the same thing. Fanout exchange in that sittuation is a best choice since it will deliver the same copy of the message you publish to all queues bind to it.


- In below example, we'll use exchange type Direct and we won't need ConsumerName in the queue name. Then a single message type will have 1 exchange and 1 queue bind to it for every environment. As a result, with message type "RequestMessage" we'll have


Exchanges:
  • ProjectName.E.Direct.DEV.RequestMessage
  • ProjectName.E.Direct.UAT.RequestMessage
  • ProjectName.E.Direct.PROD.RequestMessage

Queues:
  • ProjectName.Q.DEV.RequestMessage
  • ProjectName.Q.UAT.RequestMessage
  • ProjectName.Q.PROD.RequestMessage


- Creating queues and exchanges for every single message classes is not a good way as we will polute the server with a long list of queues and exchanges. Indeed, we realized that way as a problem and we've got a few better options. We can use 1 exchange for all those message types and bind different queues to the same exchange by different routing keys. So if we publish a message type RequestA to the exchange with routing key "RequestA", the message will go the queue "ProjectName.PROD.Q.RequestA" for instance. In our project, we had a need to use Fanout exchange as I mentioned above so we created a wrapper class like below:

public class MessageWrapper
{
    public object MessageObject {get; set;}
}

- So we'll have only 1 queue for each client. The message will be deserialized perfectly with the real object as the one we publish so the client can base on the type of MessageObject and determine what to do.

- Back to our story, with this convention in mind, what we have to do is implementing the IRouteFinder:

public class MyCustomRouteFinder : IRouteFinder
{
    private readonly string _environment;
    public MyCustomRouteFinder(string environment)
    {
        if (string.IsNullOrEmpty(environment))
        {
            throw new ArgumentNullException(environment);
        }
        _environment = environment;
    }

    public string FindExchangeName<T>()
    {
        return string.Format("ProjectName.E.{0}.{1}.{2}", GetExchangeTypeFor<T>().ToUpper(), _environment, typeof(T).Name);
    }

    public string FindRoutingKey<T>()
    {        
        return typeof(T).Name;
    }

    public string FindQueueName<T>(string subscriptionName)
    {
        if (GetExchangeTypeFor<T>() == ExchangeType.Fanout && string.IsNullOrEmpty(subscriptionName))
        {
            throw new ArgumentException("subscriptionName");
        }

        var subscription = GetSubscriptionTextFor<T>(subscriptionName);
        
        return !string.IsNullOrEmpty(subscription)
               ? string.Format("ProjectName.Q.{0}.{1}.{2}", _environment, subscription, typeof(T).Name)
               : string.Format("ProjectName.Q.{0}.{1}", _environment, typeof(T).Name);
    }

    protected virtual string GetExchangeTypeFor<T>()
    {
        var messageType = typeof(T);
        var messageTypesNeedDirectExchange = new List<Type> { typeof(MessageA), 
                                                              typeof(MessageB) };

        return messageTypesNeedDirectExchange.Contains(messageType)
             ? ExchangeType.Direct
             : ExchangeType.Fanout;
    }

    protected virtual string GetSubscriptionTextFor<T>(string subscriptionName)
    {
        var typesDontNeedSubscriptionName = new List<Type> { typeof(MessageA), 
                                                             typeof(MessageB) };

        return typesDontNeedSubscriptionName.Contains(typeof(T)) 
             ? null 
             : subscriptionName;
    }
}

- Here is how to use it:

// Init
var environment = "DEV";
var routeFinder = new MyCustomRouteFinder(environment);
Global.DefaultSerializer = new JsonSerializer();

// Setup exchange and queues programatically
Func<string, string, IRouteFinder> routeFinderFactory = (environment, exchangeType) => routeFinder;
var setup = new RabbitSetup(routeFinderFactory, 
                            Global.DefaultWatcher, 
                            ConfigurationManager.ConnectionStrings["RabbitMQ"].ToString(), 
                            environment);
setup.SetupExchangeAndQueueFor<MessageA>(new ExchangeSetupData(), new QueueSetupData());
setup.SetupExchangeAndQueueFor<MessageB>(new ExchangeSetupData(), new QueueSetupData());

// Publish message 
using(var tunnel = RabbitTunnel.Factory.Create())
{
    tunnel.SetRouteFinder(routeFinder);
    tunnel.Publish<MessageA>(new MessageA());
    tunnel.Publish<MessageB>(new MessageB());
}

- And I swear by the moon and the stars in the sky that the messages will go to correct queues. In my last post, I'd mentioned the ConstantRouteFinder which can be used for a predefined exchange name, queue. However, in any cases, I think having a convention is better eventhough we have only 1 queue and exchange.

- As a best practice, we should not share the development environment with PROD or UAT environment so the "DEV" exchanges and queues should not be created on the same server as exchanges and queues for "UAT" and "PROD". We should install rabbitmq server on personal machine for development, it won't take so long.




Happy messaging.

Wednesday, May 30, 2012

Messaging with RabbitMQ and Burrow.NET



- As a .NET developer, what I have known about queueing world is only MSMQ because I've rarely had chances to using queues in any projects. I've known RabbitMQ since my last 2 projects and I'm pretty impressed by it's reliability and stability. RabittMQ is very easy to install on Windows or Linux. I wrote a blog post "Install RabbitMQ on SliTaz Linux" which I described step by step to make a RabbitMQ server on such a very tiny SliTaz Linux distro. It's obviously not the first blog post about RabbitMQ in .NET but I would take this as a opportunity to introduce a part of my project: Burrow.NET.

- Burrow.NET is a wrapper library of RabbitMQ.Client for .NET. I started this library from EasyNETQ source code and improve the project to fit our usages. However, I have made this library as generic for any project as possible. Basically, Burrow.NET will abstract away the complexity about connection handling, error handling, logging, and recently from version 1.0.8, I've finished the work around solution for priority queues because RabbitMQ is not natively supporting it. In this post, I'll shortly introduce the very basic features of Burrow.NET and I'll keep other interesting topics for later posts.

I/ Use cases

1/ Let's say you have to build an async web service to serve a huge amount of clients. A normal basic WCF service host on IIS or Window service does not have capability to handle the big amount of requests. So RabbitMQ acts as a middle layer to queue the requests, you can launch as many instance of "processor application" to handle the requests as you need. So this way can easily scale in long term.

2/ You're building a workflow base system that the output of a sub-system will be the input for other system. On other words, your sub-systems need a proper way to asynchronous communicate with each other. Since these sub-systems are different AppDomain so WCF supposes to be a good choice as first sign. However, one like me will not like WCF as an option due to it's configuration complexity. And in this use case, WCF seems not to be a proper solution as we may need scaling for a part or some parts of the system.

- In my personal opinion, whenever you need asynchronous communication between applications, queueing is the best and think about RabbitMQ :D. Well I don't say RabbitMQ is the best. There are alot of mature solutions before RabbitMQ but they came with cost. That's one of the reason why we have the awesome RabbitMQ software and I personaly think RabbitMQ is the best choice, it's fast, it's fashionable, it's free and it works :D.

II/ Burrow.NET

- As the above introduction, this code is a wrapper of .NET RabbitMQ.Client. If you don't want to use wrapper, simply grab RabbitMQ.Client.dll and use it if your need is as simple as publishing a message to a known exchange, queues. However, if you want something consistance like the naming conventions for the whole soultion since you may need alot of queues for the system; or if you expect your application auto reconnect after a connection problem, etc then Burrow.NET is a good candidate.

- I've been using it in my real project which is a big system with different applications comunicate via RabbitMQ channels, I had to make the code as efficient as possible such as establishing only 1 connection for each AppDomain to save the "socket descriptors", open only neccesary RabbitMQ channels to save the resources and consume the messages in the queue as fast as possible to keep the server's memory stay low.


Alright, enough promoting. Here is the main topic:

III/ Example

- Asume that you have to build a web service in user case 1. Your web service will not handle the messages immediately but will put the requests to a queue. There will be 1 or many instances of window services which subscribe to that queue and process them. It's not so complicated, isn't it. Firstly grab the RabbitMQ lib either from NUGET or from GITHUB.

- In other to push messages to RabbitMQ queue, we're gonna need exchange, queue and binding. I really hope you already known about these things. When you publish a message, you actually send it to the RabbitMQ exchange. And depend on the exchange type, also depend on the queue and binding between that queue and the exchange, your messages will be delivered to the proper queue. RabbitMQ has magic stuff to route your messages to the expected target and believe me, you will love that feature. So let's say we will use following exchange and queue:

exchange: RequestExchange
queue : RequestQueue

- The above exchange type will be "direct" and the "RequestQueue" will bind to it with a routing key such as "RequestMessage". That means if you publish a request object to that exchange with a routing key "RequestMessage", it will eventually go to queue "RequestQueue". There are several exchange types, each one will suit different needs and I really recommend you to read about it here: rabbitmq.com/tutorials/tutorial.

- Since we use the above names which are not any convention at all, we'll use ConstantRouteFinder in Burrow.NET.


Here is the code to publish a message:
var routeFinder = new ConstantRouteFinder("RequestExchange", "RequestQueue", "RequestMessage");
var tunnel = RabbitTunnel.Factory.Create();
tunnel.SetSerializer(new JsonSerializer());
tunnel.SetRouteFinder(routeFinder);
tunnel.Publish<RequestMessage>(new RequestMessage());
To subscribe from the above queue, it's very similar
var routeFinder = new ConstantRouteFinder("RequestExchange", "RequestQueue", "RequestMessage");
var tunnel = RabbitTunnel.Factory.Create();
tunnel.SetSerializer(new JsonSerializer());
tunnel.SetRouteFinder(routeFinder);
tunnel.Subscribe<RequestMessage>("RabbitMQDemo", requestMsg => { 
    // Handle the requestMsg here
});
- Or if you want to asynchronous handle the messages in the queue using multi thread:
Global.DefaultConsumerBatchSize = 10; // Set the maximum 10 threads
tunnel.SubscribeAsync<RequestMessage>("RabbitMQDemo", requestMsg => { 
    // Handle the requestMsg here
});

- There something to note here, the code above create RabbitMQ connection using the connection string in your application config. You'll need something like following lines in your app.config:

<connectionStrings>
    <add name="RabbitMQ" connectionString="host=localhost;username=guest;password=guest"/>
</connectionStrings>

- If not, you can use other overloads of RabbitTunnel.Factory to provide the connection string. The "JsonSerializer" is the implementation of ISerializer using Json.NET which is one of the best json serializer for .NET for now. It's currently supporting serializing/deserializing complex object with inheritance hierachy so don't worry about it.

- Burrow.NET has a DefaultRouteFinder which will generate the queue name, exchange name following a default convention which I don't recommend to use. In fact, if you like this library, implement your own IRouteFinder and set that instance to the tunnel like the code above. So I'm saying that If you don't use ConstantRouteFinder as above, the code still work but it will create different exchange and queue following built-in convention. You can checkout the latest source code from GITHUB, there are 2 demo projects which demonstrate everything about the library, and 1 unit test project which reaches 99% test coverage ^^

- That's all you need for publishing and subscribing. Other stuff are already handled by the library and you can change those by implement serveral interfaces then inject to the tunnel using your own TunnelFactory. But that would be other topics, now it's time for me to go to bed :D


Cheers

Sunday, May 20, 2012

Install RabbitMQ on SliTaz Linux





- I have played with RabbitMQ for a few months so I tried to install RabbitMQ on a Linux distro running on VMWare. I do this as setting up a development environment at home since I don't want to install so much stuff on my Win7 machine. And another reason, for fun with Linux things since It's been along time I didn't mess with Linux. Well I did some googling to find some tiny distros. I like simple things that just work and Ubuntu is so fancy for my purpose.

- SliTaz came out to be the highest rated tiny Linux so I gave it a try. It took me a whole day for trouble shooting problems when install RabbitMQ on this distro. There are quite a few problems such as Erlang version, OpenSSL for mochiweb, and daemon on SliTaz so I think it's worth to write down my experience incase someone likes to do something like this.

1/ The problems

  • RabbitMQ (current 2.8.2) requires Erlang R12B-3 (www.rabbitmq.com/which-erlang.html). However, mochiweb requires R13B01 which is not available for SliTaz. I could't find any exising packages for Erlang with that version so I had to build OTP version from source which I have never done before :D
  • There is not RabbitMQ package available for SliTaz, so I found out that we can convert from debian package and use within SliTaz.
  • Mochiweb requires OpenSSL so we not only need Erlang source but we also need OpenSSL source. If you check Erlang website, you will see they've mentioned that.
  • I want the RabbitMQ server autostart when system starts but the daemon script for Debian which is installed when you install rabbitmq package does not work. So I have to use "screen" to work around.
  • And many many more :D but I promish it will be less pain if you follow these below steps

2/ Installation steps:

- Assume that you have install SliTaz successfully (If using VMWare, try use IDE instead of SCSI, I got the problem finding HDD with that) and login as root, the first thing to do is enable SSH on the SliTaz server because it's alot easier using "putty" than "sakura". You'll need to copy and paste something and doing this with putty is damn simple. The SSH server in SliTaz is "dropbear" and it will not start automatically when system boot so you have to put that to SliTaz "daemon" script. In SliTaz, daemon scripts locate at /etc/init.d/ folder and the primary script is /etc/rcS.conf. Simply put "dropbear" to the end of RUN_DAEMONS to make it autostart. Save the file and reboot.

- When system reboot, SSH to SliTaz server and executing following commands to install make, perl, gcc and other tools required to compile erlang.
tazpkg get-install make
tazpkg get-install perl
tazpkg get-install gcc
tazpkg get-install slitaz-toolchain --forced

- Then we'll get erlang and openssl source, at this time they are latest version.
root@slitaz:/# mkdir /home/temp
root@slitaz:/# mkdir /home/temp/erlang
root@slitaz:/# mkdir /home/temp/openssl

root@slitaz:/# cd /home/temp/openssl
root@slitaz:/home/temp/openssl/# wget http://www.openssl.org/source/openssl-1.0.1c.tar.gz
root@slitaz:/home/temp/openssl/# gunzip -c openssl-1.0.1c.tar.gz | tar xf -

root@slitaz:/# cd /home/temp/erlang
root@slitaz:/home/temp/erlang/# wget http://www.erlang.org/download/otp_src_R15B01.tar.gz
root@slitaz:/home/temp/erlang/# gunzip -c otp_src_R15B01.tar.gz | tar xf -
- Now we will config erlang with ssl tag (It's a requirement to use mochiweb)
root@slitaz:/home/temp/erlang/# ./configure --with-ssl=/home/temp/openssl/openssl-1.0.1c

- If everything is okey, you'll see this at the end:
..............
config.status: creating Makefile
config.status: creating ../make/i686-pc-linux-gnu/otp.mk
config.status: creating ../make/i686-pc-linux-gnu/otp_ded.mk
config.status: creating ../lib/ic/c_src/i686-pc-linux-gnu/Makefile
config.status: creating ../lib/os_mon/c_src/i686-pc-linux-gnu/Makefile
config.status: creating ../lib/crypto/c_src/i686-pc-linux-gnu/Makefile
config.status: creating ../lib/orber/c_src/i686-pc-linux-gnu/Makefile
config.status: creating ../lib/runtime_tools/c_src/i686-pc-linux-gnu/Makefile
config.status: creating ../lib/tools/c_src/i686-pc-linux-gnu/Makefile
config.status: creating i686-pc-linux-gnu/config.h
config.status: creating include/internal/i686-pc-linux-gnu/ethread_header_config.h
config.status: creating include/i686-pc-linux-gnu/erl_int_sizes_config.h
*********************************************************************
**********************  APPLICATIONS DISABLED  **********************
*********************************************************************

jinterface     : No Java compiler found
odbc           : ODBC library - link check failed

*********************************************************************
*********************************************************************
**********************  APPLICATIONS INFORMATION  *******************
*********************************************************************

wx             : wxWidgets not found, wx will NOT be usable

*********************************************************************
*********************************************************************
**********************  DOCUMENTATION INFORMATION  ******************
*********************************************************************

documentation  :
                 fop is missing.
                 Using fakefop to generate placeholder PDF files.

*********************************************************************

- We will make and install Erlang:
root@slitaz:/home/temp/erlang/# make

root@slitaz:/home/temp/erlang/# make install

- Now, it's time to grab and install rabbitmq package
root@slitaz:/home/temp/erlang/# cd /home/temp
root@slitaz:/home/temp/# mkdir rabbitmq
root@slitaz:/home/temp/# cd rabbitmq

root@slitaz:/home/temp/rabbitmq/# wget http://www.rabbitmq.com/releases/rabbitmq-server/v2.8.2/rabbitmq-server_2.8.2-1_all.deb
root@slitaz:/home/temp/rabbitmq/# tazpkg convert rabbitmq-server_2.8.2-1_all.deb
root@slitaz:/home/temp/rabbitmq/# tazpkg install rabbitmq-server-2.8.2-1.tazpkg

- We are almost done. Let's test things by checking rabbitmqctl
root@slitaz:/# rabbitmqctl
id: unknown user rabbitmq
sh: rabbitmqctl: unknown operand
su: unknown user rabbitmq

- Seem like the converted package didn't create user rabbitmq and group rabbitmq for the server. So we have to do it manually:
root@slitaz:/# addgroup rabbitmq
root@slitaz:/# adduser -S rabbitmq -G rabbitmq

root@slitaz:/# rabbitmqctl status
Status of node rabbit@slitaz ...
Error: unable to connect to node rabbit@slitaz: nodedown

DIAGNOSTICS
===========

nodes in question: [rabbit@slitaz]

hosts, their running nodes and ports:
- slitaz: [{rabbitmqctl22218,45733}]

current node details:
- node name: rabbitmqctl22218@slitaz
- home dir: /home/rabbitmq
- cookie hash: vPSQrFu2FNfj5/GsGmwn5A==

root@slitaz:/#

- Awesome, it works. Now it's time to start the server:
root@slitaz:/# rabbitmq-server start
/usr/lib/rabbitmq/bin/rabbitmq-server: line 70: can't create /var/lib/rabbitmq/mnesia/rabbit@slitaz.pid: Permission denied
Activating RabbitMQ plugins ...
ERROR: Could not create dir /var/lib/rabbitmq/mnesia/rabbit@slitaz-plugins-expand (eacces)
root@slitaz:/#

- Hmm, the user we've created doesn't have permission on that folder. We have to chown:
root@slitaz:/# chown -R rabbitmq:rabbitmq /var/lib/rabbitmq/mnesia

root@slitaz:/# rabbitmq-server start
Activating RabbitMQ plugins ...
0 plugins activated:

{"init terminating in do_boot",{{nocatch,{error,{cannot_log_to_file,"/var/log/rabbitmq/rabbit@slitaz.log",{error,eacces}}}},[{rabbit,ensure_working_log_handler,5,[]},{rabbit,ensure_working_log_handlers,0,[]},{rabbit,prepare,0,[]},{init,eval_script,8,[]},{init,do_boot,3,[]}]}}
init terminating in do_boot ()

- Hmm, we'll have to do the same thing for /var/log/rabbitmq
root@slitaz:/# chown -R rabbitmq:rabbitmq /var/log/rabbitmq

root@slitaz:/# rabbitmq-server start
Activating RabbitMQ plugins ...
0 plugins activated:

{"init terminating in do_boot",{{nocatch,{error,{cannot_log_to_file,"/var/log/rabbitmq/rabbit@slitaz.log",{error,eacces}}}},[{rabbit,ensure_working_log_handler,5,[]},{rabbit,ensure_working_log_handlers,0,[]},{rabbit,prepare,0,[]},{init,eval_script,8,[]},{init,do_boot,3,[]}]}}
init terminating in do_boot ()
root@slitaz:/# chown -R rabbitmq:rabbitmq /var/log/rabbitmq
root@slitaz:/# rabbitmq-server start
Activating RabbitMQ plugins ...
0 plugins activated:


+---+   +---+
|   |   |   |
|   |   |   |
|   |   |   |
|   +---+   +-------+
|                   |
| RabbitMQ  +---+   |
|           |   |   |
|   v2.8.2  +---+   |
|                   |
+-------------------+
AMQP 0-9-1 / 0-9 / 0-8
Copyright (C) 2007-2012 VMware, Inc.
Licensed under the MPL.  See http://www.rabbitmq.com/

node           : rabbit@slitaz
app descriptor : /usr/lib/rabbitmq/lib/rabbitmq_server-2.8.2/sbin/../ebin/rabbit.app
home dir       : /home/rabbitmq
config file(s) : (none)
cookie hash    : vPSQrFu2FNfj5/GsGmwn5A==
log            : /var/log/rabbitmq/rabbit@slitaz.log
sasl log       : /var/log/rabbitmq/rabbit@slitaz-sasl.log
database dir   : /var/lib/rabbitmq/mnesia/rabbit@slitaz
erlang version : 5.9.1

-- rabbit boot start
starting file handle cache server                                     ...done
starting worker pool                                                  ...done
starting database                                                     ...done
starting database sync                                                ...done
starting codec correctness check                                      ...done
-- external infrastructure ready
starting plugin registry                                              ...done
starting auth mechanism cr-demo                                       ...done
starting auth mechanism amqplain                                      ...done
starting auth mechanism plain                                         ...done
starting statistics event manager                                     ...done
starting logging server                                               ...done
starting exchange type direct                                         ...done
starting exchange type fanout                                         ...done
starting exchange type headers                                        ...done
starting exchange type topic                                          ...done
-- kernel ready
starting alarm handler                                                ...done
starting node monitor                                                 ...done
starting cluster delegate                                             ...done
starting guid generator                                               ...done
starting memory monitor                                               ...done
-- core initialized
starting empty DB check                                               ...done
starting exchange, queue and binding recovery                         ...done
starting mirror queue slave sup                                       ...done
starting adding mirrors to queues                                     ...done
-- message delivery logic ready
starting error log relay                                              ...done
starting networking                                                   ...done
starting direct client                                                ...done
starting notify cluster nodes                                         ...done

broker running

- Muahahaha, it's running without a problem except there are not any plugins available. Press CTRL-C and choose a to stop the server. Then enable the management plugin. Before that (this is optional), we can manually create a config file for rabbitmq: /etc/rabbitmq/rabbitmq.config. Hope you know how to use vi :D. Here i make it use 80% available guest memory and 5GB hard disk for data. You can find out these things on rabbitmq memory management website
[
    {mnesia, [{dump_log_write_threshold, 1000}]},
    {rabbit, [{tcp_listeners, [5672]}, {vm_memory_high_watermark, 0.8}, {disk_free_limit, 5368709120}]}
].

The dot at the end of the script is required :D
- To enable management plugin, simply execute
rabbitmq-plugins enable rabbitmq_management
- Finally, we have to let RabbitMQ server auto start when system boot. We'll use screen. First, download and install screen:
root@slitaz:/# tazpkg get-install screen

- And create the bash script to start rabbitmq in a screen session, i name it start-rabbitmq.sh and put in /etc/init.d
/etc/init.d/start-rabbitmq.sh
PATH=$PATH:/usr/local/bin
export PATH
screen -dmS rabbitmq /usr/sbin/rabbitmq-server start

- Obviously, we have to chmod this file
root@slitaz:/# chmod +x /etc/init.d/start-rabbitmq.sh

- And the last step, put it to /etc/init.d/local.sh:
echo "Run rabbitmq startup script"
/etc/init.d/start-rabbitmq.sh


- Save and reboot, you have the awesome RabbitMQ server running on SliTaz. If you want to use it on internet, find dyndns service and do some port fowarding. The ports you need to foward are 5672 and 55672, default port for the server and mochiweb.






Cheers