我想过滤来自kafka服务器的传入json,并根据我需要分离和解析它们的某些版本类型,并分别对它们进行某些计算 . 但是当我使用if语句时iam使用if语句中的print语句获取错误“没有输出操作注册”作为iam . 什么必须是运行代码的替代方案?提前致谢 .
enter code here
if(packetType.equals("P300")) {
val three300s=dstream.map(parser300)
three300s.print()
}
else if(packetType.equals("P30")) {
val thirty30s=dstream.map(parser30)
thirty30s.print()
}
else if(packetType.equals("P6")) {
val six6s=dstream.map(parser6)
six6s.print()
}
ssc.start()
ssc.awaitTermination()
}
}
我得到“线程中的异常”主“org.apache.spark.SparkException:任务不可序列化” . 我观察到序列化的一些问题,但无法准确找出 .
enter code here
pType.map(rdd => {
val pkt= rdd.toString()
if(pkt.equals("P300")) {
val t300=dstream.map(par300)
t300.print()
}else if(pkt.equals("P30")) {
val t30=dstream.map(par30)
t30.print()
}else if(pkt.equals("P6")) {
val t6=dstream.map(par6)
t6.print()
}
})
1 回答
似乎
packetType
与您提供的任何案例都不匹配 . 您是否还可以添加else
语句,如下所示 .它肯定会消除你得到的错误 . 或者你需要确保
packetType
与任何一个案例相匹配 .希望这可以帮助!