blexin

Sviluppo
Consulenza e
Formazione IT


Blog

Evolvi la tua azienda

Vediamo insieme gli strumenti che ci mette a disposizione Azure per la gestione dei messaggi

Un mondo di messaggi con Azure

Mercoledì 4 Marzo 2020

Negli articoli precedenti, abbiamo visto l’importanza di disaccoppiare le comunicazioni in applicazioni enterprise, e abbiamo utilizzato soluzioni on-premise, come RabbitMQ e astrazioni del trasporto, come Rebus. Il cloud offre comunque strumenti nativi per la gestione dei messaggi. In Azure, in particolare, abbiamo a disposizione tre servizi che hanno proprio lo scopo, in contesti differenti, di aiutarci a realizzare questo disaccoppiamento: Azure Service Bus, Azure Event Hub e Azure Event Grid.

Azure Service Bus è più adatto in quegli scenari enterprise dove si ha la necessità di scambiare messaggi il cui contenuto ha un alto valore. In questi contesti, chi crea il messaggio ha interesse che venga gestito da uno specifico destinatario, facendo differenza tra un messaggio che esprime una richiesta (spesso detto comando) e un evento, sottoscrivibile dalle parti di sistema interessate a conoscere che un dato “fatto” è accaduto (perfetto quindi per implementare scenari CQRS).

Event Hub e Event Grid si concentrano invece sul concetto di evento. Azure Event Hub è pensato per gestire serie di eventi o streaming in real-time, quindi con un particolare accento sulle performance della comunicazione. È particolarmente adatto ad applicazioni di analisi e di telemetria. Event Grid, invece, è pensato per quelle applicazioni o servizi event-based le cui elaborazioni sono attivate da eventi che notificano un cambiamento di stato, ideale quindi per l’integrazione di servizi diversi sul cloud o in scenari ibridi.

Analizziamoli singolarmente per farci un’idea delle loro funzionalità.

Azure Service Bus

Azure Service Bus (ASB) è un broker di messaggi per l’integrazione enterprise, il cui scopo è disaccoppiare applicazioni e servizi e offrire uno strumento di comunicazione affidabile per lo scambio di informazioni.
I messaggi possono essere inviati mediante l’uso di code o di topic. Se si utilizzano code, soluzione adottata per comunicazioni di tipo end-to-end, i messaggi sono ordinati e salvati all’arrivo e vengono consegnati solo quando vengono effettivamente richiesti. Si possono invece utilizzare i topics quando si vuole realizzare una comunicazione di tipo publish/subscribe. I messaggi sono inviati al topic da un publisher ed ogni subscriber riceve una copia del messaggio.

Azure Service Bus offre diverse funzionalità avanzate tra le quali:

  • Coda di Dead-Letter, dove vengono salvati i messaggi che non possono essere processati.
  • Consegna programmata, la possibilità di inviare i messaggi in maniera pianificata.
  • Supporto alle transazioni: ASB è un broker transazionale, e consente il raggruppamento di operazioni in relazione ad una delle entità di messaggistica quali code e topics. La transazionalità del broker garantisce l’assenza di perdite e duplicazione dei messaggi.
  • Supporto per l’invio in batch: si può ritardare l’invio dei messaggi e se, in questo lasso di tempo, arrivano nuovi messaggi dal mittente verranno inviati tutti in un singolo batch.
  • Eliminazione automatica della coda in caso di inattività.
  • Rilevazione di duplicati
  • Disaster Recovery Geografico: in caso di downtime questa opzione permette di continuare l’elaborazione in un data center presente in un'area geografica diversa.

Vediamo ora come possiamo inviare e ricevere messaggi con ASB utilizzando i topic.

Nell’esempio, avremo un publisher che pubblica messaggi su un topic di ASB ed un subscriber che si sottoscrive ad esso.

Andiamo sul portale Azure e creiamo una nuova risorsa Azure Service Bus.

Cliccando su Create ci viene mostrata la schermata per creare il Namespace, ovvero un contenitore per i componenti di messaggistica.

Scegliamo la versione standard o premium per avere la possibilità di usare una comunicazione di tipo publish/subscribe basata su topic. L’opzione Make this namespace zone redundant ci consente di replicare il namespace per avere ridondanza nelle availability zones. Selezionato il namespace, creiamo un topic, scegliendo tra le Entities disponibili la voce Topics:

Scegliamo il nome del topic e le caratteristiche che deve avere in termini di capacità, durata dei messaggi e rilevazione di duplicati, cliccando infine su Create:

Una volta creato il topic, possiamo creare una sottoscrizione ad esso. Troviamo la voce Subscription nelle Entities del Topic. Tra le opzioni di creazione, possiamo scegliere di inviare i messaggi scaduti ad una coda di sistema apposita per gestire queste situazioni, detta di deadletter, oppure abilitare le sessioni per avere una gestione FIFO (First-In-First-Out) dei messaggi.

Nella pagina di overview del Topic, vediamo la subscription creata, e possiamo notare che il message count di tale subscription è attualmente zero:

Creiamo adesso una console application che rappresenterà il nostro Publisher, e aggiungiamo il pacchetto Nuget Microsoft.Azure.ServiceBus. Istanziamo un TopicClient passando come parametro la connection string del Service Bus, che si può ottenere mediante il menu Settings > Shared access policies del Service Bus sul portale Azure cliccando sulla Policy, e quindi sul nome del topic che abbiamo creato. Inserito il numero di messaggi che vogliamo pubblicare, non facciamo altro che invocare il metodo SendAsync() del TopicClient a cui passiamo un messaggio che verrà inviato in maniera asincrona:

namespace AzureServiceBusPublisher
{
    class Program
    {
        const string AzureServiceBusConnectionString = "MyConnectionString";
        const string AzureServiceBusTopic = "demotopic";
        static ITopicClient topicClient;

        public static async Task Main(string[] args)
        {
            Console.WriteLine("Type the number of messages that you want to publish -> ");
            int numberOfMessage = int.Parse(Console.ReadLine());
            topicClient = new TopicClient(AzureServiceBusConnectionString, AzureServiceBusTopic);

            for (int i = 0; i < numberOfMessage; i++)
            {
                string messageBody = $"Demo Message {i}";
                var message = new Message(Encoding.UTF8.GetBytes(messageBody));
                await topicClient.SendAsync(message);
            }
            await topicClient.CloseAsync();
        }
    }
}

Collegandosi alla Overview di DemoTopic notiamo che il message count è pari al numero di messaggi che abbiamo scelto di inviare.

Realizziamo ora l’applicazione che farà da Subscriber, e sarà leggermente più complessa di quella realizzata per il publisher. In questo caso, istanziamo un SubscriptionClient che riceve come parametri la connection string del Service Bus, il nome del Topic e il nome della Subscription alla quale, per l’appunto, ci vogliamo sottoscrivere.
Per fare ciò, bisogna registrare un handler di messaggi, ovvero un metodo che si occupa di processarli e che abbiamo chiamato nell’esempio ProcessMessages. Questo metodo decodifica il messaggio, ne stampa il contenuto e completa l’elaborazione del messaggio eliminandolo dalla subscription.

Oltre al nome del metodo creato, subscriptionClient.RegisterMessageHandler() richiede come parametro anche un MessageHandlerOptions, in cui si vanno a definire alcune proprietà del message Handler.

namespace AzureServiceBusSubscriber
{
    class Program
    {
        const string AzureServiceBusConnectionString = "MyConnectionString";
        const string AzureServiceBusTopic = "demotopic";
        const string AzureServiceBusSubscription = "demoTopicSubscription";
        static ISubscriptionClient subscriptionClient;

        static async Task Main(string[] args)
        {
            subscriptionClient = new SubscriptionClient(AzureServiceBusConnectionString, AzureServiceBusTopic, AzureServiceBusSubscription);

            var messageHandlerOptions = new MessageHandlerOptions(ExceptionReceivedHandler)
            {
                AutoComplete = false,
                MaxConcurrentCalls = 1,                
            };

            subscriptionClient.RegisterMessageHandler(ProcessMessages, messageHandlerOptions);

            Console.ReadKey();

            await subscriptionClient.CloseAsync();

        }

        static async Task ProcessMessages(Message message, CancellationToken token)
        {
            string messageReceived = Encoding.UTF8.GetString(message.Body);
            Console.WriteLine("Received -> " + messageReceived);
            await subscriptionClient.CompleteAsync(message.SystemProperties.LockToken);
        }

        static Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs)
        {
            Console.WriteLine($"Exception: {exceptionReceivedEventArgs.Exception}");
            return Task.CompletedTask;
        }
    }
}

Lanciamo il programma del Subscriber ed otteniamo il seguente risultato:

Il message count di DemoTopicSubscription è ritornato a zero e quindi abbiamo consumato tutti i messaggi pubblicati.

Azure Event Hubs

Azure Event Hubs viene definito come una piattaforma di streaming di Big Data e come un servizio di Event Ingestor. Piattaforma, in quanto, a tutti gli effetti, è parte della Platform-as-a-Service (PaaS), mentre Event Ingestor è riferito al fatto che consente di disaccoppiare il producer di uno stream di eventi dal consumer.

Event Hubs può ricevere ed elaborare milioni di eventi al secondo e può essere utilizzato sia per elaborazioni di analisi real-time o di telemetria sia per acquisire ed elaborare dati in batch o per archiviarli in uno storage (ad es. Azure Blob Storage).
Consente l’elaborazione simultanea del flusso dati da parte di applicazioni diverse, utilizzando un modello di consumer partizionato, in cui ogni consumer legge solo un sottoinsieme specifico dello stream di eventi. 

Gli Event Producers rappresentano qualsiasi entità che può inviare dati all’Event Hub tramite HTTP, AMQP o Apache Kafka (consente quindi di lavorare con applicazioni basate su Kafka). Le partizioni sono sequenze ordinate di eventi, ogni consumer group legge lo stream di eventi in maniera indipendente creando una vista separata dello stream per ogni applicazione interessata. Infine, ci sono gli Event Receiver ovvero le entità che leggono i dati dall’Event Hub. Vediamo un esempio di come possiamo inviare e ricevere Eventi con Event Hubs.

Sul portale Azure creiamo una nuova risorsa Event Hubs.

Clicchiamo su Create  e andiamo nella sezione del Namespace appena creato per aggiungere un Event Hub.

Diamo un nome all’Event Hub e scegliamo il numero di partizioni che vogliamo utilizzare, quindi clicchiamo su Create.

Realizziamo adesso una console application che farà da Event Producer. Creato il progetto, aggiungiamo il pacchetto Nuget Azure.Messaging.EventHubs. Come per l’esempio precedente, viene chiesto all’utente il numero di eventi che vuole inviare.

Per pubblicare gli eventi, istanziamo un EventHubProducerClient, che ha come parametri la stringa di connessione al namespace e il nome dell’Event Hub.
Costruiamo successivamente un batch di eventi, che verranno inviati in maniera asincrona verso l’Event Hub. Non avendo specificato nessuna partizione, questi verranno inviati alla partizione di default che viene creata automaticamente.

namespace EventHubsProducer
{
    class Program
    {
        const string EventHubsConnectionString = "MyConnectionString";
        const string EventHub = "eventhubdemo";

        public static async Task Main(string[] args)
        {
            Console.WriteLine("Type the number of events that you want to publish -> ");
            int numberOfEvent = int.Parse(Console.ReadLine());


            await using var eventHubProducerClient = new EventHubProducerClient(EventHubsConnectionString, EventHub);
            using EventDataBatch eventDataBatch = await eventHubProducerClient.CreateBatchAsync();

            for (int i = 0; i < numberOfEvent; i++)
            {
                string eventBody = $"Demo Message {i}";
                eventDataBatch.TryAdd(new Azure.Messaging.EventHubs.EventData(Encoding.UTF8.GetBytes(eventBody)));
            }

            await eventHubProducerClient.SendAsync(eventDataBatch);
        }
    }
}

Come possiamo vedere dalla sezione di metriche della overview del namespace EventHUbs-Demo, abbiamo effettivamente inviato cinque messaggi.

Creiamo adesso un applicazione che andrà a consumare tali eventi. Per fare ciò, oltre ad installare il pacchetto Nuget Azure.Messaging.EventHubs, bisogna aggiungere il pacchetto Azure.Messaging.EventHubs.Processor che ci espone un EventProcessorClient. Questo tipo di client necessita della presenza di uno storage per poter salvare dei checkpoints in base agli eventi elaborati all’interno di una partizione, in modo che l’elaborazione possa, se necessario, essere ripresa da quel punto.

Creiamo quindi un Azure Storage Account:

Una volta scelto il nome, possiamo proseguire con le impostazioni base che andranno a creare quindi lo Storage Account.

Creiamo successivamente un container dal menu Blob Service, un tipo di archiviazione per dati non strutturati, che per il nostro esempio chiameremo eventhubcontainer. Creiamo quindi un'istanza del container che verrà utilizzata dal EventProcessorClient insieme al ConsumerGroup (che in questo caso sarà quello di default), la stringa di connessione del namespace e il nome del Event Hub.

Prima di poter iniziare a processare gli eventi, bisogna registrare obbligatoriamente gli eventi ProcessEventAsync e ProcessErrorAsync: il primo è responsabile dell’elaborazione degli eventi provenienti dall’Event Hub, mentre il secondo si occupa degli errori.

Con StartProcessingAsync() iniziamo l’elaborazione, ritardandola di qualche secondo per evitare che il processo non completi correttamente l’elaborazione. Terminiamo l’elaborazione con il metodo StopProcessingAsync().

namespace EventHubsReceiver
{
    class Program
    {
        const string EventHubsConnectionString = "EventHubConnectionString";
        const string EventHub = "eventhubdemo";
        const string AzureBlobStorageConnectionString = "AzureBlobStorageConnectionString";
        const string AzureBlobStorageContainer = "eventhubcontainer";
        public static async Task Main(string[] args)
        {
            string eventHubConsumerClient = EventHubConsumerClient.DefaultConsumerGroupName;

            BlobContainerClient blobContainerClient = new BlobContainerClient(AzureBlobStorageConnectionString, AzureBlobStorageContainer);

            EventProcessorClient eventProcessorClient = new EventProcessorClient(blobContainerClient, eventHubConsumerClient, EventHubsConnectionString, EventHub);
            eventProcessorClient.ProcessEventAsync += ProcessingEvent;
            eventProcessorClient.ProcessErrorAsync += ProcessingError;

            await eventProcessorClient.StartProcessingAsync();

            await Task.Delay(TimeSpan.FromSeconds(10));

            await eventProcessorClient.StopProcessingAsync();

        }

        private static Task ProcessingEvent (ProcessEventArgs processEventArgs)
        {
            Console.WriteLine("Received -> " + Encoding.UTF8.GetString(processEventArgs.Data.Body.ToArray()));
            return Task.CompletedTask;
        }

        private static Task ProcessingError(ProcessErrorEventArgs processErrorEventArgs)
        {
            Console.WriteLine(processErrorEventArgs.Exception.Message);
            return Task.CompletedTask;
        }
    }
}

Il risultato che otteniamo è il seguente:

Abbiamo effettivamente consumato i messaggi prodotti in precedenza. Se guardiamo il Blob container creato in precedenza, possiamo notare il checkpoint relativo all’elaborazione degli eventi.

Azure Event Grid

Event Grid è un connettore di eventi che consente di creare architetture event-based. La comunicazione che implementa è di tipo publish/subscribe.
Può sottoscriversi a diverse risorse messe a disposizione da Azure, oppure a dei topic di eventi personalizzati (Event Sources). Tali eventi vengono poi inviati a degli event handlers che li gestiscono o li elaborano.

Oltre alle caratteristiche citate, può supportare un numero considerevole di subscribers e c’è la possibilità di filtrare gli eventi, ad esempio in base al proprio tipo.

Vediamo come realizzare una comunicazione publish/subscribe con Event Grid.
Dal portale Azure creiamo un nuovo NameSpace che chiameremo EventGridDemo.

A differenza degli altri due Servizi, per Event Grid partiremo dal Subscriber.
Il Subscriber sarà infatti una Azure Function, servizio di Azure che consente di eseguire codice senza doversi preoccupare dell’infrastruttura, creando quelle che vengono definite applicazioni Serverless. Inoltre, questo tipo di funzioni sono event-driven, in quanto vengono scatenate da trigger, generati in base al verificarsi di diversi tipi di eventi.

Creiamo quindi un nuovo progetto da Visual Studio e scegliamo come template Azure Function. Scelto il nome dell’applicazione, ci viene chiesto di scegliere il tipo di trigger che azionerà la funzione: in questo esempio abbiamo scelto di utilizzare un trigger HTTP.

Creiamo un EventGridSubscriber e un mapping con un topic personalizzato, Demo.Message, dal quale vogliamo leggere gli eventi di tipo DemoEvent. Utilizziamo poi il metodo DeserializeEventGridEvents() per de-serializzare gli eventi della Event Grid.

Successivamente, andiamo a valutare ogni singolo evento, quando incontriamo un evento del tipo DemoEvent ne stampiamo l’EventMessage. Una volta terminata l’elaborazione degli eventi, inviamo un 200 OK HTTP.

È importante creare una risposta SubscriptionValidationResponse per gli eventi del tipo SubscriptionValidationEventData per completare l’handshake di sottoscrizione agli eventi.

class DemoEvent
{
    [JsonProperty(PropertyName = "eventMessage")]
    public string EventMessage { get; set; }
}

namespace EventGridDemoSubscriber
{
    public static class Function1
    {
        [FunctionName("Function1")]
        public static async Task<HttpResponseMessage> Run([HttpTrigger(AuthorizationLevel.Function, "get", "post", Route = null)]HttpRequestMessage req, TraceWriter log)
        {
            const string CustomTopicEvent = "Demo.Message";

            log.Info("C# HTTP trigger function processed a request.");

            string requestEvent = await req.Content.ReadAsStringAsync();

            EventGridSubscriber eventGridSubscriber = new EventGridSubscriber();
            eventGridSubscriber.AddOrUpdateCustomEventMapping(CustomTopicEvent, typeof(DemoEvent));
            EventGridEvent[] eventGridEvents = eventGridSubscriber.DeserializeEventGridEvents(requestEvent);

            foreach (EventGridEvent eventGridEvent in eventGridEvents)
            {
                if (eventGridEvent.Data is SubscriptionValidationEventData)
                {
                    var eventData = (SubscriptionValidationEventData)eventGridEvent.Data;

                    var responseData = new SubscriptionValidationResponse()
                    {
                        ValidationResponse = eventData.ValidationCode
                    };

                    return req.CreateResponse(HttpStatusCode.OK, responseData);
                }
                else if (eventGridEvent.Data is DemoEvent)
                {
                    var eventData = (DemoEvent)eventGridEvent.Data;
                    log.Info("Received -> " + eventData.EventMessage);
                }
            }

            return req.CreateResponse(HttpStatusCode.OK);
        }
    }
}

Andiamo sulla solution e, cliccando con il tasto destro sul progetto, scegliamo l’opzione Publish. In questo modo, potremo pubblicare la nostra Azure Function.

Pubblicata la function su Azure, andiamo sull’Event Grid Topic creato in precedenza e creiamo una Event Subscription.

Digitiamo un nome e scegliamo WebHook come endpoint type, ovvero un endpoint HTTP che nel nostro caso coincide con l’url della Azure Function pubblicata in precedenza.

Abbiamo così creato una subscription che possiamo visualizzare nella pagina di overview dell’Event Grid Topic.

Realizziamo adesso la console application che sarà il nostro publisher. Creaiamo un EventGridClient e utilizziamo il metodo PublishEventsAsync a cui passiamo l’indirizzo dell’Endpoint e un metodo che restituisce una lista di EventGridEvent:

namespace EventGridDemoPublisher
{
    class DemoEvent
    {
        [JsonProperty(PropertyName = "eventMessage")]
        public string EventMessage { get; set; }
    }

    class Program
    {
        static void Main(string[] args)
        {
            string topicEndpoint = "EndpointAddress";
            string topicKey = "topickey";

            string topicEndpointHost = new Uri(topicEndpoint).Host;
            TopicCredentials topicCredentials = new TopicCredentials(topicKey);
            EventGridClient eventGridClient = new EventGridClient(topicCredentials);

            eventGridClient.PublishEventsAsync(topicEndpointHost, CreateEventList()).GetAwaiter().GetResult();
            Console.Write("Published events to Event Grid topic.");
            Console.ReadLine();
        }

        static IList<EventGridEvent> CreateEventList()
        {
            Console.WriteLine("Type the number of events that you want to publish -> ");
            int numberOfEvent = int.Parse(Console.ReadLine());

            List<EventGridEvent> eventGridEventsList = new List<EventGridEvent>();

            for (int i = 0; i < numberOfEvent; i++)
            {
                eventGridEventsList.Add(new EventGridEvent()
                {
                    Id = Guid.NewGuid().ToString(),
                    EventType = "Demo.Message",
                    Data = new DemoEvent()
                    {
                        EventMessage = $"Demo message {i}"
                    },
                    EventTime = DateTime.Now
                });
            }

            return eventGridEventsList;
        }
    }
}

Lanciamo il Publisher:

E, allo stesso modo, lanciamo la nostra Azure Function:

Come possiamo vedere, sono stati ricevuti i cinque messaggi pubblicati verso l’Event Grid Topic.

Conclusioni

Finisce qui la nostra panoramica su come gestire eventi in Azure. Abbiamo sicuramente solo scalfito la superficie, ma l’idea era quella di darvi le informazioni basilari per scegliere il servizio che meglio si adatta alle vostre esigenze e partire da lì. Vi consiglio di provare almeno una volta tutti e tre i servizi, magari con il codice utilizzato negli esempi che trovate qui: https://github.com/intersect88/AzureMessagingService

Alla prossima!

Autore

Servizi

Evolvi la tua azienda