Explorando o EventStore – Overview


Recentemente em dois vídeos onde abordei a técnica de event sourcing, fiz uso de uma espécie de base de dados que armazena eventos que ocorrem na aplicação, e que é chamada de Event Store. Naqueles exemplos, a ideia era armazenar os eventos gerados pelo domínio e que eram, mais tarde, utilizando para reconstruir o estado atual do objeto em questão.

Como lá abordamos o Event Store de forma bem superficial, pois o foco era no conceito e não na ferramenta, neste artigo vamos aprofundar um pouco mais para explorar a API e os recursos que este repositório pode oferecer para armazenarmos e extrairmos os eventos para uso em cenários de event sourcing ou também em cenários que demandam um service bus, onde podemos publicar conteúdo e os assinantes possam ser notificados e/ou interrogar a base para saber se algo ocorreu e tomar alguma decisão sobre isso.

O primeiro passo é fazer o download através do site oficial. Este pacote trará o executável que é a “parte servidor”, ou seja, a própria base onde os eventos serão armazenados. Além disso, há também uma console administrativa web, que é acessada através de um navegador e permitirá interagir com a base e realizar diversas configurações de funcionamento e monitoramento.

Para exemplificar o uso de alguns recursos o Event Store, vamos criar três aplicações: Loja, Nota Fiscal e Transportadora. Na primeira, o cliente irá postar o pedido de compra. A segunda, Nota Fiscal, irá interrogar periodicamente o servidor procurando por novos pedidos adicionados e, consequentemente, emitir a nota fiscal. Por fim, a Transportadora irá monitorar qualquer nova nota fiscal emitida, e irá imediatamente iniciar o processo de entrega do produto.

O primeiro passo é iniciar o servidor que passará a receber as solicitações. Basicamente depois de extraído o pacote do download, basta executar o comando abaixo no prompt de comando e ele passará a receber as requisições. E depois do servidor inicializado, você pode ir até a console web administrativa através do seguinte endereço: http://localhost:2113, login “admin” e a senha “changeit”.

C:EventStore.ClusterNode.exe

Depois do servidor rodando, precisamos começar a codificar as aplicações, e para isso, o projeto também fornece diversas APIs para diferentes tecnologias, incluindo para .NET que é chamada de EventStore.Client. Utilize o comando abaixo para baixar e instalar ele através do Nuget:

PM> Install-Package EventStore.Client

Depois das três aplicações estarem com o pacote do cliente devidamente instalado, chega o momento de estabelecermos a conexão com o servidor para postar o evento de novo pedido criado na loja. A classe que serve para estabelecer a ligação com o servidor é a EventStoreConnection, e pode (e deve) ser mantida uma por aplicação para melhor reutilização dos recursos que a mesma utiliza para executar suas atividades. É importante notar que os métodos expostos são assíncronos, mas como estou utilizando aplicações console para o exemplo, forçarei a execução síncrona de todos eles.

A conexão se dá através do protocolo TCP, onde você pode utilizar o método estático Create para informar o endereço a partir de uma URI, e a porta padrão para o servidor é a 1113. Como dito acima, estou abrindo a conexão explicitamente e aguardando que a mesma seja concluída para dar sequência na utilização.

using (var conn = EventStoreConnection.Create(new Uri(“tcp://localhost:1113”)))
{
    conn.ConnectAsync().Wait();
}

Voltando ao exemplo de testes, a primeira aplicação se resume a postar um novo evento informando que um novo pedido foi criado. Note através do código abaixo como isso é definido. Não vou me preocupar em colocar a definição das classes aqui porque são muito simples e não há qualquer informação relevante dentro delas; são classes POCOs com as propriedades que refletem nosso negócio.

var
novoPedido = new NovoPedidoCriado(new Pedido()
{
    NomeDoCliente = “Israel Aece”,
    ValorTotal = 1200
});

Depois do evento criado, precisamos postar o mesmo, mas antes precisamos entender um conceito do
Event Store, chamado de Stream. Como já suspeitamos, é a forma que temos de agrupar os eventos por qualquer regra ou seção da aplicação que desejamos. Para o nosso caso, vamos chamar o stream de “Ecommerce.Pedidos”, que irá concentrar todos os eventos relacionados aos pedidos realizados em nosso loja.

A classe de conexão fornece um método chamado 
AppendToStreamAsync, e além de especificarmos o nome do stream onde queremos armazenar, temos que passar o objeto (evento) que deve ser persistido. Para encapsular e descrever o evento do ponto de vista da infraestrutura, o Event Store possui uma classe chamada EventData. Essa classe além de ter algumas características relacionadas a infraestrutura, recebe também em seu construtor o Id do evento, o nome e o objeto que representa o mesmo, e que no nosso caso está armazenado na variável “novoPedido”.

A serialização pode ser binária, mas dependendo com quem vamos interagir/integrar, utilizar JSON pode ser uma opção bem mais interessante. E para isso, estou recorrendo ao
Json.NET (Newtonsoft) para serializar e deserializar os objetos.

conn.AppendToStreamAsync(
    “Ecommerce.Pedidos”,
    ExpectedVersion.Any,
    GerarEvento(novoPedido)).Wait();

private static
EventData GerarEvento(Evento evento)
{
    return new EventData(
        evento.Id,
        evento.GetType().Name,
        true,
        Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(evento)), null);
}

Depois que o evento for postado, se acessarmos a console administrativa do
Event Store, veremos o stream e o evento criado, conforme é possível visualizar na imagem abaixo:



Na sequência vamos recorrer ao projeto de Nota Fiscal, que ficará interrogando periodicamente o servidor em busca de eventos do tipo
NovoPedidoCriado. Como os eventos são acumulativos do lado do servidor, nós, o cliente, devemos controlar os eventos que já foram processados pela aplicação, e por isso, estamos armazenando em uma variável o número do último evento, para que em uma próxima execução não corra o risco de gerar notas fiscais para os pedidos já gerados. E vale ressaltar que isso é um exemplo, e que em uma cenário real você precisa armazenar este contador em uma local seguro que resista a possíveis reinicializações da aplicação.

Da mesma forma que fazemos na criação do evento, para ler também precisamos criar a conexão com o servidor, e neste caso, vamos utilizar o método 
ReadStreamEventsForwardAsync, e como o próprio nome diz, lê os eventos na ordem em que eles foram postados.

var ultimoEventoProcessado = 0;

using
(var conn = EventStoreConnection.Create(new Uri(“tcp://localhost:1113”)))
{
    conn.ConnectAsync().Wait();

while ((Console.ReadLine() != null))
    {

        var itens = conn.ReadStreamEventsForwardAsync(
“Ecommerce.Pedidos”, 
ultimoEventoProcessado + 1, 200, false).Result;

if (itens.Events.Any())
        {
            foreach (var e in itens.Events)
                Processar(conn, e);
        }
        else
        {
            Log.Yellow(“Não há eventos para processar.”);
        }
    }
}

Os eventos extraídos são representados pela classe ResolvedEvent, que entre várias informações sobre o evento, temos o evento gerado e serializado pela nossa aplicação de novo pedido criado. Como podemos notar abaixo, o método Processar identifica que se trata de um evento deste tipo, deserializa o mesmo, o processa e depois disso, dispara um evento dizendo que a respectiva nota fiscal foi emitida, através de um novo evento.

internal static void Processar(
IEventStoreConnection conexao, 
ResolvedEvent dadosDoEvento)
{
    if (dadosDoEvento.Event.EventType == “NovoPedidoCriado”)
    {
        var novoPedido = Extrair<NovoPedidoCriado>(dadosDoEvento);

Log.Green(“NOVO PEDIDO”);
        Log.Green(“Id: ” + novoPedido.Id, 4);
        Log.Green(“Data: ” + novoPedido.Data, 4);
        Log.Green(“Cliente: ” + novoPedido.NomeDoCliente, 4);
        Log.Green(“Valor: ” + novoPedido.ValorTotal.ToString(“N2”), 4);
        Log.Yellow(“Emitindo a Nota Fiscal do Pedido”, 4);
        Log.NewLine();

conexao.AppendToStreamAsync(
            “Ecommerce.Pedidos”,
            ExpectedVersion.Any,
GerarEvento(

new NotaFiscalEmitida(
novoPedido.NomeDoCliente, 
novoPedido.ValorTotal, 
“0001.0292.2999-2881-9918.11.9999/99”))).Wait();

ultimoEventoProcessado = dadosDoEvento.Event.EventNumber;
    }
}

private static TEvento Extrair<TEvento>(ResolvedEvent dadosDoEvento) 
where TEvento : Evento
{
return JsonConvert.DeserializeObject<TEvento>(

Encoding.UTF8.GetString(dadosDoEvento.Event.Data));
}

E, para finalizar o processo, temos agora a aplicação da transportadora, que ao invés de periodicamente procurar por um evento de nota fiscal emitida, assina o stream e tão logo quando ele for incluído, a transportadora será automaticamente notificada do evento gerado. E da mesma forma que antes, abrimos a conexão, e agora utilizamos o método SubscribeToStreamAsync, informando o nome do stream que desejamos monitorar e qualquer novo evento, disparamos o método Processar, conforme pode ser visualizado abaixo:

using (var conn = EventStoreConnection.Create(new Uri(“tcp://localhost:1113”)))
{
    conn.ConnectAsync().Wait();

    conn.SubscribeToStreamAsync(
“Ecommerce.Pedidos”, 
false
(a, e) => Processar(e)).Wait();

Console.ReadLine();
}

internal static void Processar(ResolvedEvent dadosDoEvento)
{
    if (dadosDoEvento.Event.EventType == “NotaFiscalEmitida”)
    {
        var novoPedido = Extrair<NotaFiscalEmitida>(dadosDoEvento);

Log.Green(“NOVO PEDIDO – NOTA FISCAL EMITIDA”);
        Log.Green(“Id: ” + novoPedido.Id, 4);
        Log.Green(“Cliente: ” + novoPedido.NomeDoCliente, 4);
        Log.Green(“Valor: ” + novoPedido.ValorTotal.ToString(“N2”), 4);
        Log.Green(“NF-e: ” + novoPedido.ChaveDaNotaFiscalEletronica, 4);
        Log.NewLine();
    }
}

E agora, se colocarmos as três aplicações lado a lado, é possível visualizar o efeito do processamento nas três, onde uma gera o pedido, a segunda emite a nota fiscal e a terceira é notificada para iniciar o processo de transporte.

É claro que isso é um exemplo simplista, mas pode ser considerado algo parecido em uma escala maior, como por exemplo, utilizar este recurso para diálogo entre contextos do domínio, sincronização de bases de (escrita e leitura) em ambiente de CQRS ou para desempenhar qualquer outra tarefa em modo assíncrono.

Anúncios

Deixe uma resposta

Preencha os seus dados abaixo ou clique em um ícone para log in:

Logotipo do WordPress.com

Você está comentando utilizando sua conta WordPress.com. Sair / Alterar )

Imagem do Twitter

Você está comentando utilizando sua conta Twitter. Sair / Alterar )

Foto do Facebook

Você está comentando utilizando sua conta Facebook. Sair / Alterar )

Foto do Google+

Você está comentando utilizando sua conta Google+. Sair / Alterar )

Conectando a %s