Category Archives: Message Queue

Implicitly Acknowledging a Message from Azure Service Bus

In this post I discussed receiving, processing and acknowledging a message using the Azure Service Bus. There are two ways to acknowledge a message received from the queue (which are common to all message broker systems that I’ve used so far). That is, you either take the message, process it, and then go back to the broker to tell it you’re done (explicit acknowledgement); or, you remove the queue and then process it (implicit acknowledgement).

Explicit Acknowledgement / PeekLock

If the message is not processed within a period of time, then it will be unlocked and returned to the queue to be picked up by the next client request.

The code for this is as follows (it is also the default behaviour in Azure Service Bus):

QueueClient queueClient = QueueClient.CreateFromConnectionString(connectionString, queueName, ReceiveMode.PeekLock);

Remember that, with this code, if you don’t call:

message.Complete();

Then you will repeatedly read the same message over and over again.

Implicit Acknowledgement / ReadAndDelete

Here, if the message is not processed within a period of time, or fails to process, then it is likely lost. So, why would you ever use this method of acknowledgement? Well, speed is the main reason; because you don’t need to go back to the server, you potentially increase the whole transaction speed; furthermore, there is clearly work involved for the broker in maintaining the state of a message on the queue, expiring the message lock, etc.

The code for the implicit acknowledgement is:

QueueClient queueClient = QueueClient.CreateFromConnectionString(connectionString, queueName, ReceiveMode.ReceiveAndDelete);

References

https://docs.microsoft.com/en-us/rest/api/servicebus/peek-lock-message-non-destructive-read

Reading a Message From an Azure Service Bus Queue

In this post. I documented how to create a new application using Azure Service Bus and add a message to the queue. In this post, I’ll cover how to read that post from the queue, and how to deal with acknowledging the receipt.

The Code

The code from this post can be found here.

The code uses a lot of hard coded strings and static methods, and this is because it makes it easier to see exactly what it happening and when. This is not intended as an example of production code, more as a cut-and-paste repository.

Reading a Message

Most of the code that we’ve written can simply be re-hashed for the receipt. First, initialise the queue as before:

            Uri uri = ServiceManagementHelper.GetServiceUri();
            TokenProvider tokenProvider = ServiceManagementHelper.GetTokenProvider(uri);

            NamespaceManager nm = new NamespaceManager(uri, tokenProvider);
            if (!nm.QueueExists("TestQueue")) return;

Obviously, if the queue doesn’t exist for reading, there’s limited point in creating it. The next step is to set-up a queue client:

            string connectionString = GetConnectionString();

            QueueClient queueClient = QueueClient.CreateFromConnectionString(connectionString, queueName);

            return queueClient;

The connection string is found here:

Finally, ask for the next message:

            BrokeredMessage message = queueClient.Receive();
            string messageBody = message.GetBody<string>();
            Console.WriteLine($"Message received: {messageBody}");

And we can see the contents of the queue:

If we run again:

We can see that, despite being read, the message is still sat in the queue:

Acknowledging the Message

To explicitly acknowledge a message, just calling the Complete method on the message object will work:

            BrokeredMessage message = queueClient.Receive();
            string messageBody = message.GetBody<string>();

            message.Complete();

            Console.WriteLine($"Message received: {messageBody}");

And the message is gone:

Summary and Cost

We now have a basic, working, message queue. But one thing that I always worry about with Azure is how much this costs. Let’s run a send and receive for 100 messages with the content: “test” as above.

The first thing is to change the code slightly so that it reads through all messages (not just the first):

                while (true)
                {
                    string message = ReadMessage("TestQueue");

                    if (string.IsNullOrWhiteSpace(message)) break;
                    Console.WriteLine($"Message received: {message}");
                }
        private static string ReadMessage(string queueName)
        {
            QueueClient client = QueueManagementHelper.GetQueueClient(queueName);

            BrokeredMessage message = client.Receive();
            if (message == null) return string.Empty;
            string messageBody = message.GetBody<string>();

            message.Complete();

            return messageBody;
        }

Then run this to clear the queue. By default, client.Receive has a default timeout, so it will pause for a few seconds before returning if there are no messages. This timeout is a very useful feature. Most of this post was written on a train with a flaky internet connection, and this mechanism provided a resilient way to allow communications to continue when the connection was available.

And change the send code:


            string message = Console.ReadLine();

            for (int i = 1; i <= 100; i++)
            {
                AddNewMessage("1", message, "TestQueue");
            }

Next, the current credit on my account:

Let’s run 100 messages:

That looks familiar. Let’s try 10,000:

I’ve added some times to this, too. It’s processing around 10 / second – which is not astoundingly quick. It’s worth mentioning again that this post was written largely on a train, but still, 10 messages per second means that 10K messages will take around 15 mins. It is faster when you have a reliable (non-mobile) internet connection, but still. Anyway, back to cost. 10K messages still showed up as a zero cost.

But, Azure is a paid service, so this has to start costing money. This time, I’m adding 1000 character string to send as a message, and sending that 100,000 times.

After this, the balance was the same; however, the following day, it dropped slightly to £36.94. So, as far as I can tell, the balance is updated based on some kind of job that runs each day (which means that the balance might not be updated in real-time).

I asked this question here.

The published pricing details are here, but it looks like you should be able to post around 500,000 messages before you start incurring cost (1M operations).

References

https://insidethecpu.com/2015/11/06/levaraging-azure-service-bus-with-c/

https://www.simple-talk.com/cloud/cloud-data/an-introduction-to-windows-azure-service-bus-brokered-messaging/

https://msdn.microsoft.com/en-gb/library/hh868041.aspx?f=255&MSPPError=-2147217396

https://stackoverflow.com/questions/14831281/how-does-the-service-bus-queueclient-receive-work

Acknowledging a Message Using RabbitMQ

Conceptually, message queues work in a similar fashion: you send a message to an exchange, the exchange allows people to read the message, and you have functionality that ensures the original message arrives with the destined recipient. Acknowledgement of a message is basically a way to ensure that delivery. Having said that, the two main message brokers that I’ve been investigating deal with this in a slightly different manner (albeit, the same things happen in the end).

If you want to follow through, it might be an idea to use the code from my first post as a starting point.

Let’s change the send code first to send a few messages:

static void Main(string[] args)
{            
    for (int i = 1; i <= 100; i++)
    {
        string msg = $"test{i}";
 
        SendNewMessage(msg);        
    } 
            
}

Now we have 100 messages.

rabbitack1

Next, we’ll look at the receiving code:

public void ReceiveNextMessage()
{
    var result = _channel.QueueDeclare("NewQueue", true, false, false, null);
    Console.WriteLine(result);
 
    EventingBasicConsumer consumer = new EventingBasicConsumer(_channel);
    consumer.Received += Consumer_Received;
 
    _channel.BasicConsume("NewQueue", false, consumer);
 
}

BasicConsume() has a parameter called “noAck”. It took me a while to work this out, but noAck means that it doesn’t expect an acknowledgement; that is, it will automatically acknowledge receipt. So, noAck = True mean automatically acknowledge, and noAck = False means manually acknowledge. That not entirely uncomplicated.

The received event looks a bit like this:

private void Consumer_Received(object sender, BasicDeliverEventArgs e)
{
    var body = e.Body;
    var message = Encoding.UTF8.GetString(body);
    //if (message.Contains("3")) 
    //   throw new Exception("Error here !");
 
    //_channel.BasicAck(e.DeliveryTag, false);
 
    Console.WriteLine(message);
}

I’ve left the error and the commented out BasicAck in on purpose. If you run this, unlike with ActiveMQ, where you will get a message at a time, you will get all messages in the queue (because it’s event based). Add in the BasicAck() to acknowledge the queue and you’re good to go.

If you add in the error at this stage, you can see that, in exactly the same way as ActiveMQ, it will only acknowledge the correct message. What you will also see here is we have the poison message scenario that I discussed in this post on ActiveMQ.

IMHO, this is where RabbitMQ beats ActiveMQ hands down. The following code is the simplest version of dealing with a poison message:

private void Consumer_Received(object sender, BasicDeliverEventArgs e)
{
    try
    {
        var body = e.Body;
        var message = Encoding.UTF8.GetString(body);
 
        if (message.Contains("3"))
            throw new Exception("Error here !");
 
        _channel.BasicAck(e.DeliveryTag, false);
 
        Console.WriteLine(message);
    }
    catch (Exception ex)
    {
        Console.WriteLine(ex);
        _channel.BasicNack(e.DeliveryTag, false, true);
    }
}

So, we have a problem, and we issue a Nack. What the Nack does is allow you to re-queue the message. The code above allows the queue to process, but just moves all the bad ones to the back. The obvious problem here is that it the problem isn’t transient, we’ll keep coming back to them. It does, however, get around the problem – the queue is no longer blocked.

The solution, as it was with ActiveMQ, is to put them in a “dead letter queue”; however, unlike ActiveMQ, this is remarkably easy. Firstly, let’s refactor our queue creation a little:

public void ReceiveNextMessage()
{
    Dictionary<string, object> args = DeadLetterHelper.CreateDeadLetterQueue(_channel);
 
    // How declare the queue and pass in the dead letter exchange
    var result = _channel.QueueDeclare("NewQueue", true, false, false, args);
    Console.WriteLine(result);
 
    EventingBasicConsumer consumer = new EventingBasicConsumer(_channel);
    consumer.Received += Consumer_Received;
 
    _channel.BasicConsume("NewQueue", false, consumer);
 
}

You’ll notice that we go to a new helper method called: “CreateDeadLetterQueue()” and return a dictionary; which is, in turn, passed through to our new queue. The CreateDeadLetterQueue() function looks like this:

public static Dictionary<string, object> CreateDeadLetterQueue(IModel channel,
    string deadLetterExchange, string deadLetterRoutingKey, string deadLetterQueue)
{
    // Declare dead letter exchange                     
    channel.ExchangeDeclare(deadLetterExchange, "direct");
    Dictionary<string, object> args = new Dictionary<string, object>()
    {
        { "x-dead-letter-exchange", deadLetterExchange },
        { "x-dead-letter-routing-key", deadLetterRoutingKey }
    };
 
    // Bind the exchange to a queue
    channel.QueueDeclare(deadLetterQueue, true, false);
    channel.QueueBind(queue: deadLetterQueue,
                    exchange: deadLetterExchange,
                    routingKey: deadLetterRoutingKey);
    return args;
}

There’s effectively two steps. Firstly we need a dead letter exchange, and this needs a routing key (in this case, “dead-letter”). Next, we declare the DeadLetterQueue with the same routing key. Finally, return the argument list, which allows the linking of the main queue to the dead letter queue.

Now we are going to change the receive code so that it doesn’t re-queue:

private void Consumer_Received(object sender, BasicDeliverEventArgs e)
{
    try
    {
        var body = e.Body;
        var message = Encoding.UTF8.GetString(body);
 
        if (message.Contains("3"))
            throw new Exception("Error here !");
 
        _channel.BasicAck(e.DeliveryTag, false);
 
        Console.WriteLine(message);
    }
    catch (Exception ex)
    {
        Console.WriteLine(ex);
        _channel.BasicNack(e.DeliveryTag, false, false);
    }
}

And running it results in a dead letter queue full of all our dodgy data:

rabbitack2

If you start getting errors when you run this, try referring to this article.

References

RabbitMQ documentation on the subject

Acknowledging a Message in Active MQ

Following on from my previous post on Active MQ, I’m now going to explore creating a mechanism whereby the message can fail.

The main issue with the trial project was that it used an auto acknowledge:

using (ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))

There are a number of considerations here; firstly, what if the message that you read errors – we want to retry; but secondly, what happens is the message repeatedly errors (this type of message is known as a poison message).

The Problem

Here’s the send code again from the last post:

string queueName = "TextQueue";

Console.WriteLine($"Adding message to queue topic: {queueName}");

string brokerUri = $"activemq:tcp://localhost:61616";  // Default port
NMSConnectionFactory factory = new NMSConnectionFactory(brokerUri);

using (IConnection connection = factory.CreateConnection())
{
    connection.Start();

    using (ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
    using (IDestination dest = session.GetQueue(queueName))
    using (IMessageProducer producer = session.CreateProducer(dest))
    {                     
        producer.DeliveryMode = MsgDeliveryMode.Persistent;

        var msg = session.CreateTextMessage();
        producer.Send(msg);
                                                
        Console.WriteLine($"Sent {text} messages");
        
    }
}            

Other than splitting the message out, I haven’t changed anything. Okay, so let’s run that and check the queue:

msgrec1

msgrec2

Now, I’m going to change the receive code slightly:

    string queueName = "TextQueue";
 
    string brokerUri = $"activemq:tcp://localhost:61616";  // Default port
    NMSConnectionFactory factory = new NMSConnectionFactory(brokerUri);
 
    using (IConnection connection = factory.CreateConnection())
    {
        connection.Start();
        using (ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
        using (IDestination dest = session.GetQueue(queueName))
        using (IMessageConsumer consumer = session.CreateConsumer(dest))
        {
            IMessage msg = consumer.Receive();
            if (msg is ITextMessage)
            {
                ITextMessage txtMsg = msg as ITextMessage;                        
 
                Console.WriteLine($"Received message: {txtMsg.Text}");
 
                message = txtMsg.Text;
 
                throw new Exception("Test"); // <-- May cause problems
                
 
                return true;
            }
            else
            {
                Console.WriteLine("Unexpected message type: " + msg.GetType().Name);
            }
        }                
    }

As you can see, there is now an issue in the code; for some reason, it is repeatedly throwing an error entitled “Test”. I can’t work out why (maybe I’ll post a question on StackOverflow later), but when I run that, despite crashing, the message is read, and the queue is now empty.

Obviously, this is an issue: if that message was “DebitBankAccountWith200000” then someone is going to wish that the person that wrote this code hadn’t automatically acknowledged it.

Firstly, how do we stop the auto acknowledge?

There are basically two alternatives to auto acknowledge (there are more, but we’ll only look at two here): client acknowledge, and transactional acknowledgement. I’ll leave transactional acknowledgement for another day.

Client Acknowledge

This method is basically the manual version. You’re telling ActiveMQ that you will, or will not acknowledge the message yourself. Now, let’s alter the receive code slightly:

using (ISession session = connection.CreateSession(AcknowledgementMode.ClientAcknowledge))
using (IDestination dest = session.GetQueue(queueName))
using (IMessageConsumer consumer = session.CreateConsumer(dest))
{
    IMessage msg = consumer.Receive();
    if (msg is ITextMessage)
    {
        ITextMessage txtMsg = msg as ITextMessage;                        

        Console.WriteLine($"Received message: {txtMsg.Text}");

        message = txtMsg.Text;

        throw new Exception("Test");
        msg.Acknowledge();
        

        return true;
    }
    else
    {
        Console.WriteLine("Unexpected message type: " + msg.GetType().Name);
    }
}                

As you can see, I’ve changed two main parts here; the first is that I’ve changed that AcknowledgmentMode to ClientAcknowledge and I’ve added a call to the acknowledge method on the message.

Now let’s re-run the send and receive and see what happens to the queue.

msgrec3

Unfortunately, I still haven’t worked out why it’s crashing, but here’s the queue; still safely with the message:

msgrec4

We had an error, it crashed, but because it was never acknowledged, it’s still safe and sound in the queue. When we run the receive again, hopefully the bug will have magically disappeared and the message will successfully process.

Poison Messages

The concept of a poison message is where the issue with the message, resides in the message; the situation described above is not a poison message because the message is fine; but code is erroring. Once the code above is fixed, the message can be processed; however, let’s have a look at a different error scenario; here’s some new receive code:

string queueName = "TextQueue";
 
string brokerUri = $"activemq:tcp://localhost:61616";  // Default port
NMSConnectionFactory factory = new NMSConnectionFactory(brokerUri);
 
using (IConnection connection = factory.CreateConnection())
{
    connection.Start();
    using (ISession session = connection.CreateSession(AcknowledgementMode.ClientAcknowledge))
    using (IDestination dest = session.GetQueue(queueName))
    using (IMessageConsumer consumer = session.CreateConsumer(dest))
    {
        IMessage msg = consumer.Receive();
        if (msg is ITextMessage)
        {
            ITextMessage txtMsg = msg as ITextMessage;                        
 
            Console.WriteLine($"Received message: {txtMsg.Text}");
 
            message = txtMsg.Text;
 
            if (message.First() != 't')
                throw new Exception("Message is invalid");
            msg.Acknowledge();
            
 
            return true;
        }
        else
        {
            Console.WriteLine("Unexpected message type: " + msg.GetType().Name);
        }
    }                
}

This time, we have some code that actually processes the message and, based on the contents, does something; in this case, it throws an error where the message doesn’t start with ‘t’. So, the rules are simple; messages start with ‘t’. Let’s run the send code again and try some messages:

msgrec5

And now let’s receive these messages (incidentally, while testing this, my notes on starting two projects might be useful):

msgrec6

Okay – so we’ve come across a message that we can’t process. This has yet to be acknowledged, so it’s still safe and sound in the queue. We’ll simply restart the listener and pick it up:

msgrec7

Ah – okay. So, we have a problem. “nexttest” is causing an error with the queue, but if we don’t acknowledge it, we’re going to keep picking it up and erroring.

The Antidote

Once we know that the message is causing a problem, we can send it to a special queue; here’s the code to capture the error:

ITextMessage txtMsg = msg as ITextMessage;
 
try
{
    Console.WriteLine($"Received message: {txtMsg.Text}");
 
    message = txtMsg.Text;
 
    if (message != null && message.First() != 't')
    {
        // The message has a problem, and so we need to file it away without losing it
 
        throw new Exception("Message is invalid");
    }
    msg.Acknowledge();
 
    return true;
}
catch
{
    ResendMsg(session, msg);
    msg.Acknowledge(); // Acknoweledge the message from the original queue
}

And here’s the new method, ResendMsg:


private static void ResendMsg(ISession session, IMessage msg)
{
    var deadLetterQueue = new Apache.NMS.ActiveMQ.Commands.ActiveMQQueue("ActiveMQ.DLQ");
    IMessageProducer producer = session.CreateProducer(deadLetterQueue);
    producer.Send(msg);
}

The first time this executes, it will throw and catch the error, and then resend to a dead letter queue:

msgrec8

Subsequent runs can proceed past the problem message, and the message itself remains intact:

msgrec9

A C# Programmer’s Guide to Installing, Running and Messaging with RabbitMQ

Following this year’s trip to DDD North I got speaking to someone about Rabbit MQ. I’d previously done some research on Active MQ, however, given that there are, realistically, only two options in this market, I thought I should have a look at the competition.

Install from here.

You also need to install Erlang.

After the install… nothing happens. No matter how long you wait, assuming it’s just your slow laptop (don’t want to talk about that anymore). Navigate to:

rabbitmq1

C:\Program Files\RabbitMQ Server\rabbitmq_server-3.6.0\sbin

In a DOS prompt, type:

rabbitmq-plugins enable rabbitmq_management

rabbitmq2

Rabbit will now run as a service, which also feels much cleaner that ActiveMQs console app style interface.

Now, log-in to http://localhost:15672/, and log-in as guest/guest:

rabbitmq3

Code

Just like with ActiveMQ, there is a NuGet package that you can use for interacting with the exchange:

https://www.nuget.org/packages/RabbitMQ.Client. Your project needs .Net 4.5.1 or higher; otherwise, you’ll see this:

rabbitmq4

If it’s not, upgrade your project to 4.5.1:

rabbitmq5

Okay, so now we’re up and running, here’s the code for the send:

static void Main(string[] args)
{            
    while (true)
    {
        string msg = Console.ReadLine();
        if (string.IsNullOrWhiteSpace(msg))
        {
            break;
        }
 
        SendNewMessage(msg);
    } 
    
}
private static void SendNewMessage(string message)
{
    var factory = new ConnectionFactory() { HostName = "localhost" };
    using (var connection = factory.CreateConnection())
    using (var channel = connection.CreateModel())
    {
        var result = channel.QueueDeclare("NewQueue", true, false, false, null);
        Console.WriteLine(result);
 
        channel.BasicPublish("", "NewQueue", null, Encoding.UTF8.GetBytes(message));                
 
    }
}

Just like with ActiveMQ, you can now just look at the queue:

rabbitmq6

A quick caveat here:


var result = channel.QueueDeclare("NewQueue");

Doesn’t work because exclusive defaults to ‘true’, meaning that only your session can see it.

Receiving a Message


static void Main(string[] args)
{
    while (true)
    {
        ReceiveNextMessage();
 
        string exit = Console.ReadLine();
        if (!string.IsNullOrWhiteSpace(exit)) break;
    }
 
}
 
private static void ReceiveNextMessage()
{
    var factory = new ConnectionFactory() { HostName = "localhost" };
    using (var connection = factory.CreateConnection())
    using (var channel = connection.CreateModel())
    {
        var result = channel.QueueDeclare("NewQueue", true, false, false, null);
        Console.WriteLine(result);
 
        EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
        consumer.Received += Consumer_Received;
 
        channel.BasicConsume("NewQueue", true, consumer);
 
        System.Threading.Thread.Sleep(1000);
 
    }
}
 
private static void Consumer_Received(object sender, BasicDeliverEventArgs e)
{
    var body = e.Body;
    var message = Encoding.UTF8.GetString(body);
    Console.WriteLine(message);
}

rabbitmq7

Why the Thread.Sleep? The reason is that, because the message consumer is event based, you need to stop tidying up the objects until you’re done. Obviously, sleeping for 1 second has its issues. I suspect this could be better achieved using a TaskCompletionSource, of even just a Console.ReadLine(), but for the purposes of this post, it suffices.

Topics

The concept of topics seems to be the same as ActiveMQ; however, the implementation is different, and there were a few dead-ends to go down first.

The following code, for example:


var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
    channel.ExchangeDeclare("NewTopic", "testTopic");
    
    channel.BasicPublish("NewTopic", "test", null, Encoding.UTF8.GetBytes(message));
 
}

Gives the error: COMMAND_INVALID – unknown exchange type ‘testTopic’

rabbitmq8

RabbitMQ seems to have some magic strings that determine the message type. This works:

 
private static void SendNewTopic(string message)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
    channel.ExchangeDeclare("NewTopic", "topic"); // topic is a magic string
    
    channel.BasicPublish("NewTopic", "test", null, Encoding.UTF8.GetBytes(message));
 
}

The “test” is a routing code. These seem to be quite an involved system; however, it basically tells the exchange where the message is going.

rabbitmq9

Receive


private static void ReceiveNextTopic()
{
    var factory = new ConnectionFactory() { HostName = "localhost" };
    using (var connection = factory.CreateConnection())
    using (var channel = connection.CreateModel())
    {
        channel.ExchangeDeclare("NewTopic", "topic");
        var queueName = channel.QueueDeclare().QueueName;
 
        channel.QueueBind(queue: queueName,
                        exchange: "NewTopic",
                        routingKey: "test");
 
        EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
        consumer.Received += Consumer_Received;
 
        channel.BasicConsume(queueName, true, consumer);
 
        Console.ReadLine();
    }
}

private static void Consumer_Received(object sender, BasicDeliverEventArgs e)
{
    var body = e.Body;
    var message = Encoding.UTF8.GetString(body);
    Console.WriteLine(message);
}

Notice the routing key; this (broadly speaking) needs to match the sender’s routing key. The purpose seems to be to allow a complex mapping between publisher and subscriber.

Conclusion

In comparison to ActiveMQ, RabbitMQ seems to be a more actively maintained code base; a quick comparison of the GitHub repo of each reveals more activity in Rabbit. Rabbit also seems a little cleaner in the way it’s run and installed; however, it also seems to have more moving parts.

There are a number of companies that offer support for ActiveMQ commercially; however, the company that wrote RabbitMQ offer commercial support, should you need it.

Can you run Active and RabbitMQ together?

No. No, you can’t. They both seem to want to use the same resources, and so if you want to use them both on the same machine, you can, but only one at a time.

Acknowledgements

I used the following websites extensively in this article:

http://arcware.net/installing-rabbitmq-on-windows/

https://www.rabbitmq.com/tutorials/tutorial-one-dotnet.html

A C# Programmer’s Guide to Installing, Running and Messaging with ActiveMQ

I’ve recently been experimenting with message queues. I’ve used MSMQ in the past, but never with any complexity, and so I thought I’d spend some time investigating ActiveMQ. There are a number of articles and courses out there, but for some reason, C# seems to be the poor relation. So, here’s a C# programmer’s guide to installing and running ActiveMQ.

Download and Run ActiveMQ

Active MQ is from Apache, and the download link is here.

The specific version that I was working with was 5.14.0.

Once you’ve downloaded the archive, extract it; I extracted it to my Downloads folder, so the next step is to navigate to that directory in a command prompt:

apache-activemq-5.14.0-bin\apache-activemq-5.14.0\bin

Then type:

activemq start

There is a sample application that comes in the box, and it can be found here:

apache-activemq-5.14.0-bin\apache-activemq-5.14.0\examples\openwire\csharp\ActiveMQExamples

The examples that they supply do work out of the box, and they are not a bad place to start, if you don’t want to read the rest of this post.

Queues versus Topics

It is important for reasons that will become apparent shortly, to understand how and why these two concepts differ before writing any code. Let’s start with Topics. These are effectively a way to communicate between two end points; the important thing here is that there must be both for it to work. When you publish a topic message, it is published to any “listeners”. If your app wasn’t listening then that’s hard luck. The use cases here are situations whereby a message might be time sensitive; for example, a stock price had just changed or a server needs the client to refresh because there is more data. There are such things as durable topics, but for now, let’s leave topics as described here.

Queues on the other hand have a persistent nature. Once you add a message to the queue, it will remain there until it is handled. Use cases for this might include a notification to send an e-mail, a chat program, or a request to place a sales order. The queue will be read on a first in, first out basis, and so you can load balance a queue: that is, you can have n listeners, and they will all process the messages in order from the queue. If you were to do this with the topic, they would all receive the same message at the same time.

Publish and Subscribe to a Topic

Start off by creating a new console application: you might want to call it something like SendMessage, or Blancmange. Then, add the ActiveMQ NuGet package.

Here’s the code:

static void Main(string[] args)
{
    while (true)
    {
        string text = Console.ReadLine();
        if (string.IsNullOrWhiteSpace(text)) return;
        SendNewMessage(text);
    }
}

private static void SendNewMessage(string text)
{ 
    string topic = "TextQueue";

    Console.WriteLine($"Adding message to queue topic: {topic}");

    string brokerUri = $"activemq:tcp://localhost:61616";  // Default port
    NMSConnectionFactory factory = new NMSConnectionFactory(brokerUri);

    using (IConnection connection = factory.CreateConnection())
    {
        connection.Start();

        using (ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
        using (IDestination dest = session.GetTopic(topic))
        using (IMessageProducer producer = session.CreateProducer(dest))
        { 
            producer.DeliveryMode = MsgDeliveryMode.NonPersistent;

            producer.Send(session.CreateTextMessage(text));
            Console.WriteLine($"Sent {text} messages");
        }
    }       		      
}

The important thing here is the broker address. When you set-up ActiveMQ, 61616 is the default port, but obviously this, along with the various security settings, etc, can be changed.

This results in:

activemq1

So, it looks like we’re sending a message. We can check whether or not we are by navigating to:

http://localhost:8161/

This provides you with an admin site. The default username/password is: admin/admin

Then navigate to the topics:

http://localhost:8161/admin/topics.jsp

And you should see the messages queued up:

activemq2

As you can see, TextQueue has 4 messages at the minute.

Subscriber

Now we need to write a subscriber for those messages.

static void Main(string[] args)
{
    Console.WriteLine("Waiting for messages");
 
    // Read all messages off the queue
    while (ReadNextMessage())
    {
        Console.WriteLine("Successfully read message");
    }
 
    Console.WriteLine("Finished");
}
 
static bool ReadNextMessage()
{            
    string topic = "TextQueue";
 
    string brokerUri = $"activemq:tcp://localhost:61616";  // Default port
    NMSConnectionFactory factory = new NMSConnectionFactory(brokerUri);
 
    using (IConnection connection = factory.CreateConnection())
    {
        connection.Start();
        using (ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
        using (IDestination dest = session.GetTopic(topic))
        using (IMessageConsumer consumer = session.CreateConsumer(dest))
        {
            IMessage msg = consumer.Receive();
            if (msg is ITextMessage)
            {
                ITextMessage txtMsg = msg as ITextMessage;
                string body = txtMsg.Text;
 
                Console.WriteLine($"Received message: {txtMsg.Text}");
 
                return true;
            }
            else
            {
                Console.WriteLine("Unexpected message type: " + msg.GetType().Name);
            }
        }                
    }
 
    return false;            
}

The eagle eyed amongst you might notice that the code is almost identical; you need the same NuGet package for both the publisher and subscriber.

Topics Caveat

Okay, there’s a hugely important caveat here, which a smarter man than me would have instantly realised: if you run the subscriber now, nothing will happen. This is because the topic messages are only sent to active subscribers. In order for the above code to work, the subscriber needs to be running when the messages are sent.

So. Providing that you have an active subscriber when you publish your message, the above code will send whatever you type into the console to the subscriber.

Queues

So, the code for using queues looks very similar, but is conceptually different. Here’s the SendMessage code:

static void Main(string[] args)
{
    while (true)
    {
        string text = Console.ReadLine();
        if (string.IsNullOrWhiteSpace(text)) return;
        SendNewMessageQueue(text);
    }
}

private static void SendNewMessageQueue(string text)
{ 
    string queueName = "TextQueue";

    Console.WriteLine($"Adding message to queue topic: {queueName}");

    string brokerUri = $"activemq:tcp://localhost:61616";  // Default port
    NMSConnectionFactory factory = new NMSConnectionFactory(brokerUri);

    using (IConnection connection = factory.CreateConnection())
    {
        connection.Start();

        using (ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
        using (IDestination dest = session.GetQueue(queueName))
        using (IMessageProducer producer = session.CreateProducer(dest))
        { 
            producer.DeliveryMode = MsgDeliveryMode.NonPersistent;

            producer.Send(session.CreateTextMessage(text));
            Console.WriteLine($"Sent {text} messages");
        }
    }            
}
 

This can be run and, again, you can check the queue:

activemq3

But, crucially, the following code; even if run afterwards, will still read the queue:


static void Main(string[] args)
{
    Console.WriteLine("Waiting for messages");
 
    // Read all messages off the queue
    while (ReadNextMessageQueue())
    {
        Console.WriteLine("Successfully read message");
    }
 
    Console.WriteLine("Finished");
}
 
static bool ReadNextMessageQueue()
{            
    string queueName = "TextQueue";
 
    string brokerUri = $"activemq:tcp://localhost:61616";  // Default port
    NMSConnectionFactory factory = new NMSConnectionFactory(brokerUri);
 
    using (IConnection connection = factory.CreateConnection())
    {
        connection.Start();
        using (ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
        using (IDestination dest = session.GetQueue(queueName))
        using (IMessageConsumer consumer = session.CreateConsumer(dest))
        {
            IMessage msg = consumer.Receive();
            if (msg is ITextMessage)
            {
                ITextMessage txtMsg = msg as ITextMessage;
                string body = txtMsg.Text;
 
                Console.WriteLine($"Received message: {txtMsg.Text}");
 
                return true;
            }
            else
            {
                Console.WriteLine("Unexpected message type: " + msg.GetType().Name);
            }
        }                
    }
 
    return false;            
}

Here it is:

activemq4

Conclusion

So, it’s quite straightforward to get something working out of the box with ActiveMQ – certainly easier than I expected.