使用Azure Data Factory V2,通过门户网站
https://adf.azure.com
我创建了一个管道,用于从多个表增量复制数据,从一个Azure SQL数据库到另一个Azure SQL数据库 .
为了创建它,我根据自己的需要调整了以下示例:Incrementally load data from multiple tables
以下是与创建的管道相关的json文件:
{
"name": "IncrementalCopyPipeline",
"properties": {
"activities": [
{
"name": "IterateSQLTables",
"type": "ForEach",
"typeProperties": {
"items": {
"value": "@pipeline().parameters.tableList",
"type": "Expression"
},
"activities": [
{
"name": "LookupOldWaterMarkActivity",
"type": "Lookup",
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"typeProperties": {
"source": {
"type": "SqlSource",
"sqlReaderQuery": {
"value": "select * \nfrom watermarktable \nwhere TableName = '@{item().TABLE_NAME}'",
"type": "Expression"
}
},
"dataset": {
"referenceName": "WatermarkDataset",
"type": "DatasetReference"
}
}
},
{
"name": "LookupNewWaterMarkActivity",
"type": "Lookup",
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"typeProperties": {
"source": {
"type": "SqlSource",
"sqlReaderQuery": {
"value": "select MAX(@{item().WaterMark_Column}) as NewWatermarkvalue \nfrom @{item().TABLE_NAME}",
"type": "Expression"
}
},
"dataset": {
"referenceName": "SourceDataset",
"type": "DatasetReference"
}
}
},
{
"name": "IncrementalCopyActivity",
"type": "Copy",
"dependsOn": [
{
"activity": "LookupNewWaterMarkActivity",
"dependencyConditions": [
"Succeeded"
]
},
{
"activity": "LookupOldWaterMarkActivity",
"dependencyConditions": [
"Succeeded"
]
}
],
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"typeProperties": {
"source": {
"type": "SqlSource",
"sqlReaderQuery": {
"value": "select * from @{item().TABLE_NAME} \nwhere @{item().WaterMark_Column} > '@{activity('LookupOldWaterMarkActivity').output.firstRow.WatermarkValue}' and @{item().WaterMark_Column} <= '@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}'",
"type": "Expression"
}
},
"sink": {
"type": "SqlSink",
"writeBatchSize": 10000,
"sqlWriterStoredProcedureName": {
"value": "@{item().StoredProcedureNameForMergeOperation}",
"type": "Expression"
},
"sqlWriterTableType": {
"value": "@{item().TableType}",
"type": "Expression"
}
},
"enableStaging": false,
"dataIntegrationUnits": 0
},
"inputs": [
{
"referenceName": "SourceDataset",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "SinkDataset",
"type": "DatasetReference",
"parameters": {
"SinkTableName": "@{item().TABLE_NAME}"
}
}
]
},
{
"name": "StoredProceduretoWriteWatermarkActivity",
"type": "SqlServerStoredProcedure",
"dependsOn": [
{
"activity": "IncrementalCopyActivity",
"dependencyConditions": [
"Succeeded"
]
}
],
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"typeProperties": {
"storedProcedureName": "[dbo].[sp_write_watermark]",
"storedProcedureParameters": {
"LastModifiedtime": {
"value": {
"value": "@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}",
"type": "Expression"
},
"type": "DateTime"
},
"TableName": {
"value": {
"value": "@{activity('LookupOldWaterMarkActivity').output.firstRow.TableName}",
"type": "Expression"
},
"type": "String"
}
}
},
"linkedServiceName": {
"referenceName": "SqlServerLinkedService_dest",
"type": "LinkedServiceReference"
}
}
]
}
}
],
"parameters": {
"tableList": {
"type": "Object",
"defaultValue": [
{
"TABLE_NAME": "customer_table",
"WaterMark_Column": "LastModifytime",
"TableType": "DataTypeforCustomerTable",
"StoredProcedureNameForMergeOperation": "sp_upsert_customer_table"
},
{
"TABLE_NAME": "project_table",
"WaterMark_Column": "Creationtime",
"TableType": "DataTypeforProjectTable",
"StoredProcedureNameForMergeOperation": "sp_upsert_project_table"
}
]
}
}
}
}
在我的表中,我有一个区分不同公司的列,因此我想在此管道中添加另一个参数 . 我有这样一张 table :
NAME LASTMODIFY COMPANY
John 2015-01-01 00:00:00.000 1
Mike 2016-02-02 01:23:00.000 2
Andy 2017-03-04 05:16:00.000 3
Annie 2018-09-08 00:00:00.000 1
有人会知道如何在管道中插入一个参数,以指定要复制哪个公司以及不复制哪个公司?
有什么建议吗?在此先感谢大家!
1 回答
不清楚你在问什么,如果我错过了标记,请道歉,但是:
复制允许您可以使用存储过程来解决您的问题 . 看看这个例子:https://docs.microsoft.com/en-us/azure/data-factory/connector-sql-server#invoking-stored-procedure-for-sql-sink
它使用存储过程来MERGE执行取决于JOIN匹配的UPDATE或INSERT . 它还允许传递参数 .
因此,如果您尝试仅根据参数复制某些情况,则MERGE连接可能会有所帮助 .