阅读完文档后,我'm having a hard time conceptualizing the change feed. Let'从下面的documentation获取代码 . 第二个更改源正在从上次通过检查点运行时获取更改 . 让's say it is being used to create summary data and there was an issue and it needed to be re-run from a prior time. I don'了解以下内容:
-
如何指定检查点应该启动的特定时间 . 我知道我可以保存检查点字典并将其用于每次运行,但是如何从X时间获得更改以重新运行某些摘要数据
-
其次,假设我们正在重新运行一些摘要数据,我们保存用于每个汇总数据的最后一个检查点,以便我们知道那个停止的位置 . 如何知道记录在该检查点之前或之前?
从集合开始到最后一个检查点运行的代码:
Dictionary < string, string > checkpoints = await GetChanges(client, collection, new Dictionary < string, string > ());
await client.CreateDocumentAsync(collection, new DeviceReading {
DeviceId = "xsensr-201", MetricType = "Temperature", Unit = "Celsius", MetricValue = 1000
});
await client.CreateDocumentAsync(collection, new DeviceReading {
DeviceId = "xsensr-212", MetricType = "Pressure", Unit = "psi", MetricValue = 1000
});
// Returns only the two documents created above.
checkpoints = await GetChanges(client, collection, checkpoints);
//
private async Task < Dictionary < string, string >> GetChanges(
DocumentClient client,
string collection,
Dictionary < string, string > checkpoints) {
List < PartitionKeyRange > partitionKeyRanges = new List < PartitionKeyRange > ();
FeedResponse < PartitionKeyRange > pkRangesResponse;
do {
pkRangesResponse = await client.ReadPartitionKeyRangeFeedAsync(collection);
partitionKeyRanges.AddRange(pkRangesResponse);
}
while (pkRangesResponse.ResponseContinuation != null);
foreach(PartitionKeyRange pkRange in partitionKeyRanges) {
string continuation = null;
checkpoints.TryGetValue(pkRange.Id, out continuation);
IDocumentQuery < Document > query = client.CreateDocumentChangeFeedQuery(
collection,
new ChangeFeedOptions {
PartitionKeyRangeId = pkRange.Id,
StartFromBeginning = true,
RequestContinuation = continuation,
MaxItemCount = 1
});
while (query.HasMoreResults) {
FeedResponse < DeviceReading > readChangesResponse = query.ExecuteNextAsync < DeviceReading > ().Result;
foreach(DeviceReading changedDocument in readChangesResponse) {
Console.WriteLine(changedDocument.Id);
}
checkpoints[pkRange.Id] = readChangesResponse.ResponseContinuation;
}
}
return checkpoints;
}
2 回答
DocumentDB仅支持由服务器返回的逻辑时间戳进行检查 . 如果要从X分钟前检索所有更改,则必须"remember"与时钟时间对应的逻辑时间戳(在REST API中为集合返回
ETag
,在SDK中为ResponseContinuation
),然后使用它来检索更改 .更改源使用逻辑时间代替时钟时间,因为它可以在各种服务器/分区之间不同 . 如果您希望根据时间时间看到更改Feed支持(有一些关于偏斜的警告),请在https://feedback.azure.com/forums/263030-documentdb/建议/ upvote .
要保存每个分区键/文档的最后一个检查点,您只需保存上次看到的批处理的相应版本(在REST API中为集合返回
ETag
,在SDK中为ResponseContinuation
返回),就像Fred在答案中建议的那样 .您可以尝试提供逻辑版本/ ETag(例如
95488
),而不是提供null值作为ChangeFeedOptions的RequestContinuation属性 .