首页 文章

气流 Worker 配置

提问于
浏览
3

我是Airflow的新手 . 我正在尝试使用Celery Executor设置分布式气流模式参考文章https://stlong0521.github.io/20161023%20-%20Airflow.html

在详细了解规范之前,我想确认 I've installed PostgreSQL on a seperate instance .

设置规范详述如下:

Airflow core/server computer

  • Python 3.5

  • 气流(AIRFLOW_HOME =〜/ airflow)

  • 芹菜

  • psycogp2

  • RabbitMQ

Configurations made in airflow.cfg

sql_alchemy_conn = postgresql+psycopg2://username:password@192.168.2.12:5432/airflow
executor = CeleryExecutor
broker_url = amqp://username:password@192.168.1.12:5672//
celery_result_backend = db+postgresql://username:password@192.168.2.12:5432/airflow

进行的测试:

RabbitMQ is running
Can connect to PostgreSQL and have confirmed that Airflow has created tables
Can start and view the webserver (including custom dags)

Airflow worker computer

安装了以下内容:

  • Python 3.5 with

  • 气流(AIRFLOW_HOME =〜/ airflow)

  • 芹菜

  • psycogp2

airflow.cfg中的配置与服务器中的配置完全相同:

sql_alchemy_conn = postgresql+psycopg2://username:password@192.168.2.12:5432/airflow
executor = CeleryExecutor
broker_url = amqp://username:password@192.168.1.12:5672//
celery_result_backend = db+postgresql://username:password@192.168.2.12:5432/airflow

命令输出在工作机器上运行:

运行气流花时:

[2018-02-19 14:58:14,276] {__init__.py:57} INFO - Using executor CeleryExecutor
[2018-02-19 14:58:14,360] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python3.5/lib2to3/Grammar.txt
[2018-02-19 14:58:14,384] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python3.5/lib2to3/PatternGrammar.txt
[I 180219 14:58:15 command:139] Visit me at http://0.0.0.0:5555
[I 180219 14:58:15 command:144] Broker: amqp://username:password@192.168.1.12:5672//
[I 180219 14:58:15 command:147] Registered tasks: 
    ['celery.accumulate',
     'celery.backend_cleanup',
     'celery.chain',
     'celery.chord',
     'celery.chord_unlock',
     'celery.chunks',
     'celery.group',
     'celery.map',
     'celery.starmap']
[I 180219 14:58:15 mixins:224] Connected to amqp://username:password@192.168.1.12:5672//

我正在传递 Airflow Core machine 中的dag,并且我已经将dag将处理的样本数据(Excel工作表)复制到同一个核心机器上 .

我的 Worker 日志 raise CalledProcessError(retcode, cmd) subprocess.CalledProcessError: Command 'airflow run dag_name_x task_name_xx 2018-02-19T10:15:41.657243 --local -sd /home/Distributedici/airflow/dags/sample_data_xx.py' returned non-zero exit status 1

Now my query is

1)我是否应该将dag文件夹复制到工作计算机

2)现在,我没有复制工作计算机上的dag文件夹,我无法看到工作进程接收任务 .

请指出我在哪里犯了错误,以及如何让 Worker 流程接受这个过程 .

2 回答

  • 1

    您的配置文件看起来没问题 . 如您所料,所有工作人员确实需要DAG文件夹的副本 . 您可以使用类似 git 之类的东西来保持它们同步和最新 .

  • 1

    Airflow的一些最大难点在于部署,并使DAG文件和插件在Airflow调度程序,Airflow网络服务器和Celery工作节点之间保持同步 .

    我们创建了一个名为Astronomer Open的开源项目,它可以自动化Dockerized Airflow,Celery和PostgreSQL以及其他一些好东西 . 该项目的动机是看到很多人遇到了同样的痛点,创建了一个非常相似的设置 .

    例如,这是Airflow Dockerfile:https://github.com/astronomerio/astronomer/blob/master/docker/platform/airflow/Dockerfile

    和文档:https://open.astronomer.io/

    完全披露:这是我在工作中贡献的项目 - 我们提供付费的enterprise edition以及在Kubernetes (docs)上运行 . 也就是说,Open Edition完全免费使用 .

相关问题