-
20 votesanswersviews
查看Spark Dataframe列的内容
我正在使用Spark 1.3.1 . 我试图在Python中查看Spark数据帧列的值 . 使用Spark数据帧,我可以执行 df.collect() 来查看数据帧的内容,但是对于Spark数据帧列没有这样的方法,就像我看到的那样 . 例如,数据框 df 包含名为 'zip_code' 的列 . 所以我可以做 df['zip_code'] 并且它变成 pyspark.sql.dataframe.C... -
3 votesanswersviews
Pyspark DataFrame - 如何使用变量进行连接?
我在python上使用Spark数据帧在两个数据帧上进行连接时遇到了一些麻烦 . 我有两个数据框,我必须更改列的名称,以使它们对每个数据框唯一,所以稍后我可以告诉哪个列是哪个 . 我这样做是为了重命名列(firstDf和secondDf是使用createDataFrame函数创建的Spark DataFrames): oldColumns = firstDf.schema.names newCol... -
1 votesanswersviews
Spark DataFrame中向量的访问元素[关闭]
我想访问下面显示的spark数据框中的向量元素: - name_cv (262144,[88783,143375,220659,228248],[1.0,1.0,1.0,1.0]) (262144,[220659],[1.0]) (262144,[75742],[1.0]) (262144,[68369,95745,107911,224494],[1.0,1.0,1.0,1.0]) &am... -
0 votesanswersviews
在spark上运行wordcount >>> lines = sc.textFile(“README.md”)>>> lines.count()
Py4JJavaError:调用z:org.apache.spark.api.python.PythonRDD.collectAndServe时发生错误 . :org.apache.hadoop.mapred.InvalidInputException:输入路径不存在:org.apache中的文件:/home/shubhranshu/Documents/spark/spark-1.6.1-bi... -
4 votesanswersviews
Spark Scala:任务不可序列化错误
我正在使用带有Scala插件和spark库的IntelliJ社区版 . 我还在学习Spark并且正在使用Scala工作表 . 我写了下面的代码,删除字符串中的标点符号: def removePunctuation(text: String): String = { val punctPattern = "[^a-zA-Z0-9\\s]".r punctPattern.r... -
2 votesanswersviews
无法运行ALS.train,错误:java.lang.IllegalArgumentException
我正在尝试运行ALS of PySpark . 我复制并粘贴了链接中提供的示例代码 . 但是,错误 java.lang.IllegalArgumentException 出现在以下行: model = ALS.train(ratings, rank, numIterations) 请问在这里我需要调查哪些可能的问题? 我的Spark版本是2.2.1,我的Java版本是9.0.4 . 但是,我不确定... -
0 votesanswersviews
ML管道上的火花驱动器内存问题
我正在运行logisticregression管道,并在这一行: model = pipeline.fit(train_data) 我在RDDLossFunction阶段重复得到以下错误: 文件“/usr/spark-2.3.0/python/lib/pyspark.zip/pyspark/ml/base.py”,第132行,在适合文件中“/usr/spark-2.3.0/python/lib... -
0 votesanswersviews
容器以非零退出代码137退出
回溯(最近一次调用最后一次):文件"/home/hdp-credit/yinzhichao/analysis_data/src/imei_mate_mobile.py",第93行,在main()文件"/home/hdp-credit/yinzhichao/analysis_data/src/imei_mate_mobile.py",第89行,在main co... -
0 votesanswersviews
描述PySpark 2中的功能
我在使用PySpark的 .describe() 函数中收到以下错误 . 似乎每件事都在正确的道路上,但它不起作用 . 任何帮助表示赞赏! 追踪: py4j.protocol.Py4JJavaError:调用o31.describe时发生错误 . :位于org.apache.xbean.asm5.ClassReader的org.apache.xbean.asm5.ClassReader . (... -
1 votesanswersviews
如何为PySpark设置Window函数的分区?
我正在运行PySpark作业,我收到以下消息: WARN org.apache.spark.sql.execution.Window: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 消... -
3 votesanswersviews
在Pyspark上模拟UDAF以进行封装
我正在使用PySpark学习Spark,并且在尝试使事情更清洁时碰到了一堵墙 . 假设有一个看起来像这样的数据帧 . (当然,有更多的列和行) A | B | C --+---+------ a | 1 | 1.300 a | 2 | 2.500 a | 3 | 1.000 b | 1 | 120.0 b | 4 | 34.20 c | 2 | 3.442 我想在它上运行一堆 groupb... -
5 votesanswersviews
PySpark如何将CSV读入Dataframe并进行操作
我是pyspark的新手,我正在尝试用它来处理一个保存为csv文件的大型数据集 . 我想将CSV文件读入spark数据帧,删除一些列,然后添加新列 . 我该怎么做? 我无法将此数据转换为数据帧 . 这是我到目前为止的精简版: def make_dataframe(data_portion, schema, sql): fields = data_portion.split(",&... -
2 votesanswersviews
创建合并两个其他列的Pyspark DataFrame列,为什么我得到'unicode'对象的错误没有属性isNull?
我在使用Pyspark Dataframe时遇到了一些麻烦 . 具体来说,我正在尝试为数据帧创建一个列,这是合并数据帧的两列的结果 . 例如 . this_dataframe = this_dataframe.withColumn('new_max_price', coalesce(this_dataframe['max_price'],this_dataframe['avg(max_price)... -
0 votesanswersviews
你能在pyspark中拥有一列数据帧吗?
我对pyspark / bigdata有点新意,所以这可能是一个坏主意,但我有大约一百万个单独的CSV文件,每个文件都与一些元数据相关联 . 我想要一个包含所有元数据字段列的pyspark数据框,但也有一个列,其条目是与每组元数据相关联的(整个)CSV文件 . 我现在不在工作,但我记得几乎确切的代码 . 我尝试了类似的玩具示例 outer_pandas_df = pd.DataFrame.from... -
0 votesanswersviews
Pyspark自动增量为交替的值组
我正在尝试使用Pyspark在Spark DataFrame中创建一个新列,它表示基于交替布尔值组的自动增量(或ID) . 可以说我有以下DataFrame: df.show() +-----+------------+-------------+ |id |par_id |is_on | +-----+------------+-------------+ |4000... -
0 votesanswersviews
Pyspark groupBy Pivot Transformation
我很难构建以下Pyspark数据帧操作 . 基本上我试图按类别分组,然后转动/取消融合子类别并添加新列 . 我已经尝试了很多方法,但它们非常慢并且没有利用Spark的并行性 . 这是我现有的(慢速,详细)代码: from pyspark.sql.functions import lit df = sqlContext.table('Table') #loop over category li... -
0 votesanswersviews
Pyspark:将UDF的结果迭代地写回数据帧并不会产生预期的结果
我仍然是pyspark的新手,我正在尝试评估函数并在UDF的帮助下迭代创建列 . 以下是功能: def get_temp(df): l=['temp1','temp2','temp3'] s=[0] pt = [0] start = [0] end = [0] cummulative_stat = [0] for p in xrange(1,... -
0 votesanswersviews
PySpark聚合和分组依据
我看过多个帖子,但聚合是在多个列上完成的,但我希望基于col OPTION_CD进行聚合,基于以下 condition: If have conditions attached to the dataframe query, which is giving me the error 'DataFrame' object has no attribute '_get_object_id' IF ... -
0 votesanswersviews
将用户功能应用于整个Spark DataFrame列
Spark DataFrame架构: In [177]: testtbl.printSchema() root |-- Date: long (nullable = true) |-- Close: double (nullable = true) |-- Volume: double (nullable = true) 我希望将一个标量值函数应用于 testtbl 列 . 假设我想计算'... -
-1 votesanswersviews
稀疏矩阵RDD中的值的标准化
我想在产品推荐中使用矩阵分解(通过在MLlib中使用ALS),我没有评级,但使用购买数量(隐式信息,稀疏矩阵) . 在训练之前,我想按客户标准化数据集(矩阵中的行,其中列是产品,交集是金额) . Matrix有几百万行和几万列,所以我想尽可能多地使用RDD . 我的数据存储在元组列表中: (int, int, int) [(Client1, Product1, amount) (Client1... -
4 votesanswersviews
PySpark PCA:如何将数据帧行从多列转换为单列DenseVector?
我想使用PySpark(Spark 1.6.2)对Hive表中存在的数值数据执行主成分分析(PCA) . 我能够将Hive表导入Spark数据帧: >>> from pyspark.sql import HiveContext >>> hiveContext = HiveContext(sc) >>> dataframe = hiveConte... -
0 votesanswersviews
Spark Optimization - 为什么一个节点会承受所有压力?
我在4节点集群上提交了一个作业,我看到,大多数操作发生在其中一个工作节点上,而其他两个操作只是放松了 . 下面的图片说明了这一点 - 如何正确分配负载? 我的集群conf(4节点集群[1个驱动程序; 3个从属]) - 核心 - 6 RAM - 12 GB HDD - 60 GB 我的Spark提交命令如下 - spark-submit --master spark://192.168... -
0 votesanswersviews
在spark数据帧上应用操作时出错
我是Spark框架的新手,并在我的本地机器上处理一些(〜)小任务来练习 . 我的任务如下:我在S3中存储了365个压缩的csv文件,其中包含每日日志 . 我想构建一整年的数据框架 . 我的方法是从存储桶中检索密钥,构建每日数据帧,将它们统一到月份数据帧中,为它们执行相同操作,并获得一整年的数据帧作为回报 . 这对我检索的一些样本数据起了作用 . 在构建DataFrame之前,我解压缩了文件,将未压... -
0 votesanswersviews
从JSon File动态生成模式
需要你的帮助来定义一个动态模式,其中包含来自输入元数据JSon文件的字段和数据类型.Below是JSon文件 [ { "trim": true, "name": "id", "nullable": true, "id": null, "posit... -
0 votesanswersviews
pyspark使用模式将csv文件加载到数据框中
我是pyspark的新手,在Spark版本2.2.0和Python版本2.7.12上使用pyspark 我试图将2个.csv文件(具有多于1个 Headers 行)读入具有已知模式的2个不同数据帧并执行比较操作 . 我不确定是否有任何最佳/更好的方法来创建模式文件(包括列名,数据类型,可空性)并在pyspark程序中引用它以加载到数据帧中 . 我为第一个文件编码如下: 创建一个yaml文件来存... -
12 votesanswersviews
遇到丢失的功能时,Apache Spark会抛出NullPointerException
在为要素中的字符串列编制索引时,我对PySpark有一个奇怪的问题 . 这是我的tmp.csv文件: x0,x1,x2,x3 asd2s,1e1e,1.1,0 asd2s,1e1e,0.1,0 ,1e3e,1.2,0 bd34t,1e1e,5.1,1 asd2s,1e3e,0.2,0 bd34t,1e2e,4.3,1 我在'x0'有一个缺失值 . 首先,我正在使用pyspark_csv将数据从... -
5 votesanswersviews
使用PySpark在数据框上应用sklearn训练的模型
我使用Python训练了一个随机森林算法,并希望将它应用于PySpark的大数据集 . 我首先加载了训练有素的sklearn RF模型(使用joblib),将包含这些功能的数据加载到Spark数据帧中,然后添加一个包含预测的列,并使用用户定义的函数: def predictClass(features): return rf.predict(features) udfFunction = ... -
3 votesanswersviews
从DB获取每行DataFrame Pyspark的数据
我在流式上下文中使用Pyspark Dataframe API,我已经在我的spark流应用程序中将RDD转换为DF foreach DStream(我使用的是kafka接收器)这是我在我的进程RDD函数中所做的: rowRdd = data_lined_parameters.map( lambda x: Row(SYS=x[0], METRIC='temp', SEN=x[1],... -
22 votesanswersviews
如何将Vector拆分为列 - 使用PySpark
Context: 我有一个包含2列的 DataFrame :word和vector . "vector"的列类型是 VectorUDT . 一个例子: word | vector assert | [435,323,324,212...] 我希望得到这个: word | v1 | v2 | v3 | v4 | v5 | v6 ...... asser... -
3 votesanswersviews
为什么PCA在pyspark内存不足?
当我在pyspark中运行PCA时,我的内存不足 . 这是pyspark 1.6.3,并且执行环境是齐柏林飞艇笔记本 . 这是一个例子 . 设 df 为pyspark DataFrame,其中'vectors'是所需的输入列(包含数据的SparseVector) . from pyspark.ml.feature import PCA pca = PCA(k = 100, inputCol=&qu...