我们正在尝试为Apache Flink创建一个扩展,它使用自定义分区 . 对于某些运营商,我们要检查/检索使用过的分区程序 . 不幸的是,我找不到在给定DataSet上执行此操作的任何可能性 . 我错过了什么或者是否有其他解决方法?
我会从这样的事情开始:
class MyPartitioner[..](..) extends Partitioner[..] {..}
[..]
val myP = new MyPartitioner(...)
val ds = in.partitionCustom(myP, 0)
现在从另一个类我想访问分区器(如果已定义) . 在Spark中,我会按以下方式执行:
val myP = ds.partitioner.get.asInstanceOf[MyPartitioner]
然而,对于Flink我找不到这种可能性 .
Edit1:
suggestion of Fabian似乎有可能 . 但是,有两个限制:
(1)使用Scala时,必须先检索基础Java DataSet,然后将其转换为 PartitionOperator
(2)分区必须是最后一次操作 . 因此,在设置和获取分区程序之间不能使用其他操作 . 例如 . 以下是不可能的:
val in: DataSet[(String, Int)] = ???
val myP = new MyPartitioner()
val ds = in.partitionCustom(myP, 0)
val ds2 = ds.map(x => x)
val myP2 = ds2.asInstanceOf[PartitionOperator].getCustomPartitioner
谢谢你,最诚挚的问候,菲利普
1 回答
您可以将返回的
DataSet
强制转换为PartitionOperator
并调用PartitionOperator.getCustomPartitioner()
:注意
getCustomPartitioner()
是一种内部方法(即,不是公共API的一部分),可能会在Flink的未来版本中发生变化 .PartitionOperator
也用于其他分区类型,例如DataSet.partitionByHash()
. 在这些情况下getCustomPartitioner()
可能会返回null
.