首页 文章

加入 2 个 kafka 流的问题(使用自定义时间戳提取器)

提问于
浏览
0

我在加入 2 个 kafka 流时遇到问题,从我的活动领域中提取日期。当我没有定义自定义 TimeStampExtractor 时,连接工作正常,但是当我这样做时,连接不再起作用。我的拓扑非常简单:

val builder = new StreamsBuilder()

val couponConsumedWith = Consumed.`with`(Serdes.String(),
  getAvroCouponSerde(schemaRegistryHost, schemaRegistryPort))
val couponStream: KStream[String, Coupon] = builder.stream(couponInputTopic, couponConsumedWith)

val purchaseConsumedWith = Consumed.`with`(Serdes.String(),
  getAvroPurchaseSerde(schemaRegistryHost, schemaRegistryPort))
val purchaseStream: KStream[String, Purchase] = builder.stream(purchaseInputTopic, purchaseConsumedWith)

val couponStreamKeyedByProductId: KStream[String, Coupon] = couponStream.selectKey(couponProductIdValueMapper)
val purchaseStreamKeyedByProductId: KStream[String, Purchase] = purchaseStream.selectKey(purchaseProductIdValueMapper)

val couponPurchaseValueJoiner = new ValueJoiner[Coupon, Purchase, Purchase]() {

  @Override
  def apply(coupon: Coupon, purchase: Purchase): Purchase = {
      val discount = (purchase.getAmount * coupon.getDiscount) / 100
      new Purchase(purchase.getTimestamp, purchase.getProductid, purchase.getProductdescription, purchase.getAmount - discount)
  }
}

val fiveMinuteWindow = JoinWindows.of(TimeUnit.MINUTES.toMillis(10))
val outputStream: KStream[String, Purchase] = couponStreamKeyedByProductId.join(purchaseStreamKeyedByProductId,
  couponPurchaseValueJoiner,
  fiveMinuteWindow
  )

outputStream.to(outputTopic)

builder.build()

正如我所说,当我不使用自定义 TimeStampExtractor 时,这个代码就像魅力一样,但当我通过将 StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG 设置为我的自定义提取器类(我已经仔细检查过该类正在正确提取日期)时,连接不起作用了。

我正在通过运行单元测试并将以下事件传递给它来测试拓扑:

val coupon1 = new Coupon("Dec 05 2018 09:10:00.000 UTC", "1234", 10F)
    // Purchase within the five minutes after the coupon - The discount should be applied
    val purchase1 = new Purchase("Dec 05 2018 09:12:00.000 UTC", "1234", "Green Glass", 25.00F)
    val purchase1WithDiscount = new Purchase("Dec 05 2018 09:12:00.000 UTC", "1234", "Green Glass", 22.50F)
    val couponRecordFactory1 = couponRecordFactory.create(couponInputTopic, "c1", coupon1)
    val purchaseRecordFactory1 = purchaseRecordFactory.create(purchaseInputTopic, "p1", purchase1)

    testDriver.pipeInput(couponRecordFactory1)
    testDriver.pipeInput(purchaseRecordFactory1)
    val outputRecord1 = testDriver.readOutput(outputTopic,
      new StringDeserializer(),
      JoinTopologyBuilder.getAvroPurchaseSerde(
        schemaRegistryHost,
        schemaRegistryPort).deserializer())
    OutputVerifier.compareKeyValue(outputRecord1, "1234", purchase1WithDiscount)

不确定选择新密钥的步骤是否摆脱了正确的日期。我已经测试了很多组合而没有运气:(

任何帮助将非常感激!

2 回答

  • 1

    我不确定,因为我不知道你测试了多少代码,但我的猜测是:

    1. 你的代码使用默认的时间戳提取器,因为它使用的是你将记录作为时间戳记录发送到管道中的时间,所以基本上它会起作用,因为在你的测试中你一个接一个地发送数据而没有暂停。

    2. 你正在使用TopologyTestDriver进行测试!请注意,它对于测试业务代码和拓扑作为一个单元非常有用(我有什么作为输入,什么是正确的相应输出)但是没有在测试中运行的 Kafka Stream 应用程序。

    在您的情况下,您可以使用TopologyTestDriver类中的方法advanceWallClockTime(long)来模拟系统时间步行。

    如果要启动拓扑,则必须使用嵌入式 kafka 集群进行集成测试(kafka 库上有一个工作正常!)。

    如果有帮助请告诉我:-)

  • 0

    谢谢你的回复。我昨天正在研究这个问题,我想我发现了这个问题。正如您所说我使用 TopologyTestDriver 来运行我的测试,当您初始化 TopologyTestDriver 类时,它使用 initialWallClockTime,如果您没有提供值,TopologyTestDriver 将获取 currentTimeMillis:

    public TopologyTestDriver(Topology topology, Properties config) {
        this(topology, config, System.currentTimeMillis());
    }
    

    还有另一个构造函数允许你 pass-in 一个 initialWallClockTime。我一直在测试这种方法,但由于某种原因它对我不起作用。

    总而言之,我的解决方案是使用当前时间戳创建购买和优惠券对象。我仍在使用我的自定义时间戳提取器,但我没有硬编码日期,而是总是得到当前的时间戳,这样连接工作正常。

    对我的最终解决方案不满意,因为我不知道为什么 initialWallClockTime 对我不起作用,但至少现在测试工作正常。

相关问题