-
1 votesanswersviews
将数据添加到存储在磁盘上的Spark / Parquet数据
我处于类似the one mentioned here的情况 . 问题没有得到满意的答复 . 此外,我处理的数据较少(每天约1G) . 我的情况:我已经有一定数量的数据(~500G)可用作镶木地板(这是商定的“存储格式”),我定期进行增量更新 . 我希望以后能够处理ETL部分以及分析部分 . 为了能够有效地生成某些“中间数据产品”的更新,我看到三个选项: 使用 append mode 保存,保... -
6 votesanswersviews
Spark:对数据进行排序和分区的最有效方法是将其写为镶木地板
我的数据原则上是一个表,除了其他'data'之外,它还包含一列 ID 和一列 GROUP_ID . 在第一步中,我将CSV读入Spark,进行一些处理以准备第二步的数据,并将数据写为镶木地板 . 第二步做了很多 groupBy('GROUP_ID') 和 Window.partitionBy('GROUP_ID').orderBy('ID') . 现在的目标是 - 为了避免第二步中的混乱 - ... -
3 votesanswersviews
如何将数据框中的数据写入HDFS中的单个.parquet文件(单个文件中的数据和元数据)?
如何将数据框中的数据写入HDFS中的单个.parquet文件(单个文件中的数据和元数据)? df.show() --> 2 rows +------+--------------+----------------+ | name|favorite_color|favorite_numbers| +------+--------------+----------------+ |Alyssa|... -
0 votesanswersviews
无法在火花中本地读取镶木地板文件
我在本地运行Pyspark并尝试读取镶木地板文件并从笔记本加载到数据框中 . df = spark.read.parquet(“metastore_db / tmp / userdata1.parquet”) 我得到了这个例外 An error occurred while calling o738.parquet. : org.apache.spark.sql.AnalysisException... -
2 votesanswersviews
为每组pyspark RDD / dataframe选择随机列
我的数据帧有10,0000列,我必须对每个组应用一些逻辑(键是区域和部门) . 每组将使用10k列中的最多30列,30列列表来自第二个数据集列“colList” . 每组将有2-3百万行 . 我的方法是按键分组和调用函数,如下所示 . 但它失败了 - 1. shuffle和2.data组超过2G(可以通过重新分区来解决,但是它的成本很高),3 . 非常慢 def testfunc(iter): ... -
2 votesanswersviews
pyspark createdataframe:字符串被解释为时间戳,架构混合了列
我有一个非常奇怪的错误,火花数据帧导致一个字符串被评估为时间戳 . 这是我的设置代码: from datetime import datetime from pyspark.sql import Row from pyspark.sql.types import StructType, StructField, StringType, TimestampType new_schema = Str... -
0 votesanswersviews
spark 2.x使用csv函数读取整数/双列作为字符串
我正在使用以下声明阅读spark中的csv . df = spark.read.csv('<CSV FILE>', header=True, inferSchema = True) 当我检查spark数据帧时,一些整数和双列存储为数据帧中的字符串列 . 但是,所有列都不是这种情况 . 我检查了特定列的值,所有值都是double类型,但仍然是spark推断为StringType . 由... -
2 votesanswersviews
pyspark在udf中使用数据框
我有两个数据帧 df1 +---+---+----------+ | n|val| distances| +---+---+----------+ | 1| 1|0.27308652| | 2| 1|0.24969208| | 3| 1|0.21314497| +---+---+----------+ 和 df2 +---+---+----------+ | x1| x2| ... -
1 votesanswersviews
通过检查字符串是否出现在列中来过滤PySpark DataFrame
我是Spark的新手并且正在玩过滤 . 我有一个通过读取json文件创建的pyspark.sql DataFrame . 架构的一部分如下所示: root |-- authors: array (nullable = true) | |-- element: string (containsNull = true) 我想过滤这个DataFrame,选择所有具有与特定作者相关的条目的行 ... -
1 votesanswersviews
Pyspark:计算行最小值,忽略零和空值
我想基于数据框中现有的列子集创建一个新列(v5) . 示例数据帧: +---+---+---+---+ | v1| v2| v3| v4| +---+---+---+---+ | 2| 4|7.0|4.0| | 99| 0|2.0|0.0| |189| 0|2.4|0.0| +---+---+---+---+ 提供示例数据帧的另一个视图: +---+---+---+---+ | v1| ... -
0 votesanswersviews
Spark 2.0与Zeppelin 0.6.1 - SQLContext不可用
我在Linux服务器上运行spark 2.0和zeppelin-0.6.1-bin-all . 默认的spark笔记本运行得很好,但是当我尝试使用sqlContext在pyspark中创建并运行一个新的笔记本时,我得到错误“py4j.Py4JException:Method createDataFrame([class java.util.ArrayList,class java.util.Arr... -
0 votesanswersviews
如何在PySpark中的大型Spark数据框中对每个行子集进行映射操作
我正在使用PySpark,我想要做的是以下内容: 一个大的Spark数据框df包含所有记录 . 我想对记录的每个子集进行并行计算,除以此df中的“id”列 . 我目前可以想到的方式如下:(我将用一个简单的例子来说明) dicts = [ {'id': 1, 'name': 'a', 'score': 100}, {'id': 1, 'name': 'b', 'score'... -
0 votesanswersviews
我试图计算使用spark sql从数据帧中取出的行数 . 但我收到以下错误
我试图计算使用spark sql从数据帧中取出的行数 . 但我收到以下错误 . 有人可以帮我解决这个问题吗? My sql query 碰撞是我的表名,我试图显示结果 . outputDataframe = spark.sql("select count(*) from collisions where YEAR(date) = 2016") outputDatafra... -
1 votesanswersviews
Spark:并行化创建多个DataFrame
我目前正在基于ID列表生成DataFrames - 每个基于一个ID的查询都会返回一个非常大的PostgreSQL表的可管理子集 . 然后我根据我需要写出的文件结构对输出进行分区 . 问题是我正在达到速度限制并且主要利用我的 Actuator 资源 . 我不确定这是否需要重新考虑我的架构,或者是否有一些简单的方法可以解决这个问题,但基本上我希望得到更多的任务并行化但是我没有让所有16位执行者都忙... -
2 votesanswersviews
在Pyspark中解析JSON文件
我对Pyspark很新 . 我尝试使用以下代码解析JSON文件 from pyspark.sql import SQLContext sqlContext = SQLContext(sc) df = sqlContext.read.json("file:///home/malwarehunter/Downloads/122116-path.json") df.printSche... -
1 votesanswersviews
如何在pyspark中将JSON字符串转换为JSON对象
我有一个列类型的数据框是字符串,但实际上它包含4个架构的json对象,其中很少有字段是常见的 . 我需要将其转换为jason对象 . 这是数据框架的架构: query.printSchema() root |-- test: string (nullable = true) DF的 Value 看起来像 query.show(10) +--------------------+ | ... -
1 votesanswersviews
如何在pyspark中为给定路径编写Dataframe时避免使用随机数?
通过执行下面的代码我有pyspark Dataframe我在给路径中保存了Dataframe . df.write.format("csv").options(header='false', inferschema='true',sep="|").option("codec", "org.apache.hadoop.io.com... -
-1 votesanswersviews
PySpark-如何使用Pyspark计算每个字段的最小值,最大值?
我试图找到sql语句产生的每个字段的最小值,最大值,并将其写入csv文件 . 我试图以下面的方式得到结果 . 能否请你帮忙 . 我已经用python编写了,但现在尝试将其转换为pyspark直接在hadoop集群中运行 from pyspark.sql.functions import max, min, mean, stddev from pyspark import SparkContext... -
1 votesanswersviews
Pyspark - 如何将镶木地板文件转换为带分隔符的文本文件
我有一个带有以下架构的镶木地板文件: |日期| ID | 我想将它转换为带有制表符分隔符的文本文件,如下所示: 20170403 15284503 我怎样才能在pyspark中这样做? -
3 votesanswersviews
将pyspark dataframe列从列表转换为字符串
我有这个pyspark数据帧 +-----------+--------------------+ |uuid | test_123 | +-----------+--------------------+ | 1 |[test, test2, test3]| | 2 |[test4, test, test6]| | ... -
0 votesanswersviews
如何在pyspark中使用两列的串联进行过滤
我已经阅读了一个镶木地板文件,我想用准备好的dict过滤行 . 数据框中有两列名为col1和col2,它们是string类型 . 我的字典中有一组字符串,我想要一些行,其中列col1和col2中字符串的串联在字典中 . 我试过了 df.filter((df['col1']+df['col2']) in my_dict) 但似乎 df['col1']+df['col2'] 不是字符串,即使这是列的... -
1 votesanswersviews
Pyspark:数据帧中每个id的最后一个非空记录(Spark Streaming)
我有一个带架构的数据框 - |-- record_id: integer (nullable = true) |-- Data1: string (nullable = true) |-- Data2: string (nullable = true) |-- Data3: string (nullable = true) |-- Time: timestamp (nullable = t... -
0 votesanswersviews
Pyspark - 在groupby和orderBy之后选择Column中的Distinct值
所以我的表看起来像这样: +-------------------+-------+----------+------------+ | trip_id|line_id| ef_ar_ts| station| +-------------------+-------+----------+------------+ |80:06____:17401:000| 17... -
0 votesanswersviews
如何在Spark 2.1上更新pyspark数据帧元数据?
我遇到了SparkML的OneHotEncoder问题,因为它读取数据帧元数据以确定它应为其创建的稀疏矢量对象分配的值范围 . 更具体地说,我使用包含0到23之间的所有单个值的训练集来编码“小时”字段 . 现在我使用Pipeline的“transform”方法对单行数据帧进行评分 . 不幸的是,这导致OneHotEncoder的不同编码的稀疏矢量对象 (24,[5],[1.0])vs.(11,[1... -
1 votesanswersviews
获取java.lang.UnsupportedOperationException:无法计算Pyspark中的表达式
在我的项目中间,我遇到了这个不支持的操作异常 . 这是我的场景,我创建了一个名为filter的udf并将其注册为fnGetChargeInd . 此函数将4个参数作为unicode时间戳,该时间戳已经从查询格式化为datetime类型,字符串频率,字符串begmonth和字符串currperiod . 通过它计算chargeAmt并返回一个Integer类型值 . 这是我的udf函数代码 def ... -
1 votesanswersviews
PySpark流媒体:窗口和转换
我正在尝试从Spark流数据源读取数据,按事件时间窗口,然后在窗口数据上运行自定义Python函数(它使用非标准Python库) . 我的数据框看起来像这样: | Time | Value | | 2018-01-01 12:23:50.200 | 1234 | | 2018-01-01 12:23:51.200 | 33 | | 2018-01-01... -
0 votesanswersviews
Spark 2.3 Dropping Temp Table [复制]
这个问题在这里已有答案: Remove Temporary Tables from Apache SQL Spark 3个答案 我试图在相应的临时表的使用结束后使用 dropTempTable() (为了下一次计算释放内存) . 较新的Spark会话不需要 sqlContext ,因此,我对如何使用该功能感到困惑 . 1)尝试,我用来注册临时表的DF相同 - DF.dropTempTable(... -
0 votesanswersviews
PySpark中的GeoText和UDF
我是PySpark中的noobie(Spark 2.1.0和python 3.5),我遇到了一个我无法通过的问题 . 我尝试在UDF中使用GeoText,这是我的代码: def countries(x): count = GeoText(x).countries w = '' if not count: return '' else: ... -
0 votesanswersviews
通过执行UNION创建配置单元表不会将表存储为pyspark中的镶木地板文件
我正在尝试使用Pyspark在hive中创建一个表 . 该表已成功创建,但不是PARQUET格式 . 我创建了一个示例数据集来重新创建我的问题 . from pyspark import SparkContext, SparkConf from pyspark.sql import SparkSession, HiveContext conf = (SparkConf() .se... -
1 votesanswersviews
如何序列化pyspark GroupedData对象?
我正在一个拥有数百万条记录的数据集上运行 groupBy() 并希望保存结果输出(pyspark GroupedData 对象),以便我可以在以后对其进行反序列化并从该点恢复(根据需要在其上运行聚合) . df.groupBy("geo_city") <pyspark.sql.group.GroupedData at 0x10503c5d0> 我想避免将Gro...