Airflow Kubernetes Executors on Google Kubernetes Engine

Introduction

In this post, I’ll document the use of Kubernetes Executor on a relative large Airflow cluster (Over 200 data pipelines and growing). Kubernetes Executor in my setup solved/improved many Airflow operational drawbacks such as:

Airflow Executors

An executor is the abstraction of a task runner/worker, it executes the tasks defined in Airflow DAG. There are several different executors supported out of the box including LocalExecutor, SequentialExecutor, CeleryExecutor, and KubernetesExecutor.

On a single node Airflow production deployment, LocalExecutor by default executes tasks in a separate OS process. This setup scales relatively well as there is a very high limit in terms of several processes that can have in an OS. (cat /proc/sys/kernel/pid_max for your curiosity)

However, most ETL DAGs run on a daily interval, the rest of the time those executors sit idle wasting cloud resources (over-subscription). Reducing the instance size will reduce the over-subscription but tasks will be queued up and having higher latency from queueing to execution.

What we want here is to have as many resources provisioned to execute tasks as they are queued, but release those compute resources when tasks are in the completion. KubernetesExecutor (in the combination of preemptible instances) does exactly that! Each task instead executes on the same node as the scheduler, it sends a resource request to Kubernetes Master. Master allocates the required resource to execute the single task in a separate Pod, and delete the Pod once the task is completed.

To release the compute resources after execution, simply apply the node affinity (details later) on the executor, so they are scheduled on node pools with cluster node pool auto scaler. Once there is more demand in the resources (more tasks scheduled by Airflow), the cluster size will increase; After tasks are in completion, it reduces to the desired number (I’ve configured to 0 as a minimum)

Preemptible VM Node Pool

Preemptible instances last maximum of 24 hours and are subject to shut down due to unavailability but offers around 4 times of cost reduction. Since our workload typically finishes within a minute (per task), we will leverage this for even greater savings.

Executor Setup on GKE

This setup should work on any Kubernetes Setup with proper RBAC configured. airflow.cfg

[core]
executor = KubernetesExecutor
[kubernetes]
worker_container_repository = <your_image>
worker_container_tag = <your_image_tag>
worker_container_image_pull_policy = Always
namespace = airflow  # or your namespace
dags_in_image = True
affinity = {"nodeAffinity": {"requiredDuringSchedulingIgnoredDuringExecution": {"nodeSelectorTerms": [{"matchExpressions": [{"key": "cloud.google.com/gke-preemptible","operator": "Exists"}]}]}}}
tolerations = [{"key": "instanceType", "value": "preemptible", "operator": "Equal", "effect": "NoSchedule"}]

[kubernetes_labels]
app = airflow-worker

Besides the above, there are 2 cluster related settings which are super important.

First, by default, the namespaced default service account has almost no permissions to do anything, the almost same level of permission as an anonymous user. You either create the service account and let Airflow use the new service account for managing Pod or grant the default service accounts permissions to list/create/delete Pod, Secret, Config, etc..

Second, the pod created by KubernetesExecutor by default has no resource request and limit applied. This means Kubernetes will schedule as many containers as possible on a single Node (subject to maximum Node container number limit, default to 110). This will not work, most Pod will crash due to OOM (out of memory). To fix this, either wait for a feature or apply a namespace RangeLimit which sets default resource request and limit. Example below:

apiVersion: v1
kind: LimitRange
metadata:
  name: default-cpu-memory-limit-range
  namespace: airflow
spec:
  limits:
  - defaultRequest:
      cpu: 150m
      memory: 512Mi
    default:
      cpu: 750m
    type: Container

Other considerations

  1. Preemptible instances are subject to unavailability, but if you use smaller sized instances (n1-standard-1), less of unavailability should happen. But feel free not to use the preemptible node pool, the cost won’t increase much compare to running all capacity at all times.
  2. When specifying the node placement, make sure they don’t get placed onto the node pool with the important workload. This usually is not a problem as we rely on Kubernetes to have good resources isolation, but still, some resources are shared such as Network and Disk. Better to have a separate node pool to leave minimum impacts on other normal workloads.