Tag Archives: Message Queue

Using Azure Functions to Send an E-mail Alert from a Service Bus

In this post, I discussed creating an Azure service bus that sends an e-mail as an action once a message has expired; and in this post, I covered Azure functions and setting a basic one up.

These two pieces of functionality seem to be crying out to be together. After all, if your functionality to send an e-mail is in the cloud, you don’t have to worry about your server being down (which, if your message has expired, is a real possibility).

Create the Azure Function

The first thing to do is to create the Azure function to send an e-mail. Remember that we’ll be hooking into the service bus, and so we’ll create the function a little differently.

The first few steps are the same, though:

The new function is here:

We’ll create a custom function again:

Although this looks familiar from the last post, the next part does differ slightly. This time, we’ll set up a Service Bus Trigger:

This requires the connection string to your service bus…

As you can see above, the service bus connection is blank, and there are no possible entries… onto App Settings:

App Settings

On the App Settings tab, you can configure settings that pertain to your Azure Function App. Select “Manage App Settings”. Here we can set-up a connection string:

Now, we should be able to see that from the Function:

Does it work?

What does this function do out of the box?

Well, having populated the queue with 50 messages that time out after 30 seconds, the function kicked in and started logging that it was picking up messages after 30 seconds – so that’s a promising sign!

The messages are processed and removed from the dead letter queue. This process happens so quickly, it’s easy (as I did) to interpret this as a bug (i.e. messages are not being dead-lettered). However, as we can see from the function logs – they are.

This did, however, leave me with a concern that the messages were being disposed of before they had been successfully processed. To check this, I changed the function slightly:

So, it crashes correctly:

And here, safe and sound, are 50 freshly dead-lettered messages:

Function Code

Now we have a function, we need to make it send an e-mail… so we’ll need some code. Let’s start with what we created here.


using System;
using System.Threading.Tasks;
using System.Net.Mail;

public static void Run(string myQueueItem, TraceWriter log)
{
    log.Info($"Start C# ServiceBus queue trigger function processed message: {myQueueItem}");

    System.Net.Mail.MailMessage message = new System.Net.Mail.MailMessage();
    message.To.Add("to.address@hotmail.co.uk");
    message.Subject = "Message in queue has expired";
    message.From = new System.Net.Mail.MailAddress("from.address@hotmail.co.uk");
    message.Body = messageText;
    System.Net.Mail.SmtpClient smtp = new System.Net.Mail.SmtpClient("smtp.live.com");
    smtp.Port = 587;
    smtp.UseDefaultCredentials = false;
    smtp.Credentials = new System.Net.NetworkCredential("my.address@hotmail.co.uk", "p@ssw0rd");
    smtp.EnableSsl = true;
    smtp.Send(message);

    log.Info($"End C# ServiceBus queue trigger function processed message: {myQueueItem}");
}


This doesn’t work:

2017-06-27T16:47:56.928 Function started (Id=1188dbdb-4963-4e55-af5c-4be1f71a1ca5)
2017-06-27T16:47:56.928 Start C# ServiceBus queue trigger function processed message
2017-06-27T16:47:56.928 Function completed (Failure, Id=1188dbdb-4963-4e55-af5c-4be1f71a1ca5, Duration=0ms)
2017-06-27T16:47:57.147 Exception while executing function: Functions.ServiceBusQueueTriggerCSharp1. mscorlib: Exception has been thrown by the target of an invocation. f-ServiceBusQueueTriggerCSharp1__-1971403142: Cannot complete.
2017-06-27T16:47:57.557 Exception while executing function: Functions.ServiceBusQueueTriggerCSharp1. mscorlib: Exception has been thrown by the target of an invocation. f-ServiceBusQueueTriggerCSharp1__-1971403142: Cannot complete.

Debugging Azure

A quick side note on debugging Azure. There are a number of resources with details of how this should work on the web, and I’ll probably have a later post of my own experiences, but it’s a pretty flaky experience, and I ended up using trial and error to determine the issue.

Working code

using System;
using System.Threading.Tasks;

public static void Run(string myQueueItem, TraceWriter log)
{
    log.Info($"Start C# ServiceBus queue trigger function processed message: {myQueueItem}");

    System.Net.Mail.MailMessage message = new System.Net.Mail.MailMessage();
    
    message.To.Add("to.address@hotmail.co.uk");    
    message.Subject = "Message in queue has expired";    
    message.From = new System.Net.Mail.MailAddress("from.address@hotmail.co.uk");
    message.Body = myQueueItem;
        
    System.Net.Mail.SmtpClient smtp = new System.Net.Mail.SmtpClient("smtp.live.com");
    smtp.Port = 587;
    smtp.UseDefaultCredentials = false;
    smtp.Credentials = new System.Net.NetworkCredential("my.address@hotmail.co.uk", "p@ssw0rd");
    smtp.EnableSsl = true;
    smtp.Send(message);

    log.Info($"End C# ServiceBus queue trigger function processed message: {myQueueItem}");
}

So, the problem was just that I was referencing an unknown variable (messageText). I’m unsure exactly why I needed to travel to the mountains of Mordor to determine this – a simple error message in the online text would have sufficed.

The other issue that I faced was a security challenge; however, once I’d persuaded Azure that this really was me, everything sprung into life:

Credit Considerations

Unlike in previous posts where I’ve identified the Azure cost to be negligible, functions are the fastest way to use up credit I have found so far. Especially functions such as I’ve created here. I left the (non-working) function above active, but failing all night, and it used up over £40 worth of credit, continually trying, and failing, to process the dead-letter queue… I think the lights might even have dimmed in Redmond for a split second! The moral of the story is is: be careful when you’re debugging this – you can’t just leave at the end of the night with a function that doesn’t work, but is still active.

Summary

This concept is extremely compelling. I can have a service bus queue that is processed and monitored by an Azure function. If aliens land and steal the entire office, all the servers, dev PCs and programmers, this function will continue to run. There is obviously a mindset shift here, and it doesn’t make sense to move everything into this kind of model, but consider the possibilities; imagine a system that books holidays: it processes the customer request and adds it to a queue; the aeroplane booking system picks that from the queue and books the ticket on the plane, the car hire system takes the message to book a car, once they’re all complete they add respective messages to say so (but remain agnostic of each other), finally, if any one part of the system fails, an Azure function could sit there monitoring and cancel the whole lot. I’ve never worked in this kind of industry, so there’s a lot that I’ve probably not considered, but the essence is that you can have active functionality on (even catastrophic) failure – which is a brand new concept.

References

https://docs.microsoft.com/en-us/azure/azure-functions/functions-bindings-service-bus

https://stackoverflow.com/questions/10043219/view-content-of-an-azure-service-bus-queue

Service Bus Explorer:

https://code.msdn.microsoft.com/Service-Bus-Explorer-f2abca5a

http://markheath.net/post/remote-debugging-azure-functions

Sending e-mails:

https://stackoverflow.com/questions/25216202/smtp-live-com-mailbox-unavailable-the-server-response-was-5-7-3-requested-ac

Azure Service Bus – Send an e-mail on Message Timeout

A message queue has, in its architecture, two main points of failure; the first is the situation where a message is added to a queue, but never read (or at least not read within a specified period of time); this is called a Dead Letter, and it is the subject of this post. The second is where the message is corrupt, or it breaks the reading logic in some way; that is known as a Poison Message.

There are a number of reasons that a message might not get read in the specified time: the service reading and processing the messages might not be keeping up with the supply, it might have crashed, the network connection might have failed.

One possible thing to do at this stage, is to have a process that automatically notifies someone that a message has ended up in the dead letter queue.

Step One – specify a timeout

Here’s how you would specify a timeout on the message specifically:

           BrokeredMessage message = new BrokeredMessage(messageBody)
            {
                MessageId = id,
                TimeToLive = new TimeSpan(0, 5, 0)
            };

Or, you can create a default on the queue from the QueueDescription (typically this would be done when you initially create the queue:

                QueueDescription qd = new QueueDescription("TestQueue")
                {
                    DefaultMessageTimeToLive = new TimeSpan(0, 5, 0)
                };
                nm.CreateQueue(qd);

Should these values differ, the shortest time will be taken.

What happens to the message by default?

I’ve added a message to the queue using the default timeout of 5 minutes; here it is happily sitting in the queue:

Looking at the properties of the queue, we can determine that the “TimeToLive” is, indeed, 5 minutes:

In addition, you can see that, by default, the flag telling Service Bus to move the message to a dead letter queue is not checked. This means that the message will not be moved to the dead letter queue.

5 Minutes later:

Nothing has happened to this queue, except time passing. The message has now been discarded. It seems an odd behaviour; however, as with ReadAndDelete Locks there may be reasons that this behaviour is required.

Step Two – Dead Letters

If you want to actually do something with the expired message, the key is a concept called “Dead Lettering”. The following code will direct the Service Bus to put the offending message into the “Dead Letter Queue”:


                QueueDescription qd = new QueueDescription("TestQueue")
                {
                    DefaultMessageTimeToLive = new TimeSpan(0, 5, 0),
                    EnableDeadLetteringOnMessageExpiration = true
                };
                nm.CreateQueue(qd);

Here’s the result for the same test:

Step Three – Doing something with this…

Okay – so the message hasn’t been processed, and it’s now sat in a queue specially designed for that kind of thing, so what can we do with it? One possible thing is to create a piece of software that monitors this queue. This is an adaptation of the code that I originally created here:

        static void Main(string[] args)
        {
            System.Diagnostics.Stopwatch sw = new System.Diagnostics.Stopwatch();
            sw.Start();

            if (!InitialiseClient())
            {
                Console.WriteLine("Unable to initialise client");
            }
            else
            {
                while (true)
                {
                    string message = ReadMessage("TestQueue/$DeadLetterQueue");

                    if (string.IsNullOrWhiteSpace(message)) break;
                    Console.WriteLine($"{DateTime.Now}: Message received: {message}");
                }
            }

            sw.Stop();
            Console.WriteLine($"Done ({sw.Elapsed.TotalSeconds}) seconds");
            Console.ReadLine();
        }

        private static bool InitialiseClient()
        {
            Uri uri = ServiceManagementHelper.GetServiceUri();
            TokenProvider tokenProvider = ServiceManagementHelper.GetTokenProvider(uri);

            NamespaceManager nm = new NamespaceManager(uri, tokenProvider);
            return nm.QueueExists("TestQueue");
        }

        private static string ReadMessage(string queueName)
        {
            QueueClient client = QueueManagementHelper.GetQueueClient(queueName, true);

            BrokeredMessage message = client.Receive();
            if (message == null) return string.Empty;
            string messageBody = message.GetBody<string>();

            //message.Complete();

            return messageBody;
        }

If this was all that we had to monitor the queue, then somebody’s job would need to be to watch this application. That may make sense, depending on the nature of the business; however, we could simply notify the person in question that there’s a problem. Now, if only the internet had a concept of an offline messaging facility that works something akin to the postal service, only faster…

        static void Main(string[] args)
        {
            System.Diagnostics.Stopwatch sw = new System.Diagnostics.Stopwatch();
            sw.Start();

            if (!InitialiseClient())
            {
                Console.WriteLine("Unable to initialise client");
            }
            else
            {
                while (true)
                {
                    string message = ReadMessage("TestQueue/$DeadLetterQueue");

                    if (string.IsNullOrWhiteSpace(message)) break;
                    Console.WriteLine($"{DateTime.Now}: Message received: {message}");

                    Console.WriteLine($"{DateTime.Now}: Send e-mail");
                    SendEmail(message);
                }
            }

            sw.Stop();
            Console.WriteLine($"Done ({sw.Elapsed.TotalSeconds}) seconds");
            Console.ReadLine();
        }

        private static void SendEmail(string messageText)
        {
            System.Net.Mail.MailMessage message = new System.Net.Mail.MailMessage();
            message.To.Add("notification.address@hotmail.co.uk");
            message.Subject = "Message in queue has expired";
            message.From = new System.Net.Mail.MailAddress("my.address@hotmail.co.uk");
            message.Body = messageText;
            System.Net.Mail.SmtpClient smtp = new System.Net.Mail.SmtpClient("smtp.live.com");
            smtp.Port = 587;
            smtp.UseDefaultCredentials = false;
            smtp.Credentials = new System.Net.NetworkCredential("my.address@hotmail.co.uk", "passw0rd");
            smtp.EnableSsl = true;
            smtp.Send(message);
        }

In order to prevent a torrent of mails, you might want to put a delay in this code, or even maintain some kind of list so that you only send one mail per day.

References

https://docs.microsoft.com/en-us/dotnet/api/microsoft.servicebus.messaging.queuedescription.enabledeadletteringonmessageexpiration?view=azureservicebus-4.0.0#Microsoft_ServiceBus_Messaging_QueueDescription_EnableDeadLetteringOnMessageExpiration

https://www.codit.eu/blog/2015/01/automatically-expire-messages-in-azure-service-bus-how-it-works/

https://stackoverflow.com/questions/9851319/how-to-add-smtp-hotmail-account-to-send-mail

A C# Programmer’s Guide to Queues and Sending a Message with Azure Service Bus

I have previously written about message queue systems. The big two, as far as I can see, are Active MQ and RabbitMQ.

Microsoft have always had MSMQ*, but it’s not really a message broker as such (I believe that you can get similar behaviour using NServiceBus, but have never tried that myself). However, with Azure comes the Azure Service Bus.

The first thing that you need to do is set-up an Azure account. Note that Microsoft offer Azure as a paid service, and so this is not free. However, they also offer free trials and free Azure credit if you have an MSDN.

Log on to:

https://portal.azure.com

Namespace

Namespaces are an important concept in Azure. They basically allow you to split a single Azure account across many functions, but what that means is that everything you do relates to a specific namespace.

To add one, first, pick a pricing tier:

Make sure that your Namepsace isn’t taken:

You’ll then get an alert to say it worked:

If you refresh, you should now see your namespace:

Create Test Project

I always try to start with a console app when trying new stuf. Add NuGet reference:

It is my understanding that, as with ActiveMQ and RabbitMQ, these client libraries are an abstraction over a set of HTTP Post calls. In the case of Azure, I believe that, behind the scenes, it uses WCF to handle all this.

Using the Namespace

Using a message queue system such as RabbitMQ or ActiveMQ, you need a message queue server, and a URL that relates to it. However, one of the things Azure allows you to do is to abstract that; for example:

        static void Main(string[] args)
        {
            Console.WriteLine($"Getting service bus URI...");
            Uri uri = ServiceBusEnvironment.CreateAccessControlUri("pcm-servicebustest");
            Console.WriteLine($"Service Bus URI: {uri.ToString()}");
            Console.ReadLine();
        }

Tells me what the URI of the message queue broker is:

Adding a message to a queue

In order to do anything with a message queue in Azure, you need a token; effectively, this provides a level of security

Tokens

Get the key:

You can store these details in the app/web.config, or you can use them programmatically:

        private static TokenProvider GetTokenProvider(Uri uri)
        {
            Console.WriteLine($"Getting token...");
            TokenProvider tp = TokenProvider.CreateSharedAccessSignatureTokenProvider("RootManageSharedAccessKey", "JWh82nkstIAi4w5tW6MEj7GKQfoiZlwBYjHx9wfDqdA=");                                                

            Console.WriteLine($"Token {tp.ToString()}");
            return tp;
        }

Queues

Putting the above calls together, we can now create a queue in Azure:

        private static void CreateNewQueue(Uri uri, TokenProvider tokenProvider)
        {
            Console.WriteLine($"Creating new queue...");
            NamespaceManager nm = new NamespaceManager(uri, tokenProvider);

            Console.WriteLine($"Created namespace manager for {nm.Address}");
            if (nm.QueueExists("TestQueue"))
            {
                Console.WriteLine("Queue already exists");
            }
            else
            {
                Console.WriteLine("Creating new queue");
                QueueDescription qd = nm.CreateQueue("TestQueue");
            }
        }

Incidentally, the act of creating a queue appears to have cost £0.24 GBP. If you have MSDN, you should get £40 GBP credit each month (at the time of writing).

Now we have a queue, let’s put some messages on it.

Adding a message

        private static void AddNewMessage(string id, string messageBody, string queueName)
        {
            BrokeredMessage message = new BrokeredMessage(messageBody)
            {
                MessageId = id
            };

            string connectionString = GetConnectionString();
            
            QueueClient queueClient = QueueClient.CreateFromConnectionString(connectionString, queueName);
            queueClient.Send(message);
        }

The Connection String can be found here:

We can now see that a message has, indeed, been added to the queue:

At this time, this is about as much as you can see from this portal.

Errors

These are some errors that I encountered during the creation of this post, and their solutions.

System.UnauthorizedAccessException

System.UnauthorizedAccessException: ‘The token provider was unable to provide a security token while accessing ‘https://pcm-servicebustest-sb.accesscontrol.windows.net/WRAPv0.9/’. Token provider returned message: ‘The remote name could not be resolved: ‘pcm-servicebustest-sb.accesscontrol.windows.net”.’

The cause is not an invalid secret

That’s because this line:

TokenProvider tp = TokenProvider.CreateSharedSecretTokenProvider("RootManageSharedAccessKey", "jjdsjdsjk");

Gives the error:

System.ArgumentException: ‘The ‘issuerSecret’ is invalid.’

The fix…

This code is littered throughout the web:

TokenProvider tp = TokenProvider.CreateSharedSecretTokenProvider("RootManageSharedAccessKey", "jjdsjdsjk");

But the correct code was:

TokenProvider tp = TokenProvider.CreateSharedAccessSignatureTokenProvider("RootManageSharedAccessKey", "JWh82nkstIAi4w5tW6MEj7GKQfoiZlwBYjHx9wfDqdA=");                                                

System.ArgumentNullException: ‘Queue name should be specified as EntityPath in connectionString.’

Or: 40400: Endpoint not found.

Microsoft.ServiceBus.Messaging.MessagingEntityNotFoundException: ‘40400: Endpoint not found., Resource:sb://pcm-servicebustest.servicebus.windows.net/atestqueue. TrackingId:48de75d7-fb01-4fa9-b72e-20a5dc090a8d_G11, SystemTracker:pcm-servicebustest.servicebus.windows.net:aTestQueue, Timestamp:5/25/2017 5:23:27 PM

Means (obviously) that the following code:

QueueClient.CreateFromConnectionString(connectionString, queueName);

Either doesn’t have the queue name, or it is wrong.

References

https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-messaging-exceptions

https://blogs.msdn.microsoft.com/brunoterkaly/2014/08/07/learn-how-to-create-a-queue-place-and-read-a-message-using-azure-service-bus-queues-in-5-minutes/

https://stackoverflow.com/questions/18558299/servicebus-throws-401-unauthorized-error

https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-queues-topics-subscriptions

https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dotnet-how-to-use-topics-subscriptions

https://msdn.microsoft.com/en-us/library/jj542433.aspx?f=255&MSPPError=-2147217396

https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dotnet-multi-tier-app-using-service-bus-queues

https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dotnet-get-started-with-queues

* Microsoft probably haven’t ALWAYS had MSMQ. There was probably a time in the early 90’s where they didn’t have a message queue system at all.

Message Persistence in RabbitMQ and BenchMarkDotNet

(Note: if you want to follow the code on this, you might find it easier if you start from the project that I create here.)

A queue in a message broker can be persistent, which means that, should you have a power failure (or just shut down the server), when it comes back, the queue is still there.

So, we can create a durable (persistent) queue, like this:

var result = channel.QueueDeclare("NewQueue", true, false, false, args);

The second parameter indicates that the queue is durable. Let’s send it some messages:


static void Main(string[] args)
{            
    for (int i = 1; i <= 100; i++)
    {
        string msg = $"test{i}";
 
        SendNewMessage(msg);
    } 
    
}
private static void SendNewMessage(string message)
{
    var factory = new ConnectionFactory() { HostName = "localhost" };
    using (var connection = factory.CreateConnection())
    using (var channel = connection.CreateModel())
    {
        Dictionary<string, object> args = 
            DeadLetterHelper.CreateDeadLetterQueue(channel,
            "dl.exchange", "dead-letter", "DeadLetterQueue");
 
        var result = channel.QueueDeclare("NewQueue", true, false, false, args);
        Console.WriteLine(result);
 
        channel.BasicPublish("", "NewQueue", null, Encoding.UTF8.GetBytes(message));                
 
    }
}

Now we have 100 messages:

persist1

Let’s simulate a server reboot:

parsist2

Following the reboot, it’s gone:

persist3

Admittedly, that doesn’t sound very durable!

Why?

The reason for this, is that the durability of the queue doesn’t affect the durability of the message. At least, if the queue is durable, it doesn’t make the message so.

How can it be made persistent?

Let’s change our send code a little:


private static void SendNewMessage(string message)
{
    var factory = new ConnectionFactory() { HostName = "localhost" };
    using (var connection = factory.CreateConnection())
    using (var channel = connection.CreateModel())
    {
        Dictionary<string, object> args = 
            DeadLetterHelper.CreateDeadLetterQueue(channel,
            "dl.exchange", "dead-letter", "DeadLetterQueue");
 
        var result = channel.QueueDeclare("NewQueue", true, false, false, args);
        Console.WriteLine(result);
 
        IBasicProperties prop = channel.CreateBasicProperties();
        prop.Persistent = true;
 
        channel.BasicPublish("", "NewQueue", prop, Encoding.UTF8.GetBytes(message));                
 
    }
}

The only difference is the addition of the IBasicProperties parameter. The Persistent flag is set. Now we’ll re run the same test; here’s the messages:

persist4

And after restarting the service:

persist5

As you can see, the messages are still there, and you can see the time difference where they’re been restored to the queue after a failure.

Speed – Introducing BenchmarkDotNet

I suppose the main question here is what price do you pay for durability. This gives me a chance to play with a new tool that I heard about a little while ago: BenchmarkDotNet.

It’s quite straightforward to use, just add the NuGet package:

Install-Package BenchmarkDotNet

There’s a bit of refactoring; I effectively ripped out the send and called it from a separate class:


class Program
{        
    static void Main(string[] args)
    {
        BenchmarkRunner.Run<SpeedTest>();
    }
}
 
public class SpeedTest
{
    [Benchmark]
    public void SendNewMessagePersist()
    {
        MessageHelper helper = new MessageHelper();
        helper.SendStringMessage("Test", "NewQueue", true);
    }
 
    [Benchmark]
    public void SendNewMessageNonPersist()
    {
        MessageHelper helper = new MessageHelper();
        helper.SendStringMessage("Test", "NewQueue", false);
    }
 
 
}

I then ran this:

persist6

And it produced this:

persist7

So, it is a bit slower to persist the message. I’m not sure how helpful this information is: I probably could have guessed that persisting the message would have been slower beforehand. Having said that, I am quite impressed with BenchMarkDotNet!

A C# Programmer’s Guide to Installing, Running and Messaging with ActiveMQ

I’ve recently been experimenting with message queues. I’ve used MSMQ in the past, but never with any complexity, and so I thought I’d spend some time investigating ActiveMQ. There are a number of articles and courses out there, but for some reason, C# seems to be the poor relation. So, here’s a C# programmer’s guide to installing and running ActiveMQ.

Download and Run ActiveMQ

Active MQ is from Apache, and the download link is here.

The specific version that I was working with was 5.14.0.

Once you’ve downloaded the archive, extract it; I extracted it to my Downloads folder, so the next step is to navigate to that directory in a command prompt:

apache-activemq-5.14.0-bin\apache-activemq-5.14.0\bin

Then type:

activemq start

There is a sample application that comes in the box, and it can be found here:

apache-activemq-5.14.0-bin\apache-activemq-5.14.0\examples\openwire\csharp\ActiveMQExamples

The examples that they supply do work out of the box, and they are not a bad place to start, if you don’t want to read the rest of this post.

Queues versus Topics

It is important for reasons that will become apparent shortly, to understand how and why these two concepts differ before writing any code. Let’s start with Topics. These are effectively a way to communicate between two end points; the important thing here is that there must be both for it to work. When you publish a topic message, it is published to any “listeners”. If your app wasn’t listening then that’s hard luck. The use cases here are situations whereby a message might be time sensitive; for example, a stock price had just changed or a server needs the client to refresh because there is more data. There are such things as durable topics, but for now, let’s leave topics as described here.

Queues on the other hand have a persistent nature. Once you add a message to the queue, it will remain there until it is handled. Use cases for this might include a notification to send an e-mail, a chat program, or a request to place a sales order. The queue will be read on a first in, first out basis, and so you can load balance a queue: that is, you can have n listeners, and they will all process the messages in order from the queue. If you were to do this with the topic, they would all receive the same message at the same time.

Publish and Subscribe to a Topic

Start off by creating a new console application: you might want to call it something like SendMessage, or Blancmange. Then, add the ActiveMQ NuGet package.

Here’s the code:

static void Main(string[] args)
{
    while (true)
    {
        string text = Console.ReadLine();
        if (string.IsNullOrWhiteSpace(text)) return;
        SendNewMessage(text);
    }
}

private static void SendNewMessage(string text)
{ 
    string topic = "TextQueue";

    Console.WriteLine($"Adding message to queue topic: {topic}");

    string brokerUri = $"activemq:tcp://localhost:61616";  // Default port
    NMSConnectionFactory factory = new NMSConnectionFactory(brokerUri);

    using (IConnection connection = factory.CreateConnection())
    {
        connection.Start();

        using (ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
        using (IDestination dest = session.GetTopic(topic))
        using (IMessageProducer producer = session.CreateProducer(dest))
        { 
            producer.DeliveryMode = MsgDeliveryMode.NonPersistent;

            producer.Send(session.CreateTextMessage(text));
            Console.WriteLine($"Sent {text} messages");
        }
    }       		      
}

The important thing here is the broker address. When you set-up ActiveMQ, 61616 is the default port, but obviously this, along with the various security settings, etc, can be changed.

This results in:

activemq1

So, it looks like we’re sending a message. We can check whether or not we are by navigating to:

http://localhost:8161/

This provides you with an admin site. The default username/password is: admin/admin

Then navigate to the topics:

http://localhost:8161/admin/topics.jsp

And you should see the messages queued up:

activemq2

As you can see, TextQueue has 4 messages at the minute.

Subscriber

Now we need to write a subscriber for those messages.

static void Main(string[] args)
{
    Console.WriteLine("Waiting for messages");
 
    // Read all messages off the queue
    while (ReadNextMessage())
    {
        Console.WriteLine("Successfully read message");
    }
 
    Console.WriteLine("Finished");
}
 
static bool ReadNextMessage()
{            
    string topic = "TextQueue";
 
    string brokerUri = $"activemq:tcp://localhost:61616";  // Default port
    NMSConnectionFactory factory = new NMSConnectionFactory(brokerUri);
 
    using (IConnection connection = factory.CreateConnection())
    {
        connection.Start();
        using (ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
        using (IDestination dest = session.GetTopic(topic))
        using (IMessageConsumer consumer = session.CreateConsumer(dest))
        {
            IMessage msg = consumer.Receive();
            if (msg is ITextMessage)
            {
                ITextMessage txtMsg = msg as ITextMessage;
                string body = txtMsg.Text;
 
                Console.WriteLine($"Received message: {txtMsg.Text}");
 
                return true;
            }
            else
            {
                Console.WriteLine("Unexpected message type: " + msg.GetType().Name);
            }
        }                
    }
 
    return false;            
}

The eagle eyed amongst you might notice that the code is almost identical; you need the same NuGet package for both the publisher and subscriber.

Topics Caveat

Okay, there’s a hugely important caveat here, which a smarter man than me would have instantly realised: if you run the subscriber now, nothing will happen. This is because the topic messages are only sent to active subscribers. In order for the above code to work, the subscriber needs to be running when the messages are sent.

So. Providing that you have an active subscriber when you publish your message, the above code will send whatever you type into the console to the subscriber.

Queues

So, the code for using queues looks very similar, but is conceptually different. Here’s the SendMessage code:

static void Main(string[] args)
{
    while (true)
    {
        string text = Console.ReadLine();
        if (string.IsNullOrWhiteSpace(text)) return;
        SendNewMessageQueue(text);
    }
}

private static void SendNewMessageQueue(string text)
{ 
    string queueName = "TextQueue";

    Console.WriteLine($"Adding message to queue topic: {queueName}");

    string brokerUri = $"activemq:tcp://localhost:61616";  // Default port
    NMSConnectionFactory factory = new NMSConnectionFactory(brokerUri);

    using (IConnection connection = factory.CreateConnection())
    {
        connection.Start();

        using (ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
        using (IDestination dest = session.GetQueue(queueName))
        using (IMessageProducer producer = session.CreateProducer(dest))
        { 
            producer.DeliveryMode = MsgDeliveryMode.NonPersistent;

            producer.Send(session.CreateTextMessage(text));
            Console.WriteLine($"Sent {text} messages");
        }
    }            
}
 

This can be run and, again, you can check the queue:

activemq3

But, crucially, the following code; even if run afterwards, will still read the queue:


static void Main(string[] args)
{
    Console.WriteLine("Waiting for messages");
 
    // Read all messages off the queue
    while (ReadNextMessageQueue())
    {
        Console.WriteLine("Successfully read message");
    }
 
    Console.WriteLine("Finished");
}
 
static bool ReadNextMessageQueue()
{            
    string queueName = "TextQueue";
 
    string brokerUri = $"activemq:tcp://localhost:61616";  // Default port
    NMSConnectionFactory factory = new NMSConnectionFactory(brokerUri);
 
    using (IConnection connection = factory.CreateConnection())
    {
        connection.Start();
        using (ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
        using (IDestination dest = session.GetQueue(queueName))
        using (IMessageConsumer consumer = session.CreateConsumer(dest))
        {
            IMessage msg = consumer.Receive();
            if (msg is ITextMessage)
            {
                ITextMessage txtMsg = msg as ITextMessage;
                string body = txtMsg.Text;
 
                Console.WriteLine($"Received message: {txtMsg.Text}");
 
                return true;
            }
            else
            {
                Console.WriteLine("Unexpected message type: " + msg.GetType().Name);
            }
        }                
    }
 
    return false;            
}

Here it is:

activemq4

Conclusion

So, it’s quite straightforward to get something working out of the box with ActiveMQ – certainly easier than I expected.