首页 文章

如何将pyspark脚本拆分为多个文件

提问于
浏览
-1

我有一个大型Pyspark程序,它执行以下步骤:

  • 常规导入和Spark对象初始化

  • 从目录中读取category1的数据 . 文件采用压缩格式 . 我正在使用sc.TextFile()一次性读取目录中的所有文件 .

  • define schema = StructType([..])
    通过对步骤2中的数据应用第3步中的模式,

  • createdataframe

  • 对另外3个类别重复步骤2,3,4

  • 每个类别分别有300,140,15和10列

  • 将所有类别中的数据加入所有类别中的两个公共列 . 使用left_outer使用内部和其他两个连接两个集合

  • 将结果存储在镶木地板中

  • 在镶木地板上创建一个蜂巢表

我正在使用Spark 2.0.0 / Python 3.4,我能够成功完成所有步骤 .

但是,我意识到我的脚本是一行冗长的1200行代码,将所有内容放在同一个脚本中并不是很整洁 .

我打算:

  • 将架构定义保存在单独的文件中

  • 常用模块,例如,在单独的文件中从dir读取数据,只需传递类别和目录即可读取 .

  • 想法是保持主脚本清洁和易于管理 .

我没有看到任何暗示如何维护大型火花代码的东西,并且一般的python指南可能不完全适用,因此问题 .

我正在寻找任何建议,指南,待办事项,非待办事项等,如果可能的话,还有一些示例代码 .

感谢您的时间和帮助 .

1 回答

  • 0

    定义单个参数化辅助函数

    read_data(sc, schema_file, data_dir):
      "return df"
      schema = read(schema_file)
      rdd = sc.textFile()
      sqlc = SQLContext(sc)
      df = sqlc.createDataFrame(rdd, schema=schema)
      return df
    

    那么主要的计划就是

    df1 = read_data(sc, sf1, dd1)
    df2 = read_data(sc, sf2, dd2)
    df3 = read_data(sc, sf3, dd3)
    df4 = read_data(sc, sf4, dd4)
    
    # assumes inner joins each with different join attrs
    net = (df1.join(df2, (df1["key1a"]==df2["key2a"] 
                          & df1["key1b"]==df2["key2b"]))
              .join(df3, (df1["key1c"]==df3["key3c"]
                          & df1["key1d"]==df3["key3d"]))
              .join(df4, (df1["key1e"]==df4["key4e"]
                          & df1["key1f"]==df4["key4f"])))
    
    # you may wish to remove some duplicated columns, esp. since you said there are many input columns
    filt = net.select(...) # or
    filt = net.drop(...)
    
    filt.write.parquet("out")
    

    我个人会把Hive表创建放在别处,而不是将它捆绑到脚本中 .

    您必须小心使用sc(或spark)作为全局,特别是如果脚本可以以交互方式和其他代码调用 . 我通常添加样板来有条件地创建sc在脚本a la底部的 __main__ 上下文中

    try:
        # noinspection PyUnboundLocalVariable                                                                                                                                    
        assert sc
    except (AssertionError, UnboundLocalError):
        spark_name = ("my app name")
        conf = SparkConf()
        conf.setAppName(spark_name)
        sc = SparkContext.getOrCreate(conf)
    

相关问题