我知道How to store custom objects in Dataset?但是,我现在还不清楚如何构建这个 custom encoder which properly serializes to multiple fields. 手动,我创建了一些函数https://github.com/geoHeil/geoSparkScalaSample/blob/master/src/main/scala/myOrg/GeoSpark.scala#L122-L154通过将对象映射到原始类型来反复映射多边形,火花可以处理即元组 (String, Int)
(编辑:下面的完整代码) .
例如,要从多边形对象转到 (String, Int)
的元组,我使用以下内容
def writeSerializableWKT(iterator: Iterator[AnyRef]): Iterator[(String, Int)] = {
val writer = new WKTWriter()
iterator.flatMap(cur => {
val cPoly = cur.asInstanceOf[Polygon]
// TODO is it efficient to create this collection? Is this a proper iterator 2 iterator transformation?
List(((writer.write(cPoly), cPoly.getUserData.asInstanceOf[Int])).iterator
})
}
def createSpatialRDDFromLinestringDataSet(geoDataset: Dataset[WKTGeometryWithPayload]): RDD[Polygon] = {
geoDataset.rdd.mapPartitions(iterator => {
val reader = new WKTReader()
iterator.flatMap(cur => {
try {
reader.read(cur.lineString) match {
case p: Polygon => {
val polygon = p.asInstanceOf[Polygon]
polygon.setUserData(cur.payload)
List(polygon).iterator
}
case _ => throw new NotImplementedError("Multipolygon or others not supported")
}
} catch {
case e: ParseException =>
logger.error("Could not parse")
logger.error(e.getCause)
logger.error(e.getMessage)
None
}
})
})
}
我注意到我已经开始做了很多工作两次(参见两种方法的链接) . 现在想要能够处理
https://github.com/geoHeil/geoSparkScalaSample/blob/master/src/main/scala(下面的完整代码)
/myOrg/GeoSpark.scala#L82-L84
val joinResult = JoinQuery.SpatialJoinQuery(objectRDD, minimalPolygonCustom, true)
// joinResult.map()
val joinResultCounted = JoinQuery.SpatialJoinQueryCountByKey(objectRDD, minimalPolygonCustom, true)
这是一个 PairRDD[Polygon, HashSet[Polygon]]
,或者分别是 PairRDD[Polygon, Int]
如何才能将我的函数指定为编码器以便不再解决相同的问题2次?