首页 文章

Spark:不断读取Cassandra的数据

提问于
浏览
0

我已通过Reading from Cassandra using Spark Streaming并通过tutorial-1tutorial-2链接 .

可以公平地说,Cassandra-Spark集成目前没有提供任何开箱即用的功能,可以不断从Cassandra获取更新并将其流式传输到HDFS等其他系统吗?

通过连续,我的意思是只获取自上次获取Spark以来已更改(插入或更新)的表中的那些行 . 如果有太多这样的行,应该有一个限制行数的选项,随后的spark fetch应该从它停止的地方开始 . 至少一次保证是可以的,但确切一次将是一个巨大的欢迎 .

如果它不受支持,支持它的一种方法可能是在每个cassandra表中有一个辅助列 updated_time ,需要通过storm查询,然后使用该列进行查询 . 或者每个表的辅助表,其中包含要更改的行的ID,时间戳 . 有人曾尝试过这个吗?

2 回答

  • 0

    我不认为Apache Cassandra具有开箱即用的功能 . 在内部[在一段时间内]它以顺序方式将所有操作存储在数据上,但它是每个节点并最终被压缩(以节省空间) . 坦率地说,Cassandra(与大多数其他数据库一样)的承诺是提供最新的数据视图(这在分布式环境中本身可能非常棘手),但不是数据如何变化的完整历史记录 .

    因此,如果您仍希望在Cassandra中获得此类信息(并在Spark中处理它),您将不得不自己做一些额外的工作:设计专用表(或添加合成列),处理分区,保存偏移跟踪进度等

    Cassandra可以用于时间序列数据,但在你的情况下我会考虑使用流式解决方案(如Kafka)而不是发明它 .

  • 0

    我同意Ralkie的观点,但是如果你在这个用例中与C *绑定,我想提出一个更多的解决方案 . 此解决方案假设您可以完全控制架构并进行摄取 . 这不是一个流媒体解决方案,虽然它可能会被笨拙地分成一个 .

    您是否考虑过使用由timebucket和 murmur_hash_of_one_or_more_clustering_columnssome_int_designed_limit_row_width 组成的复合键?通过这种方式,您可以将时间设置为1分钟,5分钟,1小时等,具体取决于您需要分析/存档数据的方式 . 需要基于一个或多个聚类列的杂音散列来帮助定位C *集群中的数据(如果您经常查找特定的聚类列,这是一个非常糟糕的解决方案) .

    例如,采用IoT用例,其中传感器每分钟报告一次,并且有一些传感器读数可以表示为整数 .

    create table if not exists iottable {
      timebucket bigint,
      sensorbucket int,
      sensorid varchar,
      sensorvalue int,
      primary key ((timebucket, sensorbucket), sensorid)
    } with caching = 'none'
       and compaction = { 'class': 'com.jeffjirsa.cassandra.db.compaction.TimeWindowedCompaction' };
    

    注意TimeWindowedCompaction的使用 . 我不确定你使用的是什么版本的C *;但是对于2.x系列,我会远离DateTieredCompaction . 我不能说它在3.x中的表现如何 . 无论如何,您应该在确定架构和压缩策略之前进行广泛的测试和基准测试 .

    另请注意,此架构可能会导致热点,因为它比其他传感器更容易报告 . 同样,不知道用例很难提供完美的解决方案 - 这只是一个例子 . 如果您不关心为特定传感器(或列)读取C *,则根本不必使用聚类列,您只需使用timeUUID或随机的杂音哈希桶 .

    无论您如何决定对数据进行分区,这样的模式都允许您使用 repartitionByCassandraReplicajoinWithCassandraTable 来提取在给定的timebucket期间写入的数据 .

相关问题