我在kinesis流源上运行了一个Spark结构化的流式查询 . 它从kinesis读取(没有时间触发),计算聚合,并使用自定义dynamodb接收器推送结果(outputMode“update”) .

spark版本:2.1.1和kinesis源码库:https://github.com/maropu/spark-kinesis-sql-asl

我希望按秒处理1 500个事件(我使用KinesisDataGenerator生成假事件:https://awslabs.github.io/amazon-kinesis-data-generator/web/producer.html

一切都在aws EMR上运行,通过火花壳(它是一个POC)

我做了一些调整并到达了这些参数:

  • 2个kinesis碎片

  • 2 000个dynamodb WCU

  • 1个单集群节点m4.4xlarge(16 cpu 64 go)

大约半小时内一切运行正常,似乎突然发生了一些延迟,每批都需要花费越来越多的时间才能继续,直到有了ProvisionedThroughputExceededException(读数超过限制)

我对dynamodb没有任何问题(WCU峰值在1500左右,2000配置,没有限制写请求)

Kinesis读取非常稳定,但只要进程变慢,就可以从kinesis同时读取越来越多的事件,并且它会崩溃 .

我添加了直到8个分片和相同的事情发生,我只有一点时间才能获得ProvisionedThroughputExceededException(nb:实例cpus计数>比kinesis分片的数量)

我将我的kinesis加载吞吐量从500改为1500事件/秒,同样的事情发生在大约一半的时间 .

使用更大的实例会产生相同的结果......我尝试了更少的事件(输入上有100个不同的随机事件,而600 000个不同的事件):同样的事情

关于dynamodb接收器:在查询流结果上计算第二个聚合(因为spark结构化流传输不允许多个聚合) .

端到端流程包括:“从kinesis读取 - >流式查询(聚合1) - > Dynamodb Sink(聚合2 dynamodb插入)” . 我使用“update”outputMode,以便只将从结果表更新的行发送到接收器 .

我在接收器的addBatch方法中添加了一些日志:

  • 输入行数(流式查询结果)

  • 插入请求数(第二次聚合后的流式查询结果,第二次计数总是等于或低于第一次)

  • 整个接收过程的已用时间 .

对于500次/秒的运动装载,我得到了类似的东西:

....
addBatch(4) -> input rows 3574 output insertRequests 3449                       
addBatch(4) -> Elapsed time: 6.983257993seconds
addBatch(5) -> input rows 4052 output insertRequests 3909                       
addBatch(5) -> Elapsed time: 5.83499691seconds
addBatch(6) -> input rows 2745 output insertRequests 2657                       
addBatch(6) -> Elapsed time: 4.879952608seconds
addBatch(7) -> input rows 3081 output insertRequests 2969                       
addBatch(7) -> Elapsed time: 3.939775103seconds
addBatch(8) -> input rows 2010 output insertRequests 1925                       
addBatch(8) -> Elapsed time: 5.151868592seconds
addBatch(9) -> input rows 2977 output insertRequests 2869                       
addBatch(9) -> Elapsed time: 4.722617963seconds
addBatch(10) -> input rows 2530 output insertRequests 2448                      
addBatch(10) -> Elapsed time: 5.562205997seconds
....
addBatch(100) -> input rows 2996 output insertRequests 2864                     
addBatch(100) -> Elapsed time: 4.873477912seconds
addBatch(101) -> input rows 2535 output insertRequests 2458                     
addBatch(101) -> Elapsed time: 4.284861452seconds
addBatch(102) -> input rows 2477 output insertRequests 2388                     
addBatch(102) -> Elapsed time: 5.174349386seconds
addBatch(103) -> input rows 3942 output insertRequests 3791                     
addBatch(103) -> Elapsed time: 4.957703028seconds
addBatch(104) -> input rows 3490 output insertRequests 3366                     
addBatch(104) -> Elapsed time: 8.761093424seconds
.....
addBatch(224) -> Elapsed time: 4.475522722seconds
addBatch(225) -> input rows 2508 output insertRequests 2406                     
addBatch(225) -> Elapsed time: 5.819129461seconds
addBatch(226) -> input rows 3009 output insertRequests 2914                     
addBatch(226) -> Elapsed time: 10.994768506seconds
addBatch(227) -> input rows 4755 output insertRequests 4557                     
addBatch(227) -> Elapsed time: 11.034992149seconds
addBatch(228) -> input rows 6739 output insertRequests 6397                     
addBatch(228) -> Elapsed time: 12.409657235seconds
addBatch(229) -> input rows 5049 output insertRequests 4888                     
addBatch(229) -> Elapsed time: 12.584365428seconds
addBatch(230) -> input rows 7758 output insertRequests 7485                     
addBatch(230) -> Elapsed time: 18.631239639seconds
addBatch(231) -> input rows 9554 output insertRequests 9030                     
addBatch(231) -> Elapsed time: 18.426685181seconds
addBatch(232) -> input rows 8822 output insertRequests 8382                     
addBatch(232) -> Elapsed time: 15.920548608seconds
addBatch(233) -> input rows 9251 output insertRequests 8759                     
addBatch(233) -> Elapsed time: 19.984515183seconds
addBatch(234) -> input rows 10336 output insertRequests 9923                    
addBatch(234) -> Elapsed time: 20.487669168seconds
....
addBatch(247) -> Elapsed time: 25.539402107seconds
addBatch(248) -> input rows 12863 output insertRequests 12269                   
addBatch(248) -> Elapsed time: 27.541254374seconds
addBatch(249) -> input rows 13744 output insertRequests 13069

对于1 000事件/秒的运动装载,我得到了类似的东西:

....
addBatch(8) -> input rows 5247 output insertRequests 5023                       
addBatch(8) -> Elapsed time: 5.162705257seconds
addBatch(9) -> input rows 7484 output insertRequests 7106                       
addBatch(9) -> Elapsed time: 5.829125017seconds
....
addBatch(19) -> input rows 13981 output insertRequests 13069                    
addBatch(19) -> Elapsed time: 8.643140194seconds
addBatch(20) -> input rows 14998 output insertRequests 13997                    
addBatch(20) -> Elapsed time: 8.930952027seconds
....
addBatch(70) -> input rows 13442 output insertRequests 12626                    
addBatch(70) -> Elapsed time: 7.891013506seconds
....
addBatch(146) -> Elapsed time: 11.865784296seconds
addBatch(147) -> input rows 19489 output insertRequests 17976                   
addBatch(147) -> Elapsed time: 11.57164558seconds
addBatch(148) -> input rows 18229 output insertRequests 16863                   
addBatch(148) -> Elapsed time: 12.319194939seconds
addBatch(149) -> input rows 20976 output insertRequests 19298                   
addBatch(149) -> Elapsed time: 13.426055377seconds
addBatch(150) -> input rows 21017 output insertRequests 19325                   
addBatch(150) -> Elapsed time: 12.750020974seconds
addBatch(151) -> input rows 19494 output insertRequests 17962                   
addBatch(151) -> Elapsed time: 11.652468707seconds
....
addBatch(180) -> input rows 22960 output insertRequests 21013                   
addBatch(180) -> Elapsed time: 15.388526621seconds
addBatch(181) -> input rows 22492 output insertRequests 20555                   
addBatch(181) -> Elapsed time: 14.175100191seconds
addBatch(182) -> input rows 22964 output insertRequests 21039                   
addBatch(182) -> Elapsed time: 16.682608754seconds

我找不到为什么我的流程被推迟了 . 为什么第一批次是相当稳定的,为什么它开始花费越来越多的时间,直到大约半小时崩溃,尽管测试参数的变化(实例大小,运动碎片的数量......) .

以下是关于spark结构化流媒体的一些问题:

  • 无限制的表在内存中不断增加吗?

  • 结果表是否相同?

  • 有没有办法限制kinesis流上的记录读取次数?我尝试了时间触发器,但同样的事情发生,当有延迟一切都被读取,直到"now" .

  • 增加的结果表是否会减慢进程?在其中搜索更新行需要花费越来越多的时间吗?

我错过了什么?

谢谢,

菲利普