同一DAG使用CeleryExecutor和KubernetesExecutor #
Airflow版本:2.2.5
设置环境变量 AIRFLOW__CORE__EXECUTOR: “CeleryKubernetesExecutor”
使用 KubernetesExecutor 需要设置任务 queue=‘kubernetes’
示例DAG
from airflow import DAG
from airflow.decorators import task
from airflow.utils.dates import days_ago
from airflow.settings import AIRFLOW_HOME
import os
import time
with DAG(
dag_id="example_kubernetes_executor",
schedule_interval="33 * * * *",
start_date=days_ago(2),
catchup=False,
tags=["example"],
) as dag:
executor_config = {
"pod_template_file": os.path.join(
AIRFLOW_HOME, "base.yaml"
),
#"pod_override": k8s.V1Pod(
# metadata=k8s.V1ObjectMeta(labels={"release": "stable"})
#),
}
for i in range(2):
@task(task_id=f'kubernetes_task_{i}', executor_config=executor_config, queue='kubernetes')
def test11():
print('------------- start ----------------')
for i in range(10):
print(f'{i}' * 10)
time.sleep(3)
print('-------------- end ------------------')
test_task = test11()
for i in range(2):
@task(task_id=f'celery_task_{i}')
def test222():
print('------------- start ----------------')
for i in range(10):
print(f'{i}' * 10)
time.sleep(3)
print('-------------- end ------------------')
test_task2 = test222()
Pod_template 文件示例
- base.yaml
---
apiVersion: v1
kind: Pod
metadata:
name: testname
namespace: cmbchina
spec:
containers:
- name: base
image: 'aiflow:xxxxx'
imagePullPolicy: IfNotPresent
env:
- name: test111
value: ttttttt
envFrom:
- configMapRef:
name: airflow-conf
volumeMounts:
- name: airflow-home
mountPath: /root/airflow
restartPolicy: Never
volumes:
- name: airflow-home
hostPath:
path: /xxxxx/airflow
type: DirectoryOrCreate