airflow docs建议对DAG文件进行基本的健全性检查来解释它 . 即:
$ python ~/path/to/my/dag.py
我在 $AIRFLOW_HOME/plugins
下创建了一个插件 MordorOperator
:
from airflow.plugins_manager import AirflowPlugin
from airflow.utils.decorators import apply_defaults
from airflow.operators import BaseOperator
from airflow.exceptions import AirflowException
import pika
import json
class MordorOperator(BaseOperator):
JOB_QUEUE_MAPPING = {"testing": "testing"}
@apply_defaults
def __init__(self, job, *args, **kwargs):
super().__init__(*args, **kwargs)
# stuff
def execute(self, context):
# stuff
class MordorPlugin(AirflowPlugin):
name = "MordorPlugin"
operators = [MordorOperator]
我可以导入插件并在示例DAG中看到它的工作原理:
from airflow import DAG
from airflow.operators import MordorOperator
from datetime import datetime
dag = DAG('mordor_dag', description='DAG with a single task', start_date=datetime.today(), catchup=False)
hello_operator = MordorOperator(job="testing", task_id='run_single_task', dag=dag)
但是,当我尝试解释这个文件时,我得到的失败是我怀疑自插件成功运行后我不应该得到的 . 我怀疑这是因为在运行时发生了一些动态代码生成,当DAG被自己解释时是不可用的 . 我还发现PyCharm在导入插件时无法执行任何自动完成 .
(venv) 3:54PM /Users/paymahn/solvvy/scheduler mordor.operator ✱
❮❮❮ python dags/mordor_test.py
section/key [core/airflow-home] not found in config
Traceback (most recent call last):
File "dags/mordor_test.py", line 2, in
from airflow.operators import MordorOperator
ImportError: cannot import name 'MordorOperator'
使用插件的DAG如何进行健全性测试?是否有可能让PyCharm为自定义运算符提供自动完成功能?
1 回答
我在docker容器中运行气流并且有一个作为容器入口点运行的脚本 . 事实证明
plugins
文件夹没有't available to my container when I was running my tests. I had to add a symlink in the container as part of the setup script. The solution to my problem is highly specific to me and if someone else stumbles upon this I don' t对你的情况有一个很好的答案,除了:确保你的插件文件夹正确可用 .