首页 文章

DocumentDB更改Feed并保存检查点

提问于
浏览
0

阅读完文档后,我'm having a hard time conceptualizing the change feed. Let'从下面的documentation获取代码 . 第二个更改源正在从上次通过检查点运行时获取更改 . 让's say it is being used to create summary data and there was an issue and it needed to be re-run from a prior time. I don'了解以下内容:

  • 如何指定检查点应该启动的特定时间 . 我知道我可以保存检查点字典并将其用于每次运行,但是如何从X时间获得更改以重新运行某些摘要数据

  • 其次,假设我们正在重新运行一些摘要数据,我们保存用于每个汇总数据的最后一个检查点,以便我们知道那个停止的位置 . 如何知道记录在该检查点之前或之前?

从集合开始到最后一个检查点运行的代码:

Dictionary < string, string > checkpoints = await GetChanges(client, collection, new Dictionary < string, string > ());

  await client.CreateDocumentAsync(collection, new DeviceReading {
   DeviceId = "xsensr-201", MetricType = "Temperature", Unit = "Celsius", MetricValue = 1000
  });
  await client.CreateDocumentAsync(collection, new DeviceReading {
   DeviceId = "xsensr-212", MetricType = "Pressure", Unit = "psi", MetricValue = 1000
  });

  // Returns only the two documents created above.
  checkpoints = await GetChanges(client, collection, checkpoints);
  //

  private async Task < Dictionary < string, string >> GetChanges(
   DocumentClient client,
   string collection,
   Dictionary < string, string > checkpoints) {
   List < PartitionKeyRange > partitionKeyRanges = new List < PartitionKeyRange > ();
   FeedResponse < PartitionKeyRange > pkRangesResponse;

   do {
    pkRangesResponse = await client.ReadPartitionKeyRangeFeedAsync(collection);
    partitionKeyRanges.AddRange(pkRangesResponse);
   }
   while (pkRangesResponse.ResponseContinuation != null);

   foreach(PartitionKeyRange pkRange in partitionKeyRanges) {
    string continuation = null;
    checkpoints.TryGetValue(pkRange.Id, out continuation);

    IDocumentQuery < Document > query = client.CreateDocumentChangeFeedQuery(
     collection,
     new ChangeFeedOptions {
      PartitionKeyRangeId = pkRange.Id,
       StartFromBeginning = true,
       RequestContinuation = continuation,
       MaxItemCount = 1
     });

    while (query.HasMoreResults) {
     FeedResponse < DeviceReading > readChangesResponse = query.ExecuteNextAsync < DeviceReading > ().Result;

     foreach(DeviceReading changedDocument in readChangesResponse) {
      Console.WriteLine(changedDocument.Id);
     }

     checkpoints[pkRange.Id] = readChangesResponse.ResponseContinuation;
    }
   }

   return checkpoints;
  }

2 回答

  • 1

    DocumentDB仅支持由服务器返回的逻辑时间戳进行检查 . 如果要从X分钟前检索所有更改,则必须"remember"与时钟时间对应的逻辑时间戳(在REST API中为集合返回 ETag ,在SDK中为 ResponseContinuation ),然后使用它来检索更改 .

    更改源使用逻辑时间代替时钟时间,因为它可以在各种服务器/分区之间不同 . 如果您希望根据时间时间看到更改Feed支持(有一些关于偏斜的警告),请在https://feedback.azure.com/forums/263030-documentdb/建议/ upvote .

    要保存每个分区键/文档的最后一个检查点,您只需保存上次看到的批处理的相应版本(在REST API中为集合返回 ETag ,在SDK中为 ResponseContinuation 返回),就像Fred在答案中建议的那样 .

  • 1

    如何指定检查点应该启动的特定时间 .

    您可以尝试提供逻辑版本/ ETag(例如 95488 ),而不是提供null值作为ChangeFeedOptions的RequestContinuation属性 .

相关问题