Tecnologia

Seus dados em um único lugar com Airflow

Por: , novembro 6, 2017

Nas arquiteturas atuais de software nos deparamos com os nossos amigos microserviços, ou como são mais conhecidos, pelo seu nome em inglês: microservices. Apesar de todas as vantagens que os microservices nos trazem, logo quando olhamos para a arquitetura pensamos:

E agora, como faremos para ter relatórios com vários dados distribuídos, como bancos de dados, arquivos de terceiros (CSVs, TXTs, etc), LOGs, APIs de terceiros?

Vamos analisar o cenário abaixo:

Temos várias APIs, como: passageiros, faturamento, motoristas, pagamentos, notificações e corridas.

Modelagem Usando Micro Services

 

Como faço pra ter um relatório de motoristas versus pagamentos?

A resposta é consolidar os dados, e para isso, precisamos de uma ferramenta que pegue parte por parte dos dados que queremos e os consolide em um único lugar. Aqui que entra o Airflow.

Mas o que é Airflow?

Airflow é uma plataforma que foi criada pelo Airbnb e hoje é mantido pela fundação Apache.

Na página inicial já podemos encontrar a definição da plataforma.

Airflow is a platform to programmatically author, schedule and monitor workflows.

Airflow é uma plataforma para programar, agendar e monitorar fluxos de trabalho.

Vamos para prática.

 

Para pegar nossos pedaços de dados e consolidar em uma base. O Airflow usa o conceito de DAG (directed acyclic graphs) que em português significa: grafo acíclico dirigido.

Na prática, podemos representar uma DAG com um script Python que tem operadores que irão executar em uma sequência definida.

Vamos ver como ficaria nossa DAG de motoristas.

from datetime import datetime, timedelta
from airflow import DAG
from operators.data_transfer import DataTransfer
from operators.insert_non_duplicate import InsertNonDuplicate
interval = timedelta(hours=24)
extract_params = {'interval': str(interval)}
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2017, 11, 1, 0, 0, 0),
    'email': ['seu-email.com.br'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
}
dag = DAG('drivers',
          default_args=default_args,
          schedule_interval='@daily'
          )
extract_drivers = DataTransfer(
    task_id='extrac_drivers',
    source_conn_id='drivers',
    destination_conn_id='consolidate_database',
    destination_table='drivers_stage',
    preoperator="TRUNCATE TABLE drivers_stage",
    sql='extract_drivers.sql',
    params=extract_params,
    conflict_action=None,
    commit_every=5000,
    dag=dag
)
merger_drivers = InsertNonDuplicate(
    task_id='merger_drivers',
    conn_id='consolidate_database',
    origin_table='drivers_stage',
    destination_table='drivers',
    key_field=['driver_id'],
    truncate_on_end=True,
    dag=dag
)
merger_drivers.set_upstream(extract_drivers)

 

O script acima irá gerar uma DAG que podemos acessar através da interface do Airflow:

Interface Web Airflow

 

O primeiro operator dessa DAG irá na origem dos dados que é a nossa conexão: drivers, executará o SQL extract_drivers.sql, em seguida transferirá 5000 registros por vez para a nossa tabela de destino drivers_stage que está na conexão consolidate_database.

-- extract_drivers.sql
SELECT *
FROM   drivers
WHERE  created_at >= Timestamp('{{ ts }}', '-{{ params.interval }}')
       AND rd.created_at < Timestamp('{{ ts }}')

 

Reparem que estamos pegando os registros por data com um intervalo retroativo, ou seja essa consulta pega todos os motoristas que foram criados com data maior ou igual que a de execução da DAG subtraindo 24 horas e, menor que a data de execução da DAG. Então se executarmos essa DAG no dia 01/11/2017, essa consulta trará registros maiores ou iguais 31/10/2017 00:00:00 e menores que 01/11/2017, logo, trazendo todos os registros do dia 31/10/2017.

O Segundo operador dessa DAG fará uma transferência da tabela stage para a tabela final verificando se não há dados repetidos.


Agora o que devemos fazer é ter uma outra DAG para pegar os dados de pagamentos.

Tendo essa DAG e os dados dos dois bancos de origens diferentes consolidados em um só, para termos uma relatório de motoristas versus pagamentos bastaríamos fazer uma consulta que apresente os dados unidos (join) entre essas tabelas.

Os dois operator citados no texto (DataTransfer, InsertNonDuplicate) são iniciativas da 99 taxis e estarão sendo enviados via PR para o projeto do Airflow em breve.

Aproveite para ler mais um ótimo conteúdo em português escrito por Gustavo Amigo.

Conclusão

O intuito desse pequeno tutorial é demonstrar rapidamente como podemos consolidar dados em um único lugar, para podermos usufruir dos dados consolidados de N maneiras, como fazer relatórios ou usar ferramentas de B.I.

Se você quiser entender mais sobre os conceitos do Airflow, segue abaixo alguns links bem legais:

Tutoriais Airflow

Fontes

  • Receba nosso conteúdo em primeira mão.