Explorando o EventStore – Overview

Recentemente em dois vídeos onde abordei a técnica de event sourcing, fiz uso de uma espécie de base de dados que armazena eventos que ocorrem na aplicação, e que é chamada de Event Store. Naqueles exemplos, a ideia era armazenar os eventos gerados pelo domínio e que eram, mais tarde, utilizando para reconstruir o estado atual do objeto em questão.

Como lá abordamos o Event Store de forma bem superficial, pois o foco era no conceito e não na ferramenta, neste artigo vamos aprofundar um pouco mais para explorar a API e os recursos que este repositório pode oferecer para armazenarmos e extrairmos os eventos para uso em cenários de event sourcing ou também em cenários que demandam um service bus, onde podemos publicar conteúdo e os assinantes possam ser notificados e/ou interrogar a base para saber se algo ocorreu e tomar alguma decisão sobre isso.

O primeiro passo é fazer o download através do site oficial. Este pacote trará o executável que é a “parte servidor”, ou seja, a própria base onde os eventos serão armazenados. Além disso, há também uma console administrativa web, que é acessada através de um navegador e permitirá interagir com a base e realizar diversas configurações de funcionamento e monitoramento.

Para exemplificar o uso de alguns recursos o Event Store, vamos criar três aplicações: Loja, Nota Fiscal e Transportadora. Na primeira, o cliente irá postar o pedido de compra. A segunda, Nota Fiscal, irá interrogar periodicamente o servidor procurando por novos pedidos adicionados e, consequentemente, emitir a nota fiscal. Por fim, a Transportadora irá monitorar qualquer nova nota fiscal emitida, e irá imediatamente iniciar o processo de entrega do produto.

O primeiro passo é iniciar o servidor que passará a receber as solicitações. Basicamente depois de extraído o pacote do download, basta executar o comando abaixo no prompt de comando e ele passará a receber as requisições. E depois do servidor inicializado, você pode ir até a console web administrativa através do seguinte endereço: http://localhost:2113, login “admin” e a senha “changeit”.

C:EventStore.ClusterNode.exe

Depois do servidor rodando, precisamos começar a codificar as aplicações, e para isso, o projeto também fornece diversas APIs para diferentes tecnologias, incluindo para .NET que é chamada de EventStore.Client. Utilize o comando abaixo para baixar e instalar ele através do Nuget:

PM> Install-Package EventStore.Client

Depois das três aplicações estarem com o pacote do cliente devidamente instalado, chega o momento de estabelecermos a conexão com o servidor para postar o evento de novo pedido criado na loja. A classe que serve para estabelecer a ligação com o servidor é a EventStoreConnection, e pode (e deve) ser mantida uma por aplicação para melhor reutilização dos recursos que a mesma utiliza para executar suas atividades. É importante notar que os métodos expostos são assíncronos, mas como estou utilizando aplicações console para o exemplo, forçarei a execução síncrona de todos eles.

A conexão se dá através do protocolo TCP, onde você pode utilizar o método estático Create para informar o endereço a partir de uma URI, e a porta padrão para o servidor é a 1113. Como dito acima, estou abrindo a conexão explicitamente e aguardando que a mesma seja concluída para dar sequência na utilização.

using (var conn = EventStoreConnection.Create(new Uri(“tcp://localhost:1113”)))
{
    conn.ConnectAsync().Wait();
}

Voltando ao exemplo de testes, a primeira aplicação se resume a postar um novo evento informando que um novo pedido foi criado. Note através do código abaixo como isso é definido. Não vou me preocupar em colocar a definição das classes aqui porque são muito simples e não há qualquer informação relevante dentro delas; são classes POCOs com as propriedades que refletem nosso negócio.

var
novoPedido = new NovoPedidoCriado(new Pedido()
{
    NomeDoCliente = “Israel Aece”,
    ValorTotal = 1200
});

Depois do evento criado, precisamos postar o mesmo, mas antes precisamos entender um conceito do
Event Store, chamado de Stream. Como já suspeitamos, é a forma que temos de agrupar os eventos por qualquer regra ou seção da aplicação que desejamos. Para o nosso caso, vamos chamar o stream de “Ecommerce.Pedidos”, que irá concentrar todos os eventos relacionados aos pedidos realizados em nosso loja.

A classe de conexão fornece um método chamado 
AppendToStreamAsync, e além de especificarmos o nome do stream onde queremos armazenar, temos que passar o objeto (evento) que deve ser persistido. Para encapsular e descrever o evento do ponto de vista da infraestrutura, o Event Store possui uma classe chamada EventData. Essa classe além de ter algumas características relacionadas a infraestrutura, recebe também em seu construtor o Id do evento, o nome e o objeto que representa o mesmo, e que no nosso caso está armazenado na variável “novoPedido”.

A serialização pode ser binária, mas dependendo com quem vamos interagir/integrar, utilizar JSON pode ser uma opção bem mais interessante. E para isso, estou recorrendo ao
Json.NET (Newtonsoft) para serializar e deserializar os objetos.

conn.AppendToStreamAsync(
    “Ecommerce.Pedidos”,
    ExpectedVersion.Any,
    GerarEvento(novoPedido)).Wait();

private static
EventData GerarEvento(Evento evento)
{
    return new EventData(
        evento.Id,
        evento.GetType().Name,
        true,
        Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(evento)), null);
}

Depois que o evento for postado, se acessarmos a console administrativa do
Event Store, veremos o stream e o evento criado, conforme é possível visualizar na imagem abaixo:



Na sequência vamos recorrer ao projeto de Nota Fiscal, que ficará interrogando periodicamente o servidor em busca de eventos do tipo
NovoPedidoCriado. Como os eventos são acumulativos do lado do servidor, nós, o cliente, devemos controlar os eventos que já foram processados pela aplicação, e por isso, estamos armazenando em uma variável o número do último evento, para que em uma próxima execução não corra o risco de gerar notas fiscais para os pedidos já gerados. E vale ressaltar que isso é um exemplo, e que em uma cenário real você precisa armazenar este contador em uma local seguro que resista a possíveis reinicializações da aplicação.

Da mesma forma que fazemos na criação do evento, para ler também precisamos criar a conexão com o servidor, e neste caso, vamos utilizar o método 
ReadStreamEventsForwardAsync, e como o próprio nome diz, lê os eventos na ordem em que eles foram postados.

var ultimoEventoProcessado = 0;

using
(var conn = EventStoreConnection.Create(new Uri(“tcp://localhost:1113”)))
{
    conn.ConnectAsync().Wait();

while ((Console.ReadLine() != null))
    {

        var itens = conn.ReadStreamEventsForwardAsync(
“Ecommerce.Pedidos”, 
ultimoEventoProcessado + 1, 200, false).Result;

if (itens.Events.Any())
        {
            foreach (var e in itens.Events)
                Processar(conn, e);
        }
        else
        {
            Log.Yellow(“Não há eventos para processar.”);
        }
    }
}

Os eventos extraídos são representados pela classe ResolvedEvent, que entre várias informações sobre o evento, temos o evento gerado e serializado pela nossa aplicação de novo pedido criado. Como podemos notar abaixo, o método Processar identifica que se trata de um evento deste tipo, deserializa o mesmo, o processa e depois disso, dispara um evento dizendo que a respectiva nota fiscal foi emitida, através de um novo evento.

internal static void Processar(
IEventStoreConnection conexao, 
ResolvedEvent dadosDoEvento)
{
    if (dadosDoEvento.Event.EventType == “NovoPedidoCriado”)
    {
        var novoPedido = Extrair<NovoPedidoCriado>(dadosDoEvento);

Log.Green(“NOVO PEDIDO”);
        Log.Green(“Id: ” + novoPedido.Id, 4);
        Log.Green(“Data: ” + novoPedido.Data, 4);
        Log.Green(“Cliente: ” + novoPedido.NomeDoCliente, 4);
        Log.Green(“Valor: ” + novoPedido.ValorTotal.ToString(“N2”), 4);
        Log.Yellow(“Emitindo a Nota Fiscal do Pedido”, 4);
        Log.NewLine();

conexao.AppendToStreamAsync(
            “Ecommerce.Pedidos”,
            ExpectedVersion.Any,
GerarEvento(

new NotaFiscalEmitida(
novoPedido.NomeDoCliente, 
novoPedido.ValorTotal, 
“0001.0292.2999-2881-9918.11.9999/99”))).Wait();

ultimoEventoProcessado = dadosDoEvento.Event.EventNumber;
    }
}

private static TEvento Extrair<TEvento>(ResolvedEvent dadosDoEvento) 
where TEvento : Evento
{
return JsonConvert.DeserializeObject<TEvento>(

Encoding.UTF8.GetString(dadosDoEvento.Event.Data));
}

E, para finalizar o processo, temos agora a aplicação da transportadora, que ao invés de periodicamente procurar por um evento de nota fiscal emitida, assina o stream e tão logo quando ele for incluído, a transportadora será automaticamente notificada do evento gerado. E da mesma forma que antes, abrimos a conexão, e agora utilizamos o método SubscribeToStreamAsync, informando o nome do stream que desejamos monitorar e qualquer novo evento, disparamos o método Processar, conforme pode ser visualizado abaixo:

using (var conn = EventStoreConnection.Create(new Uri(“tcp://localhost:1113”)))
{
    conn.ConnectAsync().Wait();

    conn.SubscribeToStreamAsync(
“Ecommerce.Pedidos”, 
false
(a, e) => Processar(e)).Wait();

Console.ReadLine();
}

internal static void Processar(ResolvedEvent dadosDoEvento)
{
    if (dadosDoEvento.Event.EventType == “NotaFiscalEmitida”)
    {
        var novoPedido = Extrair<NotaFiscalEmitida>(dadosDoEvento);

Log.Green(“NOVO PEDIDO – NOTA FISCAL EMITIDA”);
        Log.Green(“Id: ” + novoPedido.Id, 4);
        Log.Green(“Cliente: ” + novoPedido.NomeDoCliente, 4);
        Log.Green(“Valor: ” + novoPedido.ValorTotal.ToString(“N2”), 4);
        Log.Green(“NF-e: ” + novoPedido.ChaveDaNotaFiscalEletronica, 4);
        Log.NewLine();
    }
}

E agora, se colocarmos as três aplicações lado a lado, é possível visualizar o efeito do processamento nas três, onde uma gera o pedido, a segunda emite a nota fiscal e a terceira é notificada para iniciar o processo de transporte.

É claro que isso é um exemplo simplista, mas pode ser considerado algo parecido em uma escala maior, como por exemplo, utilizar este recurso para diálogo entre contextos do domínio, sincronização de bases de (escrita e leitura) em ambiente de CQRS ou para desempenhar qualquer outra tarefa em modo assíncrono.

Event Sourcing – Parte 2

Dando continuidade ao post anterior, por questões de simplicidade,  armazenávamos os eventos gerados para uma entidade em memória, algo que não é útil em ambiente de produção.

Como alternativa a isso, podemos recorrer à algum repositório que persista fisicamente as informações. O uso de uma base de dados relacional pode ser útil, porém é necessário que você utilize colunas que possam armazenar o objeto serializado (varbinary, varchar(max), etc.).

Isso é necessário porque é difícil prever o schema. Podem haver muitos eventos, novos eventos podem acontecer, novas informações podem ser propagadas; será difícil evoluir o schema na mesma velocidade.

Uma alternativa aqui é utilizar uma base no-sql. Apesar de alguns nomes já virem a cabeça, existe uma chamada GetEventStore. Ela foi desenhada para dar  suporte à cenários de event sourcing, e sua API também dispõe de métodos e facilitadores que permitem gravar e ler eventos sem dificuldades.

O GetEventStore também permite o acesso a leitura e gravação através de HTTP, possibilitando assim que outras tecnologias, incluindo JavaScript, possam também usufruir dela.

Por fim, ele também permite a gestão da base de eventos através de um interface web, onde podemos interagir, diagnosticar e monitorar os recursos que são necessários para o seu funcionamento.

O vídeo abaixo altera o projeto criado anteriormente para fazer uso do GetEventStore. E tudo o que precisamos alterar é a classe RepositorioDeEventos; o resto da aplicação se comporta da mesma forma, porém com os dados sendo persistidos fisicamente. Se desejar, pode baixar o projeto clicando aqui.

Event Sourcing – Parte 1

Todos temos uma conta bancária. Além das informações cadastrais, muito provavelmente ela também possui o saldo atual, que representa o quanto temos naquele exato momento, podendo ser positivo, negativo ou simplesmente zero.

O saldo, subindo ou descendo, queremos, obrigatoriamente, saber como ele chegou naquele valor. É importante saber o saldo que tínhamos em uma determinada data (retroativa) e o que aconteceu para ele atingir o saldo atual.

Felizmente todos os bancos dão aos seus correntistas o extrato de sua respectiva conta. É com ele que vamos conseguir enxergar o aumento ou diminuição do nosso dinheiro. Posicionado em uma data, identificando o saldo daquele dia, se viermos somando os valores (lembrando que pode ser positivo ou negativo), chegaremos ao saldo atual da conta. Claro, se nada estiver errado.

Tecnicamente falando, é provável que no banco de dados tenhamos uma estrutura de duas tabelas, onde uma teria os dados cadastrais da conta e a outra os lançamentos. A tabela pai provavelmente também irá armazenar o saldo atual da conta e outras características.

A medida em que os lançamentos são inseridos na segunda tabela, a coluna saldo da primeira vai sendo sensibilizado para refletir o saldo atual da conta. O valor atual do saldo vai sendo sobrescrito com o novo valor.

Já está claro que o importante para o dia a dia é o valor que temos disponível, ou seja, o saldo atual. Os lançamentos são necessários, até então, apenas para histórico; na maioria das vezes, eles são dispensáveis para operar a conta corrente (pagar contas, realizar saques, sacar dinheiro, etc.). Agora, considere o exemplo abaixo:

Repare que o saldo atual que temos não corresponde a soma dos lançamentos da segunda tabela. É provável que nosso sistema possua um bug, que afeta diretamente a atualização do saldo atual.

Apesar de, teoricamente, os lançamentos estarem corretos, é difícil diagnosticar o problema, já que não é possível, de uma forma fácil, reproduzir os mesmos eventos, exatamente da forma em que eles aconteceram, para só assim identificar onde e quando o problema de fato ocorreu.
Eis que entra em cena o Event Sourcing. Ao invés de armazenar o estado atual da entidade em colunas e linhas, o uso desta técnica determina que se armazene uma sequência (stream) de eventos, onde cada um deles informa a ação que foi executada sobre a entidade. Quando esses eventos são recarregados, eles são reaplicados contra a entidade, reconstruindo a mesma. Depois de todos eventos processados, a entidade terá, novamente, o seu estado (versão) atual. Neste modelo não fazemos a persistência física das propriedades na base de dados.
Versionamento
Uma das preocupações neste modelo é com o versionamento da entidade. É através de versão que vamos controlar o processamento e aplicação dos eventos. A versão irá garantir que um evento não seja aplicado à entidade depois que outro evento tenha se antecipado e alterado o estado da mesma em relação ao momento em que ela foi construída, ou melhor, carregada.
Isso é conhecido como concorrência otimista. É algo que também já fazemos com o banco de dados relacional, comparando o valor anterior com o atual que está na tabela antes de proceder com o comando de UPDATE ou de DELETE. O controle pode ser feito de várias maneiras, mas para ser simplista, podemos nos ater ao uso de um número inteiro, que vai sendo incrementado a medida em que um novo evento é aplicado.
Snapshots
Ao longo do tempo, mais e mais eventos vão sendo adicionados e, eventualmente, a performance pode ser degradada. Neste caso, uma possível solução é a criação de snapshots, que depois de carregado os eventos, reaplicados na entidade, ela pode disponibilizar métodos para expor o seu estado atual.
Os snapshots também são armazenados e quando a entidade for recarregada, primeiramente devemos avaliar a existência dele antes de reaplicar os eventos. Se houver um snapshot, restauramos o estado da entidade e reaplicaremos os eventos que aconteceram do momento do snapshot para frente, não havendo a necessidade de percorrer toda a sequência de eventos, ganhando assim em performance. Tão importante quanto o estado atual, o snapshot também deve armazenar a versão atual da entidade, pois esta informação é essencial para o correto funcionamento dos snapshots. Essa rotina de criação de snapshots pode ser feita em background, em uma janela de baixa atividade do sistema, podendo ser implementado através de uma tarefa do Windows que roda periodicamente ou até mesmo com um Windows Service. Para a criação de snapshots, podemos recorrer ao padrão Memento, que nos permite expor e restaurar o estado sem violar o encapsulamento do objeto.

Naturalmente o cenário de conta corrente que vimos acima quase se encaixa perfeitamente neste modelo, exceto pelo fato de termos o saldo atual sumarizado e armazenado fisicamente. Vamos então transformar uma simples classe com uma coleção de lançamentos em um modelo que suporte a técnica descrita pelo Event Sourcing. Vale lembrar que isso é “apenas” mais uma forma de persistência. Você pode ter situações que talvez esse modelo seja útil e necessário, já para outros ambientes, pode ser considerado overkill.