首页 文章

将Google Cloud 存储桶配置为Airflow Log文件夹

提问于
浏览
0

我们刚刚开始在我们的数据管道项目中使用Apache airflow . 在探索这些功能后,我们开始了解如何将远程文件夹配置为气流中的日志目标 . 为此我们

创建了一个谷歌 Cloud 桶 . 从Airflow UI创建了一个新的GS连接

enter image description here

我无法理解所有字段 . 我刚从谷歌控制台在我的项目下创建了一个示例GS Bucket,并将该项目ID提供给此Connection.Left密钥文件路径和范围为空白 . 然后编辑airflow.cfg文件如下

remote_base_log_folder = gs://my_test_bucket/
remote_log_conn_id = test_gs

在此更改之后重新启动了Web服务器和调度程序 . 但是我的Dags仍然没有将日志写入GS存储桶 . 我能够看到在base_log_folder中创建日志的日志 . 但是我的存储桶中没有创建任何内容 . 我的身边是否需要额外的配置才能让它正常工作

注意:使用Airflow 1.8 . (我也面临与AmazonS3相同的问题 . )

2017年9月20日更新

尝试附加屏幕截图的GS方法

enter image description here
我仍然没有收到日志

谢谢Anoop R.

3 回答

  • 1

    未将日志写入存储桶的原因可能与服务帐户有关,而不是与气流本身配置有关 . 确保它可以访问上述存储桶 . 我过去也遇到过同样的问题 .

    为服务帐户添加更多慷慨的权限,例如甚至是项目范围内的编辑器,然后缩小范围 . 您也可以尝试使用该密钥的gs客户端,看看是否可以写入存储桶 .

    对我个人而言,此范围适用于编写日志:“https://www.googleapis.com/auth/cloud-platform

  • 0

    我建议您使用DAG将气流连接到GCP而不是UI .

    首先,在GCP上创建一个服务帐户并下载json密钥 .

    然后执行此DAG(您可以修改访问范围):

    from airflow import DAG
    from datetime import datetime
    from airflow.operators.python_operator import PythonOperator
    
    def add_gcp_connection(ds, **kwargs):
    
          """Add a airflow connection for GCP"""
    
         new_conn = Connection(
               conn_id='gcp_connection_id',
               conn_type='google_cloud_platform',
         )
         scopes = [
              "https://www.googleapis.com/auth/pubsub",
              "https://www.googleapis.com/auth/datastore",
              "https://www.googleapis.com/auth/bigquery",
              "https://www.googleapis.com/auth/devstorage.read_write",
              "https://www.googleapis.com/auth/logging.write",
              "https://www.googleapis.com/auth/cloud-platform",
         ]
         conn_extra = {
              "extra__google_cloud_platform__scope": ",".join(scopes),
              "extra__google_cloud_platform__project": "<name_of_your_project>",
              "extra__google_cloud_platform__key_path": '<path_to_your_json_key>'
    }
         conn_extra_json = json.dumps(conn_extra)
         new_conn.set_extra(conn_extra_json)
         session = settings.Session()
         if not (session.query(Connection).filter(Connection.conn_id == 
          new_conn.conn_id).first()):
             session.add(new_conn)
             session.commit()
        else:
             msg = '\n\tA connection with `conn_id`={conn_id} already exists\n'
             msg = msg.format(conn_id=new_conn.conn_id)
             print(msg)
    
     dag = DAG('add_gcp_connection', start_date=datetime(2016,1,1), schedule_interval='@once')
    
    # Task to add a connection
    AddGCPCreds = PythonOperator(
           dag=dag,
           task_id='add_gcp_connection_python',
           python_callable=add_gcp_connection,
           provide_context=True)
    

    感谢Yu Ishikawa的this code .

  • 0

    是的,您需要为S3和GCP连接提供其他信息 .

    S3

    配置通过额外字段作为JSON传递 . 您只能提供 Profiles

    {"profile": "xxx"}
    

    或凭证

    {"profile": "xxx", "aws_access_key_id": "xxx", "aws_secret_access_key": "xxx"}
    

    或配置文件的路径

    {"profile": "xxx", "s3_config_file": "xxx", "s3_config_format": "xxx"}
    

    如果是第一个选项,boto将尝试检测您的凭据 .

    源代码 - airflow/hooks/S3_hook.py:107

    GCP

    您可以提供 key_pathscope (请参阅Service account credentials),也可以按以下顺序从您的环境中提取凭据:

    • 环境变量GOOGLE_APPLICATION_CREDENTIALS指向具有存储凭据信息的文件 .

    • 存储"well known"与 gcloud 命令行工具关联的文件 .

    • Google App Engine(制作和测试)

    • Google Compute Engine 生产环境 环境 .

    源代码 - airflow/contrib/hooks/gcp_api_base_hook.py:68

相关问题