首页 文章

在Spark Streaming中加入Kafka和Cassandra DataFrames会忽略C *谓词下推

提问于
浏览
1

Intent

我正在通过直接流从Kafka接收数据,并希望使用来自Cassandra的数据来丰富消息 . Kafka消息(Protobufs)被解码为DataFrames,然后与Cassandra的(假设预先过滤的)DF一起加入 . (Kafka)流批量大小与原始C *数据的关系是[几个流式消息到数百万个C *行],但是连接总是每个消息产生一个结果[1:1] . 在连接之后,最终的DF最终存储到另一个C *表中 .

Problem

即使我在完整的Cassandra主键上加入两个DF并将相应的过滤器推送到C *,似乎Spark在实际加入之前将整个C *数据集加载到内存中(我想阻止它)通过使用filter / predicate pushdown) . 这会导致大量的洗牌和任务产生,因此“简单”的连接需要永远......

def main(args: Array[String]) {
    val conf = new SparkConf()
      .setAppName("test")      
      .set("spark.cassandra.connection.host", "xxx")
      .set("spark.cassandra.connection.keep_alive_ms", "30000")
      .setMaster("local[*]")

    val ssc = new StreamingContext(conf, Seconds(10))
    ssc.sparkContext.setLogLevel("INFO")

    // Initialise Kafka
    val kafkaTopics = Set[String]("xxx")
    val kafkaParams = Map[String, String](
      "metadata.broker.list" -> "xxx:32000,xxx:32000,xxx:32000,xxx:32000",
      "auto.offset.reset" -> "smallest")

    // Kafka stream
    val messages = KafkaUtils.createDirectStream[String, MyMsg, StringDecoder, MyMsgDecoder](ssc, kafkaParams, kafkaTopics)      

    // Executed on the driver
    messages.foreachRDD { rdd =>

      // Create an instance of SQLContext
      val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
      import sqlContext.implicits._

      // Map MyMsg RDD
      val MyMsgRdd = rdd.map{case (key, MyMsg) => (MyMsg)}

      // Convert RDD[MyMsg] to DataFrame
      val MyMsgDf = MyMsgRdd.toDF()        
        .select(
            $"prim1Id" as 'prim1_id,
            $"prim2Id" as 'prim2_id,
            $...
      )

      // Load DataFrame from C* data-source
      val base_data = base_data_df.getInstance(sqlContext)    

      // Left join on prim1Id and prim2Id
      val joinedDf = MyMsgDf.join(base_data,
            MyMsgDf("prim1_id") === base_data("prim1_id") &&
            MyMsgDf("prim2_id") === base_data("prim2_id"), "left")
            .filter(base_data("prim1_id").isin(MyMsgDf("prim1_id"))
                && base_data("prim2_id").isin(MyMsgDf("prim2_id")))          

      joinedDf.show()
      joinedDf.printSchema()

      // Select relevant fields

      // Persist
    }

    // Start the computation
    ssc.start()
    ssc.awaitTermination()
}

Environment

  • Spark 1.6

  • Cassandra 2.1.12

  • Cassandra-Spark-Connector 1.5-RC1

  • Kafka 0.8.2.2

SOLUTION

来自对Apache Cassandra ML的DataStax Spark Connector的讨论

我学会了以下内容:

引用罗素斯皮策这不是谓词下推的情况 . 这是分区键列上的连接 . 目前只有joinWithCassandraTable支持这种直接类型的连接,尽管我们正在研究一些方法来尝试在Spark中自动完成此操作 . 可以从任何可以应用架构的RDD创建数据帧 . 最简单的方法是将您的joinedRDD [x,y]映射到Rdd [JoinedCaseClass],然后调用toDF(这将需要导入您的sqlContext含义 . )有关详细信息,请参阅此处的DataFrames文档 .

所以实际的实现现在类似于

// Join myMsg RDD with myCassandraTable
val joinedMsgRdd = myMsgRdd.joinWithCassandraTable(
  "keyspace",
  "myCassandraTable",
  AllColumns,
  SomeColumns(
      "prim1_id",
      "prim2_id"
  )
).map{case (myMsg, cassandraRow) => 

  JoinedMsg(
    foo = myMsg.foo
    bar = cassandraRow.bar
  )
}

// Convert RDD[JoinedMsg] to DataFrame
val myJoinedDf = joinedMsgRdd.toDF()

1 回答

  • 1

    你试过joinWithCassandraTable吗?它应该下载到C *你正在寻找的所有键...

相关问题