Intent
我正在通过直接流从Kafka接收数据,并希望使用来自Cassandra的数据来丰富消息 . Kafka消息(Protobufs)被解码为DataFrames,然后与Cassandra的(假设预先过滤的)DF一起加入 . (Kafka)流批量大小与原始C *数据的关系是[几个流式消息到数百万个C *行],但是连接总是每个消息产生一个结果[1:1] . 在连接之后,最终的DF最终存储到另一个C *表中 .
Problem
即使我在完整的Cassandra主键上加入两个DF并将相应的过滤器推送到C *,似乎Spark在实际加入之前将整个C *数据集加载到内存中(我想阻止它)通过使用filter / predicate pushdown) . 这会导致大量的洗牌和任务产生,因此“简单”的连接需要永远......
def main(args: Array[String]) {
val conf = new SparkConf()
.setAppName("test")
.set("spark.cassandra.connection.host", "xxx")
.set("spark.cassandra.connection.keep_alive_ms", "30000")
.setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(10))
ssc.sparkContext.setLogLevel("INFO")
// Initialise Kafka
val kafkaTopics = Set[String]("xxx")
val kafkaParams = Map[String, String](
"metadata.broker.list" -> "xxx:32000,xxx:32000,xxx:32000,xxx:32000",
"auto.offset.reset" -> "smallest")
// Kafka stream
val messages = KafkaUtils.createDirectStream[String, MyMsg, StringDecoder, MyMsgDecoder](ssc, kafkaParams, kafkaTopics)
// Executed on the driver
messages.foreachRDD { rdd =>
// Create an instance of SQLContext
val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
import sqlContext.implicits._
// Map MyMsg RDD
val MyMsgRdd = rdd.map{case (key, MyMsg) => (MyMsg)}
// Convert RDD[MyMsg] to DataFrame
val MyMsgDf = MyMsgRdd.toDF()
.select(
$"prim1Id" as 'prim1_id,
$"prim2Id" as 'prim2_id,
$...
)
// Load DataFrame from C* data-source
val base_data = base_data_df.getInstance(sqlContext)
// Left join on prim1Id and prim2Id
val joinedDf = MyMsgDf.join(base_data,
MyMsgDf("prim1_id") === base_data("prim1_id") &&
MyMsgDf("prim2_id") === base_data("prim2_id"), "left")
.filter(base_data("prim1_id").isin(MyMsgDf("prim1_id"))
&& base_data("prim2_id").isin(MyMsgDf("prim2_id")))
joinedDf.show()
joinedDf.printSchema()
// Select relevant fields
// Persist
}
// Start the computation
ssc.start()
ssc.awaitTermination()
}
Environment
-
Spark 1.6
-
Cassandra 2.1.12
-
Cassandra-Spark-Connector 1.5-RC1
-
Kafka 0.8.2.2
SOLUTION
来自对Apache Cassandra ML的DataStax Spark Connector的讨论
我学会了以下内容:
引用罗素斯皮策这不是谓词下推的情况 . 这是分区键列上的连接 . 目前只有joinWithCassandraTable支持这种直接类型的连接,尽管我们正在研究一些方法来尝试在Spark中自动完成此操作 . 可以从任何可以应用架构的RDD创建数据帧 . 最简单的方法是将您的joinedRDD [x,y]映射到Rdd [JoinedCaseClass],然后调用toDF(这将需要导入您的sqlContext含义 . )有关详细信息,请参阅此处的DataFrames文档 .
所以实际的实现现在类似于
// Join myMsg RDD with myCassandraTable
val joinedMsgRdd = myMsgRdd.joinWithCassandraTable(
"keyspace",
"myCassandraTable",
AllColumns,
SomeColumns(
"prim1_id",
"prim2_id"
)
).map{case (myMsg, cassandraRow) =>
JoinedMsg(
foo = myMsg.foo
bar = cassandraRow.bar
)
}
// Convert RDD[JoinedMsg] to DataFrame
val myJoinedDf = joinedMsgRdd.toDF()
1 回答
你试过joinWithCassandraTable吗?它应该下载到C *你正在寻找的所有键...