Azure Service Bus Listener with Azure Service Fabric

PUBLISHED ON APR 21, 2017 — AZURE, SERVICE BUS, SERVICE FABRIC, UNCATEGORISED

Starting to play around with Azure Service Fabric, and one of the things a friend wanted to do was to be able to listen for messages on a service bus, and handle messages doing some work with file processing and blob storage. Doing this in Service Fabric would easily enable such a thing with massive scale, so I thought I’d take a look at laying out the basics for him.

As such, this blog post is going to cover the basics of listening to an Azure Service Bus queue on Azure Service Fabric.

My sample code can be found on GitHub here.

First things first, before you can work with Service Fabric you need to set up your dev environment

Obviously, you’ll need an Azure account and to have a Service Bus and Queue set up – you can find info on that here if required.

The application I’m going to create will be very simple, only consisting of two parts: - A stateless service
- A dummy client console application

The client is just for publishing a message onto the service bus – we need to be able to check this thing works right?

The stateless service is what we’ll deploy to service fabric. This will listen to the service bus, and trigger a method when something is received.

Now create a new solution in Visual Studio, and you’re going to want to create a new Service Fabric Application. From the window that appears, select a Stateless Service. I’ve called mine ServiceBusTrigger.

Other types of services may be more suitable depending on what you’re trying to achieve, but for my use case – receiving and processing messages – I don’t care about state.

Now we have the service scaffolded, let’s set our service bus listener.

First, you’ll need to add the Nuget package for WindowsAzure.ServiceBus.

In the ServiceBusTrigger class (or whatever name you gave to your service) you’ll see an override method called CreateServiceInstanceListeners.

This is where we’ll want to set up our listener, and in the future any other listeners you require for this service.

We need to create our ServiceInstanceListener – this will be a class that implements the ICommunicationListener interface. I created a folder to keep mine separate called ServiceListeners, and called my class ServiceBusQueueListener.

Implementing the ICommunicationListener interface you’ll see 3 methods – Abort, CloseAsync, and OpenAsync. We’ll use Abort and CloseAsync to close the connection to our service bus, and we’ll use OpenAsync to set up the client and start receiving messages.

Without any changes yet, your class should look like this:

internal class ServiceBusQueueListener : ICommunicationListener 
{ 
    public void Abort() 
    { 
        throw new NotImplementedException(); 
    } 
  
    public Task CloseAsync(CancellationToken cancellationToken) 
    { 
        throw new NotImplementedException(); 
    } 
  
    public Task OpenAsync(CancellationToken cancellationToken) 
    { 
        throw new NotImplementedException(); 
    } 
}

We’ve already added the Nuget package for the service bus, so let’s start configuring that.

We only need a connection string, and the name of a queue, in order to set up our client. I’ve added these as private fields on the class, so we can set them in the constructor, then access them in the OpenAsync method. I’ve also added two more fields – one for the service bus QueueClient, and one for an Action.

The Action will also be passed in the constructor and set, and we’ll call this method when a message is received. This approach lets this listener be quite reusable – you simply pass in the callback to handle incoming messages where it’s implemented. If there’s a better approach, let me know! 🙂

In the OpenAsync method we’ll create our client from the connection string and queue name, and then set it up so that when a message is received, we invoke the callback Action.

I’ve also added a Stop method to close down the client, this will be called by both CloseAsync and Abort, as in this scenario we’ll want to do the same thing in both instances.

After these changes, your class should look something like this:

internal class ServiceBusQueueListener : ICommunicationListener 
{ 
    private string _queueName; 
    private string _connectionString; 
    private QueueClient _client; 
    private Action _callback; 
  
    public ServiceBusQueueListener(Action callback, string connectionString, string queueName) 
    { 
        // Set variables 
        _callback = callback; 
        _connectionString = connectionString; 
        _queueName = queueName; 
    } 
  
    public void Abort() 
    { 
        // Close down 
        Stop(); 
    } 
  
    public Task CloseAsync(CancellationToken cancellationToken) 
    { 
        // Close down 
        Stop(); 
        return Task.FromResult(true); 
    } 
  
    public Task OpenAsync(CancellationToken cancellationToken) 
    { 
        _client = QueueClient.CreateFromConnectionString(_connectionString, _queueName); 
        _client.OnMessage(message => _callback.Invoke(message)); 
  
        // Return the uri - in this case, that's just our connection string 
        return Task.FromResult(_connectionString); 
    } 
  
    private void Stop() 
    { 
        if (_client != null && !_client.IsClosed) 
        { 
            _client.Close(); 
            _client = null; 
        } 
    } 
} 

That’s most of the work done!

Now back in the service class, we’ll need to register this listener in the CreateServiceInstanceListeners method. Here we simply create a new ServiceInstanceListener class with a new instance of our ServiceBusQueueListener.

I’ve got the connection string and queue name as private fields again, and I’ve added a simple Test method that takes in a BrokeredMessage as a parameter, and will write the contents out to the Debug window. This will be the action we pass in to our listener.

All in, our service class now looks like this:

internal sealed class TriggerService : StatelessService 
{ 
    private string _connectionString = "your-connection-string"; 
    private string _queueName = "your-queue-name"; 
  
    public TriggerService(StatelessServiceContext context) 
        : base(context) 
    { 
    } 
  
    /// <summary>
    /// Optional override to create listeners (e.g., TCP, HTTP) for this service replica to handle client or user requests.
    /// </summary>
    /// <returns>A collection of listeners.</returns>
    protected override IEnumerable<ServiceInstanceListener> CreateServiceInstanceListeners()
    {
        Action<BrokeredMessage> testAction = Test;
        yield return new ServiceInstanceListener(context => new ServiceBusQueueListener(testAction, _connectionString, _queueName), "StatelessService-ServiceBusQueueListener");
    }

    private void Test(BrokeredMessage message)
    {
        Debug.WriteLine(message.GetBody<string>());
    }
}

And that’s all that’s required!

To test it, let’s add a console project to our solution to send something to the service bus – this is my DummyClient.

Again, add the Nuget package for the service bus, and set up the connection string and queue name.

We then want to create a connection again, the same as we did in our listener class.

Now all that’s needed is to create a message and send it using the client:

class Program 
{ 
    private static string _connectionString = "your-connection-string"; 
    private static string _queueName = "your-queue-name"; 

    static void Main(string[] args) 
    { 
        // Writing to the service bus for testing purposes 
        var client = QueueClient.CreateFromConnectionString(_connectionString, _queueName); 
        var message = new BrokeredMessage("Testing 123"); 
  
        client.Send(message); 
    } 
} 

If you debug the service, and then run the DummyClient, you should now see your test message printed in the Output window!

I’ve added one last step to my sample on GitHub, and that’s to make use of environment variables from the config, so you can have the connection string and queue name swapped out depending on where you’re running the code.

The first step is to set up the default environment variables in ServiceManifest.xml - this is under the PackageRoot folder on your service.

In there find the CodePackage node, and underneath the EntryPoint add a new EnvironmentVariables node. And under that, add two EnvironmentVariable nodes, one for the connection string, and one for the queue name.

Your code package node should now look like this:

<CodePackage Name="Code" Version="1.0.0">
  <EntryPoint>
    <ExeHost>
      <Program>TriggerService.exe</Program>
    </ExeHost>
  </EntryPoint>
  <EnvironmentVariables>
    <EnvironmentVariable Name="ServiceBusConnectionString" Value="." />
    <EnvironmentVariable Name="ServiceBusQueueName" Value="." />
  </EnvironmentVariables>
</CodePackage>

We also want to set these in ApplicationManifest.xml = this can be found under the 'ServiceBusTrigger' node (or whatever you called your project – not your service), under the ApplicationPackageRoot.

This time find the ServiceManifestImport node, and add a new EnvironmentOverrides node, setting the CodePackageRef to the one above.

Again, in here we want to add the two EnvironmentVariable nodes, however this time we’re going to set the value as a parameter. This section should look something like this:

<ServiceManifestImport>
  <ServiceManifestRef ServiceManifestName="TriggerServicePkg" ServiceManifestVersion="1.0.0" />
  <ConfigOverrides />
  <EnvironmentOverrides CodePackageRef="Code">
    <EnvironmentVariable Name="ServiceBusQueueName" Value="[ServiceBusQueueName]" />
    <EnvironmentVariable Name="ServiceBusConnectionString" Value="[ServiceBusConnectionString]" />
  </EnvironmentOverrides>
</ServiceManifestImport>

Now, we can override that value from our environment specific parameter xml files. These are found under the ApplicationParameters folder – there are 3 by default: Cloud.xml, Local1Node.xml, Local5Node.xml

In each of these files there is a Parameters node, under which we want to add two more Parameter nodes after the existing one. The name of the parameter should match the name in [ ] brackets that we set above, and the values should be the environment specific values you want:

<Parameters>
  <Parameter Name="TriggerService_InstanceCount" Value="1" />
  <Parameter Name="ServiceBusQueueName" Value="your-queue-name" />
  <Parameter Name="ServiceBusConnectionString" Value="your-connection-string" />
</Parameters>

Now that’s configured, the last step is to read these values from the config and put them to use. Go back to the TriggerService class, and instead of hardcoding queue and connection string values, we’ll read them from the context and get the environment variables by name.

After updating, your constructor should now look like this:

public TriggerService(StatelessServiceContext context) : base(context) 
{ 
    var config = Context.CodePackageActivationContext.GetConfigurationPackageObject("Config"); 

    _connectionString = Environment.GetEnvironmentVariable("ServiceBusConnectionString"); 
    _queueName = Environment.GetEnvironmentVariable("ServiceBusQueueName"); 
} 

And that’s it!

That’s all that’s required to set up a listener running in an Azure Service Fabric app. As you can see, it’s easy to switch out the service bus for any other listener you might want, or to extend the Action we passed in to handle anything we want when a message is received.

You can find my sample code on GitHub here.

As always, if you have any suggestions or objections please let me know! We’re all here to learn 🙂

comments powered by Disqus