建议在python中调整我的spark应用程序

我想对改进我的火花应用设计有一些建议 . 我是新手,并且已经在网上阅读了关于性能的文档 . 当我在具有完全数据加载的集群上运行时,它非常慢 . 因此,请建议从开发人员的角度来看如何更好地调整应用程序 .

这就是我开发我的spark应用程序的方法:功能是读取HDFS中存在的文件,处理它并将数据存储在镶木地板蜂巢表中 . 使用spark和python开发 .

每个文件大小约为50Mbyte,大约有50个文件需要处理 . 它是一个3节点集群(2个slave 1 master) . 目前处理数据大约需要4个小时 . 在配置中分配有10个驱动器核,总 Actuator 核60, Actuator 存储器7G,驱动器存储器7G .

该脚本使用sc.TextFile读取所有文件并创建phytonRDD . python RDD被分配了一个模式,并使用Lambda函数逐行处理 . 逐行处理需要时间 . 加工后,它存放在镶木地板 .

我如何知道创建了多少RDD以及每个RDD占用了多少内存 . 怎么能更好地改善 .

谢谢你的建议 .

代码片段:

# Input: list of files along with metadata file. 
# Files start with a number to identify which branch file and the branch number is also a value in the base table
# If there are 100 files, then I can specify to process 10 files  

# checking the metadata table
    field_lst = branch list 
    sql_stmt = 'from bd.tb select filename where field1 in ' + \
                   ' (\"' + '\", \"'.join(["%s"] * int(field_lst.__len__())) % tuple(field_lst) + '\")' + \
                   ' and ' + \
                   'filename like "%_yyyy_xxxxx%"'
        newdf = hc.sql(sql_stmt)
# preparing the list of files that needs to be processed. This is a subset of input files.  
        files = newdf.toPandas().values.tolist()
        newfiles = list(ny.array(files).flat)
# processing the input file
        pyrdd = sc.textFile(','.join(newfiles), use_unicode=False)
    rdd1 = pyrdd.map(lambda row: tbl_schema(*row.split('\t')))
    rdd2 = rdd1.map(lambda row: None if (row == '0' or row == '') else row)
# input file has around 20 columns which is processed in Row
    rdd3 = rdd2.map(lambda row: Row(
             str(row.field1).lower.replace("'", "''").replace("\\", "\\\\").strip(),
             row.field2,
             row.field3,
             datetime.datetime.now()
             ))
    df2 = hc.createDataFrame(rdd3, SCHEMA_rdd3)
# reading from the base table where the rows does not have the branch list in field_lst
    sql_stmt1 = 'select * from ' + base_table + ' where field1 not in ' + \
            ' (\"' + '\", \"'.join(["%s"] * int(field_lst.__len__())) % tuple(field_lst) + '\")'
        df3 = hc.sql(sql_stmt1)
        new_df = df2.unionAll(df3)
        new_df.saveAsTable('tmp_tbl1, mode='overwrite')
        df_new = hc.sql('select * from tmp_tbl1')
        df_new.saveAsTable(base_table, mode='overwrite')

回答(2)

2 years ago

我建议你用Spark History Server来更好地了解你的工作 . 它可以告诉你:

我如何知道创建了多少RDD以及每个RDD占用了多少内存 .

(如果你缓存你的RDD或者洗牌,否则他们不会消耗太多内存)

此外,历史记录服务器还可以显示作业的DAG,潜在的GC问题等 .

逐行处理需要时间 .

既然您已经知道这一点,您可能希望专注于使用单元测试和分析来调整函数 . 在您的问题中粘贴实际代码可以让人们提供帮助 .

2 years ago

谢谢 . 请在下面找到一个代码段:

# Input: list of files along with metadata file. 
# Files start with a number to identify which branch file and the branch number is also a value in the base table
# If there are 100 files, then I can specify to process 10 files  

# checking the metadata table
    field_lst = branch list 
    sql_stmt = 'from bd.tb select filename where field1 in ' + \
                   ' (\"' + '\", \"'.join(["%s"] * int(field_lst.__len__())) % tuple(field_lst) + '\")' + \
                   ' and ' + \
                   'filename like "%_yyyy_xxxxx%"'
        newdf = hc.sql(sql_stmt)
# preparing the list of files that needs to be processed. This is a subset of input files.  
        files = newdf.toPandas().values.tolist()
        newfiles = list(ny.array(files).flat)
# processing the input file
        pyrdd = sc.textFile(','.join(newfiles), use_unicode=False)
    rdd1 = pyrdd.map(lambda row: tbl_schema(*row.split('\t')))
    rdd2 = rdd1.map(lambda row: None if (row == '0' or row == '') else row)
# input file has around 20 columns which is processed in Row
    rdd3 = rdd2.map(lambda row: Row(
             str(row.field1).lower.replace("'", "''").replace("\\", "\\\\").strip(),
             row.field2,
             row.field3,
             datetime.datetime.now()
             ))
    df2 = hc.createDataFrame(rdd3, SCHEMA_rdd3)
# reading from the base table where the rows does not have the branch list in field_lst
    sql_stmt1 = 'select * from ' + base_table + ' where field1 not in ' + \
            ' (\"' + '\", \"'.join(["%s"] * int(field_lst.__len__())) % tuple(field_lst) + '\")'
        df3 = hc.sql(sql_stmt1)
        new_df = df2.unionAll(df3)
        new_df.saveAsTable('tmp_tbl1, mode='overwrite')
        df_new = hc.sql('select * from tmp_tbl1')
        df_new.saveAsTable(base_table, mode='overwrite')