Azure

Azure Service Fabric: Inter Service Communication: Naming Service

Azure Service Fabric: Inter Service Communication: DNS Naming Service

In this article, we will look at how you can call the actor service which we created in the previous post and send data to it. We will be building a stateless application which will connect with IoT Hub and pull data from it.

Azure Service Fabric: Inter Service Communication: Service to Service Communication –

Service fabric generally supports any protocol for communication between services. In a distributed microservice environment its often difficult to keep track of where a particular microservice is running as a service might be moved for various reasons like failover, resource load balancing, upgrades. In this kind of scenario, the endpoint address of a particular service can change. For this service fabric out of the box provides a System Service called Naming Service which maintains a table mapping individual services to its endpoint address. While doing inter-service communication, you can directly utilize the unique service name which does not change over the lifetime of the service. The name of a service in service fabric looks like “fabric://{ApplicationName}/{ServiceName}.”

Service Fabric provides a separate DNS service for applications which already have an existing URL name. Service Fabric also allows you to bring your DNS service as well.

Out of the box, Service Fabric provides a specific set of communication API options for reliable services.

  1. HTTP: Applications written in any language can utilize this communication protocol. For C# applications you can also use ASP.NET WebAPI as well.
  2. WCF: If you have a C# application which uses the WCF as the communication channel then you can utilize the same communication channel inside Service Fabric. This functionality is available only for Windows-based clusters.
  3. Service Remoting: The fastest and the most natural way and the default option to communicate between two services is by using the service remoting functionality. The service remoting functionality is provided by the Reliable Service Framework and can be used for both Java and C# applications. Service Remoting functionality out of the box handles service address resolution, connection retry and error handling. Service Remoting provided strongly typed RPC calls for communicating with your services.

We will now demonstrate how services can communicate with each other in our ServiceFabric Project. Let’s add a stateless application by right-clicking on the Service Fabric project -> Add -> New Service Fabric Services and then select a stateless ‘.Net core 2.0‘ application.

Adding a Stateless Service

This will add a new service to the solution. The newly created stateless application project contains a program.cs and an EventProcessorHostServie.cs file. The program.cs includes the main() method which registers this service. The EventProcessorHost file derives from the StatelessService class and contains the Runasync method which is the entry point for the service and a CreateServiceInstanceListener method used for creating a listener

IoTHub provides an EventHub compatible endpoint through which we can retrieve events/messages. We will be using the EventProcessorHost agent to receive event data. To use the EventProcessorHost, we need to create a class first which implements the IEventProcessor interface. The IEventProcessor interface has four methods

  • OpenAsync – Used for initializing the EventProcessor
  • CloseAsync – Called when the EventProcessor is stopped
  • ProcessErrorAsync – Used when there is an error while receiving the messages from the IoTHub
  • ProcessEventsAsync – The actual method which receives batches of messages.

Let’s create a class and name it as EventProcessorReceiver.cs. This class will implement the IEventProcessor. Then add the EventHubs Nuget packages for ‘.Net Standard’. We will need both the “Microsoft.Azure.EventHubs” and “Microsoft.Azure.EventHubs.Procesor”. Now we will go to the ProcessEventsAsync method and add our logic.


  public async Task ProcessEventsAsync(PartitionContext context, IEnumerable messages)
        {
            if (messages == null)
            {
                return;
            }
            try
            {
                
                foreach (EventData message in messages)
                {
                    
                    AircraftSensorModel sensorEvent = Newtonsoft.Json.JsonConvert.DeserializeObject
                                                        (Encoding.UTF8.GetString(message.Body.Array));
                    var actor = GetActor(sensorEvent.AircraftId);
                    await actor.AddSensorData(sensorEvent.datetime, sensorEvent);
                }

                await context.CheckpointAsync();
            }
            catch (Exception e)
            {
                ServiceEventSource.Current.Message(e.Message);
            }
        }

The ProcessEventsAsync provides batches of EventData messages from the IoTHub. We need to enumerate through the messages batch and for each of the message deserialize it to our model object and then call the GetActor() method which returns an object for the specific actor instance which will be responsible for maintaining the state of this particular aircraft. We are calling the AddSensorData on the actor instance to store the sensor data.

Once all the messages are processed in the batch, we will call the PartitionContext. CheckpointAsync() to checkpoint the progress we have made in the Eventhub Partition stream. This will ensure that if for some reason our code crashes we will start processing messages from after the checkpoint and avoid processing the same messages again.

Next, we will implement the GetActor method.


private IFlightActor GetActor(long aircraftId)
        {
            if (collection.ContainsKey(aircraftId))
            {
                return collection[aircraftId];
            }
            else
            {
                lock (collection)
                {
                    var flightActor = ActorProxy.Create(
                                        new ActorId(aircraftId),
                                        new Uri("fabric:/FlightDataTracker/FlightActorService"));
                    collection.Add(aircraftId, flightActor);
                    return flightActor;
                }
            }
        }

The GetActor method uses the ActorProxy to create an instance of the FlightActorService if it does not exist already. Notice that we pass the actor Id while creating the Actor service. This actor id uniquely identifies a particular actor instance which is responsible for maintaining the state of a specific aircraft. We are also passing the name of the service as a parameter. The name will be resolved to the endpoint address of the service using the Service Fabric Naming Service.

We will now create another class and name it as EventHostProcessorListner.cs. This class will be used to register our EventProcessorHost class.  In this class, we will implement the ICommunicationListener which has three methods OenAsync, CloseAsync, and Abort. We will be registering our EventProcessorHost inside the OpenAsync method.


public async Task OpenAsync(CancellationToken cancellationToken)
        {
            var eventProcessorHost = new EventProcessorHost(
                                    EventHubName,
                                    ConsumerGroupName,
                                    EventHubConnectionString,
                                    StorageConnectionString,
                                    StorageContainerName);
            await eventProcessorHost.RegisterEventProcessorAsync();
            
            return this.EventHubName;
        }

While registering the EventProcessor, we need to provide specific information

  1. EventHubName – The name of the IoTHub in this case
  2. ConsumerGroupName– The consumer group in IoTHub allows processors to process messages from IoTHub partitions parallelly. ConsumerGroup provides a view of the IoTHub for each processor. When you create an IoTHub, a consumer group comes by default and its named as “$Default.” You can create a separate one as well by going to the “Endpoints” section then click on events.
  3. EventHubConnectionString– IoTHub provides a EventHub compatible connection string. This is available if you go to the Endpoint section in your IoTHub and then select events.
  4. StorageConnectionString– To use EventProcessorHost separate storage needs to be created. Specify the connection string of the storage account here.
  5. StorageContainername– Create an empty container inside the storage account and provide the name.

Consumer Group Creation and Event Hub Compatible Connection String

Next we call the RegisterEventProcessorAsync and specifies our EventProcessor(EventProcessorReceiver) class. Now our EventProcessor is configured now we need to set up the listener for our stateless service. In our EventProcessorHost class modify the CreateServiceListener method and add the below code.


protected override IEnumerable CreateServiceInstanceListeners()
        {
            return new[]
            {
                new ServiceInstanceListener(s => new EventHostProcessorListner(s))
            };
        }

Now our Service Fabric application is ready. We will test this by creating a simple console application which will simulate the process of sending events to IoTHub. Before we write our device simulator, we will go ahead and create an IoTHub resource in Azure. IoTHub allows for per device authentication. So we will register our device first inside IoTHub. Once our device is registered inside IoTHub, a connection string will be created which will contain the SharedAccessKey for device authentication.

IoTHub Device Registration

Let’s create the console application and add the following code


class Program
    {
        static DeviceClient deviceClient;
        static string connectionString = "Your Connection String";
        static void Main(string[] args)
        {

            Console.WriteLine("Send Cloud-to-Device messagen");
            deviceClient = DeviceClient.CreateFromConnectionString(connectionString);
            while (true)
            {
                var data = GetSensorData();
                SendCloudToDeviceMessageAsync(data).Wait();
                Thread.Sleep(2000);
            }
            
        }

        private static AircraftSensorModel GetSensorData()
        {
            var aircraft = new AircraftSensorModel();
            aircraft.AircraftId = 26;
            aircraft.datetime = DateTime.UtcNow;
            aircraft.Temperature = 60;
            aircraft.Humidity = 70;
            aircraft.DeviceId = 61234789;
            aircraft.EngineRPM = 15000;

            return aircraft;
        }

        private async static Task SendCloudToDeviceMessageAsync(AircraftSensorModel data)
        {
            var payload = JsonConvert.SerializeObject(data);
            
            var commandMessage = new Microsoft.Azure.Devices.Client.Message(Encoding.UTF8.GetBytes(payload));
            await deviceClient.SendEventAsync(commandMessage);

            Console.WriteLine("Msg Sent" + payload);
            Console.WriteLine("=========================");
        }


        struct AircraftSensorModel
        {
            public long AircraftId { get; set; }
            public long DeviceId { get; set; }
            public double Temperature { get; set; }
            public double Humidity { get; set; }
            public int EngineRPM { get; set; }
            public DateTime datetime { get; set; }
        }
   }

We will need the “Microsoft.Azure.Devices.Client” NuGet package to make a call to send our payload.

Next, we will add an ASP.Net Core WebAPI project to our service fabric application. This API will allow us to retrieve data from the actor instance. Let’s add another service to this service fabric project. This time we will select a Stateless ASP.Net Core Project and name it as FlightDataTracker.API.

API Project Structure

API Project Structure

If you are familiar with an ASP.NET core web API, then the project structure  will look familiar. The program.cs registers the service. The API.cs class is where you set up the Service Instance Listener. Now, since this is a .Net Core WebApi project we will use the WebHostBuilder to configure this app with Kestrel. As its a WebAPI you have the Startup.cs file where you can add services to the container using the ConfigureServices method or configure the HTTP pipeline using the Configure method. We will add a new controller and write code to call the Actor Service.

Here is our controller code.

[Route("api/[controller]")]
    public class FlightDataActorController : Controller
    {
        private IFlightActor actor;
        
        [HttpGet("GetAircraftSensorData/{aircraftId}")]
        public async Task<List> GetAircraftSensorData(int aircraftId)
        {
            var actor = GetActor(aircraftId);
            return await actor.GetSensorData();
        }

        private IFlightActor GetActor(int aircraftId)
        {
            return ActorProxy.Create(
                new ActorId(aircraftId),
                new Uri("fabric:/FlightDataTracker/FlightActorService"));
        }
    }

We are using the actor proxy to get an instance of the FlightActor Service and then calling the
GetSensorData() method to retrieve the sensor data.

Now its time to test our application. We will be using the service fabric local cluster to deploy our code. If you have already installed the Service Fabric SDK then search for the Service Fabric Local Cluster Manager in your machine and run it.

Service Fabric Local Cluster Manager Running in Dev Machine

Service Fabric local cluster manager provides a 1 node cluster or a 5 node cluster in which you can test your application. Right-click on your Service Fabric project and select Publish.

Service Fabric Application Publish

Service Fabric Application Publish

Select the Connection Endpoint as  Local Cluster and the Application Parameter File as Local.5Node.xml as we will be deploying our application in a 5 node local cluster. Next click on publish

Once Published click on the Service Fabric Local cluster and select manage the local cluster.

Manage Loca Cluster

Manage Local Cluster

This will open up the Service Fabric Explorer. The ‘explorer‘ is a great tool available out of the box. Here you can monitor the health of your applications, check errors, create new instances of your service. We will talk about the explorer in a later article.

For the time being you should be able to see something like this.

Service Fabric Explorer

Service Fabric Explorer

If there are any errors while registering the service, you should be able to see errors here.

So now our services are running. We will run our FlightDataSim console application.

Aircraft Sensor Simulator Application

Aircraft Sensor Simulator Application

Next, we will use postman to make a call to our API and here is the result.

postman

postman

In our next post we will be looking at raising alerts from our actor service whenever there is an anomaly is detected in the sensor data.

Disclaimer: The Questions and Answers provided on https://www.gigxp.com are for general information purposes only. We make no representations or warranties of any kind, express or implied, about the completeness, accuracy, reliability, suitability or availability with respect to the website or the information, products, services, or related graphics contained on the website for any purpose.

What's your reaction?

Excited
0
Happy
0
In Love
0
Not Sure
0
Silly
0
Sumanta Kar
A Software Developer by profession working on Azure and .Net for more than 4 years. Passionate about architecture, design and latest technologies. A passionate football fan and a gamer at other times.

    You may also like

    Comments are closed.

    More in:Azure