我试图使用scala从火花流数据帧中提取值,其代码如下:
var txs = spark.readStream
.format("kafka") .option("kafka.bootstrap.servers",KAFKABS)
.option("subscribe", "txs")
.load()
txs = txs.selectExpr("CAST(value AS STRING)")
val schema = StructType(Seq(
StructField("from",StringType,true),
StructField("to", StringType, true),
StructField("timestamp", TimestampType, true),
StructField("hash", StringType, true),
StructField("value", StringType, true)
))
txs = txs.selectExpr("cast (value as string) as json")
.select(from_json($"json", schema).as("data"))
.select("data.*")
.selectExpr("from","to","cast(timestamp as timestamp) as timestamp","hash","value")
val newDataFrame = txs
.flatMap(row => {
val to = row.getString(0)
val from = row.getString(1)
// val timestamp = row.getTimestamp??
//do stuff
})
我想知道Timestamps是否有一个等效的类型get方法?为了增加我的困惑,似乎在我为结构化流定义的SQL类型与我通过 flatMap
功能访问它们时的变量的实际类型之间存在某种隐藏映射(至少对我隐藏) . 我查看了文档,事实确实如此 . 根据文件:
返回位置i的值 . 如果值为null,则返回null . 以下是Spark SQL类型和返回类型之间的映射:BooleanType - > java.lang.Boolean ByteType - > java.lang.Byte ShortType - > java.lang.Short IntegerType - > java.lang.Integer FloatType - > java . lang.Float DoubleType - > java.lang.Double StringType - > String DecimalType - > java.math.BigDecimal DateType - > java.sql.Date TimestampType - > java.sql.Timestamp BinaryType - > byte array ArrayType - > scala.collection .Seq(对java.util.List使用getList)MapType - > scala.collection.Map(对java.util.Map使用getJavaMap)StructType - > org.apache.spark.sql.Row
鉴于此,我原本预计这个映射会更正式地作为它实现的接口被烘焙到 Row
类中,但显然情况并非如此:(似乎在TimestampType / java.sql的情况下 . 时间戳,我必须放弃我的时间戳类型的其他东西?有人请解释为什么我现在只使用scala和spark 3-4个月了 .
- 保罗
1 回答
您已正确推断出
TimestampType
列的Scala类型为java.sql.Timestamp
.截至 V1.5.0 .
org.apache.spark.sql.Row
has一个getTimestamp(i: Int)
方法,所以你可以调用它并得到java.sql.Timestamp
:即使你使用早期版本,也没有必要放弃这种类型,你只需使用
getAs[T](i: Int)
和java.sql.Timestamp
: