首页 文章

为气流中的日志设置s3

提问于
浏览
28

我正在使用docker-compose来设置可扩展的气流群集 . 我的解决方案是基于这个Dockerfile https://hub.docker.com/r/puckel/docker-airflow/

我的问题是将日志设置为从s3写入/读取 . 当一个dag完成后,我得到这样的错误

*** Log file isn't local.
*** Fetching here: http://ea43d4d49f35:8793/log/xxxxxxx/2017-06-26T11:00:00
*** Failed to fetch log file from worker.

*** Reading remote logs...
Could not read logs from s3://buckets/xxxxxxx/airflow/logs/xxxxxxx/2017-06-
26T11:00:00

我在 airflow.cfg 文件中设置了一个新的部分

[MyS3Conn]
aws_access_key_id = xxxxxxx
aws_secret_access_key = xxxxxxx
aws_default_region = xxxxxxx

然后在 airflow.cfg 中的远程日志部分中指定s3路径

remote_base_log_folder = s3://buckets/xxxx/airflow/logs
remote_log_conn_id = MyS3Conn

我是否正确设置了这个并且有错误?这里有成功的秘诀吗?

  • 更新

我尝试以URI和JSON格式导出,似乎都不起作用 . 然后我导出了aws_access_key_id和aws_secret_access_key,然后气流开始捡起它 . 现在我在工作日志中得到了他的错误

6/30/2017 6:05:59 PMINFO:root:Using connection to: s3
6/30/2017 6:06:00 PMERROR:root:Could not read logs from s3://buckets/xxxxxx/airflow/logs/xxxxx/2017-06-30T23:45:00
6/30/2017 6:06:00 PMERROR:root:Could not write logs to s3://buckets/xxxxxx/airflow/logs/xxxxx/2017-06-30T23:45:00
6/30/2017 6:06:00 PMLogging into: /usr/local/airflow/logs/xxxxx/2017-06-30T23:45:00
  • 更新

我发现了这个链接https://www.mail-archive.com/dev@airflow.incubator.apache.org/msg00462.html

然后我进入我的一个工作机器(与Web服务器和调度程序分开)并在python中运行这段代码

import airflow
s3 = airflow.hooks.S3Hook('s3_conn')
s3.load_string('test', airflow.conf.get('core', 'remote_base_log_folder'))

我收到此错误 .

boto.exception.S3ResponseError: S3ResponseError: 403 Forbidden

我尝试导出几种不同类型的 AIRFLOW_CONN_ envs,如连接部分https://airflow.incubator.apache.org/concepts.html中所述,以及此问题的其他答案 .

s3://<AWS_ACCESS_KEY_ID>:<AWS_SECRET_ACCESS_KEY>@S3

{"aws_account_id":"<xxxxx>","role_arn":"arn:aws:iam::<xxxx>:role/<xxxxx>"}

{"aws_access_key_id":"<xxxxx>","aws_secret_access_key":"<xxxxx>"}

我还导出了AWS_ACCESS_KEY_ID和AWS_SECRET_ACCESS_KEY但没有成功 .

这些凭据存储在数据库中,因此一旦我在UI中添加它们,它们就应该被工作人员选中,但由于某些原因它们无法写入/读取日志 .

5 回答

  • 14

    您需要通过气流UI设置s3连接 . 为此,您需要转到气流UI上的Admin - > Connections选项卡,并为S3连接创建一个新行 .

    一个示例配置是:

    Conn Id:my_conn_S3 Conn类型:S3 Extra:{“aws_access_key_id”:“your_aws_key_id”,“aws_secret_access_key”:“your_aws_secret_key”}

  • 1

    UPDATE Airflow 1.10 makes logging a lot easier.

    对于s3日志记录,按照the above answer设置连接挂钩

    然后只需将以下内容添加到airflow.cfg中

    [core]
        # Airflow can store logs remotely in AWS S3. Users must supply a remote
        # location URL (starting with either 's3://...') and an Airflow connection
        # id that provides access to the storage location.
        remote_base_log_folder = s3://my-bucket/path/to/logs
        remote_log_conn_id = MyS3Conn
        # Use server-side encryption for logs stored in S3
        encrypt_s3_logs = False
    

    对于gcs日志记录,

    • 首先安装gcp_api软件包,如下所示:pip install apache-airflow [gcp_api] .

    • 根据the above answer设置连接挂钩

    • 将以下内容添加到airflow.cfg

    [core]
    # Airflow can store logs remotely in AWS S3. Users must supply a remote
    # location URL (starting with either 's3://...') and an Airflow connection
    # id that provides access to the storage location.
    remote_logging = True
    remote_base_log_folder = gs://my-bucket/path/to/logs
    remote_log_conn_id = MyGCSConn
    

    NOTE: As of Airflow 1.9 remote logging has been significantly altered. If you are using 1.9, read on.

    参考here

    完整说明:

    • 创建一个存储配置的目录并放置它,以便在PYTHONPATH中找到它 . 一个例子是$ AIRFLOW_HOME / config

    • 创建名为$ AIRFLOW_HOME / config / log_config.py和$ AIRFLOW_HOME / config / __ init__.py的空文件

    • airflow/config_templates/airflow_local_settings.py的内容复制到刚刚在上面的步骤中创建的log_config.py文件中 .

    • 自定义模板的以下部分:

    #Add this variable to the top of the file. Note the trailing slash.
    S3_LOG_FOLDER = 's3://<bucket where logs should be persisted>/'
    
    Rename DEFAULT_LOGGING_CONFIG to LOGGING CONFIG
    LOGGING_CONFIG = ...
    
    Add a S3TaskHandler to the 'handlers' block of the LOGGING_CONFIG variable
    's3.task': {
        'class': 'airflow.utils.log.s3_task_handler.S3TaskHandler',
        'formatter': 'airflow.task',
        'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
        's3_log_folder': S3_LOG_FOLDER,
        'filename_template': FILENAME_TEMPLATE,
    },
    
     Update the airflow.task and airflow.task_runner blocks to be 's3.task' instead >of 'file.task'.
    'loggers': {
        'airflow.task': {
            'handlers': ['s3.task'],
            ...
        },
        'airflow.task_runner': {
            'handlers': ['s3.task'],
            ...
        },
        'airflow': {
            'handlers': ['console'],
            ...
        },
    }
    
    • 确保已在Airflow中定义了s3连接挂钩,如the above answer所示 . 钩子应具有对S3_LOG_FOLDER中上面定义的s3桶的读写访问权限 .

    • 更新$ AIRFLOW_HOME / airflow.cfg以包含:

    task_log_reader = s3.task
    logging_config_class = log_config.LOGGING_CONFIG
    remote_log_conn_id = <name of the s3 platform hook>
    
    • 重新启动Airflow网络服务器和调度程序,并触发(或等待)新任务执行 .

    • 验证日志是否显示在您定义的存储桶中新执行的任务中 .

    • 验证s3存储查看器是否在UI中正常工作 . 拉出新执行的任务,并验证您是否看到类似的内容:

    *** Reading remote log from gs://<bucket where logs should be persisted>/example_bash_operator/run_this_last/2017-10-03T00:00:00/16.log.
    [2017-10-03 21:57:50,056] {cli.py:377} INFO - Running on host chrisr-00532
    [2017-10-03 21:57:50,093] {base_task_runner.py:115} INFO - Running: ['bash', '-c', u'airflow run example_bash_operator run_this_last 2017-10-03T00:00:00 --job_id 47 --raw -sd DAGS_FOLDER/example_dags/example_bash_operator.py']
    [2017-10-03 21:57:51,264] {base_task_runner.py:98} INFO - Subtask: [2017-10-03 21:57:51,263] {__init__.py:45} INFO - Using executor SequentialExecutor
    [2017-10-03 21:57:51,306] {base_task_runner.py:98} INFO - Subtask: [2017-10-03 21:57:51,306] {models.py:186} INFO - Filling up the DagBag from /airflow/dags/example_dags/example_bash_operator.py
    
  • 6

    如果您不想使用管理界面,这是一个解决方案 .

    我的部署过程是Dockerized,我从不接触管理UI . 我还喜欢在bash脚本中设置特定于Airflow的环境变量,该脚本会覆盖.cfg文件 .

    airflow[s3]

    首先,您需要安装 s3 子包来将Airflow日志写入S3 . ( boto3 适用于DAG中的Python作业,但 S3Hook 取决于s3子包 . )

    还有一个注意事项:conda install doesn't handle this yet,所以我必须做 pip install airflow[s3] .

    Environment variables

    在bash脚本中,我设置了这些 core 变量 . 从these instructions开始,但对环境变量使用命名约定 AIRFLOW__{SECTION}__{KEY} ,我这样做:

    export AIRFLOW__CORE__REMOTE_BASE_LOG_FOLDER=s3://bucket/key
    export AIRFLOW__CORE__REMOTE_LOG_CONN_ID=s3_uri
    export AIRFLOW__CORE__ENCRYPT_S3_LOGS=False
    

    S3 connection ID

    s3_uri 是我编写的连接ID . 在Airflow中,它对应于另一个环境变量 AIRFLOW_CONN_S3_URI . 它的值是您的S3路径,必须采用URI形式 . 那是

    s3://access_key:secret_key@bucket/key
    

    存储此,但您处理其他敏感的环境变量 .

    使用此配置,Airflow会将您的日志写入S3 . 他们将遵循 s3://bucket/key/dag/task_id 的道路 .

  • 26

    要使用最近的Airflow更新完成Arne的答案,您不需要将 task_log_reader 设置为另一个值而不是默认值: task

    好像你按照默认的日志记录模板airflow/config_templates/airflow_local_settings.py,您可以看到since this commit(注意处理程序的名称更改为 's3': {'task'... 而不是 s3.task ),远程文件夹( REMOTE_BASE_LOG_FOLDER )上的值将使用正确的文件夹替换处理程序:

    REMOTE_LOGGING = conf.get('core', 'remote_logging')
    
    if REMOTE_LOGGING and REMOTE_BASE_LOG_FOLDER.startswith('s3://'):
            DEFAULT_LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['s3'])
    elif REMOTE_LOGGING and REMOTE_BASE_LOG_FOLDER.startswith('gs://'):
            DEFAULT_LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['gcs'])
    elif REMOTE_LOGGING and REMOTE_BASE_LOG_FOLDER.startswith('wasb'):
            DEFAULT_LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['wasb'])
    elif REMOTE_LOGGING and ELASTICSEARCH_HOST:
            DEFAULT_LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['elasticsearch'])
    

    有关如何从S3登录/读取的更多详细信息:https://github.com/apache/incubator-airflow/blob/master/docs/howto/write-logs.rst#writing-logs-to-amazon-s3

  • 1

    对于遵循the above answer中非常有用的说明的任何人只需注意:如果您偶然发现此问题:"ModuleNotFoundError: No module named 'airflow.utils.log.logging_mixin.RedirectStdHandler'" as referenced here(使用airflow 1.9时会发生这种情况),修复很简单 - 请使用此基本模板:https://github.com/apache/incubator-airflow/blob/v1-9-stable/airflow/config_templates/airflow_local_settings.py(并按照所有其他说明进行操作)在the above answer

    master分支中存在的当前模板incubator-airflow/airflow/config_templates/airflow_local_settings.py包含对类"airflow.utils.log.s3_task_handler.S3TaskHandler"的引用,该类在apache-airflow == 1.9.0 python包中不存在 . 希望这可以帮助!

相关问题