This was one of the features that interested me most in message queues. Basically, you have an application, and you want to communicate to it; that is, Joe Bloggs the user is sat there, tapping away at his keyboard, and I want to send him a message that interrupts him. There are dozens of use cases for this: the user has entered an order and there’s a problem with it, the user’s account has been locked and needs to be logged out, we want to alert them that there’s a data change so that they can refresh their data.
The relevant part of message queuing here is a topic; which allows me to send an alert to one of more listeners.
To this end, I’ve extended my helper class to include a TopicHelper:
public class TopicHelper : IDisposable
{
IReceiveTopicBehaviour \_receiveBehaviour = null;
ConnectionFactory \_factory = new ConnectionFactory() { HostName = "localhost" };
IConnection \_connection = null;
IModel \_channel = null;
EventingBasicConsumer \_consumer = null;
private void SetupChannel()
{
if (\_connection == null)
{
\_connection = \_factory.CreateConnection();
\_channel = \_connection.CreateModel();
}
}
private void SetupConsumer()
{
if (\_channel == null) throw new Exception("Channel is not set-up");
if (\_connection == null) throw new Exception("Connection is not set-up");
if (\_consumer == null)
\_consumer = new EventingBasicConsumer(\_channel);
}
public void ReceiveTopic(string topicName, string key, IReceiveTopicBehaviour behaviour)
{
\_receiveBehaviour = behaviour;
SetupChannel();
SetupConsumer();
\_channel.ExchangeDeclare(topicName, "topic");
var queue = \_channel.QueueDeclare();
var queueName = queue.QueueName;
\_channel.QueueBind(queue: queueName,
exchange: topicName,
routingKey: key);
\_consumer.Received += Consumer\_Received;
\_channel.BasicConsume(queueName, true, \_consumer);
}
private void Consumer\_Received(object sender, BasicDeliverEventArgs e)
{
var body = e.Body;
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(message);
\_receiveBehaviour.OnReceive(message);
}
public void SendTopic(string topicName, string key, string data)
{
SetupChannel();
\_channel.ExchangeDeclare(topicName, "topic");
\_channel.BasicPublish(topicName, key, null, Encoding.UTF8.GetBytes(data));
}
public void Dispose()
{
\_connection.Dispose();
\_channel.Dispose();
}
}
There’s a lot of code here, but basically there’s only two methods of note: SendTopic() and ReceiveTopic(). I’ve also made it a disposable class. The next thing we want is a listener; for this, I’ve used a WPF app, but any application should be able to do this:
The call to set-up the alert is this:
public partial class MainWindow : Window
{
private TopicHelper \_topicHelper = new TopicHelper();
public MainWindow()
{
InitializeComponent();
Task.Run(() =>
{
ReceiveTopic recTopic = new ReceiveTopic();
\_topicHelper.ReceiveTopic("Alerts", "thisApp", recTopic);
});
}
}
I’ve used the code behind because I’m just proving a point. Obviously, in real life, this would be some abstraction in the business layer. The main thing to note is the ReceiveTopic class that is instantiated and passed through; here’s its implementation:
public class ReceiveTopic : IReceiveTopicBehaviour
{
public void OnReceive(string message)
{
System.Windows.MessageBox.Show(message);
}
}
This, effectively, allows me to provide custom functionality on the receive. If you find that you’re using this to pass in the same one-liner, you could adapt this to simply pass in an action.
The final piece of the puzzle is the send alert; in my case this is a console app:
Here’s the code for the main function:
static void Main(string[] args)
{
Console.WriteLine("Alert: ");
string alertText = Console.ReadLine();
using (TopicHelper helper = new TopicHelper())
{
helper.SendTopic("Alerts", "thisApp", alertText);
}
Console.WriteLine("Finished");
Console.ReadLine();
}
Does it work?
Yes, of course.