-
0 votesanswersviews
将Spark Dataframe保存到couchbase
我想将spark数据帧数据写入couchbase . 为此,我试图按如下方式进行: - double[] val=new double[3]; SparkContext sc = new SparkContext(new SparkConf().setAppName("sql").setMaster("local").set("com.couch... -
0 votesanswersviews
我们是否需要在客户端计算机上运行hiveserver2来访问hive Metastore?
我正在使用spark-java来访问hive Metastore . 在我的机器上只安装了火花而没有别的 . 我没有hadoop目录或Hive文件夹 . 我在 spark/conf 目录中创建了 hive-site.xml , hdfs-site.xml , core-site.xml 和 yarn-site.xml . 我的hive Metastore是在另一台机器上设置的,它是hadoop集... -
1 votesanswersviews
如何使用SPARK在HDFS中编写大数据(大约800 GB)作为hive orc表?
我在最近3-4个月和最近工作在 Spark Project . 我正在使用巨大的历史文件(800 GB)和一个小的增量文件(3 GB)进行一些计算 . 计算使用 hqlContext & dataframe 很快发生火花,但是当我试图写的计算结果与 orc 格式的 hive table 其中将包含近20十亿的记录有近800 GB的数据大小花费过多时间(超过2小时,最后失败) . 我的群集详细信息... -
2 votesanswersviews
集成SQL和Spark Streaming时不可序列化的异常
除了Not Serializable exception when integrating Spark SQL and Spark Streaming 我的源代码 public static void main(String args[]) { SparkConf sparkConf = new SparkConf().setAppName("NumberCount")... -
0 votesanswersviews
Spark结构化流 Kafka 依赖无法解决
我试过了 ./spark-2.3.1-bin-hadoop2.7/bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.1 test.py 在我自己的电脑里,一切都很好 . 但是在我在学校的服务器上尝试之后,它有以下消息和错误 . 我在谷歌搜索了很长时间并且不知道 . 谁能帮我? 常 Spring ... -
3 votesanswersviews
如何将 Spark DataFrame 插入 Hive 内部表?
在追加模式下将 DF 插入 Hive 内部表的正确方法是什么。看来我们可以使用“ saveAsTable”方法直接将 DF 写入 Hive 或将 DF 存储到 temp 表,然后使用查询。 df.write().mode("append").saveAsTable("tableName") 要么 df.registerTempTable("tem... -
10 votesanswersviews
如何指定 saveAsTable 将文件保存到的路径?
我正在尝试使用 DataFrameWriter 在 Spark1.4 的 pyspark 中将 DataFrame 保存到 S3 df = sqlContext.read.format("json").load("s3a://somefile") df_writer = pyspark.sql.DataFrameWriter(df) df_writer.pa... -
2 votesanswersviews
Hive 分区表的 Spark SQL 分区修剪
当我运行一些查询火花似乎不是谓词下推到特定的配置单元表的分区。 将“ spark.sql.orc.filterPushdown”设置为“ true”并没有帮助。 Spark 版本为 1.6,蜂巢版本为 1.2. 蜂巢表按日期分割为 ORC 格式。 val sc = new SparkContext(new SparkConf()) var hql = new org.apache.spark.sq... -
1 votesanswersviews
DataFrame —连接/ groupBy-agg-分区
我可能对 join/groupBy-agg 有一个幼稚的问题。在 RDD 期间,每当我想要执行一个。 groupBy-agg,我曾经说过(PairRDDFunctions 的)reduceByKey 和可选的 Partition-Strategy(具有分区或 Partitioner 的数量)b。 Join(PairRDDFunctions)及其变体,我曾经有一种方法来提供多个分区 在 DataFr... -
0 votesanswersviews
从 Spark 替换配置单元分区
有没有办法可以从 Spark 程序替换(现有)配置单元分区?仅替换最新的分区,其余分区保持不变。 以下是我要尝试的想法, 我们每分钟都会从 RDBMS 系统获取进入 HDFS 的跨国数据。将有一个 spark 程序(每 5 或 10 分钟运行一次),该程序读取数据,执行 ETL 并将输出写入 Hive 表。由于覆盖整个配置单元表将是巨大的,因此我们只想覆盖当前分区的配置单元表。一天结束时,源分区和... -
19 votesanswersviews
如何在 Spark SQL 中控制分区大小
我需要使用 Spark SQL HiveContext从 Hive 表中加载数据并加载到 HDFS 中。默认情况下,SQL 输出中的DataFrame具有 2 个分区。为了获得更多的并行性,我需要在 SQL 中增加分区。 HiveContex t 中没有重载方法来获取分区数参数。 RDD 的重新分区会导致改组并导致更多的处理时间。 val 结果= sqlContext.sql(“从 bt_... -
46 votesanswersviews
在 Spark DataFrame 写入方法中覆盖特定分区
我想覆盖特定的分区,而不是全部覆盖。我正在尝试以下命令: df.write.orc('maprfs:///hdfs-base-path','overwrite',partitionBy='col4') 其中 df 是具有要覆盖的增量数据的数据帧。 hdfs-base-path 包含主数据。 当我尝试上述命令时,它将删除所有分区,并在 hdfs 路径中的 df 中插入这些分区。 我的要求是只覆盖指... -
2 votesanswersviews
Pyspark 数据帧连接需要很长时间
我在 pyspark 中有 2 个数据框,是使用 2 个 sparksql 查询从 hive 数据库中加载的。 当我尝试使用df1.join(df2,df1.id_1=df2.id_2)联接两个数据框时,需要花费很长时间。当我调用 JOIN 时,Spark 是否会重新执行 df1 和 df2 的 SQL? 基础数据库是 HIVE -
2 votesanswersviews
带有 groupBy 的 DataFrame 与带有 reduceByKey 的 RDD
我有一个 csv 文件:(customerId,orderId,花)。我使用两种方法计算每个客户的总支出: 方法 1:使用 DataFrame 和 groupBy val df = ss.read .option("header", false) .option("inferSchema", true) .csv("data.csv") ... -
0 votesanswersviews
在 Spark 中保存具有非常大值的数据框
使用 Spark 数据帧,我正在执行 groupBy 操作,以将与键关联的所有值收集到列表中。所收集值的大小可能相差很大。实际上,我正在尝试通过合并用于后期处理的组合键的值来生成“文档”。 为了说明这一点,df 是具有 3 个字符串列 A,B,C 的数据帧。 df.groupBy(concat($"A", lit("-"), $"B").... -
-2 votesanswersviews
将数据倾斜到少数执行者
我在具有 21 个执行程序的独立模式下运行 spark,并且当我使用 sqlContext 加载我的第一个 SQL 表时,我以某种方式对它进行分区,以使数据可以通过在连续整数列上进行分区而在所有块之间完美分配: val brDF = sqlContext.load("jdbc", Map("url" -> srcurl, "dbtable&q... -
4 votesanswersviews
Spark 数据帧:加入后偏斜的分区
我有两个数据框,df1有 2200 万条记录,df2有 200 万条记录。我正在对email_address做正确的连接作为关键。 test_join = df2.join(df1, "email_address", how = 'right').cache() 两个数据框中几乎没有重复的电子邮件(如果有)。联接后,我尝试使用以下代码查找所得数据帧test_join的分区大小... -
0 votesanswersviews
识别为何 Spark 中的数据偏斜
我正在研究一个 Spark SQL 作业(Spark 1.6.0),该作业由于 200 个分区中的数据严重不正确而表现不佳,大多数数据位于 1 个分区中:我想知道 is...is Spark UI 中有什么可以帮助我找到有关数据如何分区的更多信息?从这个角度看,我不知道数据帧在哪列上分区。我如何找到答案? (除了查看代码外-我想知道日志 and/or UI 中是否有什么可以帮助我)? 其他详细信息... -
0 votesanswersviews
Spark 请求的阵列大小超过 BufferHolder.grow 的 VM 限制
我在混合 scala-python 应用程序(类似于 Zeppelin)上的 Hadoop 集群上运行的 Spark 2.1 上遇到此错误: 18/04/09 08:19:34 ERROR Utils: Uncaught exception in thread stdout writer for /x/python/miniconda/bin/python java.lang.OutOfMemor... -
1 votesanswersviews
如何从 PySpark 中的字符串获取列表
在 PySpark 中是否有类似eval的功能。 我正在尝试将 Python 代码转换为 PySpark 我正在查询一个数据框,并且其中一列具有数据,如下所示,但采用字符串格式。 [{u'date': u'2015-02-08', u'by': u'abc@gg.com', u'value': u'NA'}, {u'date': u'2016-02-08', u'by': u'dfg@yaa.co... -
1 votesanswersviews
PySpark Dataframes:如何使用紧凑代码在多种条件下进行过滤?
如果我有一个列名列表,并且如果这些列的值大于零,我想对行进行过滤,是否可以执行类似的操作? columns = ['colA','colB','colC','colD','colE','colF'] new_df = df.filter(any([df[c]>0 for c in columns])) 返回: ValueError:无法将列转换为布尔值:请对“和”,“ |”使用“&”构建... -
-2 votesanswersviews
Spark和scala仅扫描过滤后的数据
让我以简单的方式说明我使用spark从Cassandra表中读取并做一些选择并写入另一个Cassandra表但是写入另一个Cassandra表,为我拍摄了近15分钟的150万条记录,我该如何微调它 . 我正在尝试扫描Cassandra的过滤数据,因为我正在使用它 Spark.read.format( “org.apache.spark.sql.cassandra”) . 选项(密钥空间,表).lo... -
1 votesanswersviews
使用spark SQL中的相同名称替换带有calcutated列的列
我从我的datalake读取文件并将它们加载到数据框中由于kafka(here)中的强制转换问题,加载的数据有一些与源数据库中的类型不同的字段 因此,我使用错误的数据类型(bunary)从S3加载数据,并使用UDF函数将每个列转换为另一列 然后,我重命名新列以替换olders,以在源数据库和目标数据库中保持相同的结构 脚步 : 之前: myTable | +-- myField1 (binary)... -
0 votesanswersviews
Pyspark:如何汇总Pyspark列表中所有元素的数据? [重复]
这个问题在这里已有答案: Calculating percentage of total count for groupBy using pyspark 1回答 我将所有字符串字段存储在列表对象中 . 然后,我正在传递for循环内的每个字段来计算聚合计数 . 我正在寻找一种方法来获取所有字符串列的聚合计数 . 请帮忙 . 样本数据: Dataframe(Input_Data)具有这些记录 No... -
0 votesanswersviews
火花存储器分数和 Actuator 内存开销? [等候接听]
什么是“spark.executor.memoryOverhead”和“spark.memory.fraction”?什么是默认属性 -
-1 votesanswersviews
如何使用joinType“cross”在spark中进行交叉连接?
我已经知道了两种方法来进行交叉连接 - 使用crossJoin或者在没有条件的情况下进行连接(然后通过sql上下文启用交叉连接) . 然而,我只是为什么他们提供了一个明确的joinType“交叉”而感到困惑 - 每当我尝试使用它时,它最终都等同于“内部” . 也许有人可以提供一个例子,或者至少解释一下如果没有用途,给出joinType背后的想法? -
36 votesanswersviews
Spark Dataframe区分具有重复名称的列
正如我在Spark Dataframe中所知,多列的名称可以与下面的数据帧快照中显示的名称相同: [ Row(a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0}), a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0})), Ro... -
5 votesanswersviews
Pyspark圆形功能的问题
在pyspark中使用圆形函数时遇到一些麻烦 - 我有下面的代码块,我试图将 new_bid 列舍入到2位小数,然后将列重命名为 bid - 我导入 pyspark.sql.functions AS func 以供参考,并使用其中包含的 round 函数: output = output.select(col("ad").alias("ad_id"), ... -
0 votesanswersviews
最有效的方法是根据列中的最大值过滤spark数据帧中的行
我有一个名为flightData2015的spark数据帧,格式如下: +--------------------------+---------------------+-------+ | Destination_country_name | Origin_country_name | count | +--------------------------+------------------... -
0 votesanswersviews
在Spark2 DataFrame中提供显式架构
我需要从csv文件中读取数据并根据显式模式对其进行验证,如果模式验证失败,则抛出错误 . 为此,我做了以下工作:1)定义了架构 public static StructField[] schema ={ new StructField("name", DataTypes.StringType, false, Metadata.empty()), ...