我的问题是关于StatefulNetworkWordCount示例:
Q1)stateDstream RDD由驱动程序或工作节点维护,或者每个工作节点是否有自己的完整状态rdd的本地副本?
Q2)为什么我们需要以下行中的HashPartitioner:
val stateDstream = wordDstream.updateStateByKey[Int](newUpdateFunc, new HashPartitioner (ssc.sparkContext.defaultParallelism), true, initialRDD)
幕后发生了什么?
1 回答
回答你的两个问题:
1) 由
DStream
生成的RDD
分布在整个 Worker 身上 . 与非流式传输类似,这意味着DStream
生成的每个RDD
的记录都在群集中分布(这就是为什么分区在这里很重要的原因) .2) 在这种情况下,分区很重要,因为它可以解决每个
RDD
迭代中的记录被拆分的问题 . 特别是对于像updateStateByKey()这样的转换,您倾向于看到跨越不同批处理间隔的RDD
的键保持不变 . 所以在这里不言而喻,如果我们的每个区间的键都在相同的分区上排列,那么这个函数可以更有效地工作,并且可以为分区内的键更新状态 .举个例子,让我们来看看你链接的单词计数程序 . 让我们以两秒一的间隔考虑
RDD
(在t = 1时为rdd1
,在t = 2时为rdd2
) . 假设rdd1
生成的文本为"hello world"
,生成的rdd2
也会看到文本"hello I'm world"
. 如果没有分区,每个RDD
的记录可以发送到不同工作人员的各个分区(t = 1时"hello"
,t = 2时"hello"
可以发送到不同的位置) . 这意味着对计数状态的更新需要重新调整每次迭代的记录以获得更新的计数 . 通过定义分区器(并记住其中一个参数!),我们将在同一分区看到键"hello"
和"world"
,从而避免混乱,并创建更有效的更新 .此处还要注意,因为键可以更改,所以有一个参数可以切换是否记住分区程序 .