我正在使用pyspark从Kafka读取流数据,然后我想将这些数据汇入mongodb . 我已经包含了所有必需的包,但它会引发错误
UnsupportedOperationException: Data source com.mongodb.spark.sql.DefaultSource does not support streamed writing
以下链接与我的问题无关
这是完整的错误堆栈跟踪
Traceback(最近一次调用最后一次):文件“/home/b3ds/kafka-spark.py”,第85行,在.option中(“com.mongodb.spark.sql.DefaultSource”,“mongodb:// localhost:27017 /twitter.test“)\ File”/home/b3ds/hdp/spark/python/lib/pyspark.zip/pyspark/sql/streaming.py“,第827行,在启动文件”/ home / b3ds / hdp / spark中/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py“,第1133行,在调用文件中”/home/b3ds/hdp/spark/python/lib/pyspark.zip/pyspark/sql/ utils.py“,第63行,在deco文件中”/home/b3ds/hdp/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py“,第319行,在get_return_value py4j.protocol中.Py4JJavaError:调用o122.start时发生错误 . :java.lang.UnsupportedOperationException:数据源com.mongodb.spark.sql.DefaultSource不支持org.apache中org.apache.spark.sql.execution.datasources.DataSource.createSink(DataSource.scala:287)的流式写入.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:272)at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)at sun.reflect.DelegatingMethodAccessorImpl .invoke(DelegatingMethodAccessorImpl.java:43)位于py4j.reflection.Metod上的java.lang.reflect.Method.invoke(Method.java:498)py4j.reflection.MododInvoker.invoke(MethodInvoker.java:244) ReflectionEngine.java:357)py4j.Gateway.invoke(Gateway.java:280)py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)py4j.commands.CallCommand.execute(CallCommand.java:79)at java.lang.Thread.run中的py4j.GatewayConnection.run(GatewayConnection.java:214)(Thread.java:748)
这是我的pyspark代码
from __future__ import print_function
import sys
from pyspark.sql.functions import udf
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql.types import StructType
from pyspark.sql.types import *
import json
from pyspark.sql.functions import struct
from pyspark.sql.functions import *
import datetime
json_schema = StructType([
StructField("twitterid", StringType(), True),
StructField("created_at", StringType(), True),
StructField("tweet", StringType(), True),
StructField("screen_name", StringType(), True)
])
def parse_json(df):
twitterid = json.loads(df[0])['id']
created_at = json.loads(df[0])['created_at']
tweet = json.loads(df[0])['text']
tweet = json.loads(df[0])['text']
screen_name = json.loads(df[0])['user']['screen_name']
return [twitterid, created_at, tweet, screen_name]
def convert_twitter_date(timestamp_str):
output_ts = datetime.datetime.strptime(timestamp_str.replace('+0000 ',''), '%a %b %d %H:%M:%S %Y')
return output_ts
if __name__ == "__main__":
spark = SparkSession\
.builder\
.appName("StructuredNetworkWordCount")\
.config("spark.mongodb.input.uri","mongodb://192.168.1.16:27017/twitter.test")\
.config("spark.mongodb.output.uri","mongodb://192.168.1.16:27017/twitter.test")\
.getOrCreate()
events = spark\
.readStream\
.format("kafka")\
.option("kafka.bootstrap.servers", "localhost:9092")\
.option("subscribe", "twitter")\
.load()
events = events.selectExpr("CAST(value as String)")
udf_parse_json = udf(parse_json , json_schema)
udf_convert_twitter_date = udf(convert_twitter_date, TimestampType())
jsonoutput = events.withColumn("parsed_field", udf_parse_json(struct([events[x] for x in events.columns]))) \
.where(col("parsed_field").isNotNull()) \
.withColumn("created_at", col("parsed_field.created_at")) \
.withColumn("screen_name", col("parsed_field.screen_name")) \
.withColumn("tweet", col("parsed_field.tweet")) \
.withColumn("created_at_ts", udf_convert_twitter_date(col("parsed_field.created_at")))
windowedCounts = jsonoutput.groupBy(window(jsonoutput.created_at_ts, "1 minutes", "15 seconds"),jsonoutput.screen_name)$
mongooutput = jsonoutput \
.writeStream \
.format("com.mongodb.spark.sql.DefaultSource")\
.option("com.mongodb.spark.sql.DefaultSource","mongodb://localhost:27017/twitter.test")\
.start()
mongooutput.awaitTermination()
I have seen the mongodb documentation which says it supports spark to mongo sink
https://docs.mongodb.com/spark-connector/master/scala/streaming/
1 回答
文档声称,您可以使用标准
RDD
API使用旧版Streaming(DStream
)API编写每个RDD .它没有.3028996_t . 由于你使用PySpark,无法访问forEach writer,你必须等待,直到(如果有的话)更新MongoDB包以支持流操作 .