Monthly Archives: October 2016

Acknowledging a Message Using RabbitMQ

Conceptually, message queues work in a similar fashion: you send a message to an exchange, the exchange allows people to read the message, and you have functionality that ensures the original message arrives with the destined recipient. Acknowledgement of a message is basically a way to ensure that delivery. Having said that, the two main message brokers that I’ve been investigating deal with this in a slightly different manner (albeit, the same things happen in the end).

If you want to follow through, it might be an idea to use the code from my first post as a starting point.

Let’s change the send code first to send a few messages:

static void Main(string[] args)
{            
    for (int i = 1; i <= 100; i++)
    {
        string msg = $"test{i}";
 
        SendNewMessage(msg);        
    } 
            
}

Now we have 100 messages.

rabbitack1

Next, we’ll look at the receiving code:

public void ReceiveNextMessage()
{
    var result = _channel.QueueDeclare("NewQueue", true, false, false, null);
    Console.WriteLine(result);
 
    EventingBasicConsumer consumer = new EventingBasicConsumer(_channel);
    consumer.Received += Consumer_Received;
 
    _channel.BasicConsume("NewQueue", false, consumer);
 
}

BasicConsume() has a parameter called “noAck”. It took me a while to work this out, but noAck means that it doesn’t expect an acknowledgement; that is, it will automatically acknowledge receipt. So, noAck = True mean automatically acknowledge, and noAck = False means manually acknowledge. That not entirely uncomplicated.

The received event looks a bit like this:

private void Consumer_Received(object sender, BasicDeliverEventArgs e)
{
    var body = e.Body;
    var message = Encoding.UTF8.GetString(body);
    //if (message.Contains("3")) 
    //   throw new Exception("Error here !");
 
    //_channel.BasicAck(e.DeliveryTag, false);
 
    Console.WriteLine(message);
}

I’ve left the error and the commented out BasicAck in on purpose. If you run this, unlike with ActiveMQ, where you will get a message at a time, you will get all messages in the queue (because it’s event based). Add in the BasicAck() to acknowledge the queue and you’re good to go.

If you add in the error at this stage, you can see that, in exactly the same way as ActiveMQ, it will only acknowledge the correct message. What you will also see here is we have the poison message scenario that I discussed in this post on ActiveMQ.

IMHO, this is where RabbitMQ beats ActiveMQ hands down. The following code is the simplest version of dealing with a poison message:

private void Consumer_Received(object sender, BasicDeliverEventArgs e)
{
    try
    {
        var body = e.Body;
        var message = Encoding.UTF8.GetString(body);
 
        if (message.Contains("3"))
            throw new Exception("Error here !");
 
        _channel.BasicAck(e.DeliveryTag, false);
 
        Console.WriteLine(message);
    }
    catch (Exception ex)
    {
        Console.WriteLine(ex);
        _channel.BasicNack(e.DeliveryTag, false, true);
    }
}

So, we have a problem, and we issue a Nack. What the Nack does is allow you to re-queue the message. The code above allows the queue to process, but just moves all the bad ones to the back. The obvious problem here is that it the problem isn’t transient, we’ll keep coming back to them. It does, however, get around the problem – the queue is no longer blocked.

The solution, as it was with ActiveMQ, is to put them in a “dead letter queue”; however, unlike ActiveMQ, this is remarkably easy. Firstly, let’s refactor our queue creation a little:

public void ReceiveNextMessage()
{
    Dictionary<string, object> args = DeadLetterHelper.CreateDeadLetterQueue(_channel);
 
    // How declare the queue and pass in the dead letter exchange
    var result = _channel.QueueDeclare("NewQueue", true, false, false, args);
    Console.WriteLine(result);
 
    EventingBasicConsumer consumer = new EventingBasicConsumer(_channel);
    consumer.Received += Consumer_Received;
 
    _channel.BasicConsume("NewQueue", false, consumer);
 
}

You’ll notice that we go to a new helper method called: “CreateDeadLetterQueue()” and return a dictionary; which is, in turn, passed through to our new queue. The CreateDeadLetterQueue() function looks like this:

public static Dictionary<string, object> CreateDeadLetterQueue(IModel channel,
    string deadLetterExchange, string deadLetterRoutingKey, string deadLetterQueue)
{
    // Declare dead letter exchange                     
    channel.ExchangeDeclare(deadLetterExchange, "direct");
    Dictionary<string, object> args = new Dictionary<string, object>()
    {
        { "x-dead-letter-exchange", deadLetterExchange },
        { "x-dead-letter-routing-key", deadLetterRoutingKey }
    };
 
    // Bind the exchange to a queue
    channel.QueueDeclare(deadLetterQueue, true, false);
    channel.QueueBind(queue: deadLetterQueue,
                    exchange: deadLetterExchange,
                    routingKey: deadLetterRoutingKey);
    return args;
}

There’s effectively two steps. Firstly we need a dead letter exchange, and this needs a routing key (in this case, “dead-letter”). Next, we declare the DeadLetterQueue with the same routing key. Finally, return the argument list, which allows the linking of the main queue to the dead letter queue.

Now we are going to change the receive code so that it doesn’t re-queue:

private void Consumer_Received(object sender, BasicDeliverEventArgs e)
{
    try
    {
        var body = e.Body;
        var message = Encoding.UTF8.GetString(body);
 
        if (message.Contains("3"))
            throw new Exception("Error here !");
 
        _channel.BasicAck(e.DeliveryTag, false);
 
        Console.WriteLine(message);
    }
    catch (Exception ex)
    {
        Console.WriteLine(ex);
        _channel.BasicNack(e.DeliveryTag, false, false);
    }
}

And running it results in a dead letter queue full of all our dodgy data:

rabbitack2

If you start getting errors when you run this, try referring to this article.

References

RabbitMQ documentation on the subject

RabbitMQ Change Queue

Whilst trying to add a dead letter exchange to an existing queue, I got the following error:

Exception thrown: ‘RabbitMQ.Client.Exceptions.OperationInterruptedException’ in RabbitMQ.Client.dll

Additional information: The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=406, text=”PRECONDITION_FAILED – inequivalent arg ‘x-dead-letter-exchange’ for queue ‘NewQueue’ in vhost ‘/’: received the value ‘dl.exchange’ of type ‘longstr’ but current is none”, classId=50, methodId=10, cause=

There is a clue is in the error message:

…received the value ‘dl.exchange’ of type ‘longstr’ but current is none

So, the problem is that the exchange is changing.

One solution is to delete the queue:

rabbitmqchange1

This didn’t feel right, so I asked, and it turns out you can’t do this in code.

Also, remember that both the publisher and subscriber need to change the code, otherwise, the publisher will re-declare the queue without the exchange. Not that this happened to me – I realised that straight away, and didn’t spend an hour trying to work out why even deleting the queue didn’t work.

Mesh Colliders in Unity

Mesh colliders are, generally speaking, a bad idea in Unity. The reason being that they cause collision based on the detailed mesh that forms the object. This is bad, basically because this generates many collision points. Consider this object:

meshcollider1

As you can see, it’s a basic cylinder (a cup), so a box collider would add eight collision points. A mesh collider generates a point for each of the 496 vertices.

meshcollider2

So, whilst it’s more accurate, it uses more resources.

Okay, so now I’ve said why you shouldn’t use a mesh collider, I’ll cover how to use them.

Firstly, you need to add a mesh collider:

meshcollider3

There are three important things to note here:

  1. There is a mesh collider
  2. It is convex
    1. The convex flag allows the collider to collide with other colliders
  3. It is not a trigger
    1. The trigger flag turns the collision into a programmatic notification only

The manual for this is here.

Acknowledging a Message in Active MQ

Following on from my previous post on Active MQ, I’m now going to explore creating a mechanism whereby the message can fail.

The main issue with the trial project was that it used an auto acknowledge:

using (ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))

There are a number of considerations here; firstly, what if the message that you read errors – we want to retry; but secondly, what happens is the message repeatedly errors (this type of message is known as a poison message).

The Problem

Here’s the send code again from the last post:

string queueName = "TextQueue";

Console.WriteLine($"Adding message to queue topic: {queueName}");

string brokerUri = $"activemq:tcp://localhost:61616";  // Default port
NMSConnectionFactory factory = new NMSConnectionFactory(brokerUri);

using (IConnection connection = factory.CreateConnection())
{
    connection.Start();

    using (ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
    using (IDestination dest = session.GetQueue(queueName))
    using (IMessageProducer producer = session.CreateProducer(dest))
    {                     
        producer.DeliveryMode = MsgDeliveryMode.Persistent;

        var msg = session.CreateTextMessage();
        producer.Send(msg);
                                                
        Console.WriteLine($"Sent {text} messages");
        
    }
}            

Other than splitting the message out, I haven’t changed anything. Okay, so let’s run that and check the queue:

msgrec1

msgrec2

Now, I’m going to change the receive code slightly:

    string queueName = "TextQueue";
 
    string brokerUri = $"activemq:tcp://localhost:61616";  // Default port
    NMSConnectionFactory factory = new NMSConnectionFactory(brokerUri);
 
    using (IConnection connection = factory.CreateConnection())
    {
        connection.Start();
        using (ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
        using (IDestination dest = session.GetQueue(queueName))
        using (IMessageConsumer consumer = session.CreateConsumer(dest))
        {
            IMessage msg = consumer.Receive();
            if (msg is ITextMessage)
            {
                ITextMessage txtMsg = msg as ITextMessage;                        
 
                Console.WriteLine($"Received message: {txtMsg.Text}");
 
                message = txtMsg.Text;
 
                throw new Exception("Test"); // <-- May cause problems
                
 
                return true;
            }
            else
            {
                Console.WriteLine("Unexpected message type: " + msg.GetType().Name);
            }
        }                
    }

As you can see, there is now an issue in the code; for some reason, it is repeatedly throwing an error entitled “Test”. I can’t work out why (maybe I’ll post a question on StackOverflow later), but when I run that, despite crashing, the message is read, and the queue is now empty.

Obviously, this is an issue: if that message was “DebitBankAccountWith200000” then someone is going to wish that the person that wrote this code hadn’t automatically acknowledged it.

Firstly, how do we stop the auto acknowledge?

There are basically two alternatives to auto acknowledge (there are more, but we’ll only look at two here): client acknowledge, and transactional acknowledgement. I’ll leave transactional acknowledgement for another day.

Client Acknowledge

This method is basically the manual version. You’re telling ActiveMQ that you will, or will not acknowledge the message yourself. Now, let’s alter the receive code slightly:

using (ISession session = connection.CreateSession(AcknowledgementMode.ClientAcknowledge))
using (IDestination dest = session.GetQueue(queueName))
using (IMessageConsumer consumer = session.CreateConsumer(dest))
{
    IMessage msg = consumer.Receive();
    if (msg is ITextMessage)
    {
        ITextMessage txtMsg = msg as ITextMessage;                        

        Console.WriteLine($"Received message: {txtMsg.Text}");

        message = txtMsg.Text;

        throw new Exception("Test");
        msg.Acknowledge();
        

        return true;
    }
    else
    {
        Console.WriteLine("Unexpected message type: " + msg.GetType().Name);
    }
}                

As you can see, I’ve changed two main parts here; the first is that I’ve changed that AcknowledgmentMode to ClientAcknowledge and I’ve added a call to the acknowledge method on the message.

Now let’s re-run the send and receive and see what happens to the queue.

msgrec3

Unfortunately, I still haven’t worked out why it’s crashing, but here’s the queue; still safely with the message:

msgrec4

We had an error, it crashed, but because it was never acknowledged, it’s still safe and sound in the queue. When we run the receive again, hopefully the bug will have magically disappeared and the message will successfully process.

Poison Messages

The concept of a poison message is where the issue with the message, resides in the message; the situation described above is not a poison message because the message is fine; but code is erroring. Once the code above is fixed, the message can be processed; however, let’s have a look at a different error scenario; here’s some new receive code:

string queueName = "TextQueue";
 
string brokerUri = $"activemq:tcp://localhost:61616";  // Default port
NMSConnectionFactory factory = new NMSConnectionFactory(brokerUri);
 
using (IConnection connection = factory.CreateConnection())
{
    connection.Start();
    using (ISession session = connection.CreateSession(AcknowledgementMode.ClientAcknowledge))
    using (IDestination dest = session.GetQueue(queueName))
    using (IMessageConsumer consumer = session.CreateConsumer(dest))
    {
        IMessage msg = consumer.Receive();
        if (msg is ITextMessage)
        {
            ITextMessage txtMsg = msg as ITextMessage;                        
 
            Console.WriteLine($"Received message: {txtMsg.Text}");
 
            message = txtMsg.Text;
 
            if (message.First() != 't')
                throw new Exception("Message is invalid");
            msg.Acknowledge();
            
 
            return true;
        }
        else
        {
            Console.WriteLine("Unexpected message type: " + msg.GetType().Name);
        }
    }                
}

This time, we have some code that actually processes the message and, based on the contents, does something; in this case, it throws an error where the message doesn’t start with ‘t’. So, the rules are simple; messages start with ‘t’. Let’s run the send code again and try some messages:

msgrec5

And now let’s receive these messages (incidentally, while testing this, my notes on starting two projects might be useful):

msgrec6

Okay – so we’ve come across a message that we can’t process. This has yet to be acknowledged, so it’s still safe and sound in the queue. We’ll simply restart the listener and pick it up:

msgrec7

Ah – okay. So, we have a problem. “nexttest” is causing an error with the queue, but if we don’t acknowledge it, we’re going to keep picking it up and erroring.

The Antidote

Once we know that the message is causing a problem, we can send it to a special queue; here’s the code to capture the error:

ITextMessage txtMsg = msg as ITextMessage;
 
try
{
    Console.WriteLine($"Received message: {txtMsg.Text}");
 
    message = txtMsg.Text;
 
    if (message != null && message.First() != 't')
    {
        // The message has a problem, and so we need to file it away without losing it
 
        throw new Exception("Message is invalid");
    }
    msg.Acknowledge();
 
    return true;
}
catch
{
    ResendMsg(session, msg);
    msg.Acknowledge(); // Acknoweledge the message from the original queue
}

And here’s the new method, ResendMsg:


private static void ResendMsg(ISession session, IMessage msg)
{
    var deadLetterQueue = new Apache.NMS.ActiveMQ.Commands.ActiveMQQueue("ActiveMQ.DLQ");
    IMessageProducer producer = session.CreateProducer(deadLetterQueue);
    producer.Send(msg);
}

The first time this executes, it will throw and catch the error, and then resend to a dead letter queue:

msgrec8

Subsequent runs can proceed past the problem message, and the message itself remains intact:

msgrec9

Start Two Project Simultaneously

In Investigating message queues with RabbitMQ and ActiveMQ, I came across a problem that I’ve never considered before. Starting two projects at the same time.

The well-known (at least to me) way of doing this was to start the start-up project, and then right-click the second project, and select Start New Instance:

2proj

However, this is laborious when you’re constantly starting both projects. So, the alternative is to right click the solution, and select Set StartUp Projects… :

2proj2

Then you can select to start multiple projects:

2proj3

A small thing, but saves a lot of time.

A C# Programmer’s Guide to Installing, Running and Messaging with RabbitMQ

Following this year’s trip to DDD North I got speaking to someone about Rabbit MQ. I’d previously done some research on Active MQ, however, given that there are, realistically, only two options in this market, I thought I should have a look at the competition.

Install from here.

You also need to install Erlang.

After the install… nothing happens. No matter how long you wait, assuming it’s just your slow laptop (don’t want to talk about that anymore). Navigate to:

rabbitmq1

C:\Program Files\RabbitMQ Server\rabbitmq_server-3.6.0\sbin

In a DOS prompt, type:

rabbitmq-plugins enable rabbitmq_management

rabbitmq2

Rabbit will now run as a service, which also feels much cleaner that ActiveMQs console app style interface.

Now, log-in to http://localhost:15672/, and log-in as guest/guest:

rabbitmq3

Code

Just like with ActiveMQ, there is a NuGet package that you can use for interacting with the exchange:

https://www.nuget.org/packages/RabbitMQ.Client. Your project needs .Net 4.5.1 or higher; otherwise, you’ll see this:

rabbitmq4

If it’s not, upgrade your project to 4.5.1:

rabbitmq5

Okay, so now we’re up and running, here’s the code for the send:

static void Main(string[] args)
{            
    while (true)
    {
        string msg = Console.ReadLine();
        if (string.IsNullOrWhiteSpace(msg))
        {
            break;
        }
 
        SendNewMessage(msg);
    } 
    
}
private static void SendNewMessage(string message)
{
    var factory = new ConnectionFactory() { HostName = "localhost" };
    using (var connection = factory.CreateConnection())
    using (var channel = connection.CreateModel())
    {
        var result = channel.QueueDeclare("NewQueue", true, false, false, null);
        Console.WriteLine(result);
 
        channel.BasicPublish("", "NewQueue", null, Encoding.UTF8.GetBytes(message));                
 
    }
}

Just like with ActiveMQ, you can now just look at the queue:

rabbitmq6

A quick caveat here:


var result = channel.QueueDeclare("NewQueue");

Doesn’t work because exclusive defaults to ‘true’, meaning that only your session can see it.

Receiving a Message


static void Main(string[] args)
{
    while (true)
    {
        ReceiveNextMessage();
 
        string exit = Console.ReadLine();
        if (!string.IsNullOrWhiteSpace(exit)) break;
    }
 
}
 
private static void ReceiveNextMessage()
{
    var factory = new ConnectionFactory() { HostName = "localhost" };
    using (var connection = factory.CreateConnection())
    using (var channel = connection.CreateModel())
    {
        var result = channel.QueueDeclare("NewQueue", true, false, false, null);
        Console.WriteLine(result);
 
        EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
        consumer.Received += Consumer_Received;
 
        channel.BasicConsume("NewQueue", true, consumer);
 
        System.Threading.Thread.Sleep(1000);
 
    }
}
 
private static void Consumer_Received(object sender, BasicDeliverEventArgs e)
{
    var body = e.Body;
    var message = Encoding.UTF8.GetString(body);
    Console.WriteLine(message);
}

rabbitmq7

Why the Thread.Sleep? The reason is that, because the message consumer is event based, you need to stop tidying up the objects until you’re done. Obviously, sleeping for 1 second has its issues. I suspect this could be better achieved using a TaskCompletionSource, of even just a Console.ReadLine(), but for the purposes of this post, it suffices.

Topics

The concept of topics seems to be the same as ActiveMQ; however, the implementation is different, and there were a few dead-ends to go down first.

The following code, for example:


var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
    channel.ExchangeDeclare("NewTopic", "testTopic");
    
    channel.BasicPublish("NewTopic", "test", null, Encoding.UTF8.GetBytes(message));
 
}

Gives the error: COMMAND_INVALID – unknown exchange type ‘testTopic’

rabbitmq8

RabbitMQ seems to have some magic strings that determine the message type. This works:

 
private static void SendNewTopic(string message)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
    channel.ExchangeDeclare("NewTopic", "topic"); // topic is a magic string
    
    channel.BasicPublish("NewTopic", "test", null, Encoding.UTF8.GetBytes(message));
 
}

The “test” is a routing code. These seem to be quite an involved system; however, it basically tells the exchange where the message is going.

rabbitmq9

Receive


private static void ReceiveNextTopic()
{
    var factory = new ConnectionFactory() { HostName = "localhost" };
    using (var connection = factory.CreateConnection())
    using (var channel = connection.CreateModel())
    {
        channel.ExchangeDeclare("NewTopic", "topic");
        var queueName = channel.QueueDeclare().QueueName;
 
        channel.QueueBind(queue: queueName,
                        exchange: "NewTopic",
                        routingKey: "test");
 
        EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
        consumer.Received += Consumer_Received;
 
        channel.BasicConsume(queueName, true, consumer);
 
        Console.ReadLine();
    }
}

private static void Consumer_Received(object sender, BasicDeliverEventArgs e)
{
    var body = e.Body;
    var message = Encoding.UTF8.GetString(body);
    Console.WriteLine(message);
}

Notice the routing key; this (broadly speaking) needs to match the sender’s routing key. The purpose seems to be to allow a complex mapping between publisher and subscriber.

Conclusion

In comparison to ActiveMQ, RabbitMQ seems to be a more actively maintained code base; a quick comparison of the GitHub repo of each reveals more activity in Rabbit. Rabbit also seems a little cleaner in the way it’s run and installed; however, it also seems to have more moving parts.

There are a number of companies that offer support for ActiveMQ commercially; however, the company that wrote RabbitMQ offer commercial support, should you need it.

Can you run Active and RabbitMQ together?

No. No, you can’t. They both seem to want to use the same resources, and so if you want to use them both on the same machine, you can, but only one at a time.

Acknowledgements

I used the following websites extensively in this article:

http://arcware.net/installing-rabbitmq-on-windows/

https://www.rabbitmq.com/tutorials/tutorial-one-dotnet.html

Asynchronous Behaviour in Applications using Message Queues

This was one of the features that interested me most in message queues. Basically, you have an application, and you want to communicate to it; that is, Joe Bloggs the user is sat there, tapping away at his keyboard, and I want to send him a message that interrupts him. There are dozens of use cases for this: the user has entered an order and there’s a problem with it, the user’s account has been locked and needs to be logged out, we want to alert them that there’s a data change so that they can refresh their data.

The relevant part of message queuing here is a topic; which allows me to send an alert to one of more listeners.

To this end, I’ve extended my helper class to include a TopicHelper:

public class TopicHelper : IDisposable
{
    IReceiveTopicBehaviour _receiveBehaviour = null;
    ConnectionFactory _factory = new ConnectionFactory() { HostName = "localhost" };
    IConnection _connection = null;
    IModel _channel = null;
    EventingBasicConsumer _consumer = null;
 
    private void SetupChannel()
    {
        if (_connection == null)
        {
            _connection = _factory.CreateConnection();
            _channel = _connection.CreateModel();
        }
    }
 
    private void SetupConsumer()
    {
        if (_channel == null) throw new Exception("Channel is not set-up");
        if (_connection == null) throw new Exception("Connection is not set-up");
 
        if (_consumer == null)
            _consumer = new EventingBasicConsumer(_channel);
    }
 
    public void ReceiveTopic(string topicName, string key, IReceiveTopicBehaviour behaviour)
    {
        _receiveBehaviour = behaviour;
 
        SetupChannel();
        SetupConsumer();
 
        _channel.ExchangeDeclare(topicName, "topic");
        var queue = _channel.QueueDeclare();
        var queueName = queue.QueueName;
 
        _channel.QueueBind(queue: queueName,
        exchange: topicName,
        routingKey: key);
 
        _consumer.Received += Consumer_Received;
 
        _channel.BasicConsume(queueName, true, _consumer);
 
    }
 
    private void Consumer_Received(object sender, BasicDeliverEventArgs e)
    {
        var body = e.Body;
        var message = Encoding.UTF8.GetString(body);
        Console.WriteLine(message);
 
        _receiveBehaviour.OnReceive(message);
    }
 
    public void SendTopic(string topicName, string key, string data)
    {
        SetupChannel();
 
        _channel.ExchangeDeclare(topicName, "topic");
        _channel.BasicPublish(topicName, key, null, Encoding.UTF8.GetBytes(data));            
 
    }
 
    public void Dispose()
    {            
        _connection.Dispose();
        _channel.Dispose();            
    }
}

There’s a lot of code here, but basically there’s only two methods of note: SendTopic() and ReceiveTopic(). I’ve also made it a disposable class. The next thing we want is a listener; for this, I’ve used a WPF app, but any application should be able to do this:

rabbitalert1

The call to set-up the alert is this:

public partial class MainWindow : Window
{
    private TopicHelper _topicHelper = new TopicHelper();
    public MainWindow()
    {
        InitializeComponent();
 
        Task.Run(() =>
        {
            ReceiveTopic recTopic = new ReceiveTopic();
            _topicHelper.ReceiveTopic("Alerts", "thisApp", recTopic);
        });
    }
}

I’ve used the code behind because I’m just proving a point. Obviously, in real life, this would be some abstraction in the business layer. The main thing to note is the ReceiveTopic class that is instantiated and passed through; here’s its implementation:


public class ReceiveTopic : IReceiveTopicBehaviour
{
    public void OnReceive(string message)
    {
        System.Windows.MessageBox.Show(message);
    }
}

This, effectively, allows me to provide custom functionality on the receive. If you find that you’re using this to pass in the same one-liner, you could adapt this to simply pass in an action.

The final piece of the puzzle is the send alert; in my case this is a console app:

rabbitalert2

Here’s the code for the main function:


static void Main(string[] args)
{
    Console.WriteLine("Alert: ");
    string alertText = Console.ReadLine();
 
    using (TopicHelper helper = new TopicHelper())
    {
        helper.SendTopic("Alerts", "thisApp", alertText);
    }
 
    Console.WriteLine("Finished");
    Console.ReadLine();
}

Does it work?

Yes, of course.

rabbitalert3