首页 文章

Spark流式传输计算平均值

提问于
浏览
0

我从格式中接收来自kafka的数据,其中null是关键 .

null,val1,val2,val3,val4,val5,val6,val7,...val23
null,val1,val2,val3,val4,val5,val6,val7,...val23
null,val1,val2,val3,val4,val5,val6,val7,...val23

我现在映射了值以删除空键并使用以下代码形成新的键和值对 .

val topics = Array("kafka-topic")
    val stream = KafkaUtils.createDirectStream[String, String](
    streamingContext,
    PreferConsistent,
    Subscribe[String, String](topics, kafkaParams)
    )
    streamingContext.checkpoint("hdfs:///hdfs/location")
    val record= stream.map(record=>record.value().toString)


    val rdds=record.transform
    {
    pps=>pps.flatMap(_.split(","))
    }

    val ppds= rdds.transform
`  `{
    pair=>pair.map(vals=>
(vals(2).toString(),Set(vals(1).toLong,vals(2),vals(3),vals(4),val(5),val(6),val(7)....val(23)
 }

where vals(2)a String will be the key and the remaining 22 values will be the values.

我现在试图在20秒的时间窗口内获得每个键的所有值的平均值,并将每个键的计算平均值连续推送到数据存储(HBASE) . . 在批处理模式下,我知道有一个允许你这样做的aggregatebykey()方法 .

在流媒体模式下,如何实现这一目标?

还有一些值是字符串的可能性我如何跳过字符串的值并计算仅数值类型的平均值,同时不断推送更新到HBASE?

1 回答

  • 0

    使用reduceByKeyAndWindow,

    // Reduce last 30 seconds of data, every 10 seconds
    
    val aggregateFunction = (a:Int,b:Int) => (a + b)
    val pairDStream = // DStream contains (word,1)
    val windowedWordCounts = pairDStream.reduceByKeyAndWindow(aggregateFunction, Seconds(30), Seconds(10))
    

    以上示例将用于计算窗口期间的字数,而不是像上面那样使用简单的加法函数,您可以编写更复杂的聚合函数并将其与reduceByKeyAndWindow一起使用

    欲获得更多信息
    https://docs.cloud.databricks.com/docs/latest/databricks_guide/07%20Spark%20Streaming/10%20Window%20Aggregations.html

相关问题