我在加入 2 个 kafka 流时遇到问题,从我的活动领域中提取日期。当我没有定义自定义 TimeStampExtractor 时,连接工作正常,但是当我这样做时,连接不再起作用。我的拓扑非常简单:
val builder = new StreamsBuilder()
val couponConsumedWith = Consumed.`with`(Serdes.String(),
getAvroCouponSerde(schemaRegistryHost, schemaRegistryPort))
val couponStream: KStream[String, Coupon] = builder.stream(couponInputTopic, couponConsumedWith)
val purchaseConsumedWith = Consumed.`with`(Serdes.String(),
getAvroPurchaseSerde(schemaRegistryHost, schemaRegistryPort))
val purchaseStream: KStream[String, Purchase] = builder.stream(purchaseInputTopic, purchaseConsumedWith)
val couponStreamKeyedByProductId: KStream[String, Coupon] = couponStream.selectKey(couponProductIdValueMapper)
val purchaseStreamKeyedByProductId: KStream[String, Purchase] = purchaseStream.selectKey(purchaseProductIdValueMapper)
val couponPurchaseValueJoiner = new ValueJoiner[Coupon, Purchase, Purchase]() {
@Override
def apply(coupon: Coupon, purchase: Purchase): Purchase = {
val discount = (purchase.getAmount * coupon.getDiscount) / 100
new Purchase(purchase.getTimestamp, purchase.getProductid, purchase.getProductdescription, purchase.getAmount - discount)
}
}
val fiveMinuteWindow = JoinWindows.of(TimeUnit.MINUTES.toMillis(10))
val outputStream: KStream[String, Purchase] = couponStreamKeyedByProductId.join(purchaseStreamKeyedByProductId,
couponPurchaseValueJoiner,
fiveMinuteWindow
)
outputStream.to(outputTopic)
builder.build()
正如我所说,当我不使用自定义 TimeStampExtractor 时,这个代码就像魅力一样,但当我通过将 StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG 设置为我的自定义提取器类(我已经仔细检查过该类正在正确提取日期)时,连接不起作用了。
我正在通过运行单元测试并将以下事件传递给它来测试拓扑:
val coupon1 = new Coupon("Dec 05 2018 09:10:00.000 UTC", "1234", 10F)
// Purchase within the five minutes after the coupon - The discount should be applied
val purchase1 = new Purchase("Dec 05 2018 09:12:00.000 UTC", "1234", "Green Glass", 25.00F)
val purchase1WithDiscount = new Purchase("Dec 05 2018 09:12:00.000 UTC", "1234", "Green Glass", 22.50F)
val couponRecordFactory1 = couponRecordFactory.create(couponInputTopic, "c1", coupon1)
val purchaseRecordFactory1 = purchaseRecordFactory.create(purchaseInputTopic, "p1", purchase1)
testDriver.pipeInput(couponRecordFactory1)
testDriver.pipeInput(purchaseRecordFactory1)
val outputRecord1 = testDriver.readOutput(outputTopic,
new StringDeserializer(),
JoinTopologyBuilder.getAvroPurchaseSerde(
schemaRegistryHost,
schemaRegistryPort).deserializer())
OutputVerifier.compareKeyValue(outputRecord1, "1234", purchase1WithDiscount)
不确定选择新密钥的步骤是否摆脱了正确的日期。我已经测试了很多组合而没有运气:(
任何帮助将非常感激!
2 回答
我不确定,因为我不知道你测试了多少代码,但我的猜测是:
你的代码使用默认的时间戳提取器,因为它使用的是你将记录作为时间戳记录发送到管道中的时间,所以基本上它会起作用,因为在你的测试中你一个接一个地发送数据而没有暂停。
你正在使用
TopologyTestDriver
进行测试!请注意,它对于测试业务代码和拓扑作为一个单元非常有用(我有什么作为输入,什么是正确的相应输出)但是没有在测试中运行的 Kafka Stream 应用程序。在您的情况下,您可以使用
TopologyTestDriver
类中的方法advanceWallClockTime(long)
来模拟系统时间步行。如果要启动拓扑,则必须使用嵌入式 kafka 集群进行集成测试(kafka 库上有一个工作正常!)。
如果有帮助请告诉我:-)
谢谢你的回复。我昨天正在研究这个问题,我想我发现了这个问题。正如您所说我使用 TopologyTestDriver 来运行我的测试,当您初始化 TopologyTestDriver 类时,它使用 initialWallClockTime,如果您没有提供值,TopologyTestDriver 将获取 currentTimeMillis:
还有另一个构造函数允许你 pass-in 一个 initialWallClockTime。我一直在测试这种方法,但由于某种原因它对我不起作用。
总而言之,我的解决方案是使用当前时间戳创建购买和优惠券对象。我仍在使用我的自定义时间戳提取器,但我没有硬编码日期,而是总是得到当前的时间戳,这样连接工作正常。
对我的最终解决方案不满意,因为我不知道为什么 initialWallClockTime 对我不起作用,但至少现在测试工作正常。