We are living in the Airflow era. Almost all of us started our scheduling journey with cronjobs and the transition to a workflow scheduler like Airflow has given us better handling with complex inter-dependent pipelines, UI based scheduling, retry mechanism, alerts & what not! AWS also recently announced managed airflow workflows. These are truly exciting times and today, Airflow has really changed the scheduling landscape, with scheduling configuration as a code.
Let’s dig deeper.
Now, coming to a use case where I really dug down in airflow capabilities. For the below use case, all the references to task are for airflow tasks.
The Use case
The above DAG consists of the following operations:
- Start an AWS EMR cluster: EMR is an AWS based big data environment. To understand the use case we don’t need to deep dive into how AWS EMR works. But if you want to, you can read more about it here.
Airflow task_id for this operation: EMR_start_cluster - Submit an ETL job: This is done by adding a step to the EMR, basically submitting a spark job on the cluster. This is just the job submission part, and will always be successful if the task is able to submit the job successfully to the cluster. This is done using EMRAddStepOperator.
Airflow task_id for this operation: promo_etl/user_etl - Wait for the ETL job to complete: This is the actual indicator of the status of the job submitted by the upstream ETL task(in the context of the above DAG). Using EMRStepSensor tasks, we wait for the submitted job to complete and the state of this task indicates whether the job is in the running/successful or failed state.
Airflow task_id for this operation: promo_etl_sensor/user_etl_sensor - Load partitions: Task for adding partitions to the deliver table of the pipeline.
Airflow task_id for this operation: load_user_partitions/load_promo_partitions - Terminate the EMR cluster: Once all the tasks have completed, shut down the EMR.
So, in a nutshell, one task is submitting the spark job to the cluster and its downstream sensor task is polling the state of the upstream task to get job status. Since the pipeline includes EMR, this might be a little different from the scenario in a running cluster where you can submit the spark job directly from airflow using spark-submit and BashOperator and single task indicates job state.
Now, coming to the catch. Here, my user_etl
pipeline was dependent on the previous DAG run of the same. That is, with an hourly schedule, the job for 2020–12–25 12:00
will use deliverables of the previous DAG run, i.e. 2020–12–25 11:00
. So, in case the 2020–12–25 11:00
run has failed, user_etl
task for 2020–12–25 12:00
DAG run should not run.
I didn’t want to handle this in code or via markers and was looking to take care of the same in the DAG itself. Additionally, I also wanted to add a retry mechanism to the user_etl
pipeline, so in case user_etl_sensor
fails, retry user_etl
(which is an upstream task).
What did I want? For a DAG run, if user_etl_sensor
fails, user_etl
is submitted again to the cluster n
times as a retry mechanism and if it still fails, then in the next DAG run, user_etl
job is not submitted to the cluster.
For these “inter-dependent” task use cases, I could not simply use airflow parameters mentioned below, directly.
- Add retry on
user_etl_sensor
: It would have retrieduser_etl_sensor
task on failure but its upstream task(user_etl
) is the one that is actually submitting the job to the cluster, not the sensor. Using retry would have been useless here. - Add depends_on_past=True on
user_etl_sensor
: This airflow parameter, if set on a task, doesn’t run the task in the current DAG run if the previous run of the task has failed. But, in this case, it won’t runuser_etl_sensor
if the previous run has failed butuser_etl
would have already submitted the job in the current DAG run by then, so it won’t really matter.
The solution
As a retry mechanism, I thought if I can clear
the state of the upstream task(user_etl
), on user_etl_sensor
failure, I might be able to submit the job to the cluster n times and that’s what I implemented withon_retry_callback
on the user_etl_sensor
task with 2 retries and programmatically cleared the state of the user_etl
task on sensor failure.
So now, user_etl_sensor
fails > goes into retry state > function mentioned as on_retry_callback
is called > function clears the state of user_etl
> job gets submitted to the cluster again > sensor goes into running state again.
Now, coming to controlling the next scheduled run of user_step
if the previous run of the user_etl_sensor
has failed even after retries. I thought of programmatically mark user_etl
to failed(in the same DAG run) if user_etl_sensor
fails.
By doing so, I can use depends_on_past=True
on user_etl
and the task won’t be triggered in the next DAG run because user_etl
is marked failed on the previous run. It’ll behave like only one task is controlling the submission of the job and the running state of the job and that’s what I did, using on_failure_callback
.
Code
retry_upstream_task
function is passed as on_retry_callback
:
mark_upstream_failed
function is passed as on_failure_callback
:
These callbacks can be added on user_etl_sensor
as shown below:
user_etl_sensor = EMRStepSensor(
task_id='user_etl_sensor',
dag=dag,
on_failure_callback=mark_upstream_failed,
on_retry_callback=retry_upstream_task,
params={'mark_failed': ['user_etl'], 'to_retry': ['user_etl']}
)
Pretty cool, isn’t it? That’s how I explored the configuration as code. This “Pythonic” task state control can be applied to any airflow sensor operator which inherits BaseSensorOperator
not just dealing with EMR based jobs or basically any use case of working with interdependent tasks.
This user_etl
pipeline in my case was a Delta Lake based pipeline and if the next scheduled run has “run” with the previous run failed, it might have caused data inconsistencies. I really like the flexibility airflow allowed me to convert my logic to code and actually achieve it.
In the callbacks, I looped over all the tasks to get the task object to make changes in its state i.e. marking the task as failed or clearing the task for the retry functionality. I am researching more to get the task object directly from task_id, instead of looping over, will edit the post if I find a better way to do it.
Comments
Post a Comment