WINDOWS SERVICE BUS – SMART QUEUE LISTENER

 

Writing elegant software is an art. There are several ways by which you can consume the items in service bus queue. Following code shows one of the elegant ways of achieving that. When you are writing scalable server side software, WORKER-BOSS pattern comes handy.

Imagine boxes coming on a conveyer belt and worker at the end of it are responsible for taking those boxes and loading them onto a truck. If the speed of the belt is high, you need more workers to keep up with the arriving boxes.

Similarly, in the software world, if the queue items are arriving at an higher speed, you would rather prefer an option to control the number of processes (or worker threads) who will process these queue items. The following Smart queue listener class is designed to achieve this goal. Also, what you with the message varies on your context. This class takes function pointer (term used loosely). This function will be executed for every incoming message.

Also note that, this listener is asynchronous (non-blocking).

Enjoy the code and feel free to use it anywhere you want!

Code Snippet
  1. public class SmartQueueListener
  2. {
  3.     int _threadCount = 4;
  4.     BlockingCollection<Task> _dequeueTasks = null;
  5.  
  6.     Action<BrokeredMessage> _dequeAction;
  7.  
  8.     //I have several tasks created in a loop. Im going to use just one cancellation
  9.     //token so I can cancel them all at once.
  10.     CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource();
  11.  
  12.     public SmartQueueListener(int threadCount, Action<BrokeredMessage> dequeAction)
  13.     {
  14.         //Guard.ArgumentNotZeroOrNegativeValue(threadCount, "threadCount");
  15.  
  16.         _dequeAction = dequeAction;
  17.  
  18.         _threadCount = threadCount;
  19.         _dequeueTasks = new BlockingCollection<Task>();
  20.  
  21.         for (int i = 0; i < threadCount; i++)
  22.         {
  23.             //I am passing this to the task
  24.             int taskId = i + 1;
  25.  
  26.             // Start a new dequeue task and register it in the collection of tasks internally managed by this component.
  27.             _dequeueTasks.Add(Task.Factory.StartNew(DequeueTaskMain,
  28.                                                     taskId,
  29.                                                     _cancellationTokenSource.Token,
  30.                                                     TaskCreationOptions.LongRunning,
  31.                                                     TaskScheduler.Default));
  32.  
  33.         }
  34.  
  35.         // Mark this collection as not accepting any more additions.
  36.         this._dequeueTasks.CompleteAdding();
  37.     }
  38.  
  39.     private void DequeueTaskMain(object state)
  40.     {
  41.         //Takes most of the information from config file    
  42.         MessagingFactory messagingFactory = MessagingFactory.Create();
  43.         NamespaceManager namespaceManager = NamespaceManager.Create();
  44.  
  45.         BrokeredMessage message = null;
  46.         QueueClient myQueueClient = messagingFactory.CreateQueueClient("OrderQueue");
  47.         while ((message = myQueueClient.Receive(new TimeSpan(hours: 24 * 24, minutes: 0, seconds: 5))) != null)
  48.         {
  49.             Console.WriteLine(string.Format("Message received: {0}, {1}, {2}", message.SequenceNumber, message.Label, message.MessageId));
  50.  
  51.             try
  52.             {
  53.                 _dequeAction(message);
  54.             }
  55.             catch (Exception ex)
  56.             {
  57.  
  58.             }
  59.  
  60.             message.Complete(); //call this if the message is successfully processed
  61.             //message.Abandon(); //call this if the message still needs to available in he queue
  62.         }
  63.     }
  64.  
  65. }

 

Following code segment show how you could use this smart listener

Code Snippet
  1. class Program
  2.     {
  3.         static SmartQueueListener _listener = null;
  4.         static void Main(string[] args)
  5.         {
  6.             SmartQueueListener listener = new SmartQueueListener(5, ProcessMessage);
  7.             Console.Write("Press ENTER to exit");
  8.             Console.ReadLine();
  9.         }
  10.  
  11.         static void ProcessMessage(BrokeredMessage message)
  12.         {
  13.             int myContent = message.GetBody<int>();
  14.             Console.WriteLine(string.Format("Message received: {0}, {1}, {2}", myContent, message.Label, message.MessageId));
  15.         }
  16.     }

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google+ photo

You are commenting using your Google+ account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

%d bloggers like this: