Olá pessoal!

Mais um post sobre meu portfólio e desta vez, quero falar do processo de criação que desenvolvi para estruturar a dimensão Currency.

O processo em si foi bem simples, a dimensão não possui grandes transformações mas o que eu gostaria de chamar atenção aqui é para um novo tipo de controle de mudanças que elaborei utilizando o step add sequence do Pentaho.

Vamos ao post!

CRIANDO A DIMENSÃO

O primeiro passo foi criar a estrutura de job que vai coordenar o processo de transformação e carga deste data warehouse.

Essa é a estrutura base que forma meu ETL.

Como a transformação de stage é apenas uma carga simples sem tratamento, irei para a transformação da dimensão em si, que carrega o data warehouse.

Ainda que mencionado acima, a transformação é bem simples, precisando apenas adicionar o step add sequence para criar a surrogate key desta tabela.

Abaixo, o step da minha primeira carga de dados que vai alimentar a dimensão e o DW.

Como podemos ver, o processo é simples e estou utilizando o Insert/update para o Slowly changing dimension.

Abaixo, a configuração do step.

Como é a primeira carga, desabilitei os updates e, repare que a coluna CurrencyKey está presente, pois ela é a surrogate key e vai receber os valores do add sequence.

Uma vez que tenha feito a primeira carga, podemos avançar para o ponto que desenvolvi o controle para inserir, modificar ou alterar somente o necessário. 

Meu intuito é poupar processamento e recursos, pois, nenhum outro step após a comparação precisará ler e gerar valores para todas as linhas que passarem na stream.

Essa é a nova estrutura após a primeira carga.

CONTROLANDO AS SURROGATE KEYS – DESENVOLVENDO O ETL

Nosso primeiro passo após o insert inicial é controlar o valor das chaves da tabela, para isso, antes das próximas cargas, eu criei a transformação GET_MAX_ID_CURRENCY, que está no início do post.

Essa transformação possui uma conexão com a dimensão Currency pesquisando qual o maior valor da coluna CurrencyKey.

Veja a estrutura.

Ao pesquisar o valor, puxo o resultado da query com o step SUBSTITUI_NULL e armazeno na variável GRAVA_MAX_ID_VARIÁVEL.

Aqui, a query que utilizei no step SELECT_MAX_ID_DW. Não esqueça de somar 1 ao valor para evitar erro de integridade com a chave primária.

Já o SUBSTITUI_NULL é uma garantia de que se houver algum truncate, delete  ou por algum motivo o resultado retorne null, se tornará 1. Veja.

Agora, basta atribuir o valor da query na variável e puxá-la no próximo step: CARGA_BKP_DW_CURRENCY.

Agora temos a variável com criada recebendo o valor da query executada no table input.

CONFIGURANDO O MODELO DE CONTROLE

Antes de avançar, é importante deixar claro que uma das fontes continua sendo o arquivo gerado pela extração no job de stage.

Voltando a transformação de carga do DW adicionei as seguintes transformações para o processo:

  1. Table input.
  2. Sort
  3. Merge Diff
  4. Switch / Case
  5. Dummy

Para este procedimento, fiz uma consulta direta no DW, poderia programar uma batch também, mas como o acesso a este banco é menor, não vi problema. Avaliem.

Busquei apenas as colunas que estão tanto na área de stage quanto no data warehouse, se houver diferença, o Merge diff acusa erro.

A transformação de ordenamento foi para não causar erro no merge, ela não é obrigatória, mas diminui a chance de problemas quando vamos unir e comparar duas fontes.

Essa é a configuração do merge diff. Comparei apenas as colunas IDCurrency pois é ela quem vai me informar quais são novas, alteradas etc.

Já no step switch / case o que faremos é dar um destino para os valores, já que o merge diff categoriza cada uma deles.

Em vermelho, estão as três categorias possíveis do merge e seus steps de destino. O default são os dados que não mudaram nada desde a última carga.

Agora, mostrarei qual o último valor na minha coluna CurrencyKey na dimensão Currency, para confirmar que o step de capturar o id funciona.

Não esqueça de atribuir a variável na transformação add sequence. 

Confirmando através da query no banco o maior valor da coluna.

O último passo para o teste é alterar o arquivo fonte do stage adicionando alguns valores. Para meu teste adicionei alguns e fiz um update em outros.

Exemplo dos valores. Uma dica é copiar um bloco de linhas, alterar e salvar o arquivo fonte.

Em resumo, esta foi a nova estrutura adicionada a estrutura base.

Executando um preview das alterações feitas, perceba que o merge diff categorizou corretamente. 

Antes de executar o teste, remova o job de carga na stage, ele pode sobrescrever seu arquivo txt alterado, e aí, o teste irá falhar!

Execute somente as  transformações sublinhadas em vemelho.

Veja no log do job que a transformação insert/update escreveu apenas 16 linhas. Justamente as linhas adicionadas.

E consultando o banco de dados, veja o resultado resumido.

Vemos que a transformação, conferência e carga ocorreu perfeitamente, correto? Mas, e se por um acaso no seu processo de ETL, houver um arquivo para salvar a transformação como forma de agilizar o backup?!

Para solucionar, basta marcar a opção de append  na transformação e executar normalmente. Observe.

Adicionando mais duas linhas para teste.

Consulta ao banco para conferir se foi inserido corretamente. 

E para finalizar, o arquivo csv com as duas últimas linhas anexadas.

CONCLUSÃO

Para este post pretendi agregar maior valor no desenvolvimento do slowly changing em si do que na transformação, por julgá-la bem simples.

Ao final, consegui desenvolver um modelo de controle bem robusto que cobre todos os aspectos que detectei e que poderiam causar algum tipo de problema.

Ao final, também mostrei como solucionar uma possível sobrescrita no csv, caso crie um para conferência e backup.

Outro aspecto importante é somar o valor retornado pela query do max_id com 1; evite falhas por problema de integridade de chave primária.

Reforço que toda a estrutura de controle deve ser concluída após a primeira carga, para não ter erro.

Farei novos testes com essa transformação para solucionar algumas dúvidas e post em forma de snippet!

Para o controle de horário da carga com a coluna LoadTime, utilizei a transformação formula com a formula interna: NOW(). Pega a hora e data que a transformação está ocorrendo.

Outra solução seria criar uma constraint de default com hora e data puxada do sistema pela função em SQL sysdatetime().

Espero que tenham gostado, Deus os abençoe!

Saúde.