-
0 votesanswersviews
Spark Streaming不读所有Kafka唱片
我们从kafka发送了15条记录到SparkStreaming,但是spark只收到了11条记录 . 我使用的是spark 2.1.0和kafka_2.12-0.10.2.0 . 码 import java.util.HashMap; import java.util.Map; import java.util.Properties; import org.apache.kafka.client... -
0 votesanswersviews
PySpark Softmax回归梯度体面
我有一个softmax文档分类器的实现,编写为spark应用程序 . 我有一套培训文件,一套培训文件的标签和一套测试文件 . 我的任务是使用在培训文档上训练的softmax分类器来预测测试文档的标签 . 我发现this是关于softmax回归和梯度体面过程的非常有用的教程 . 那是's what I'米的工作 . 训练数据是一个文件,每行文件一个文档 . 训练标签是一个文件,在文件的每一行上具有逗... -
7 votesanswersviews
在pyspark中调用scala代码进行XSLT转换
这可能是一个很长的镜头,但认为它不能试图在pyspark中使用Elsevier's open-sourced spark-xml-utils package来使用XSLT转换一些XML记录 . 我通过一些探索性代码获得转换工作取得了一些成功: # open XSLT processor from spark's jvm context with open('/tmp/foo.xsl', 'r') ... -
20 votesanswersviews
将Spark数据帧保存为Hive中的动态分区表
我有一个示例应用程序正在从csv文件读取数据帧 . 可以使用方法 df.saveAsTable(tablename,mode) 将数据帧以镶木地板格式存储到Hive表中 . 上面的代码工作正常,但我每天都有如此多的数据,我想根据creationdate(表中的列)动态分区hive表 . 有没有办法动态分区数据帧并将其存储到配置单元仓库 . 想要避免使用 hivesqlcontext.sql(ins... -
9 votesanswersviews
将Spark数据框保存到Hive:table不可读,因为“镶木地板不是SequenceFile”
我想使用PySpark将Spark(v 1.3.0)数据框中的数据保存到Hive表中 . documentation州: “spark.sql.hive.convertMetastoreParquet:当设置为false时,Spark SQL将使用Hive SerDe作为镶木桌而不是内置支持 . ” 看看Spark tutorial,似乎可以设置此属性: from pyspark.sql im... -
1 votesanswersviews
如何将Spark数据框存储为Parquet格式的动态分区Hive表?
当前的原始数据在Hive上 . 我想连接几个分区的TB级Hive表,然后将结果作为Parquet格式的分区Hive表输出 . 我正在考虑将Hive表的所有分区加载为Spark数据帧 . 然后加入,分组等等 . 这是正确的方法吗? 最后我需要保存数据,我们可以将Spark数据帧保存为Parquet格式的动态分区Hive表吗?如何处理元数据? -
0 votesanswersviews
将hive分区表加载到Spark Dataframe
我正在使用 Spark 1.4.1 版本 . 我正在尝试将分区的Hive表加载到DataFrame中,其中Hive表由 year_week 分区,在一个场景中我可能有104个分区 . 但我可以看到DataFrame正在将数据加载到200个分区中,我知道这是由于 spark.sql.shuffle.partitions 默认设置为200 . 我想知道是否有任何好的方法可以将我的Hive表加载到具有1... -
4 votesanswersviews
Spark将数据写入分区的Hive表非常慢
我想以正常的可读文本格式将Spark数据帧存储到Hive表中 . 为此我先做了 sqlContext.sql("SET spark.sql.hive.convertMetastoreParquet=false") 我的DataFrame就像: final_data1_df = sqlContext.sql("select a, b from final_data&q... -
3 votesanswersviews
Hive on Spark列出特定配置单元表的所有分区并添加分区
我正在使用spark 2.0,我想知道,是否有可能列出特定蜂巢表的所有文件?如果是这样,我可以使用spark sc.textFile("file.orc") 直接逐步更新这些文件 . 如何在hive表中添加新分区?我可以从火花中使用蜂巢状的Metast? 有没有办法获得映射数据帧的内部hive函数 row => partition_path 我的主要推理是表的增量更新 ... -
2 votesanswersviews
使用pyspark更改配置单元表后的模式错误
我在hive中有一个表,名为 test ,列 id 和 name 现在我在hive中有另一个名为mysql的表,列为 id , name 和 city . 现在我想比较两个表的模式并将列差异添加到hive表 test . hive_df= sqlContext.table("testing.test") mysql_df= sqlContext.table("t... -
1 votesanswersviews
来自RDD的每个密钥的PySpark Distinct列表
我确信这很简单,但我一直有问题 . 我有一个带键值对的RDD . 我想要一个独特的键列表 . 我将分享代码和示例 . 先感谢您! RDD示例 >>> rdd4.take(3) [[(u'11394071', 1), (u'11052103', 1), (u'11052101', 1)], [(u'11847272', 10), (u'999999', 1), (u'1184727... -
4 votesanswersviews
为什么我的Spark DataFrame比RDD慢得多?
我有一个非常简单的Spark DataFrame,当运行DataFrame groupby时,性能非常糟糕 - 比(在我脑中)等效的RDD reduceByKey慢约8倍... 我的缓存DF只有两列,客户和名称只有5万行: == Physical Plan == InMemoryColumnarTableScan [customer#2454,name#2456], InMemoryRelatio... -
4 votesanswersviews
Apache Spark遇到了一个非常奇怪的数据偏差
环境:Spark 1.6.3,火花,150个 Actuator * 2个核心,每个6 GB(内存40%),python . 我有一个带有3列的spark-Dataframe:{int('userId'),longInt('productId'),double('CatgResult')},数据帧的长度大约为10亿 . 数据具有一个特征,即当'userId' - 'productId'对作为关键字时... -
0 votesanswersviews
从使用火花与scala的蜂巢获取空表
我想使用spark编写scala代码来从hive服务器获取数据帧 . 我使用以下代码 - import org.apache.hadoop.conf.Configuration import org.apache.hadoop.security.UserGroupInformation import scala.util.Properties import org.apache.spark.sql... -
0 votesanswersviews
使用Scala在RDD中基于多个键列对值进行分组的最快方法是什么? [重复]
这个问题在这里已有答案: Spark groupByKey alternative 1回答 我的数据是一个包含超过200万行员工记录的文件 . 每行有15个员工功能字段,包括名称,DOB,ssn等 . 示例: ID|name|DOB|address|SSN|... 1|James Bond|10/01/1990|1000 Stanford Ave|123456789|... 2|Jason B... -
0 votesanswersviews
PySpark和Scala [重复]
这个问题在这里已有答案: Spark performance for Scala vs Python 1回答 我希望我不会对这个问题投反对票 . 多个火花版本令人困惑,所以无论如何我都要问它 . 请注意,这个问题纯粹是从性能角度而非开发人员 生产环境 力/技能角度来看 . 而且我是新手,很多人想从2017年的角度了解最新状态 . 我知道python的JIT问题,这不是问题 . 它纯粹来自PyS... -
0 votesanswersviews
试图从Pyspark调用Java函数会出现“TypeError:'Column' object is not callable”[duplicate]
这个问题在这里已有答案: Spark: How to map Python with Scala or Java User Defined Functions? 1回答 Wrapping a java function in pyspark 1回答 我在Amazon EMR上使用PySpark,部分过程基本上是ETL步骤 . 我有一个包含多个列的数据框,其中一个名为“Report”,是... -
0 votesanswersviews
PySpark 1.5 Groupby Sum为Dataframe中的新列
我正在尝试使用groupBy和sum(使用PySpark 1.5)在Spark Dataframe中创建一个新列(“newaggCol”) . 我的数字列已经转换为Long或Double . 用于形成groupBy的列是String和Timestamp . 我的代码如下 df= df.withColumn("newaggCol",(df.groupBy([df.strCol,d... -
-1 votesanswersviews
IndexedRowMatrix的pyspark包装multiply()
IndexedRowMatrix 类的PySpark包装器不包含在它包含的Java实现中 . 我尝试将其手动添加到PySpark / MlLib / inalg / distributed.py,如下所示: def multiply(self, other): other_java_matrix = other._java_matrix_wrapper._java_model ja... -
12 votesanswersviews
PySpark投掷错误方法__getnewargs __([])不存在
我有一组文件 . 文件的路径保存在文件中,例如“all_files.txt” . 使用apache spark,我需要对所有文件进行操作并对结果进行处理 . 我想要做的步骤是: 通过阅读"all_files.txt"创建RDD 对于"all_files.txt"中的每一行(每行是某个文件的路径),将每个文件的内容读入单个RDD 然后对所有内容进行... -
0 votesanswersviews
Pyspark aboutQuantile投掷错误
我已经将一个csv文件加载到我的spark数据帧中,之后如果我尝试使用 approxQuantile 方法计算,这会给我一个错误 . 尝试使用不同的数据集和不同的列,概率和relativeError . 帮助我了解正在发生的事情 . df.approxQuantile("column_name", [0.2,0.3,0.6,1.0], 0) 我收到以下错误: py4j.pr... -
2 votesanswersviews
根据某些键值(pyspark)从RDD创建多个Spark DataFrames
我有一些包含JSON对象的文本文件(每行一个对象) . 例: {"a": 1, "b": 2, "table": "foo"} {"c": 3, "d": 4, "table": "bar"} {"a": 5, &quo... -
1 votesanswersviews
在PySpark中展平嵌套词典列表[重复]
这个问题在这里已有答案: Merge list of lists in pySpark RDD 2个答案 我需要使用PySpark来展平包含嵌套dicts的以下RDD,示例如下: x = [{1: {345: 2}, 2: {33: 9}}, {5: {3: 2}, 2: {45, 9}}, {2: {33:5}}] 在我的实际数据中,每个嵌套的dict可能具有不同的长度和项目数 . x... -
9 votesanswersviews
等效的IF然后是ELSE
我早些时候在这里看过这个问题,并从中吸取了教训 . 但是,当我觉得它应该有效时,我不确定为什么会出现错误 . 我想通过一些规则在现有的Spark DataFrame 中创建一个新列 . 这是我写的 . iris_spark是具有分类变量iris_spark的数据框,具有三个不同的类别 . from pyspark.sql import functions as F iris_spark_df... -
3 votesanswersviews
Pyspark和Pandas是否经过认证可以合作? [关闭]
我面临很多问题,将Pyspark数据帧集成/添加到现有的Pandas代码中 . 1)如果我将Pandas数据帧转换为Pyspark数据帧,则多个操作无法很好地转换,因为Pyspark数据帧似乎不像Pandas数据帧那样丰富 . 2)如果我选择使用Pyspark数据帧和Pandas来处理同一代码中的不同数据集,那么当通过map调用的函数包含任何pandas数据帧时,Pyspark转换(如map)似乎... -
19 votesanswersviews
对spark数据帧的同一列进行多次聚合操作
我有三个字符串类型的数组包含以下信息: groupBy数组:包含我想要对数据进行分组的列的名称 . aggregate array:包含我想要聚合的列的名称 . operations数组:包含我想要执行的聚合操作 我正在尝试使用spark数据帧来实现这一目标 . Spark数据框提供了agg(),您可以在其中传递Map [String,String](列名和相应的聚合操作)作为输... -
1 votesanswersviews
PySpark:用一列来索引另一列(两列的udf?)
(Edited Feb 14th) 假设我有一个带有以下模式的Spark(PySpark)数据帧: root |-- myarray: array (nullable = true) | |-- element: string (containsNull = true) |-- myindices: array (nullable = true) | |-- element: ... -
37 votesanswersviews
如何使用Spark查找中值和分位数
如何使用分布式方法,IPython和Spark找到 RDD 整数的中位数? RDD 大约有700,000个元素,因此太大而无法收集并找到中位数 . 这个问题与这个问题类似 . 但是,问题的答案是使用Scala,我不知道 . How can I calculate exact median with Apache Spark? 使用Scala答案的思考,我试图在Python中编写类似的答案 . 我... -
1 votesanswersviews
SPARK中的自定义分区程序(pyspark)
我正在尝试使用PySpark在spark作业中创建一个自定义分区器,比方说,我有一些整数列表 [10,20,30,40,50,10,20,35] . 现在我想要一个场景,我有两个分区,如 p1 和 p2 . p1 包含所有列表元素<30和 p2 包含30以上的所有元素 . elements = sc.parallelize([10,20,30,40,50,10,20,35]).map... -
1 votesanswersviews
Spark Executor在将数据框写入镶木地板时表现不佳
Spark版本:2.3 hadoop dist:azure Hdinsight 2.6.5平台:Azure存储:BLOB 集群中的节点:6个执行程序实例:每个执行程序6个核心:每个执行程序3个内存:8GB 尝试通过同一存储帐户上的火花数据框将天蓝色blob(wasb)中的csv文件(大小4.5g - 280 col,2.8 mil行)加载到镶木地板格式 . 我已经重新划分了不同大小的文件,即2...