我在这里写一些关于如何使用spark-cassandra连接器提高分析工作中的读取性能的建议 . 我面临的是一些超时问题,同时在几个分区键上执行大量读取 .

在cassandra日志中,这样的超时类似于:

WARN  [ScheduledTasks:1] 2017-12-06 04:31:49,752 MonitoringTask.java:150 - 32 operations timed out in the last 20394 msecs, operation list available at debug log level
DEBUG [ScheduledTasks:1] 2017-12-06 04:31:49,753 MonitoringTask.java:155 - 32 operations timed out in the last 20394 msecs:
SELECT * FROM xyz WHERE ts = 2017-11-10 00:32+1100 LIMIT 5000: total time 15563 msec - timeout 5000 msec
SELECT * FROM xyz WHERE ts = 2017-10-20 00:31+1100 LIMIT 5000: total time 15597 msec - timeout 5000 msec
SELECT * FROM xyz WHERE ts = 2017-10-20 00:49+1100 LIMIT 5000: total time 15596 msec - timeout 5000 msec

集群

我正在使用此配置在群集中运行分析Spark作业:

  • DC1:2个节点,都有32个内核和64GB内存

  • DC2:2个节点,32个和24个内核,都有64GB的RAM

在所有节点上,我运行Cassandra和Spark . 作为火花工作,我有一个流媒体工作和分析工作(每天运行一次) . 复制因子是2,每个DC一个 . 这被配置为每个DC具有每个数据的一个副本 . 所有节点都有C * 3.7和Spark 2.1.1,连接器版本2.0.5 .

火花流作业负责集群中的写入(它从Flume代理接收数据) . 这意味着我的集群整天都有大量的写入工作负载,并且每天都有大量的读取工作负载 .

表格读了

我正在读取数据的表有这个模式:

CREATE TABLE xyz (
    ts timestamp,
    f1 text,
    f2 text,
    count counter,
    PRIMARY KEY ((ts), f1, f2)
) WITH CLUSTERING ORDER BY (f1 ASC, f2 ASC)
    AND bloom_filter_fp_chance = 0.01
    AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
    AND comment = ''
    AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
    AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
    AND crc_check_chance = 1.0
    AND dclocal_read_repair_chance = 0.1
    AND default_time_to_live = 0
    AND gc_grace_seconds = 864000
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 0
    AND min_index_interval = 128
    AND read_repair_chance = 0.0
    AND speculative_retry = '99PERCENTILE';

如您所见,分区键是ts,其中每个可能的值代表特定的分钟(例如,2017-12-05 17:42 0000)

分析工作

我运行分析工作,每个节点有2个执行程序,每个执行程序4个核心,每个执行程序5GB(总共32个核心和40GB内存) .

分析作业负责从一个C *表读取数据并写入另一个C *表(使用不同的分区键) .

分析作业在24个批次中读取12周的数据,每5040分钟读取一次,从而访问5040个分区键 . 对于每个批次,我使用5040密钥准备RDD,然后使用 repartitionByCassandraReplicajoinWithCassandraTable 来确保数据位置 .

nodetool tablestatsnodetool tablehistograms 我可以看到分区大小平均为422KB . 这意味着每批需要读取5040 * 422KB~ = 2GB的(未压缩?)数据 .

作业连接到主节点,因为我没有在C *连接器中指定主机 . 因此,默认情况下,数据只能从DC1读取 .

问题

默认的Cassandra Heap空间大小为8GB,由于C *中的许多读取超时错误,spark作业失败 . 由于FullGC,我能够将这些错误与大的GC暂停相关联 .

我调查了很多,我能够排除磁盘访问中的峰值(使用iostat命令) . 在CQLSH中跟踪相同类型的查询我能够看到一个合理的响应时间(10-50ms),同时完成了超过一千个请求 .

唯一的问题似乎是垃圾收集器 .

我试图将MAX_HEAP_MEMORY从8GB增加到12GB,然后从12GB增加到16GB . 它有所帮助,但仍然看到一些超时:

  • 如上所述,如果有太多的超时导致火花错误,那么作业就会停止

  • 使用12GB的作业在48分钟内完成,由于GC仅在Cassandra日志中可见,大约15-20次超时(对火花作业没有影响)

  • 16GB的工作在41分钟内完成, still with 由于GC的一些超时(大约10个案例)

First question: 覆盖连接器参数 spark.cassandra.connection.factory 以使用将使用来自两个DC的所有节点的策略是否有用,考虑到DC之间的延迟较低(<3ms)?

这将对DC1上的节点施加较小的压力,因为读取将分布在4个节点而不是2个节点上 .

Second question: 考虑到数据写在表上的方式,我可能决定将压缩策略更改为 DateTieredCompactionStrategy . 改变压实策略会有用吗?

这在 生产环境 集群中是一项代价高昂的操作,因此我希望能够合理地确定它将是有益的 . 有关信息,我可以看到在不同节点上访问的SSTable数量在50%百分位数的5到12之间,7%和14%用于99%,7-14再次作为最大值(来自 nodetool tablehistograms

Third question: 我还缺少另一个优化点吗?