我想对改进我的火花应用设计有一些建议 . 我是新手,并且已经在网上阅读了关于性能的文档 . 当我在具有完全数据加载的集群上运行时,它非常慢 . 因此,请建议从开发人员的角度来看如何更好地调整应用程序 .
这就是我开发我的spark应用程序的方法:功能是读取HDFS中存在的文件,处理它并将数据存储在镶木地板蜂巢表中 . 使用spark和python开发 .
每个文件大小约为50Mbyte,大约有50个文件需要处理 . 它是一个3节点集群(2个slave 1 master) . 目前处理数据大约需要4个小时 . 在配置中分配有10个驱动器核,总 Actuator 核60, Actuator 存储器7G,驱动器存储器7G .
该脚本使用sc.TextFile读取所有文件并创建phytonRDD . python RDD被分配了一个模式,并使用Lambda函数逐行处理 . 逐行处理需要时间 . 加工后,它存放在镶木地板 .
我如何知道创建了多少RDD以及每个RDD占用了多少内存 . 怎么能更好地改善 .
谢谢你的建议 .
代码片段:
# Input: list of files along with metadata file.
# Files start with a number to identify which branch file and the branch number is also a value in the base table
# If there are 100 files, then I can specify to process 10 files
# checking the metadata table
field_lst = branch list
sql_stmt = 'from bd.tb select filename where field1 in ' + \
' (\"' + '\", \"'.join(["%s"] * int(field_lst.__len__())) % tuple(field_lst) + '\")' + \
' and ' + \
'filename like "%_yyyy_xxxxx%"'
newdf = hc.sql(sql_stmt)
# preparing the list of files that needs to be processed. This is a subset of input files.
files = newdf.toPandas().values.tolist()
newfiles = list(ny.array(files).flat)
# processing the input file
pyrdd = sc.textFile(','.join(newfiles), use_unicode=False)
rdd1 = pyrdd.map(lambda row: tbl_schema(*row.split('\t')))
rdd2 = rdd1.map(lambda row: None if (row == '0' or row == '') else row)
# input file has around 20 columns which is processed in Row
rdd3 = rdd2.map(lambda row: Row(
str(row.field1).lower.replace("'", "''").replace("\\", "\\\\").strip(),
row.field2,
row.field3,
datetime.datetime.now()
))
df2 = hc.createDataFrame(rdd3, SCHEMA_rdd3)
# reading from the base table where the rows does not have the branch list in field_lst
sql_stmt1 = 'select * from ' + base_table + ' where field1 not in ' + \
' (\"' + '\", \"'.join(["%s"] * int(field_lst.__len__())) % tuple(field_lst) + '\")'
df3 = hc.sql(sql_stmt1)
new_df = df2.unionAll(df3)
new_df.saveAsTable('tmp_tbl1, mode='overwrite')
df_new = hc.sql('select * from tmp_tbl1')
df_new.saveAsTable(base_table, mode='overwrite')
2 回答
我建议你用Spark History Server来更好地了解你的工作 . 它可以告诉你:
(如果你缓存你的RDD或者洗牌,否则他们不会消耗太多内存)
此外,历史记录服务器还可以显示作业的DAG,潜在的GC问题等 .
既然您已经知道这一点,您可能希望专注于使用单元测试和分析来调整函数 . 在您的问题中粘贴实际代码可以让人们提供帮助 .
谢谢 . 请在下面找到一个代码段: