我正在尝试使用spark来处理一个大的cassandra表(大约4.02亿个条目和84列)但是我得到了不一致的结果 . 最初的要求是将此表中的某些列复制到另一个表 . 复制数据后,我注意到新表中的某些条目丢失了 . 要验证我是否计算了大型源表,但每次都得到不同的值 . 我在一个较小的表(约700万条记录)上尝试了查询,结果很好 .
最初,我尝试使用pyspark进行计数 . 这是我的pyspark脚本:
spark = SparkSession.builder.appName("Datacopy App").getOrCreate()
df = spark.read.format("org.apache.spark.sql.cassandra").options(table=sourcetable, keyspace=sourcekeyspace).load().cache()
df.createOrReplaceTempView("data")
query = ("select count(1) from data " )
vgDF = spark.sql(query)
vgDF.show(10)
Spark提交命令如下:
~/spark-2.1.0-bin-hadoop2.7/bin/spark-submit --master spark://10.128.0.18:7077 --packages datastax:spark-cassandra-connector:2.0.1-s_2.11 --conf spark.cassandra.connection.host="10.128.1.1,10.128.1.2,10.128.1.3" --conf "spark.storage.memoryFraction=1" --conf spark.local.dir=/media/db/ --executor-memory 10G --num-executors=6 --executor-cores=2 --total-executor-cores 18 pyspark_script.py
上述火花提交过程需要约90分钟才能完成 . 我跑了三次,这是我得到的计数:
-
Spark迭代1:402273852
-
Spark迭代2:402273884
-
Spark迭代3:402274209
Spark在整个过程中不会显示任何错误或异常 . 我在cqlsh中运行了相同的查询三次,并再次获得了不同的结果:
-
Cqlsh迭代1:402273598
-
Cqlsh迭代2:402273499
-
Cqlsh迭代3:402273515
我无法找出为什么我从同一个查询得到不同的结果 . Cassandra系统日志(/var/log/cassandra/system.log)仅显示以下错误消息:
ERROR [SSTableBatchOpen:3] 2018-02-27 09:48:23,592 CassandraDaemon.java:226 - Exception in thread Thread[SSTableBatchOpen:3,5,main]
java.lang.AssertionError: Stats component is missing for sstable /media/db/datakeyspace/sensordata1-acfa7880acba11e782fd9bf3ae460699/mc-58617-big
at org.apache.cassandra.io.sstable.format.SSTableReader.open(SSTableReader.java:460) ~[apache-cassandra-3.9.jar:3.9]
at org.apache.cassandra.io.sstable.format.SSTableReader.open(SSTableReader.java:375) ~[apache-cassandra-3.9.jar:3.9]
at org.apache.cassandra.io.sstable.format.SSTableReader$4.run(SSTableReader.java:536) ~[apache-cassandra-3.9.jar:3.9]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[na:1.8.0_131]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[na:1.8.0_131]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ~[na:1.8.0_131]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_131]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_131]
版本:
-
Cassandra 3.9 .
-
Spark 2.1.0 .
-
Datastax的spark-cassandra-connector 2.0.1
-
Scala版本2.11
簇:
-
Spark设置有3个worker和1个主节点 .
-
3个工作节点也安装了cassandra集群 .
-
每个工作节点有8个CPU核心和40 GB RAM .
任何帮助将不胜感激 .
1 回答
Spark Cassandra连接器默认读取一致性为“LOCAL_ONE”,默认写入一致性为“LOCAL_QUORUM”,因此可以在完全修复之前使用该默认值读取部分数据 . 对于无法写入数据的节点,您可以读取“ONE”,但这不是错误,因为其他2个副本成功 . 因此,您应该将两个级别设置为QUORUM,或者将其中一个设置为ALL
默认的CQL shell级别也是ONE,所以你也应该增加它: