首页 文章

PySpark:数据并不总是符合模式 - 逻辑来改变数据

提问于
浏览
0

我是PySpark的新手,正在编写一个脚本,从 .csv 文件中读取 .

我已明确定义了下面的模式,并且脚本运行完美......大部分时间 .

问题是,有时值会输入不符合架构的文件 - 例如'-'可能出现在整数字段中,因此,我们得到一个类型错误 - 在脚本中达到 df1.show() 时会抛出错误 .

我试图想办法有效地说 - 如果值与定义的数据类型不匹配,则替换为''

有谁知道这可能吗?任何建议都会很棒!

from pyspark.sql import SparkSession
import pyspark.sql.functions as sqlfunc
from pyspark.sql.types import *
import argparse, sys
from pyspark.sql import *
from pyspark.sql.functions import *
from datetime import datetime
#create a context that supports hive
def create_session(appname):
    spark_session = SparkSession\
        .builder\
        .appName(appname)\
        .master('yarn')\
        .config("hive.metastore.uris", "thrift://serverip:9083")\
        .enableHiveSupport()\
        .getOrCreate()
    return spark_session

### START MAIN ###
if __name__ == '__main__':
    spark_session = create_session('testing_files')
    dt_now = datetime.now()

    today_unixtime = long(dt_now.strftime('%s'))
    today_date = datetime.fromtimestamp(today_unixtime).strftime('%Y%m%d')

    twoday_unixtime = long(dt_now.strftime('%s')) - 24*60*60*2
    twoday = datetime.fromtimestamp(twoday_unixtime).strftime('%Y%m%d')

    hourago = long(dt_now.strftime('%s')) - 60*60*4
    hrdate = datetime.fromtimestamp(hourago).strftime('%H')

    schema = [\
        StructField('field1', StringType(), True),\
        StructField('field2',StringType(), True), \
        StructField('field3',IntegerType(), True) \
        ]
    final_structure = StructType(schema)

    df1 = spark_session.read\
        .option("header","false")\
        .option("delimiter", "\t")\
        .csv('hdfs://hdfspath/dt=%s/*/*/*' %today_date, final_structure)

    usercatschema = [\
        StructField('field1', StringType(), True),\
        StructField('field2',StringType(), True), \
        StructField('field3',StringType(), True) \
        ]
    usercat_structure = StructType(usercatschema)

    df2 = spark_session.read\
        .option("header","false")\
        .option("delimiter", "\t")\
        .csv('hdfs://hdfspath/v0/dt=%s/*' %twoday, usercat_structure)

    df1.show()
    df2.show()
    df1.createOrReplaceTempView("dpi")
    df2.createOrReplaceTempView("usercat")

    finaldf = spark_session.sql('''
    SQL QUERY
''')
    finaldf.coalesce(10).write.format("com.databricks.spark.csv").option("header", "true").option('sep', '\t').mode('append').save('hdfs://hdfs path')

1 回答

  • 0

    将其作为String类型读取,然后转换为int .

    df.withColumn("field3",df.field3.cast("int"))
    

相关问题