Key Airflow Pipeline Deployment Lessons On Kubernetes

At Natural Intelligence we use Airflow extensively in order to create simple ETL flows as well as complex ML tasks that run on terabytes of data. However, with its great power, Airflow also presents great complexity and many pitfalls.

I found myself working on a couple of different pipeline implementations and trying to solve the same hard problems over and over again. We realised that we should eliminate redundant systems and build a generic pipeline creation mechanism, for solving these hard problems.

There are three important lessons we have learned along the way that I would like to share with you today:

  • How to make it easy to deploy Airflow DAGs
  • How to handle task processing at scale
  • How to handle DAG storage at scale

How to make it easy to deploy Airflow DAGs

We store DAG code in Git and we want Airflow to be updated with the latest code changes called continuous delivery style.

There are several different ways to continuously deploy DAGs when running Airflow on Kubernetes:

  1. Kubernetes cron job to pull changes from Git into the mounted DAGs directory.
  2. Building a deployment pipeline with Jenkins to copy the DAGs.

These two approaches are essentially different. Choosing one instead of the other is based on the project complexity. Cron job can detect git changes and update the DAGs folders. However, a deployment pipeline can ensure that we have a rollback criteria, versioning, artifacts storage, transaction in the deployment and more. The ability to release on demand is a critical competency. Once we release the pipeline, teams can work independently. An automated pipeline removes manual errors, provides standardised development feedback loops and enables fast product iterations. That’s why we decided to use Jenkins to create a pipeline workflow.

In order to better understand why our pipeline looks the way it does — here is a short explanation about what our DAG’s are doing. Some of our DAGs are running Spark workloads on AWS EMR. Others are running custom code implemented as Docker containers on our Kubernetes cluster, using Airflow Kubernetes operator.

And here is how we do it:

It all starts with git detecting changes and triggering the built pipeline.

Build: Generate the DAGs out of easy-to-use templates, create Jars to be executed on EMR and Docker image.

Unit test: Run unit tests on our DAG’s. These tests verify the preliminary correctness of the DAG code and allow us to get faster feedback.

Push: Pack, Version and Store the artifacts in S3.

Deploy to Staging: Prepare the staging environment and deploy our previously created artifacts to this environment.

Test: Test the new artifacts on our staging environment. This actually involves executing the DAGs on a predefined dataset and verifying the ETL results.

Deploy to Production: Deploy our versioned and tested artifact into our production environment by copying the verified DAGs to our mounted EFS.

The lesson: Git DAG synchronisation isn’t trivial. You have to build your own process.


How to handle task processing at scale

Once you are able to deploy to production with trust, it’s easier to get to the next stage — scaling with trust and improving your product scaling. The official Airflow helm chart uses Celery Executor for scheduling the tasks by default. The Celery mechanism requires a group of worker nodes (implemented as pods in a statefulset on Kubernetes). These workers might consume plenty of resources, and that might lead to starvation of the entire Kubernetes cluster. That’s one of the reasons why we choose to create a dedicated instance group in Kubernetes just for the workers in order to prevent an overloaded resource consumption.

Here is how we configure it in Airflow helm chart:

https://gist.github.com/naturalett/6c085bb3b7e4bd31af8c3a762287f074

Celery Executor doesn’t allow to automate the scale-out of worker pods when more workers are needed. This means we need to make sure that we have enough worker threads to be able to run all DAGs parallelly.

Airflow has a lot of concurrency parameters:

https://gist.github.com/naturalett/776301419aaad2913a683691f994f937

While working with Celery your most downstream bottleneck becomes the number_of_workers X celery.worker_concurrency

This requires to always have a predefined number of worker nodes running even when no DAGs is scheduled. This is of course not very cost-effective.

In order to allow automated scale-out and scale-in, we are looking at using Kubernetes Executor. The main issue that Kubernetes Executor solves is the dynamic resource allocation whereas Celery Executor requires static workers. The main advantage of the Kubernetes Executor is the automatic expansion and shrinkage of nodes according to the workload. Additionally, while creating a new pod, you can specify precisely the resources needed, coupled with the dependencies.

The lesson: When using Celery Executor it assigns dedicated nodes to Airflow worker components and will only take you a few steps ahead. However, if you want true autoscaling, look into Kubernetes Executor.


How to handle DAG storage at scale

After taking care of performance, let’s talk about storage

All Airflow components require access to the same set of DAG files. When running on Kubernetes, each component is executed in a separate pod. Therefore, a shared storage solution is needed. When running a Kubernetes cluster on AWS (which is what we do), one has two main options for persistent shared storage:

  1. EFS (Elastic File System)
  2. EBS (Elastic Block Storage)

EBS volume can only be mounted on one VM and thus limits the cluster scale we discussed in the previous paragraph. That’s why we chose EFS. We use the stable chart for Kubernetes EFS Provisioner.

Here is how we define it in Airflow helm chart:

https://gist.github.com/naturalett/73512c7ee8b5ba84df961e86cfd7c3cc

The lesson: Shared storage is needed to allow Airflow to scale. EFS Provisioner is the way to enable storage sharing on AWS based Kubernetes


To summarise

The three important Airflow lessons in this post are as follows:

  • Make the DAG deployments CD style
  • Consider using Kubernetes powers over the built-in Celery for processing scale-out
  • Use EFS over EBS for handling storage at scale

What we do next…

However, there are more lessons to discuss. In the next post I will share more lessons learned, such as:

  1. Generated DAGs mechanism and why you should generate DAGs automatically
  2. CI/CD for DAGs in detail — unit tests, integration tests and deployment
Facebook
Twitter
LinkedIn