我正在尝试使用Azure Data Factory(v2)备份我的Cosmos Db存储 . 一般来说,它正在完成它的工作,但我想让Cosmos集合中的每个文档对应blob存储中的新json文件 .
使用下一个复制参数,我可以将集合中的所有文档复制到azure blob存储中的1个文件中:
{
"name": "ForEach_mih",
"type": "ForEach",
"typeProperties": {
"items": {
"value": "@pipeline().parameters.cw_items",
"type": "Expression"
},
"activities": [
{
"name": "Copy_mih",
"type": "Copy",
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false
},
"userProperties": [
{
"name": "Source",
"value": "@{item().source.collectionName}"
},
{
"name": "Destination",
"value": "cosmos-backup-v2/@{item().destination.fileName}"
}
],
"typeProperties": {
"source": {
"type": "DocumentDbCollectionSource",
"nestingSeparator": "."
},
"sink": {
"type": "BlobSink"
},
"enableStaging": false,
"enableSkipIncompatibleRow": true,
"redirectIncompatibleRowSettings": {
"linkedServiceName": {
"referenceName": "Clear_Test_BlobStorage",
"type": "LinkedServiceReference"
},
"path": "cosmos-backup-logs"
},
"cloudDataMovementUnits": 0
},
"inputs": [
{
"referenceName": "SourceDataset_mih",
"type": "DatasetReference",
"parameters": {
"cw_collectionName": "@item().source.collectionName"
}
}
],
"outputs": [
{
"referenceName": "DestinationDataset_mih",
"type": "DatasetReference",
"parameters": {
"cw_fileName": "@item().destination.fileName"
}
}
]
}
]
}
}
如何将每个cosmos doc复制到单独的文件中,并将其命名为 - ?
UPD
源集代码:
{
"name": "ClustersData",
"properties": {
"linkedServiceName": {
"referenceName": "Clear_Test_CosmosDb",
"type": "LinkedServiceReference"
},
"type": "DocumentDbCollection",
"typeProperties": {
"collectionName": "directory-clusters"
}
}
}
目的地集代码:
{
"name": "OutputClusters",
"properties": {
"linkedServiceName": {
"referenceName": "Clear_Test_BlobStorage",
"type": "LinkedServiceReference"
},
"type": "AzureBlob",
"typeProperties": {
"format": {
"type": "JsonFormat",
"filePattern": "arrayOfObjects"
},
"fileName": "",
"folderPath": "cosmos-backup-logs"
}
}
}
管道代码:
{
"name": "copy-clsts",
"properties": {
"activities": [
{
"name": "LookupClst",
"type": "Lookup",
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false
},
"typeProperties": {
"source": {
"type": "DocumentDbCollectionSource",
"nestingSeparator": "."
},
"dataset": {
"referenceName": "ClustersData",
"type": "DatasetReference"
},
"firstRowOnly": false
}
},
{
"name": "ForEachClst",
"type": "ForEach",
"dependsOn": [
{
"activity": "LookupClst",
"dependencyConditions": [
"Succeeded"
]
}
],
"typeProperties": {
"items": {
"value": "@activity('LookupClst').output.value",
"type": "Expression"
},
"batchCount": 8,
"activities": [
{
"name": "CpyClst",
"type": "Copy",
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false
},
"typeProperties": {
"source": {
"type": "DocumentDbCollectionSource",
"query": "select @{item()}",
"nestingSeparator": "."
},
"sink": {
"type": "BlobSink"
},
"enableStaging": false,
"enableSkipIncompatibleRow": true,
"cloudDataMovementUnits": 0
},
"inputs": [
{
"referenceName": "ClustersData",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "OutputClusters",
"type": "DatasetReference"
}
]
}
]
}
}
]
}
}
输入集合中的doc示例(所有格式相同):
{
"$type": "Entities.ADCluster",
"DisplayName": "TESTNetBIOS",
"OrgId": "9b679d2a-42c5-4c9a-a2e2-3ce63c1c3506",
"ClusterId": "ab2a242d-f1a5-62ed-b420-31b52e958586",
"AllowLdapLifeCycleSynchronization": true,
"DirectoryServers": [
{
"$type": "Entities.DirectoryServer",
"AddressId": "e6a8edbb-ad56-4135-94af-fab50b774256",
"Port": 389,
"Host": "192.168.342.234"
}
],
"DomainNames": [
"TESTNetBIOS"
],
"BaseDn": null,
"UseSsl": false,
"RepositoryType": 1,
"DirectoryCustomizations": null,
"_etag": "\"140046f2-0000-0000-0000-5ac63a180000\"",
"LastUpdateTime": "2018-04-05T15:00:40.243Z",
"id": "ab2a242d-f1a5-62ed-b420-31b52e958586",
"PartitionKey": "directory-clusters-9b679d2a-42c5-4c9a-a2e2-3ce63c1c3506",
"_rid": "kpvxLAs6gkmsCQAAAAAAAA==",
"_self": "dbs/kvpxAA==/colls/kpvxLAs6gkk=/docs/kvpxALs6kgmsCQAAAAAAAA==/",
"_attachments": "attachments/",
"_ts": 1522940440
}
3 回答
由于您的cosmosdb具有数组,并且ADF不支持cosmos db的序列化数组,因此这是我可以提供的解决方法 .
首先,使用export json as(是blob或adls或文件系统,任何文件存储)将所有文档导出到json文件 . 我想你已经知道怎么做了 . 这样,每个集合都将有一个json文件 .
其次,处理每个json文件,将文件中的每一行精确到一个文件 .
我只为第2步提供管道 . 您可以使用执行管道活动链接第1步和第2步 . 您甚至可以使用foreach活动处理第2步中的所有集合 .
管道json
}
数据集json用于查找
}
复制的源数据集 . 实际上,这个数据集没用 . 只是想用它来托管查询(选择@ {item()}
}
目标数据集 . 有两个参数,它还解决了您的文件名请求 .
}
另请注意Lookup活动的限制:查找支持以下数据源 . Lookup活动可以返回的最大行数为5000,最大为2MB . 目前,超时前Lookup活动的最长持续时间为一小时 .
您是否考虑过使用Azure功能以不同的方式实现此功能? ADF旨在将批量数据从一个位置移动到另一个位置,并且每个集合仅生成一个文件 .
您可以考虑在集合中添加/更新文档时触发Azure功能,并让Azure功能将文档输出到blob存储 . 这应该很好地扩展并且相对容易实现 .
以一个集合为例 .
在foreach内:
您的查找和复制活动源数据集引用相同的cosmosdb数据集 .
如果要复制5个集合,可以将此管道放入执行活动中 . 并且execute活动的主管道具有foreach活动 .