首页 文章

如何在cassandra表之间复制数据?

提问于
浏览
1

任何人都可以在从一张 table 读取数据并在cassandra中将其写入另一张 table 时解释spark的内部工作 .

这是我的用例:

我通过kafka主题将从IOT平台进入cassandra的数据提取出来 . 我有一个小的python脚本解析来自kafka的每个消息以获取它所属的表名,准备一个查询并使用datastax的cassandra-driver为python将其写入cassandra . 通过该脚本,我可以将 300000 records per min 摄入cassandra . 然而,我的传入数据速率是 510000 records per minute 所以kafka消费者滞后继续增加 .

Python脚本已经对cassandra进行了并发调用 . 如果我增加python执行程序的数量,cassandra-driver会因为cassandra节点不可用而开始失败 . 我觉得我每秒钟都有一个cassandra通话限制 . 这是我得到的错误消息:

ERROR Operation failed: ('Unable to complete the operation against any hosts', {<Host: 10.128.1.3 datacenter1>: ConnectionException('Pool is shutdown',), <Host: 10.128.1.1 datacenter1>: ConnectionException('Pool is shutdown',)})"

最近,我运行了一个pyspark作业,将数据从一个表中的几列复制到另一个表中 . 该表中有大约1.68亿条记录 . Pyspark工作在大约5个小时内完成 . 所以它在 550000 records per min 处理 .

这是我正在使用的pyspark代码:

df = spark.read\
    .format("org.apache.spark.sql.cassandra")\
    .options(table=sourcetable, keyspace=sourcekeyspace)\
    .load().cache()

df.createOrReplaceTempView("data")

query = ("select dev_id,datetime,DATE_FORMAT(datetime,'yyyy-MM-dd') as day, " + field + " as value  from data  " )

vgDF = spark.sql(query)
vgDF.show(50)
vgDF.write\
    .format("org.apache.spark.sql.cassandra")\
    .mode('append')\
    .options(table=newtable, keyspace=newkeyspace)\
    .save()

Versions:

  • Cassandra 3.9 .

  • Spark 2.1.0 .

  • Datastax的spark-cassandra-connector 2.0.1

  • Scala版本2.11

Cluster:

  • Spark设置有3个worker和1个主节点 .

  • 3个工作节点也安装了cassandra集群 . (每个cassandra节点有一个spark worker节点)

  • 每个 Worker 被允许10 GB ram和3个核心 .

所以我想知道:

  • spark首先从cassandra读取所有数据然后将其写入新表,还是在spark cassandra连接器中进行某种优化,允许它在cassandra表中移动数据而不读取所有记录?

  • 如果我用一个火花流工作替换我的python脚本,我在其中解析数据包以获取cassandra的表名,这会帮助我更快地将数据提取到cassandra中吗?

1 回答

  • 2

    Spark连接器经过优化,因为它可以并行处理和读取/插入数据到拥有数据的节点 . 使用Cassandra Spark Connector可以获得更好的吞吐量,但这需要更多资源 .

    谈论你的任务 - 300000次插入/分钟是5000 /秒,坦率地说这不是很大的数字 - 你可以通过进行不同的优化来增加吞吐量:

    • 使用asynchronous calls提交请求 . 您只需确保提交更多可由一个连接处理的请求(但您也可以增加此数量 - 我不知道如何在Python中执行此操作,但请检查Java driver doc以获取建议) .

    • 使用正确的一致性级别( LOCAL_ONE 应该会给你非常好的表现)

    • 使用正确load balancing policy

    • 您可以并行运行脚本的多个副本,确保它们都在同一个Kafka使用者组中 .

相关问题