首页 文章

Spark(SQL /结构化流)Cassandra - PreparedStatement

提问于
浏览
1

我实时使用Spark Structured Streaming进行机器学习,我希望在我的Cassandra集群中存储预测 .

由于我处于流式上下文中,每秒执行多次相同的请求,因此必须使用PreparedStatement进行强制优化 .

在cassandra spark驱动程序(https://github.com/datastax/spark-cassandra-connector)中没有办法使用PreparedStatement(在scala或python中,我不考虑java作为选项)

我应该使用scala(https://github.com/outworkers/phantom)/ python(https://github.com/datastax/python-driver)cassandra驱动程序吗?它是如何工作的,我的连接对象需要序列化才能传递给 Worker ?

如果有人可以帮助我!

谢谢 :)

1 回答

  • 2

    为了做一个准备好的声明,然后在使用结构化火花流处理流处理时在Cassandra中注册数据,您需要:

    • import com.datastax.driver.core.Session

    • import com.datastax.spark.connector.cql.CassandraConnector

    然后,构建您的连接器:

    val connector = CassandraConnector.apply(sparkSession.sparkContext.getConf)
    

    既有 session 又有 connector ,你现在可以调用你在Statement scala类中编写的 prepared Statement 函数

    connector.withSessionDo { session =>
     Statements.PreparedStatement()
    

    }

    您最终可以通过使用下面的函数在Cassandra中编写数据来完成,cql是将变量绑定到准备好的Statement并执行它的函数:

    private def processRow(value: Commons.UserEvent) = {
      connector.withSessionDo { session =>
      session.execute(Statements.cql(value.device_id, value.category, value.window_time, value.m1_sum_downstream, value.m2_sum_downstream))
    }
    

    }

    当然你必须在foreach作家中调用这个函数( processRow

    // This Foreach sink writer writes the output to cassandra.
    import org.apache.spark.sql.ForeachWriter
    val writer = new ForeachWriter[Commons.UserEvent] {
      override def open(partitionId: Long, version: Long) = true
      override def process(value: Commons.UserEvent) = {
        processRow(value)
      }
      override def close(errorOrNull: Throwable) = {}
    }
    
    val query =
      ds.writeStream.queryName("aggregateStructuredStream").outputMode("complete").foreach(writer).start
    

相关问题