I’ve recently been playing with the Azure Event Hub. This is basically a way of transmitting large amounts* of data between systems. In a later post, I may try and test these limits by designing some kind of game based on this.
As a quick disclaimer, it’s worth bearing in mind that I am playing with this technology, and so much of the content of this post can be found in the links at the bottom of this post - you won’t find anything original here - just a record of my findings. You may find more (and more accurate) information in those.
Event Hub Namespace
The first step, as with many Azure services, is to create a namespace:
For a healthy amount of data transference, you’ll pay around £10 per month.
Finally, we’ll create event hub within the namespace:
When you create the event hub, it asks how many partitions you need. This basically splits the message delivery; and it’s clever enough to work out, if you have 3 partitions and two listeners that one should have two slots, and one, one slot:
We’ll need an access policy so that we have permission to listen:
New Console Apps
We’ll need to create two applications: a producer and a consumer.
Let’s start with a producer. Create a new console app and add this NuGet library.
Here’s the code:
class Program
{
private static EventHubClient eventHubClient;
private const string EhConnectionString = "Endpoint=sb://pcm-testeventhub.servicebus.windows.net/;SharedAccessKeyName=Publisher;SharedAccessKey=key;EntityPath=pcm-eventhub1";
private const string EhEntityPath = "pcm-eventhub1";
public static async Task Main(string[] args)
{
EventHubsConnectionStringBuilder connectionStringBuilder = new EventHubsConnectionStringBuilder(EhConnectionString)
{
EntityPath = EhEntityPath
};
eventHubClient = EventHubClient.CreateFromConnectionString(connectionStringBuilder.ToString());
while (true)
{
Console.Write("Please enter message to send: ");
string message = Console.ReadLine();
if (string.IsNullOrWhiteSpace(message)) break;
await eventHubClient.SendAsync(new EventData(Encoding.UTF8.GetBytes(message)));
}
await eventHubClient.CloseAsync();
Console.WriteLine("Press ENTER to exit.");
Console.ReadLine();
}
}
Consumer
Next we’ll create a consumer; so the first thing we’ll need is to grant permissions for listening:
We’ll create a second new console application with this same library and the processor library, too.
class Program
{
private const string EhConnectionString = "Endpoint=sb://pcm-testeventhub.servicebus.windows.net/;SharedAccessKeyName=Listener;SharedAccessKey=key;EntityPath=pcm-eventhub1";
private const string EhEntityPath = "pcm-eventhub1";
private const string StorageContainerName = "eventhub";
private const string StorageAccountName = "pcmeventhubstorage";
private const string StorageAccountKey = "key";
private static readonly string StorageConnectionString = string.Format("DefaultEndpointsProtocol=https;AccountName={0};AccountKey={1}", StorageAccountName, StorageAccountKey);
static async Task Main(string[] args)
{
Console.WriteLine("Registering EventProcessor...");
var eventProcessorHost = new EventProcessorHost(
EhEntityPath,
PartitionReceiver.DefaultConsumerGroupName,
EhConnectionString,
StorageConnectionString,
StorageContainerName);
// Registers the Event Processor Host and starts receiving messages
await eventProcessorHost.RegisterEventProcessorAsync<EventsProcessor>();
Console.WriteLine("Receiving. Press ENTER to stop worker.");
Console.ReadLine();
// Disposes of the Event Processor Host
await eventProcessorHost.UnregisterEventProcessorAsync();
}
}
class EventsProcessor : IEventProcessor
{
public Task CloseAsync(PartitionContext context, CloseReason reason)
{
Console.WriteLine($"Processor Shutting Down. Partition '{context.PartitionId}', Reason: '{reason}'.");
return Task.CompletedTask;
}
public Task OpenAsync(PartitionContext context)
{
Console.WriteLine($"SimpleEventProcessor initialized. Partition: '{context.PartitionId}'");
return Task.CompletedTask;
}
public Task ProcessErrorAsync(PartitionContext context, Exception error)
{
Console.WriteLine($"Error on Partition: {context.PartitionId}, Error: {error.Message}");
return Task.CompletedTask;
}
public Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
{
foreach (var eventData in messages)
{
var data = Encoding.UTF8.GetString(eventData.Body.Array, eventData.Body.Offset, eventData.Body.Count);
Console.WriteLine($"Message received. Partition: '{context.PartitionId}', Data: '{data}'");
}
return context.CheckpointAsync();
}
}
As you can see, we can now transmit data through the Event Hub into client applications:
Footnotes
*Large, in terms of frequency, rather than volume - for example, transmitting a small message twice a second, rather than uploading a petabyte of data
References
https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-dotnet-standard-getstarted-send
https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-dotnet-standard-getstarted-receive-eph