我知道How to store custom objects in Dataset?但是,我现在还不清楚如何构建这个 custom encoder which properly serializes to multiple fields. 手动,我创建了一些函数https://github.com/geoHeil/geoSparkScalaSample/blob/master/src/main/scala/myOrg/GeoSpark.scala#L122-L154通过将对象映射到原始类型来反复映射多边形,火花可以处理即元组 (String, Int) (编辑:下面的完整代码) .

例如,要从多边形对象转到 (String, Int) 的元组,我使用以下内容

def writeSerializableWKT(iterator: Iterator[AnyRef]): Iterator[(String, Int)] = {
    val writer = new WKTWriter()
    iterator.flatMap(cur => {
      val cPoly = cur.asInstanceOf[Polygon]
      // TODO is it efficient to create this collection? Is this a proper iterator 2 iterator transformation?
      List(((writer.write(cPoly), cPoly.getUserData.asInstanceOf[Int])).iterator
    })
  }
 def createSpatialRDDFromLinestringDataSet(geoDataset: Dataset[WKTGeometryWithPayload]): RDD[Polygon] = {
    geoDataset.rdd.mapPartitions(iterator => {
      val reader = new WKTReader()
      iterator.flatMap(cur => {
        try {
          reader.read(cur.lineString) match {
            case p: Polygon => {
              val polygon = p.asInstanceOf[Polygon]
              polygon.setUserData(cur.payload)
              List(polygon).iterator
            }
            case _ => throw new NotImplementedError("Multipolygon or others not supported")
          }
        } catch {
          case e: ParseException =>
            logger.error("Could not parse")
            logger.error(e.getCause)
            logger.error(e.getMessage)
            None
        }
      })
    })
  }

我注意到我已经开始做了很多工作两次(参见两种方法的链接) . 现在想要能够处理

https://github.com/geoHeil/geoSparkScalaSample/blob/master/src/main/scala(下面的完整代码)

/myOrg/GeoSpark.scala#L82-L84
 val joinResult = JoinQuery.SpatialJoinQuery(objectRDD, minimalPolygonCustom, true)
  //  joinResult.map()
  val joinResultCounted = JoinQuery.SpatialJoinQueryCountByKey(objectRDD, minimalPolygonCustom, true)

这是一个 PairRDD[Polygon, HashSet[Polygon]] ,或者分别是 PairRDD[Polygon, Int] 如何才能将我的函数指定为编码器以便不再解决相同的问题2次?