Tuesday, December 29, 2015

OutputCache & Instrument using Flatwhite

If you are looking for OutputCache for WebApi, go here

What is Flatwhite?

Flatwhite is an AOP library with MVC and WebAPI ActionFilter style using Castle dynamic proxy. There are many libraries out there to help you intercept a method call such as PostSharp, recently CodeCop and they're really cool tools. However, I've been using Castle dynamic proxy for a many years and I think it offers enough needs for my projects. Therefore, Flatwhite is an opinionated library to facilitate usages of Castle dynamic proxy for method interceptions.

Current release only supports Autofac but I think other IOC containers also use Castle dynamic proxy when they come to interception so they will be supported in the future.

You can create MethodFilterAttribute to add custom logic to any methods as soon as it is interceptable by Castle Dynamic Proxy (virtual not final). Flatwhite has a built-in OutputCacheFilter to cache method result which can auto refresh stale content. You can use Flatwhite simply for caching or extending behavior of your code such as profiling, logging by implement MethodFilterAttribute similar to MVC's ActionFilterAttribute

When to use Flatwhite?

You have classes implemented interfaces and registered using Autofac (for now). You have a need to intercept method calls so you possibly have 2 quick options:

  • Use Autofac.Extras and call EnableInterfaceInterceptor() on type registrations then create/register custom IInterceptor.
  • Or use Flatwhite, implement an MethodFilterAttribute and decorate on the methods on your interfaces which you want to intercept.

As mentioned above, Flatwhite has a built-in OutputCacheFilter to cache method output. It works for methods that have a return value both sync and async methods. Beside caching, you can also implement MethodFilterAttribute and ExceptionFilterAttribute to add custom logic to your code.

How to use Flatwhite?

** Required packages: and
For now, Flatwhite needs to be used with Autofac (except Flatwhite.WebApi package). It requires Castle Dynamic proxy to intercept methods so it's a requirement to have public interface or your methods must be virtual and not final to be intercepted.

For caching:

1/ Enable class interceptor
If you modify the class to make the method virtual and decorate the method with OutputCacheAttribute, you will register the class like this:
public class UserService
    [OutputCache(Duration = 2, VaryByParam = "userId")]
    public virtual object GetById(Guid userId) 
        // ...
var builder = new ContainerBuilder().EnableFlatwhite();

2/ Enable interface interceptor
If the methods are not virtual, but the class implements an interface, you can decorate the methods on the interface with OutputCacheAttribute and register the type like this
public interface IUserService
    [OutputCache(Duration = 2, VaryByParam = "userId")]
    object GetById(Guid userId);

    object GetByEmail(string email);

    IEnumerable<object> GetRoles(Guid userId);

var builder = new ContainerBuilder().EnableFlatwhite();

3/ Quick enable cache on all methods
If you don't want to decorate the OutputCache attribute on the interface, you can do like this to enable cache on all methods
var builder = new ContainerBuilder().EnableFlatwhite();

4/ Choose the method to cache without using Attribute filter
If you want to cache on just some methods, you can selectively do like below. Again, it works only on virtual methods if you are registering class service; interface services are fine.
var builder = new ContainerBuilder().EnableFlatwhite();
                              .ForMember(x => x.GetById(Argument.Any<Guid>()))

                              .ForMember(x => x.GetComments(Argument.Any<Guid>(), Argument.Any<int>()))
                              .WithChangeMonitors((i, context) => 
                                    return new[] {new YourCustomCacheChangeMonitor()};

5/ Enable interceptors on all previous registrations
If you're a fan of assembly scanning, you can decorate the OutputCache attribute on classes & interfaces you want to cache and enable them by RegisterModule FlatwhiteBuilderInterceptModule before building the container
var builder = new ContainerBuilder();

// Register other types normally

// Register FlatwhiteBuilderInterceptModule at the end
var container = builder.Build();

Note that you don't have to call EnableFlatwhite() after creating ContainerBuilder like the other methods.

6/ Auto refresh stale data
Flatwhite can auto refresh the stale content if you set StaleWhileRevalidate with a value greater than 0. This should be used with Duration to indicates that caches MAY serve the cached result in which it appears after it becomes stale, up to the indicated number of seconds The first call comes to the service and gets a stale cache result will also make the cache system auto refresh once in the background. So if the method is not called many times in a short period, it's better to turn on AutoRefresh to make the cache alive and refreshed as soon as it starts to be stale
public interface IBlogService
    // For method with too many cache variations because of VaryByParam settings
    [OutputCache(Duration = 5, VaryByParam = "tag, from, authorId", StaleWhileRevalidate = 5)]
    IEnumerable<object> Search(string tag, DateTime from, Guid authorId);    

    // For method with not many cache variations and data is likely to changed every 5 seconds
    [OutputCache(Duration = 5, VaryByParam = "blogId", StaleWhileRevalidate = 5)]
    object GetById(int blogId);    

    // You can turn on AutoRefresh to keep the cache active if there are limited variations of the cache
    [OutputCache(Duration = 5, VaryByParam = "blogId", StaleWhileRevalidate = 5, AutoRefresh = true)]
    IEnumerable<string> GetBlogCategory();    

7/ Using CacheProfile
public interface IUserService
    object GetById(Guid userId);

Profile setting default file name is cacheProfile.yaml located at the same folder of your app.config/web.config file and has a "yaml like" format:
-- Cache profile settings, everything is case-sensitive
-- Profile name
    -- Profile propertyName:propertyValue, start with a tab or 4+ empty spaces


You can implement another IOutputCacheProfileProvider and set to Global.OutputCacheProfileProvider or simply change the location/name of the yaml file. At the moment, only yaml file is supported.

8/ Revalidate cache
Even though you can use AutoRefresh or StaleWhileRevalidate to auto refresh cache data. Some time you want to remove the cache item after you call a certain method. You can use RevalidateAttribute to remove the cache item or some related cache items. Decorate the attribute on another method and the cache item will be removed once the method is invoked successfully. On example below, when you call method DisableUser, because it has the Revalidate attribute decorated with "User" as the key, all related caches created for method with attribute OutputCache which has RevalidationKey = "User" will be reset.
public interface IUserService
    [OutputCache(Duration = 2, StaleWhileRevalidate = 2, VaryByParam = "userId", RevalidationKey = "User")]
    object GetById(Guid userId);

    [OutputCache(Duration = 2, VaryByParam = "userId", RevalidationKey = "User")]
    Task<object> GetByIdAsync(Guid userId); 

    void DisableUser(Guid userId);  

Unfortunately, this is not working for distributed services. That means the method is called on one server cannot notify the other service instances on remote servers. However, it's technically achievable to extend this filter using queueing or something like that to notify remote system.

For additional logic before/after calling methods

Flatwhite is inspired by WebAPI and ASP.NET MVC ActionFilterAttribute, so it works quite similar. The base filter attribute has following methods. So simply implement your filter class and do whatever you want.
public abstract class MethodFilterAttribute : Attribute

    public virtual void OnMethodExecuting(MethodExecutingContext methodExecutingContext);    
    public virtual Task OnMethodExecutingAsync(MethodExecutingContext methodExecutingContext);   
    public virtual void OnMethodExecuted(MethodExecutedContext methodExecutedContext);    
    public virtual Task OnMethodExecutedAsync(MethodExecutedContext methodExecutedContext);    
If you decorate the filter on async methods, only OnMethodExecutingAsync and OnMethodExecutedAsync are called. During the filters are being executed, if the Result value is set to the MethodExecutingContext, the remaining filters will be ignored.

For error handling

Similar to MethodFilterAttribute, you can implement ExceptionFilterAttribute to provide custom error handling logic. If the property MethodExceptionContext.Handled is true, all remaining ExceptionFilter will be ignored.
public abstract class ExceptionFilterAttribute : Attribute
    public virtual void OnException(MethodExceptionContext exceptionContext);    
    public virtual Task OnExceptionAsync(MethodExceptionContext exceptionContext);       

What's else?

Flatwhite for WebAPI: https://github.com/vanthoainguyen/Flatwhite/wiki/Flatwhite.WebApi
Wiki: https://github.com/vanthoainguyen/Flatwhite/wiki

Wednesday, June 25, 2014

Book Review RabbitMQ Essentials

I've had a chance to read through the book "RabbitMQ Essentials" published by Packtpub.com

This book is an excellent book for those who is thinking about integrating their software architect with RabbitMQ. The book is using an Application Inbox to step by step walk you through different technical decision to design and build an Inbox using RabbitMQ. Well, I personally wouldn't build an Inbox application using RabbitMQ because I think RabbitMQ is not a database, messages suppose to be consumed as fast as possible and shouldn't be queued up. Another debate is that it would require you to have each queue for each users which is not ideal to me. The book mentioned we can create thousand of queues without any problem, I wouldn't say yes or say no to that claim because I've never tried it but I wouldn't try to do that to my solution if I have other approach. I've been experimenting enough problem from RabbitMQ cluster network partitioning that made me to rebuild the cluster from scratch. Having thousand queues to backup and restore when disaster happen is possibly a pain. However, I think the application example in this book is fine and a perfect example to help someone who has never used RabbitMQ to look into messaging world. I strongly think you would learn all the skills you need to work with RabbitMQ from basic knowledge such as Exchange, Queue and Binding to setting up a HA cluster or using federation.

In some beginning chapters, It was very interesting to know why RabbitMQ hasn't supported latest AMQP standard and I completely agree with that. I would expect to read more about RabbitMQ ops in later chapters such as trouble shooting network partition, or ideal cluster set-up depend on hosting environments but It's not there in the book. Anyway, here is a brief table of contents:

  • Chapter 1. A Rabbit Spring to Life
  • Chapter 2. Creating an Application Inbox
  • Chapter 3. Switching to Server-push
  • Chapter 4. Handling Application Logs
  • Chapter 5. Tweaking Message Delivery
  • Chapter 6. Smart Message Routing
  • Chapter 7. Taking RabbitMQ to Production
  • Chapter 8. Testing and Tracing Applications

In summary, If you are about to learn RabbitMQ, you should read it. It's short enough for you to read on the train but still cover all important aspect. If you are using RabbitMQ but still want to reinforce your knowledge about RabbitMQ, you should also read it. The only thing I think the book doesn't have is more real-life production experiences from people. But I'm sure we can always find it from online community.


How did we recover from RabbitMQ network partition

We have a RabbitMQ cluster of 3 nodes hosting with Storm. Everything has been very smooth for a years without a problem until Storm upgraded their switches recently, twice actually. Unfortunately, both times they caused network partition problem to our cluster even though Storm said that the upgrade wouldn't affect their customers, just a little bit latency between servers :).

Anyway, our mode in the cluster is pause_minority which based on an incorrect assumption. It worked great when a single node crashed for some reason but it's absolutely a bad choice in network partition. In this case, all these 3 servers couldn't communicate with any of the other 2 so they all thought they were minority and paused. As a result, the whole cluster stopped working even after the server could ping each others again.

So what did I do? It was a little panic actually because every second we don't have the queue, we lost real time data for that moment. I tried to rabbitmqctl stop_app, none of these servers responded immediately. So I tried to CTRL C and rabbitmqctl start_app again, same thing, it didn't seem to response. I thought the backlog on the queue/disk needed time to start the app. Fortunately, I had a backup json file of the whole cluster so I decided to reset everything and rebuild the cluster from scratch accepting that we would lost all the backlog existing in those queues.

With this approach, I had to kill the beam.smp process, delete all files/folders in mnesia directory, reboot the nodes and restored the backup file. That was cool and whole cluster configuration was load successfully and we had the cluster back. It took me a few hours to make this happen, we lost the real time data and the backlog but we had a way to get those data back from other sources so It was just some more hours for us to get the system back to normal.

One thing to note here about the second network partition, those servers recovered quickly after the switch upgrade but they seemed to lost all the backlog on their queues, apparently they were all in pause_minotiry mode but I would expect they wouldn't lost data on the queue. I was pretty sure those data was published to the queue with persistence mode 1 which made the servers write the data to disk. I guess RabbitMQ was right when saying that "Other undefined and weird behaviour may occur" during a network partition.

After this disaster, I've changed our cluster mode to autoheal and wait to see how it would cope with the next network partition. With this setting, we can have 2 nodes cluster and we could have 2 nodes running separately after the next network partition but at least we wouldn't lost the backlog. I've been thinking about another approach using federation which possibly a better approach for us in such not so reliable network situation where the second nodes will have everything published to the first node via a upstream. Anyway, we'll wait and see.

Hope this information help someone ;)

Monday, June 23, 2014

Using RabbitMQ Header exchange with Burrow.NET

In RabbitMQ, messages are published to the Exchange. However, subscription will be made against queues. Binding is the key to make the right messages to go to the expected queue. In this post, we will setup an exchange to filter messages that have ip = 1"" in the header go to an expected queue. This will involve following steps:
  • - Create an exchange type Header
  • - Create a queue
  • - Bind the queue to the exchange with following arguments
       ip = ""
       x-match = "all"
  • - Publish messages to the exchange with the ip in the header
  • - Subscribe the the queue and process such messages

RabbitMQ header exchange is capable of filtering multiple header value by allowing us to set x-match = all or any. And using a combination of the bindings will facilitate any complex routing logics. In this post, we are filtering only the ip in the header so x-match = all or any doesn't matter. Creating exchange, queue and binding can be done manually. However, below is the code to do those using Burrow.NET library:
Assume that the exchange name is : Burrow.Demo.E.Headers
the queue name is                : Burrow.Demo.Q.Message
and the message to use is        : IpMessage

1/ Setup queue, exchange and binding using Burrow.NET

var setup = new RabbitSetup(_connectionString);
var routeSetupData = new RouteSetupData
    RouteFinder = new ConstantRouteFinder("Burrow.Demo.E.Headers", "Burrow.Demo.Q.Message", "IpMessage"),
    ExchangeSetupData = new ExchangeSetupData
        ExchangeType = ExchangeType.Headers,
    QueueSetupData = new QueueSetupData
        MessageTimeToLive = 100000,
    SubscriptionName = SubscriptionName,
routeSetupData.OptionalBindingData["ip"] = "";
routeSetupData.OptionalBindingData["x-match"] = "all";

Normally, an application should have an implementation of IRouteFinder, I use ConstantRouteFinder in this example for simple demonstration. After executing the above code, if you check the created queue or exchange, you should see the expected binding.

Note: I set MessageTimeToLive = 100 seconds, it's optional.

2/ Publishing data to RabbitMQ Exchange

var tunnel = RabbitTunnel.Factory.Create(_connectionString);
tunnel.SetSerializer(new JsonSerializer());
tunnel.SetRouteFinder(new ConstantRouteFinder("Burrow.Demo.E.Headers", "Burrow.Demo.Q.Message", "IpMessage"));
tunnel.Publish(new IpMessage(), new Dictionary<string, object>{{"ip", ""}});

3/ Subscribe data from Burrow.Demo.Q.Message

var tunnel = RabbitTunnel.Factory.Create(_connectionString);
tunnel.SetSerializer(new JsonSerializer());
tunnel.SetRouteFinder(new ConstantRouteFinder("Burrow.Demo.E.Headers", "Burrow.Demo.Q.Message", "IpMessage"));
tunnel.Subscribe(new SubscriptionOption<IpMessage>
    MessageHandler = msg => { /* Handle the message here */},
    SubscriptionName = "AwesomeApp",
    QueuePrefetchSize = 10, 
    BatchSize = 2 // 2 threads

Summary, I normally make a wrapper class on top of the publishing code to set the required header value to the dictionary from the message fields. We should have in mind fields in the header we're going to use to create the correct binding. However, if later in your project, a new field is added and it's a criteria to put in the header for routing messages, a new binding including that field should be added and we can simply delete the old binding.


Thursday, July 18, 2013

Monitor RabbitMQ queues & count total messages

- I used to do the hard way which is creating another queue which is bound to the same exchange with same parameters, routing keys. Then consume from that queue and count the message in 5 minutes, 15 minutes, 1 hour etc.

- I've just got another idea which is more simple. The solution is similar to the first approach, but you apply "Message TTL" parameter when creating the queue. So you will have to create 3 queues with Message TTL to 5 minutes, 15 minutes, 1 hour. At any point, the total messages in each queue is the actual number you want. And with latest Burrow.NET (From 1.0.17) you can actually count the message in the queue this way:

ITunnel tunnel = RabbitTunnel.Factory.Create();
// Set routeFinder if you have custom routefinder
var messageCount = tunnel.GetMessageCount<YourMessageType>("");

//Or if you want to count all the messages in priority queues:

ushort maxLevel = 5;
ITunnel tunnel = RabbitTunnel.Factory.WithPrioritySupport()
// Set routeFinder if you have custom routefinder 
var messageCount = tunnel.GetMessageCount<YourMessageType>("", maxLevel);