首页 文章

动态Azure数据工厂v2管道

提问于
浏览
2

所以我们有一个拥有约400个数据集和~200个管道的工厂,而且它变得笨重 . 专注于从sql源复制到blob sink . 由于我们正在复制到blob,因此架构没有任何影响 . 我想为每个源提供一个数据集,为每个blob帐户提供一个数据集,为每个源/ blob帐户组合提供一个管道,从查找中动态提供配置 .

我们已经成功开发了一个管道,它使用虚拟数据集作为源和接收器 . 如果您提供查询,容器名称和文件夹名称,它可以工作 .

{
    "name": "pipeline1",
    "properties": {
        "activities": [
            {
                "name": "DynamicCopy",
                "type": "Copy",
                "policy": {
                    "timeout": "7.00:00:00",
                    "retry": 0,
                    "retryIntervalInSeconds": 30,
                    "secureOutput": false,
                    "secureInput": false
                },
                "typeProperties": {
                    "source": {
                        "type": "SqlSource",
                        "sqlReaderQuery": "select 1 a"
                    },
                    "sink": {
                        "type": "BlobSink"
                    },
                    "enableStaging": false,
                    "dataIntegrationUnits": 0
                },
                "inputs": [
                    {
                        "referenceName": "AzureSql",
                        "type": "DatasetReference"
                    }
                ],
                "outputs": [
                    {
                        "referenceName": "AzureBlob",
                        "type": "DatasetReference",
                        "parameters": {
                            "container": "raw-test",
                            "folder": "test"
                        }
                    }
                ]
            }
        ]
    }
}

当我们在它之前放置一个查找并将其包装在foreach中时,它就会停止工作 . 没有那么有帮助

“errorCode”:“400”,
“消息”:“活动因内部活动失败而失败”,
“failureType”:“UserError”,
“目标”:“ForEach”

查询存储过程 [dbo].[adfdynamic] 实际上尚未在foreach中引用:

create proc adfdynamic as
select 'raw-test' container, 'test_a' folder, 'select 1 a, 2 b' 
UNION ALL    
select 'raw-test' container, 'test_b' folder, 'select 3 c, 2 d'

所以我想要的行为是:

  • 一次blob in raw-test @ .. myblob ... / test_a / out.dsv内容 {'a,b','1,2'}

  • 一次blob in raw-test @ .. myblob ... / test_b / out.dsv内容 {'c,d','3,2'}

sql数据集:

{
    "name": "AzureSql",
    "properties": {
        "linkedServiceName": {
            "referenceName": "Dest",
            "type": "LinkedServiceReference"
        },
        "type": "AzureSqlTable",
        "structure": [
            {
                "name": "CustomerKey",
                "type": "Int32"
            },
            {
                "name": "Name",
                "type": "String"
            }
        ],
        "typeProperties": {
            "tableName": "[dbo].[DimCustomer]"
        }
    }
}

blob数据集:

{
    "name": "AzureBlob",
    "properties": {
        "linkedServiceName": {
            "referenceName": "AzureStorage1",
            "type": "LinkedServiceReference"
        },
        "parameters": {
            "container": {
                "type": "String"
            },
            "folder": {
                "type": "String"
            }
        },
        "type": "AzureBlob",
        "typeProperties": {
            "format": {
                "type": "TextFormat",
                "columnDelimiter": ",",
                "treatEmptyAsNull": false,
                "skipLineCount": 0,
                "firstRowAsHeader": false
            },
            "fileName": {
                "value": "@{dataset().folder}/out.dsv",
                "type": "Expression"
            },
            "folderPath": {
                "value": "@dataset().container",
                "type": "Expression"
            }
        }
    },
    "type": "Microsoft.DataFactory/factories/datasets"
}

和非工作动态管道:

{
    "name": "Copy",
    "properties": {
        "activities": [
            {
                "name": "ForEach",
                "type": "ForEach",
                "dependsOn": [
                    {
                        "activity": "Lookup",
                        "dependencyConditions": [
                            "Succeeded"
                        ]
                    }
                ],
                "typeProperties": {
                    "items": {
                        "value": "@activity('Lookup').output.value",
                        "type": "Expression"
                    },
                    "activities": [
                        {
                            "name": "Copy",
                            "type": "Copy",
                            "policy": {
                                "timeout": "7.00:00:00",
                                "retry": 0,
                                "retryIntervalInSeconds": 30,
                                "secureOutput": false,
                                "secureInput": false
                            },
                            "typeProperties": {
                                "source": {
                                    "type": "SqlSource",
                                    "sqlReaderQuery": {
                                        "value": "select 1 a, 2 b from dest",
                                        "type": "Expression"
                                    }
                                },
                                "sink": {
                                    "type": "BlobSink"
                                },
                                "enableStaging": false,
                                "dataIntegrationUnits": 0
                            },
                            "inputs": [
                                {
                                    "referenceName": "AzureSql",
                                    "type": "DatasetReference"
                                }
                            ],
                            "outputs": [
                                {
                                    "referenceName": "AzureBlob",
                                    "type": "DatasetReference",
                                    "parameters": {
                                        "container": {
                                            "value": "raw-test",
                                            "type": "Expression"
                                        },
                                        "folder": {
                                            "value": "folder",
                                            "type": "Expression"
                                        }
                                    }
                                }
                            ]
                        }
                    ]
                }
            },
            {
                "name": "Lookup",
                "type": "Lookup",
                "policy": {
                    "timeout": "7.00:00:00",
                    "retry": 0,
                    "retryIntervalInSeconds": 30,
                    "secureOutput": false,
                    "secureInput": false
                }
            }
        ]
    }
}

抱歉格式化 . 一条消息中的代码太多了?

2 回答

  • 0
    • 在查找活动中,请检查您的firstRowOnly属性 . 它是假的还是真的?默认情况下,这是真的 .

    • 在UI中,您可以设置断点来调试查找活动 . 然后你可以看到输出是否是你想要的 .

    enter image description here

  • 0

    不完全是你的问题的答案,但我为使生活更简单所做的事情是创建一个名为GenericBlob的数据集 . 这有2个参数容器和路径 . 这可能有助于简化您正在做的事情 . 我过去常常有20个blob数据集,现在我有一个...(这是假设blob在同一个存储帐户中) .

相关问题