首页 文章

使用spark SQL中的相同名称替换带有calcutated列的列

提问于
浏览
1

我从我的datalake读取文件并将它们加载到数据框中由于kafka(here)中的强制转换问题,加载的数据有一些与源数据库中的类型不同的字段

因此,我使用错误的数据类型(bunary)从S3加载数据,并使用UDF函数将每个列转换为另一列

然后,我重命名新列以替换olders,以在源数据库和目标数据库中保持相同的结构

脚步 :

之前:

myTable
|
+-- myField1 (binary)
+-- myField2 (binary)
+-- myField3 (binary)

中间状态1(使用UDF功能):

myTable
|
+-- myField1 (binary)
+-- myField1_new (numeric)
+-- myField2 (binary)
+-- myField2_new (numeric)
+-- myField3 (binary)
+-- myField3_new (numeric)

中间国家2(下降旧栏目):

myTable
|
+-- myField1_new (numeric)
+-- myField2_new (numeric)
+-- myField3_new (numeric)

最终状态(重命名计算列):

myTable
|
+-- myField1 (numeric)
+-- myField1 (numeric)
+-- myField1 (numeric)

这是我使用的语法:

spark.sql('select *,
            MyUDF(myfield1) myfield1_new,
            MyUDF(myfield2) myfield2_new,
            MyUDF(myfield3) myfield3_new
            from my_table')
.drop('myfield1').withColumnRenamed('myfield1_new', 'myfield1')
.drop('myfield2').withColumnRenamed('myfield2_new', 'myfield2')
.drop('myfield3').withColumnRenamed('myfield3_new', 'myfield3')
.show(1, False)

我的问题是这个过程真的很慢,因为在真实的 生产环境 表中要计算439个字段(439 !!!)

有没有办法更快地完成它?在飞行中重命名还是其他什么?

谢谢你的帮助

1 回答

  • 0

    我看到了这个问题的previous主题 .

    扩展那个,假设你有 df

    +--------------------+
    |             myfield|
    +--------------------+
    |[00, 8F, 2B, 9C, 80]|
    |    [52, F4, 92, 80]|
    +--------------------+
    

    EDIT :由于列 myfield 的格式为 bytearray(b'\x00') ,转换方式如下(由@Ftagn指出) . 否则,如果是字符串列表,请使用 commented return .

    def func(val):
        return int.from_bytes(val, byteorder='big', signed=False) / 1000000
        # return int("".join(val), 16)/1000000
    func_udf = udf(lambda x: func(x), FloatType())
    

    并创建输出,使用

    df = df.withColumn("myfield1", func_udf("myfield"))
    

    这会产生,

    +--------------------+--------+
    |             myfield|myfield1|
    +--------------------+--------+
    |[00, 8F, 2B, 9C, 80]|  2402.0|
    |    [52, F4, 92, 80]| 1391.76|
    +--------------------+--------+
    

    相反,如果你使用,

    df = df.withColumn("myfield", func_udf("myfield"))
    

    你得到,

    +-------+
    |myfield|
    +-------+
    | 2402.0|
    |1391.76|
    +-------+
    

相关问题