首页 文章

如何在Apache Flink中获取分区程序?

提问于
浏览
4

我们正在尝试为Apache Flink创建一个扩展,它使用自定义分区 . 对于某些运营商,我们要检查/检索使用过的分区程序 . 不幸的是,我找不到在给定DataSet上执行此操作的任何可能性 . 我错过了什么或者是否有其他解决方法?

我会从这样的事情开始:

class MyPartitioner[..](..) extends Partitioner[..] {..}
[..]
val myP = new MyPartitioner(...)
val ds = in.partitionCustom(myP, 0)

现在从另一个类我想访问分区器(如果已定义) . 在Spark中,我会按以下方式执行:

val myP = ds.partitioner.get.asInstanceOf[MyPartitioner]

然而,对于Flink我找不到这种可能性 .


Edit1:

suggestion of Fabian似乎有可能 . 但是,有两个限制:

(1)使用Scala时,必须先检索基础Java DataSet,然后将其转换为 PartitionOperator

(2)分区必须是最后一次操作 . 因此,在设置和获取分区程序之间不能使用其他操作 . 例如 . 以下是不可能的:

val in: DataSet[(String, Int)] = ???

val myP = new MyPartitioner()
val ds = in.partitionCustom(myP, 0)
val ds2 = ds.map(x => x)

val myP2 = ds2.asInstanceOf[PartitionOperator].getCustomPartitioner

谢谢你,最诚挚的问候,菲利普

1 回答

  • 0

    您可以将返回的 DataSet 强制转换为 PartitionOperator 并调用 PartitionOperator.getCustomPartitioner()

    val in: DataSet[(String, Int)] = ???
    
    val myP = new MyPartitioner()
    val ds = in.partitionCustom(myP, 0)
    
    val myP2 = ds.asInstanceOf[PartitionOperator].getCustomPartitioner
    

    注意

    • getCustomPartitioner() 是一种内部方法(即,不是公共API的一部分),可能会在Flink的未来版本中发生变化 .

    • PartitionOperator 也用于其他分区类型,例如 DataSet.partitionByHash() . 在这些情况下 getCustomPartitioner() 可能会返回 null .

相关问题