-
1 votesanswersviews
从Spark Streaming DataFrame中删除(损坏的)不适合架构的行(来自Kafka的传入JSON数据)
我有一个火花结构的蒸汽应用程序,我正在从 Kafka 读书 . 这是我的代码的基本结构 . 我创建了Spark会话 . val spark = SparkSession .builder .appName("app_name") .getOrCreate() 然后我从流中读到 val data_stream = spark .readStream .for... -
2 votesanswersviews
为什么来自Kafka的读取流失败并且“无法找到存储在数据集中的类型的编码器”?
我正在尝试将Spark Structured Streaming与Kafka一起使用 . object StructuredStreaming { def main(args: Array[String]) { if (args.length < 2) { System.err.println("Usage: StructuredStreaming <... -
1 votesanswersviews
无法找到存储在数据集中的类型的编码器 . 在火花结构流
我正在尝试火花网站上给出的火花结构化流媒体的例子,但它正在抛出错误 1. Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits... -
1 votesanswersviews
结构化流错误py4j.protocol.Py4JNetworkError:来自Java端的答案为空
我正在尝试使用PySpark和Structured Streaming(Spark 2.3)在两个Kafka Stream之间 Build 左外连接 . import os import time from pyspark.sql.types import * from pyspark.sql.functions import from_json, col, struct, explode, g... -
2 votesanswersviews
在Streaming Dataset和DStream之间转换
可以将流式传输 o.a.s.sql.Dataset 转换为 DStream 吗?如果是这样,怎么样? 我知道如何将其转换为RDD,但它是在流式上下文中 . -
0 votesanswersviews
如何在编写镶木地板文件时避免空文件?
我正在使用Spark结构流从Kafka队列中读取 . 从Kafka阅读后,我正在对数据帧应用过滤器 . 这个过滤后的数据框我在镶木地板文件中说 . 这产生了许多空的镶木地板文件 . 有什么办法可以停止写一个空文件 . df = spark \ .readStream \ .format("kafka") \ .option("kafka.boo... -
0 votesanswersviews
具有文件源和文件接收器的Spark结构化流中出错
我的团队现在正在进入结构化流媒体领域 . 我对结构化流媒体比较陌生 . 我有一个要求 来源 - CSV接收器 - JSON 环境详情: 集群:Spark 2.2.1编程语言:Scala构建工具:Gradle 范围: 我已经实现了这个简单的代码 val schema = StructType( Array(StructField("customer_id", Stri... -
2 votesanswersviews
为什么流式查询失败“InvalidSchemaException:组类型不能为空(...)空组:spark_schema”写入镶木地板?
我使用Spark 2.2.1和Parquet 1.8.1 . 我想从Kafka读取JSON数据并进行一些转换然后将数据写入镶木地板文件,然后由Apache Hive加载 . 但是当writeStream到镶木地板时我遇到了以下错误 . Caused by: org.apache.parquet.schema.InvalidSchemaException: A group type can not ... -
1 votesanswersviews
如何从流式查询中编写镶木地板文件?
我正在使用Spark 2.2结构化流式传输来读取CSV文件 . 我将查询结果写入控制台是这样的: val consoleQuery = exceptions .withWatermark("time", "5 years") .groupBy(window($"time", "1 hour"), $"... -
2 votesanswersviews
使用唯一消息架构的Spark结构化流式传输多个Kafka主题
Current State: 今天我构建了一个Spark Structured Streaming应用程序,该应用程序使用包含JSON消息的单个Kafka主题 . 嵌入在Kafka主题中的值包含有关消息字段的源和模式的一些信息 . 消息的一个非常简化的版本看起来像这样: { "source": "Application A", "schem... -
0 votesanswersviews
JSON作为kafka生成者消息发送并通过spark结构化流式传输-parquet消费
我想知道如何使用scala函数将jSON字符串作为消息发送到kafka主题,并使用spark结构化流中的使用readstream()消费,另存为镶木地板格式 . 目前使用以下代码,但镶木地板文件没有被创建 . 请帮助获取带有数据的镶木地板文件 . 这也作为函数实现,并且需要在集成测试中调用这两个函数 . 发送给Kafka主题的JSON消息 - object kafkaProducer extend... -
0 votesanswersviews
基于kafka分区的结构化流式读取
我正在使用spark结构化Streaming来读取来自Kafka主题的传入消息并根据传入消息写入多个镶木桌面所以我创建了一个readStream,因为Kafka源是常见的,并且每个镶木地板表在循环中创建单独的写入流 . 这工作正常但读取流正在创建瓶颈,因为每个writeStream都会创建一个readStream,并且无法缓存已读取的数据帧 . val kafkaDf=spark.readStre... -
1 votesanswersviews
如何从Kafka读取JSON数据并使用Spark结构流存储到HDFS?
我正在尝试从Kafka读取JSON消息,并将它们存储在HDFS中,并使用Spark结构化流式传输 . 我按照示例here,当我的代码如下所示: df = spark \ .read \ .format("kafka") \ .option("kafka.bootstrap.servers", "host1:port1,host2:por... -
1 votesanswersviews
将Spark SQL与Spark Streaming一起使用
尝试了解Spark Structured Streaming方面的SparkSql .Spark Session从kafka主题中读取事件,将数据聚合到按不同列名分组的计数,并将其打印到控制台 .原始输入数据结构如下: +--------------+--------------------+----------+----------+-------+-------------------+---... -
0 votesanswersviews
Pyspark结构化流媒体和 Kafka 来源 - 没有发送精神
我正在使用PySpark读取一些数据,我正在尝试从Kafka源代理程序读取数据,但PySpark中没有打印结果 . 我意识到Kafka发送和接收方法正如预期和消费者所期望的那样工作 . 问题是读取经纪人在PySpark发送的数据 这是我的代码, events = spark.readStream.format("kafka").option("kafka.bootst... -
0 votesanswersviews
Spark结构化流 - python - org.apache.kafka.common.TopicPartition;类对反序列化无效
我正在尝试执行以下spark spark示例代码 . https://github.com/apache/spark/blob/master/examples/src/main/python/sql/streaming/structured_kafka_wordcount.py 我在Spark版本2.0.2的AWS EMR集群上运行它 . 以下依赖项将添加到spark提交中 . spark-... -
0 votesanswersviews
Spark Streaming 标准示例中缺少必需的配置“bootstrap.servers”错误
我对 Scala 和 Spark 有些新意,所以请随意评判我,但不要太难。 我正在尝试启动标准的 DirectKafkaWordCount 示例(随 Spark2 安装提供)以测试 Spark Streaming 如何与 Kafka 一起使用。 这是示例的代码(也可以找到这里): /* * Licensed to the Apache Software Foundation (ASF) unde... -
0 votesanswersviews
Spark Structured Streaming(Kafka) - 闲置期后长时间延迟
我有一个长期运行的结构化流媒体任务,使用Kafka作为源和接收器 . 批次通常在一秒钟内完成,但有时可能需要超过20秒 . 我还没有确定一个精确的模式,但我相信这是在闲置期后发生的,而错误的来源是 生产环境 者 . 如果延迟超过默认的request.timeout.ms设置,我会得到以下错误 . 有谁知道延迟发生的原因和可能的解决方案? 18/11/07 10:45:54错误实用程序:中止任务o... -
1 votesanswersviews
PySpark流媒体:窗口和转换
我正在尝试从Spark流数据源读取数据,按事件时间窗口,然后在窗口数据上运行自定义Python函数(它使用非标准Python库) . 我的数据框看起来像这样: | Time | Value | | 2018-01-01 12:23:50.200 | 1234 | | 2018-01-01 12:23:51.200 | 33 | | 2018-01-01... -
-1 votesanswersviews
Kafka采用火花结构化流程
我们正在使用Spark结构化流媒体进行Kafka执行时我们遇到以下问题: Ivy Default Cache set to: /root/.ivy2/cache The jars for the packages stored in: /root/.ivy2/jars :: loading settings :: url = jar:file:/usr/hdp/2.6.3.0-235/spar... -
3 votesanswersviews
Spark结构化流写入流到Hive ORC分区外部表
我正在尝试使用Spark Structured Streaming - writeStream API来写入外部分区的Hive表 . CREATE EXTERNAL TABLE `XX`( `a` string, `b` string, `b` string, `happened` timestamp, `processed` timestamp, `d` string, `e` stri... -
1 votesanswersviews
如何在Spark Structured Streaming中将withtime功能添加到withWatermark
在我的Spark Structured Streaming代码中,我从Kafka接收用户操作,并且需要在Parquet(追加模式)中存储per_user_and_15_min_window的操作计数 . 事件时间戳与现实不符,它们可能属于过去,并且可能无序到达某一点 . 数据流是这样的:我可能会收到2017年2月的数据,之后可能是2016年11月的数据,但是一旦我收到了用户ID和期间(15分钟)的... -
5 votesanswersviews
结构化流媒体赢得了't write DF to file sink citing /_spark_metadata/9.compact doesn't存在
我正在EMR 5.11.1,Spark 2.2.1中构建一个Kafka摄取模块 . 我的目的是使用结构化流来消费Kafka主题,进行一些处理,并以镶木地板格式存储到EMRFS / S3 . 控制台接收器按预期工作,文件接收器不起作用 . 在 spark-shell : val event = spark.readStream.format("kafka") .option(&q... -
1 votesanswersviews
从kinesis到dynamodb的火花结构流
我在kinesis流源上运行了一个Spark结构化的流式查询 . 它从kinesis读取(没有时间触发),计算聚合,并使用自定义dynamodb接收器推送结果(outputMode“update”) . spark版本:2.1.1和kinesis源码库:https://github.com/maropu/spark-kinesis-sql-asl 我希望按秒处理1 500个事件(我使用Kinesi... -
1 votesanswersviews
在Spark结构化流媒体中,如何将完整聚合输出到外部源(如REST服务)
我尝试执行的任务是聚合DataFrame中维度(字段)的值计数,执行一些统计信息,如average,max,min等,然后通过进行API调用将聚合输出到外部系统 . 我使用的是30秒的水印,窗口大小为10秒 . 我将这些尺寸做得很小,以便我更容易测试和调试系统 . 我发现进行API调用的唯一方法是使用 ForeachWriter . 我的问题是 ForeachWriter 在分区级别执行,并且每... -
1 votesanswersviews
Spark结构化流媒体Kafka Microbatch计数
我使用Spark结构化流来读取Kafka主题的记录;我打算在Spark readstream 中计算每个'Micro batch'中收到的记录数 这是一个片段: val kafka_df = sparkSession .readStream .format("kafka") .option("kafka.bootstrap.servers", ... -
0 votesanswersviews
如何将结构化流写入S3?
我一直在搜索如何将结构化流结果写入s3但无法找到方法 . 我尝试了几个选项,但每个选项都产生了不同的错误 . 这是我想要做的: 阅读csv流 将其减少到包含组中平均值的2-3行结果 . 像这样: groupA, 15000 groupB, 10000 然后将这些结果写入单个文件中的s3存储桶,可能会在新输入流进入时覆盖 . 所以我尝试这样做: query.writeStream ... -
1 votesanswersviews
Spark Structured Streaming,Executor由于广播累积导致的内存不足故障
我们的ETL管道在存储到cassandra之前使用spark结构化流来丰富传入数据(与静态数据帧连接) . 目前,查找表是csv文件(在HDFS中),它们作为数据帧加载,并在每个触发器上与每批数据连接 . 似乎查找表Dataframe在每个触发器上广播并存储在Memory store中 . 这耗尽了执行者的记忆,最终执行者面对OOM并被Mesos杀死:Log of executor 从上面的链接中...