Monthly Archives: September 2016

A C# Programmer’s Guide to Installing, Running and Messaging with ActiveMQ

I’ve recently been experimenting with message queues. I’ve used MSMQ in the past, but never with any complexity, and so I thought I’d spend some time investigating ActiveMQ. There are a number of articles and courses out there, but for some reason, C# seems to be the poor relation. So, here’s a C# programmer’s guide to installing and running ActiveMQ.

Download and Run ActiveMQ

Active MQ is from Apache, and the download link is here.

The specific version that I was working with was 5.14.0.

Once you’ve downloaded the archive, extract it; I extracted it to my Downloads folder, so the next step is to navigate to that directory in a command prompt:

apache-activemq-5.14.0-bin\apache-activemq-5.14.0\bin

Then type:

activemq start

There is a sample application that comes in the box, and it can be found here:

apache-activemq-5.14.0-bin\apache-activemq-5.14.0\examples\openwire\csharp\ActiveMQExamples

The examples that they supply do work out of the box, and they are not a bad place to start, if you don’t want to read the rest of this post.

Queues versus Topics

It is important for reasons that will become apparent shortly, to understand how and why these two concepts differ before writing any code. Let’s start with Topics. These are effectively a way to communicate between two end points; the important thing here is that there must be both for it to work. When you publish a topic message, it is published to any “listeners”. If your app wasn’t listening then that’s hard luck. The use cases here are situations whereby a message might be time sensitive; for example, a stock price had just changed or a server needs the client to refresh because there is more data. There are such things as durable topics, but for now, let’s leave topics as described here.

Queues on the other hand have a persistent nature. Once you add a message to the queue, it will remain there until it is handled. Use cases for this might include a notification to send an e-mail, a chat program, or a request to place a sales order. The queue will be read on a first in, first out basis, and so you can load balance a queue: that is, you can have n listeners, and they will all process the messages in order from the queue. If you were to do this with the topic, they would all receive the same message at the same time.

Publish and Subscribe to a Topic

Start off by creating a new console application: you might want to call it something like SendMessage, or Blancmange. Then, add the ActiveMQ NuGet package.

Here’s the code:

static void Main(string[] args)
{
    while (true)
    {
        string text = Console.ReadLine();
        if (string.IsNullOrWhiteSpace(text)) return;
        SendNewMessage(text);
    }
}

private static void SendNewMessage(string text)
{ 
    string topic = "TextQueue";

    Console.WriteLine($"Adding message to queue topic: {topic}");

    string brokerUri = $"activemq:tcp://localhost:61616";  // Default port
    NMSConnectionFactory factory = new NMSConnectionFactory(brokerUri);

    using (IConnection connection = factory.CreateConnection())
    {
        connection.Start();

        using (ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
        using (IDestination dest = session.GetTopic(topic))
        using (IMessageProducer producer = session.CreateProducer(dest))
        { 
            producer.DeliveryMode = MsgDeliveryMode.NonPersistent;

            producer.Send(session.CreateTextMessage(text));
            Console.WriteLine($"Sent {text} messages");
        }
    }       		      
}

The important thing here is the broker address. When you set-up ActiveMQ, 61616 is the default port, but obviously this, along with the various security settings, etc, can be changed.

This results in:

activemq1

So, it looks like we’re sending a message. We can check whether or not we are by navigating to:

http://localhost:8161/

This provides you with an admin site. The default username/password is: admin/admin

Then navigate to the topics:

http://localhost:8161/admin/topics.jsp

And you should see the messages queued up:

activemq2

As you can see, TextQueue has 4 messages at the minute.

Subscriber

Now we need to write a subscriber for those messages.

static void Main(string[] args)
{
    Console.WriteLine("Waiting for messages");
 
    // Read all messages off the queue
    while (ReadNextMessage())
    {
        Console.WriteLine("Successfully read message");
    }
 
    Console.WriteLine("Finished");
}
 
static bool ReadNextMessage()
{            
    string topic = "TextQueue";
 
    string brokerUri = $"activemq:tcp://localhost:61616";  // Default port
    NMSConnectionFactory factory = new NMSConnectionFactory(brokerUri);
 
    using (IConnection connection = factory.CreateConnection())
    {
        connection.Start();
        using (ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
        using (IDestination dest = session.GetTopic(topic))
        using (IMessageConsumer consumer = session.CreateConsumer(dest))
        {
            IMessage msg = consumer.Receive();
            if (msg is ITextMessage)
            {
                ITextMessage txtMsg = msg as ITextMessage;
                string body = txtMsg.Text;
 
                Console.WriteLine($"Received message: {txtMsg.Text}");
 
                return true;
            }
            else
            {
                Console.WriteLine("Unexpected message type: " + msg.GetType().Name);
            }
        }                
    }
 
    return false;            
}

The eagle eyed amongst you might notice that the code is almost identical; you need the same NuGet package for both the publisher and subscriber.

Topics Caveat

Okay, there’s a hugely important caveat here, which a smarter man than me would have instantly realised: if you run the subscriber now, nothing will happen. This is because the topic messages are only sent to active subscribers. In order for the above code to work, the subscriber needs to be running when the messages are sent.

So. Providing that you have an active subscriber when you publish your message, the above code will send whatever you type into the console to the subscriber.

Queues

So, the code for using queues looks very similar, but is conceptually different. Here’s the SendMessage code:

static void Main(string[] args)
{
    while (true)
    {
        string text = Console.ReadLine();
        if (string.IsNullOrWhiteSpace(text)) return;
        SendNewMessageQueue(text);
    }
}

private static void SendNewMessageQueue(string text)
{ 
    string queueName = "TextQueue";

    Console.WriteLine($"Adding message to queue topic: {queueName}");

    string brokerUri = $"activemq:tcp://localhost:61616";  // Default port
    NMSConnectionFactory factory = new NMSConnectionFactory(brokerUri);

    using (IConnection connection = factory.CreateConnection())
    {
        connection.Start();

        using (ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
        using (IDestination dest = session.GetQueue(queueName))
        using (IMessageProducer producer = session.CreateProducer(dest))
        { 
            producer.DeliveryMode = MsgDeliveryMode.NonPersistent;

            producer.Send(session.CreateTextMessage(text));
            Console.WriteLine($"Sent {text} messages");
        }
    }            
}
 

This can be run and, again, you can check the queue:

activemq3

But, crucially, the following code; even if run afterwards, will still read the queue:


static void Main(string[] args)
{
    Console.WriteLine("Waiting for messages");
 
    // Read all messages off the queue
    while (ReadNextMessageQueue())
    {
        Console.WriteLine("Successfully read message");
    }
 
    Console.WriteLine("Finished");
}
 
static bool ReadNextMessageQueue()
{            
    string queueName = "TextQueue";
 
    string brokerUri = $"activemq:tcp://localhost:61616";  // Default port
    NMSConnectionFactory factory = new NMSConnectionFactory(brokerUri);
 
    using (IConnection connection = factory.CreateConnection())
    {
        connection.Start();
        using (ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
        using (IDestination dest = session.GetQueue(queueName))
        using (IMessageConsumer consumer = session.CreateConsumer(dest))
        {
            IMessage msg = consumer.Receive();
            if (msg is ITextMessage)
            {
                ITextMessage txtMsg = msg as ITextMessage;
                string body = txtMsg.Text;
 
                Console.WriteLine($"Received message: {txtMsg.Text}");
 
                return true;
            }
            else
            {
                Console.WriteLine("Unexpected message type: " + msg.GetType().Name);
            }
        }                
    }
 
    return false;            
}

Here it is:

activemq4

Conclusion

So, it’s quite straightforward to get something working out of the box with ActiveMQ – certainly easier than I expected.

Lock a TFS branch through builds

One of the things that is necessary if you subscribe to the feature branching method of source control in TFS is that, once a release is cut, it needs to be locked. There are other reasons that you might want to lock a branch, but this was my specific use case when I came up with this.

There are a dozen ways to do this; you can simply delete the branch, you can remove check-in permissions; however, you could also create a custom build, which prevents check-ins. This isn’t perfect, but it does give you a lot more flexibility that some of the other options.

How

Create a new build definition:

lockbranch1

Then remove all the standard components. As a base level, it probably looks something like this:

lockbranch2

Now, set the build as a gated check-in for your branch, and the next time you try and check something into that branch, you’ll end up with a build failure:

lockbranch3

Why?

The advantage of this method is mainly flexibility. When someone attempts to check into this branch, you execute a custom workflow; so you can send an e-mail, give a custom message, connect to an IoT device that administers a small electric shock, etc. You don’t have to blanket reject check-ins, you can call call the TFS API, interact with services; you could even implement some kind of rudimentary approval system for it.

Caveats

There are a couple of issues with doing this: firstly, if someone tries to check into two gated branches at the same time, they are given the option of a build – as far as I’m aware, there’s no way around this (obviously this means that this solution is not water-tight). Although, again, one of the advantages of the flexibility, is that you could probably check for this in the build.