我之前是从LocalExecutor换成了CeleryExecutor,用的是官方的demo docker-compose文件,在此基础上进行的修改。
docker-compose文件的代码如下
--- version: '3.8' x-airflow-common: &airflow-common image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.2.0-python3.8} environment: &airflow-common-env AIRFLOW__CORE__EXECUTOR: CeleryExecutor AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0 AIRFLOW__CORE__FERNET_KEY: '' AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true' AIRFLOW__CORE__LOAD_EXAMPLES: 'false' AIRFLOW__CORE__DEFAULT_TIMEZONE: Asia/Shanghai AIRFLOW__WEBSERVER__DEFAULT_UI_TIMEZONE: Asia/Shanghai AIRFLOW__API__AUTH_BACKEND: 'airflow.api.auth.backend.basic_auth' AIRFLOW__EMAIL__EMAIL_BACKEND: 'airflow.utils.email.send_email_smtp' AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'true' AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'true' _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-} volumes: - ./dags:/opt/airflow/dags - ./logs:/opt/airflow/logs - ./plugins:/opt/airflow/plugins # user: "${AIRFLOW_UID:-50000}:${AIRFLOW_GID:-50000}" user: "root" privileged: true depends_on: redis: condition: service_healthy postgres: condition: service_healthy services: postgres: container_name: airflow-postgres image: postgres:13 environment: POSTGRES_USER: airflow POSTGRES_PASSWORD: airflow POSTGRES_DB: airflow TZ: Asia/Shanghai volumes: - postgres-db-volume:/var/lib/postgresql/data healthcheck: test: [ "CMD", "pg_isready", "-U", "airflow" ] interval: 5s retries: 5 restart: always privileged: true redis: container_name: airflow-redis image: redis:latest environment: TZ: Asia/Shanghai ports: - 6379:6379 healthcheck: test: [ "CMD", "redis-cli", "ping" ] interval: 5s timeout: 30s retries: 50 restart: always privileged: true airflow-webserver: <<: *airflow-common container_name: airflow-webserver command: webserver # build: ./airflow-webserver ports: - 8080:8080 healthcheck: test: [ "CMD", "curl", "--fail", "http://localhost:8080/health" ] interval: 10s timeout: 10s retries: 5 restart: always privileged: true airflow-scheduler: <<: *airflow-common container_name: airflow-scheduler command: scheduler build: ./airflow-scheduler healthcheck: test: [ "CMD-SHELL", 'airflow jobs check --job-type SchedulerJob --hostname "$${HOSTNAME}"' ] interval: 10s timeout: 10s retries: 5 restart: always privileged: true airflow-worker: <<: *airflow-common container_name: airflow-worker command: celery worker healthcheck: test: - "CMD-SHELL" - 'celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"' interval: 10s timeout: 10s retries: 5 restart: always privileged: true airflow-init: <<: *airflow-common container_name: airflow-init command: version environment: <<: *airflow-common-env _AIRFLOW_DB_UPGRADE: 'true' _AIRFLOW_WWW_USER_CREATE: 'true' _AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow} _AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow} privileged: true flower: container_name: airflow-flower <<: *airflow-common command: celery flower ports: - 5555:5555 healthcheck: test: [ "CMD", "curl", "--fail", "http://localhost:5555/" ] interval: 10s timeout: 10s retries: 5 restart: always privileged: true volumes: postgres-db-volume:
看得出我是在airflow-scheduler中build的,但是我最近遇到了一下小问题(具体问题另开一篇文章记录),让我思考这个build是否是有问题的。
之前在使用LocalExecutor时,只对scheduler进行build,目的是在scheduler的容器中安装业务需要的python环境,然后airflow在调度、执行task时用到的都是scheduler中的环境,区别只是调度用的是容器中自带python环境,执行task用到的是我自己安装的业务python环境。
到了CeleryExecutor中有些不太一样了,调度用的是scheduler,执行task用到的是worker。可我依旧只是对scheduler进行build,奇怪的是scheduler和worker中都有一份业务环境???但既然work中有业务环境,那也能继续用。
直到最近,我尝试了将build的对象改为worker,且丝毫不影响airflow的运行,我想这才是正确的用法吧,现在的代码如下
--- version: '3.8' x-airflow-common: &airflow-common image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.2.3-python3.8} environment: &airflow-common-env AIRFLOW__CORE__EXECUTOR: CeleryExecutor AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0 AIRFLOW__CORE__FERNET_KEY: '' AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true' AIRFLOW__CORE__LOAD_EXAMPLES: 'false' AIRFLOW__CORE__DEFAULT_TIMEZONE: Asia/Shanghai AIRFLOW__WEBSERVER__DEFAULT_UI_TIMEZONE: Asia/Shanghai AIRFLOW__API__AUTH_BACKEND: 'airflow.api.auth.backend.basic_auth' AIRFLOW__EMAIL__EMAIL_BACKEND: 'airflow.utils.email.send_email_smtp' AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'true' AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'true' _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-} volumes: - ./dags:/opt/airflow/dags - ./logs:/opt/airflow/logs - ./plugins:/opt/airflow/plugins # user: "${AIRFLOW_UID:-50000}:${AIRFLOW_GID:-50000}" user: "root" privileged: true depends_on: redis: condition: service_healthy postgres: condition: service_healthy services: postgres: container_name: airflow-postgres image: postgres:13 environment: POSTGRES_USER: airflow POSTGRES_PASSWORD: airflow POSTGRES_DB: airflow TZ: Asia/Shanghai volumes: - postgres-db-volume:/var/lib/postgresql/data healthcheck: test: [ "CMD", "pg_isready", "-U", "airflow" ] interval: 5s retries: 5 restart: always privileged: true redis: container_name: airflow-redis image: redis:latest environment: TZ: Asia/Shanghai ports: - 6379:6379 healthcheck: test: [ "CMD", "redis-cli", "ping" ] interval: 5s timeout: 30s retries: 50 restart: always privileged: true airflow-webserver: <<: *airflow-common container_name: airflow-webserver command: webserver # build: ./airflow-webserver ports: - 8080:8080 healthcheck: test: [ "CMD", "curl", "--fail", "http://localhost:8080/health" ] interval: 10s timeout: 10s retries: 5 restart: always privileged: true airflow-scheduler: <<: *airflow-common container_name: airflow-scheduler command: scheduler # build: ./airflow-scheduler healthcheck: test: [ "CMD-SHELL", 'airflow jobs check --job-type SchedulerJob --hostname "$${HOSTNAME}"' ] interval: 10s timeout: 10s retries: 5 restart: always privileged: true airflow-worker: <<: *airflow-common image: core.harbor.techfin.ai/library/apache/airflow-worker:2.2.0-python3.8 build: ./airflow-scheduler container_name: airflow-worker command: celery worker healthcheck: test: - "CMD-SHELL" - 'celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"' interval: 10s timeout: 10s retries: 5 restart: always privileged: true airflow-init: <<: *airflow-common container_name: airflow-init command: version environment: <<: *airflow-common-env _AIRFLOW_DB_UPGRADE: 'true' _AIRFLOW_WWW_USER_CREATE: 'true' _AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow} _AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow} privileged: true flower: container_name: airflow-flower <<: *airflow-common command: celery flower ports: - 5555:5555 healthcheck: test: [ "CMD", "curl", "--fail", "http://localhost:5555/" ] interval: 10s timeout: 10s retries: 5 restart: always privileged: true volumes: postgres-db-volume:
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)