Skip to main content

Bet you didn’t know this about Airflow!

 

We are living in the Airflow era. Almost all of us started our scheduling journey with cronjobs and the transition to a workflow scheduler like Airflow has given us better handling with complex inter-dependent pipelines, UI based scheduling, retry mechanism, alerts & what not! AWS also recently announced managed airflow workflows. These are truly exciting times and today, Airflow has really changed the scheduling landscape, with scheduling configuration as a code.

Let’s dig deeper.

Now, coming to a use case where I really dug down in airflow capabilities. For the below use case, all the references to task are for airflow tasks.

The Use case

The above DAG consists of the following operations:

  • Start an AWS EMR cluster: EMR is an AWS based big data environment. To understand the use case we don’t need to deep dive into how AWS EMR works. But if you want to, you can read more about it here.
    Airflow task_id for this operation: EMR_start_cluster
  • Submit an ETL job: This is done by adding a step to the EMR, basically submitting a spark job on the cluster. This is just the job submission part, and will always be successful if the task is able to submit the job successfully to the cluster. This is done using EMRAddStepOperator.
    Airflow task_id for this operation: promo_etl/user_etl
  • Wait for the ETL job to complete: This is the actual indicator of the status of the job submitted by the upstream ETL task(in the context of the above DAG). Using EMRStepSensor tasks, we wait for the submitted job to complete and the state of this task indicates whether the job is in the running/successful or failed state.
    Airflow task_id for this operation: promo_etl_sensor/user_etl_sensor
  • Load partitions: Task for adding partitions to the deliver table of the pipeline.
    Airflow task_id for this operation: load_user_partitions/load_promo_partitions
  • Terminate the EMR cluster: Once all the tasks have completed, shut down the EMR.

So, in a nutshell, one task is submitting the spark job to the cluster and its downstream sensor task is polling the state of the upstream task to get job status. Since the pipeline includes EMR, this might be a little different from the scenario in a running cluster where you can submit the spark job directly from airflow using spark-submit and BashOperator and single task indicates job state.

Now, coming to the catch. Here, my user_etl pipeline was dependent on the previous DAG run of the same. That is, with an hourly schedule, the job for 2020–12–25 12:00 will use deliverables of the previous DAG run, i.e. 2020–12–25 11:00. So, in case the 2020–12–25 11:00 run has failed, user_etl task for 2020–12–25 12:00 DAG run should not run.

I didn’t want to handle this in code or via markers and was looking to take care of the same in the DAG itself. Additionally, I also wanted to add a retry mechanism to the user_etl pipeline, so in case user_etl_sensor fails, retry user_etl(which is an upstream task).

What did I want? For a DAG run, if user_etl_sensor fails, user_etl is submitted again to the cluster n times as a retry mechanism and if it still fails, then in the next DAG run, user_etl job is not submitted to the cluster.

For these “inter-dependent” task use cases, I could not simply use airflow parameters mentioned below, directly.

  • Add retry on user_etl_sensor: It would have retried user_etl_sensor task on failure but its upstream task(user_etl) is the one that is actually submitting the job to the cluster, not the sensor. Using retry would have been useless here.
  • Add depends_on_past=True on user_etl_sensor: This airflow parameter, if set on a task, doesn’t run the task in the current DAG run if the previous run of the task has failed. But, in this case, it won’t run user_etl_sensor if the previous run has failed but user_etl would have already submitted the job in the current DAG run by then, so it won’t really matter.

The solution

As a retry mechanism, I thought if I can clear the state of the upstream task(user_etl), on user_etl_sensor failure, I might be able to submit the job to the cluster n times and that’s what I implemented withon_retry_callback on the user_etl_sensor task with 2 retries and programmatically cleared the state of the user_etl task on sensor failure.

So now, user_etl_sensor fails > goes into retry state > function mentioned as on_retry_callback is called > function clears the state of user_etl > job gets submitted to the cluster again > sensor goes into running state again.

Now, coming to controlling the next scheduled run of user_step if the previous run of the user_etl_sensor has failed even after retries. I thought of programmatically mark user_etl to failed(in the same DAG run) if user_etl_sensor fails.

By doing so, I can use depends_on_past=True on user_etl and the task won’t be triggered in the next DAG run because user_etl is marked failed on the previous run. It’ll behave like only one task is controlling the submission of the job and the running state of the job and that’s what I did, using on_failure_callback.

Code

retry_upstream_task function is passed as on_retry_callback:

from airflow.utils.state import State
from airflow.utils.db import provide_session

@provide_session
def execute(task, session=None):
session.merge(task)

def mark_upstream_failed(context) -> None:
tasks = context["dag_run"].get_task_instances()
mark_failed = context["params"].get("mark_failed", [])

task_to_mark = [ ti for ti in tasks if ti.task_id in mark_failed]

for task in task_to_mark:
task.state = State.FAILED
execute(task)

mark_upstream_failed function is passed as on_failure_callback:

from airflow.utils.state import State
from airflow.utils.db import provide_session

@provide_session
def execute(task, session=None):
session.merge(task)

def mark_upstream_failed(context) -> None:
tasks = context["dag_run"].get_task_instances()
mark_failed = context["params"].get("mark_failed", [])

task_to_mark = [ ti for ti in tasks if ti.task_id in mark_failed]

for task in task_to_mark:
task.state = State.FAILED
execute(task)

These callbacks can be added on user_etl_sensor as shown below:

user_etl_sensor = EMRStepSensor(
task_id='user_etl_sensor',
dag=dag,
on_failure_callback=mark_upstream_failed,
on_retry_callback=retry_upstream_task,
params={'mark_failed': ['user_etl'], 'to_retry': ['user_etl']}
)

Pretty cool, isn’t it? That’s how I explored the configuration as code. This “Pythonic” task state control can be applied to any airflow sensor operator which inherits BaseSensorOperator not just dealing with EMR based jobs or basically any use case of working with interdependent tasks.

This user_etl pipeline in my case was a Delta Lake based pipeline and if the next scheduled run has “run” with the previous run failed, it might have caused data inconsistencies. I really like the flexibility airflow allowed me to convert my logic to code and actually achieve it.

In the callbacks, I looped over all the tasks to get the task object to make changes in its state i.e. marking the task as failed or clearing the task for the retry functionality. I am researching more to get the task object directly from task_id, instead of looping over, will edit the post if I find a better way to do it.


Comments

Popular posts from this blog

Reference Hadoop HDFS config Files

Trong Hadoop HDFS (Hadoop Distributed File System), có một số file cấu hình quan trọng để tùy chỉnh và điều chỉnh các thành phần của hệ thống. Dưới đây là một số file cấu hình quan trọng trong Hadoop HDFS và ý nghĩa của chúng: 1./ hdfs-site.xml : File này chứa cấu hình cho các thuộc tính liên quan đến HDFS. Đây là nơi bạn có thể thiết lập các cấu hình như kích thước block mặc định, số lượng bản sao dữ liệu, quyền truy cập, v.v. Điều chỉnh các giá trị trong file này có thể ảnh hưởng đến hiệu suất và tính sẵn sàng của HDFS. 2./ core-site.xml: File này chứa cấu hình cho các thuộc tính cơ bản của Hadoop. Nó bao gồm thông tin về tên miền Hadoop, địa chỉ máy chủ NameNode và các cài đặt liên quan đến mạng như cổng giao tiếp và giao thức. 3./ hdfs-default.xml : Đây là file mẫu chứa tất cả các thuộc tính có thể được cấu hình trong HDFS. File này cung cấp mô tả chi tiết và giá trị mặc định của mỗi thuộc tính. Nếu bạn muốn thay đổi một thuộc tính nào đó, bạn có thể sao chép nó vào hdfs-s...

Apache Spark Discretized Streams (DStreams) with Pyspark

Apache Spark Discretized Streams (DStreams) with Pyspark SPARK STREAMING What is Streaming ? Try to imagine this; in every single second , nearly 9,000 tweets are sent , 1000 photos are uploaded on instagram, over 2,000,000 emails are sent and again nearly 80,000 searches are performed according to Internet Live Stats. So many data is generated without stopping from many sources and sent to another sources simultaneously in small packages. Many applications also generate consistently-updated data like sensors used in robotics, vehicles and many other industrial and electronical devices stream data for monitoring the progress and the performance. That’s why great numbers of generated data in every second have to be processed and analyzed rapidly in real time which means “ Streaming ”. DStreams Spark DStream (Discretized Stream) is the basic concept of Spark Streaming. DStream is a continuous stream of data.The data stream receives input from different kind of sources like Kafka, Kinesis...

Khác nhau giữa các chế độ triển khai giữa Local, Standalone và YARN trong Spark

Trong Apache Spark, có ba chế độ triển khai chính: Local, Standalone và YARN. Dưới đây là sự khác biệt giữa chúng: Chế độ triển khai Local: Chế độ triển khai Local là chế độ đơn giản nhất và được sử dụng cho môi trường phát triển và kiểm thử. Khi chạy trong chế độ Local, Spark sẽ chạy trên một máy tính duy nhất bằng cách sử dụng tất cả các luồng CPU có sẵn trên máy đó. Đây là chế độ phù hợp cho các tác vụ nhỏ và không yêu cầu phân tán dữ liệu. Chế độ triển khai Standalone: Chế độ triển khai Standalone cho phép bạn triển khai một cụm Spark độc lập bao gồm nhiều máy tính. Trong chế độ này, một máy tính được chọn làm "Spark Master" và các máy tính khác được kết nối với Spark Master như là "Spark Workers". Spark Master quản lý việc phân phối công việc và quản lý tài nguyên giữa các Spark Workers. Chế độ Standalone phù hợp cho triển khai Spark trên các cụm máy tính riêng lẻ mà không có hệ thống quản lý cụm chuyên dụng. Chế độ triển khai YARN: YARN (Yet Another Resource N...