top of page

Coffee and Tips Newsletter

Inscreva-se na nossa newsletter semanal

Nos vemos em breve!

Airflow Sensor: Monitorando Condições em Workflows de Dados

Foto do escritor: JPJP

Apache Airflow é uma ferramenta poderosa para orquestração de workflows de dados, permitindo a automação e monitoramento de pipelines complexos. Dentro do Airflow, Sensors são operadores especializados que aguardam por uma condição específica antes de permitir a execução da próxima tarefa. Neste artigo, exploramos em detalhes como os Sensors funcionam e mostramos exemplos práticos.



Airflow Sensor
Airflow Sensor

O que é um Airflow Sensor?


O Airflow Sensor é um operador no Airflow que verifica periodicamente se uma condição foi satisfeita antes de liberar o fluxo de execução. Ele é especialmente útil para aguardar eventos externos, como:


  • Presença de um arquivo em um bucket S3

  • Disponibilidade de dados em um banco de dados

  • Conclusão de um processo em outra ferramenta


Os Sensors podem funcionar em diferentes modos de execução:


  • poke: verifica periodicamente uma condição e dorme entre as tentativas.

  • reschedule: libera o worker e reagenda a verificação para o futuro, otimizando o uso de recursos.



Exemplo 1: FileSensor


Caso de Uso:


O FileSensor é utilizado em workflows que precisam aguardar a chegada de um arquivo antes de prosseguir com o processamento. Por exemplo:


  • Monitorar diretórios de entrada para iniciar o processamento de novos arquivos.

  • Garantir que um relatório seja exportado antes de iniciar uma análise de dados.


Código:



Entendendo a DAG


Este DAG monitora um diretório local para verificar a existência de um arquivo antes de permitir a execução da próxima tarefa.


  1. O DAG file_sensor_example é configurado para rodar diariamente.

  2. A tarefa wait_for_file usa FileSensor para aguardar o arquivo no caminho especificado.

  3. O poke_interval=60 define que a verificação será feita a cada 60 segundos.

  4. O timeout=600 limita o tempo total de espera a 10 minutos.

  5. Assim que o arquivo for detectado, a tarefa process_file será executada.


 

Exemplo 2: S3KeySensor


Caso de Uso


O Airflow Sensor possui um operador chamado S3KeySensor,  essencial para pipelines de ingestão de dados que dependem de arquivos armazenados na AWS S3. Exemplos:


  • Aguardar arquivos CSV antes de iniciar uma carga para um Data Warehouse.

  • Sincronizar dados entre diferentes sistemas utilizando arquivos no S3.


Código:



Entendendo a DAG


Este DAG monitora a presença de um arquivo em um bucket AWS S3 antes de continuar o fluxo de execução.


  1. O S3KeySensor aguarda até que o arquivo input.csv seja encontrado no bucket meu-bucket.

  2. O parâmetro mode='reschedule' economiza recursos reagendando a verificação ao invés de manter a tarefa ativa constantemente.

  3. O DAG só continuará quando o arquivo estiver disponível.


 

Exemplo 3: SQLSensor


Caso de Uso


O SQLSensor é utilizado para aguardar registros específicos em um banco de dados antes de prosseguir com um fluxo de trabalho. Exemplos:


  • Esperar a chegada de novos pedidos antes de gerar relatórios de vendas.

  • Garantir que uma tabela tenha sido preenchida antes de iniciar o processamento de dados.


Código:


Entendendo a DAG


Este DAG aguarda a presença de novos registros em uma tabela do banco de dados antes de executar a próxima etapa.


  1. O SqlSensor executa a query para verificar se há registros com status 'pendente'.

  2. Se a contagem for maior que zero, a próxima tarefa poderá ser executada.

  3. O poke_interval=120 define que a verificação será feita a cada 120 segundos.

  4. O timeout=1800 define que o sensor pode esperar por até 30 minutos.


 

Exemplo 4: HttpSensor


Caso de Uso


O HttpSensor é utilizado para aguardar uma API externa estar disponível antes de continuar a execução do DAG. Exemplos:


  • Garantir que um serviço de machine learning esteja ativo antes de enviar solicitações de inferência.

  • Aguardar que um sistema de terceiros conclua um processamento antes de buscar os resultados.


Código:



Entendendo a DAG


Este DAG aguarda até que um serviço externo esteja pronto antes de continuar o processamento.


  1. O DAG http_sensor_example é configurado para rodar a cada hora.

  2. A tarefa wait_for_api usa HttpSensor para verificar o status de uma API externa.

  3. O parâmetro response_check avalia a resposta JSON para garantir que a API retorne ready antes de liberar a próxima etapa.

  4. Assim que a API estiver pronta, a tarefa process_data será executada.


 

Exemplo 5: ExternalTaskSensor


Caso de Uso


O ExternalTaskSensor é útil para coordenar workflows entre diferentes DAGs. Exemplos:


  • Esperar que um pipeline de transformação finalize antes de iniciar uma análise de dados.

  • Garantir que um processamento de logs seja concluído antes de acionar relatórios.



Código:



Entendendo a DAG


Este DAG sincroniza a execução entre dois DAGs distintos, garantindo que um processo externo seja concluído antes de iniciar o próximo.


  1. O DAG external_task_sensor_example monitora a finalização de uma tarefa em outro DAG.

  2. A tarefa wait_for_external_task aguarda a conclusão da tarefa_concluida no DAG outra_dag.

  3. O poke_interval=300 define que a verificação será feita a cada 5 minutos.

  4. Assim que a tarefa externa for concluída, a próxima etapa do DAG será liberada.


 

Considerações Finais


Os Sensors do Airflow são essenciais para sincronizar workflows com eventos externos. No entanto, eles podem consumir muitos recursos se não forem configurados corretamente. O uso do modo reschedule, quando possível, é uma boa prática para evitar sobrecarga no ambiente.


 

Quer Aprender Mais? Inscreva-se na nossa Newsletter semanal!


Não perca nossas dicas exclusivas de Tech e Data!



Receba semanalmente:

  • Tutoriais práticos e diretos sobre Engenharia de Software e Dados

  • Insights de tecnologia e notícias da semana

 

Comentarios


bottom of page