所以我们有一个拥有约400个数据集和~200个管道的工厂,而且它变得笨重 . 专注于从sql源复制到blob sink . 由于我们正在复制到blob,因此架构没有任何影响 . 我想为每个源提供一个数据集,为每个blob帐户提供一个数据集,为每个源/ blob帐户组合提供一个管道,从查找中动态提供配置 .
我们已经成功开发了一个管道,它使用虚拟数据集作为源和接收器 . 如果您提供查询,容器名称和文件夹名称,它可以工作 .
{
"name": "pipeline1",
"properties": {
"activities": [
{
"name": "DynamicCopy",
"type": "Copy",
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"typeProperties": {
"source": {
"type": "SqlSource",
"sqlReaderQuery": "select 1 a"
},
"sink": {
"type": "BlobSink"
},
"enableStaging": false,
"dataIntegrationUnits": 0
},
"inputs": [
{
"referenceName": "AzureSql",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "AzureBlob",
"type": "DatasetReference",
"parameters": {
"container": "raw-test",
"folder": "test"
}
}
]
}
]
}
}
当我们在它之前放置一个查找并将其包装在foreach中时,它就会停止工作 . 没有那么有帮助
“errorCode”:“400”,
“消息”:“活动因内部活动失败而失败”,
“failureType”:“UserError”,
“目标”:“ForEach”
查询存储过程 [dbo].[adfdynamic]
实际上尚未在foreach中引用:
create proc adfdynamic as
select 'raw-test' container, 'test_a' folder, 'select 1 a, 2 b'
UNION ALL
select 'raw-test' container, 'test_b' folder, 'select 3 c, 2 d'
所以我想要的行为是:
-
一次blob in raw-test @ .. myblob ... / test_a / out.dsv内容
{'a,b','1,2'}
-
一次blob in raw-test @ .. myblob ... / test_b / out.dsv内容
{'c,d','3,2'}
sql数据集:
{
"name": "AzureSql",
"properties": {
"linkedServiceName": {
"referenceName": "Dest",
"type": "LinkedServiceReference"
},
"type": "AzureSqlTable",
"structure": [
{
"name": "CustomerKey",
"type": "Int32"
},
{
"name": "Name",
"type": "String"
}
],
"typeProperties": {
"tableName": "[dbo].[DimCustomer]"
}
}
}
blob数据集:
{
"name": "AzureBlob",
"properties": {
"linkedServiceName": {
"referenceName": "AzureStorage1",
"type": "LinkedServiceReference"
},
"parameters": {
"container": {
"type": "String"
},
"folder": {
"type": "String"
}
},
"type": "AzureBlob",
"typeProperties": {
"format": {
"type": "TextFormat",
"columnDelimiter": ",",
"treatEmptyAsNull": false,
"skipLineCount": 0,
"firstRowAsHeader": false
},
"fileName": {
"value": "@{dataset().folder}/out.dsv",
"type": "Expression"
},
"folderPath": {
"value": "@dataset().container",
"type": "Expression"
}
}
},
"type": "Microsoft.DataFactory/factories/datasets"
}
和非工作动态管道:
{
"name": "Copy",
"properties": {
"activities": [
{
"name": "ForEach",
"type": "ForEach",
"dependsOn": [
{
"activity": "Lookup",
"dependencyConditions": [
"Succeeded"
]
}
],
"typeProperties": {
"items": {
"value": "@activity('Lookup').output.value",
"type": "Expression"
},
"activities": [
{
"name": "Copy",
"type": "Copy",
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"typeProperties": {
"source": {
"type": "SqlSource",
"sqlReaderQuery": {
"value": "select 1 a, 2 b from dest",
"type": "Expression"
}
},
"sink": {
"type": "BlobSink"
},
"enableStaging": false,
"dataIntegrationUnits": 0
},
"inputs": [
{
"referenceName": "AzureSql",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "AzureBlob",
"type": "DatasetReference",
"parameters": {
"container": {
"value": "raw-test",
"type": "Expression"
},
"folder": {
"value": "folder",
"type": "Expression"
}
}
}
]
}
]
}
},
{
"name": "Lookup",
"type": "Lookup",
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
}
}
]
}
}
抱歉格式化 . 一条消息中的代码太多了?
2 回答
在查找活动中,请检查您的firstRowOnly属性 . 它是假的还是真的?默认情况下,这是真的 .
在UI中,您可以设置断点来调试查找活动 . 然后你可以看到输出是否是你想要的 .
不完全是你的问题的答案,但我为使生活更简单所做的事情是创建一个名为GenericBlob的数据集 . 这有2个参数容器和路径 . 这可能有助于简化您正在做的事情 . 我过去常常有20个blob数据集,现在我有一个...(这是假设blob在同一个存储帐户中) .