首页 文章

Spark从一个Data框架创建多个Data框架

提问于
浏览
0

我使用Spark 2.1和Cassandra(3.9)作为数据源 . C *有一个包含50列的大表,这对我的用例来说不是一个好的数据模型 . 所以我为每个传感器创建了拆分表以及分区键和聚类键cols .

All sensor table
-----------------------------------------------------
| Device |   Time     | Sensor1 | Sensor2 | Sensor3 |
|  dev1  | 1507436000 |  50.3   |    1    |    1    |
|  dev2  | 1507436100 |  90.2   |    0    |    1    |
|  dev1  | 1507436100 |  28.1   |    1    |    1    |
-----------------------------------------------------
Sensor1 table
-------------------------------
| Device |   Time     | value |
|  dev1  | 1507436000 | 50.3  |
|  dev2  | 1507436100 | 90.2  |
|  dev1  | 1507436100 | 28.1  |
-------------------------------

现在我使用spark将数据从旧表复制到新表 .

df = spark.read\
    .format("org.apache.spark.sql.cassandra")\
    .options(table="allsensortables", keyspace="dataks")\
    .load().cache()
df.createOrReplaceTempView("data")
query = ('''select device,time,sensor1 as value from data  ''' )
vgDF = spark.sql(query)
vgDF.write\
    .format("org.apache.spark.sql.cassandra")\
    .mode('append')\
    .options(table="sensor1", keyspace="dataks")\
    .save()

逐个复制数据对于单个表需要花费大量时间(2.1)小时 . 有什么方法我可以 select * 并为每个传感器创建多个df并立即保存? (或甚至顺序) .

1 回答

  • 0

    代码中的一个问题是缓存

    df = spark.read\
    .format("org.apache.spark.sql.cassandra")\
    .options(table="allsensortables", keyspace="dataks")\
    .load().cache()
    

    在这里,我没有看到除了保存之外df多次被使用 . 所以这里缓存是适得其反的 . 您正在读取数据,对其进行过滤并将其保存到单独的cassandra表中 . 现在,数据帧上发生的唯一操作是保存而不是其他任何操作 .

    因此,在此缓存数据没有任何好处 . 删除缓存可以加快速度 .

    按顺序创建多个表 . 我建议使用partitionBy并将数据首先写入HDFS作为分区数据w.r.t传感器,然后将其写回cassandra .

相关问题