这个问题是从[这一个](saving a list of rows to a Hive table in pyspark)脱离出来的 .

编辑请参阅本文底部的更新编辑

我已经使用Scala和现在的Pyspark来完成相同的任务,但是我遇到了将数据帧非常缓慢地保存到镶木地板或csv,或者将数据帧转换为列表或数组类型数据结构的问题 . 下面是相关的python / pyspark代码和信息:

#Table is a List of Rows from small Hive table I loaded using
#query = "SELECT * FROM Table"
#Table = sqlContext.sql(query).collect()

for i in range(len(Table)):

    rows = sqlContext.sql(qry)
    val1 = Table[i][0]
    val2 = Table[i][1]
    count = Table[i][2]
    x = 100 - count

#hivetemp is a table that I copied from Hive to my hfs using:
#create external table IF NOT EXISTS hive temp LIKE hivetableIwant2copy LOCATION "/user/name/hiveBackup";
#INSERT OVERWRITE TABLE hivetemp SELECT * FROM hivetableIwant2copy;

    query = "SELECT * FROM hivetemp WHERE col1<>\""+val1+"\" AND col2 ==\""+val2+"\" ORDER BY RAND() LIMIT "+str(x)

    rows = sqlContext.sql(query)
    rows = rows.withColumn("col4", lit(10))
    rows = rows.withColumn("col5", lit(some_string))
#writing to parquet is heck slow AND I can't work with pandas due to the library not installed on the server
    rows.saveAsParquetFile("rows"+str(i)+".parquet")
#tried this before and heck slow also
    #rows_list = rows.collect()
    #shuffle(rows_list)

我曾尝试在Scala中执行上述操作,但我遇到了类似的问题 . 我可以轻松加载配置单元表或hive表的查询,但需要随机shuffle或存储大型数据帧遇到内存问题 . 能够添加2个额外列也存在一些挑战 .

我要添加行的Hive表(hiveTemp)有5,570,000~550万行和120列 .

我在for循环中迭代的Hive表有5000行和3列 . 有25个唯一的 val1 (hiveTemp中的列),以及 val1val2 3000的组合.Val2可以是5列中的一列及其特定的单元格值 . 这意味着如果我调整了代码,那么我可以减少行的查找从5000添加到26,但是我必须检索,存储和随机shuffle的行数会非常大,因此存在内存问题(除非有人对此提出建议)

至于我需要添加到表中的总行数可能大约为100,000 .

最终目标是将5.5米行的原始表格附加到作为蜂巢或镶木 table 的100k行中 . 如果它更容易,我可以在自己的表中写入100k行,以后可以合并到5.5 mill表中

Scala或Python很好,虽然Scala更受欢迎 .

对此以及最佳选择的任何建议都会很棒 .

非常感谢!

编辑我对此问题有一些额外的想法:我使用散列分区程序将hive表分区为26个分区 . 这基于列值,其中有26个不同的列值 . 我想在for循环中执行的操作可以通用化,这样它只需要在每个分区上进行 . 话虽这么说,我怎么可以,或者我可以在线查看哪些指南能够编写scala代码来执行此操作,以及单独的执行程序在每个分区上执行这些循环中的每一个?我认为这会让事情变得更快 .
我知道如何使用多线程执行此类操作,但不知道如何使用scala / spark范例 .