我是Airflow的新手 . 我正在尝试使用Celery Executor设置分布式气流模式参考文章https://stlong0521.github.io/20161023%20-%20Airflow.html
在详细了解规范之前,我想确认 I've installed PostgreSQL on a seperate instance .
设置规范详述如下:
Airflow core/server computer
-
Python 3.5
-
气流(AIRFLOW_HOME =〜/ airflow)
-
芹菜
-
psycogp2
-
RabbitMQ
Configurations made in airflow.cfg :
sql_alchemy_conn = postgresql+psycopg2://username:password@192.168.2.12:5432/airflow
executor = CeleryExecutor
broker_url = amqp://username:password@192.168.1.12:5672//
celery_result_backend = db+postgresql://username:password@192.168.2.12:5432/airflow
进行的测试:
RabbitMQ is running
Can connect to PostgreSQL and have confirmed that Airflow has created tables
Can start and view the webserver (including custom dags)
Airflow worker computer
安装了以下内容:
-
Python 3.5 with
-
气流(AIRFLOW_HOME =〜/ airflow)
-
芹菜
-
psycogp2
airflow.cfg中的配置与服务器中的配置完全相同:
sql_alchemy_conn = postgresql+psycopg2://username:password@192.168.2.12:5432/airflow
executor = CeleryExecutor
broker_url = amqp://username:password@192.168.1.12:5672//
celery_result_backend = db+postgresql://username:password@192.168.2.12:5432/airflow
命令输出在工作机器上运行:
运行气流花时:
[2018-02-19 14:58:14,276] {__init__.py:57} INFO - Using executor CeleryExecutor
[2018-02-19 14:58:14,360] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python3.5/lib2to3/Grammar.txt
[2018-02-19 14:58:14,384] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python3.5/lib2to3/PatternGrammar.txt
[I 180219 14:58:15 command:139] Visit me at http://0.0.0.0:5555
[I 180219 14:58:15 command:144] Broker: amqp://username:password@192.168.1.12:5672//
[I 180219 14:58:15 command:147] Registered tasks:
['celery.accumulate',
'celery.backend_cleanup',
'celery.chain',
'celery.chord',
'celery.chord_unlock',
'celery.chunks',
'celery.group',
'celery.map',
'celery.starmap']
[I 180219 14:58:15 mixins:224] Connected to amqp://username:password@192.168.1.12:5672//
我正在传递 Airflow Core machine 中的dag,并且我已经将dag将处理的样本数据(Excel工作表)复制到同一个核心机器上 .
我的 Worker 日志 raise CalledProcessError(retcode, cmd) subprocess.CalledProcessError: Command 'airflow run dag_name_x task_name_xx 2018-02-19T10:15:41.657243 --local -sd /home/Distributedici/airflow/dags/sample_data_xx.py' returned non-zero exit status 1
Now my query is
1)我是否应该将dag文件夹复制到工作计算机
2)现在,我没有复制工作计算机上的dag文件夹,我无法看到工作进程接收任务 .
请指出我在哪里犯了错误,以及如何让 Worker 流程接受这个过程 .
2 回答
您的配置文件看起来没问题 . 如您所料,所有工作人员确实需要DAG文件夹的副本 . 您可以使用类似
git
之类的东西来保持它们同步和最新 .Airflow的一些最大难点在于部署,并使DAG文件和插件在Airflow调度程序,Airflow网络服务器和Celery工作节点之间保持同步 .
我们创建了一个名为Astronomer Open的开源项目,它可以自动化Dockerized Airflow,Celery和PostgreSQL以及其他一些好东西 . 该项目的动机是看到很多人遇到了同样的痛点,创建了一个非常相似的设置 .
例如,这是Airflow Dockerfile:https://github.com/astronomerio/astronomer/blob/master/docker/platform/airflow/Dockerfile
和文档:https://open.astronomer.io/
完全披露:这是我在工作中贡献的项目 - 我们提供付费的enterprise edition以及在Kubernetes (docs)上运行 . 也就是说,Open Edition完全免费使用 .