首页 文章

Apache Flink:如何并行执行但保持消息顺序?

提问于
浏览
2

关于flink的并行性,我有几个问题 . 这是我的设置:

我有1个主节点和2个从属节点 . 在flink中,我创建了3个kafka消费者,每个消费者都使用不同的主题 .
由于元素的顺序对我很重要,每个主题只有一个分区,我有flink设置来使用事件时间 .

然后我在每个数据流上运行以下管道(伪代码):

source
.map(deserialize)
.window
.apply
.map(serialize)
.writeTo(sink)

到目前为止,我使用参数 -p 2 启动了我的flink程序,假设这将允许我使用我的两个节点 . 结果不是我所希望的,因为我的输出顺序有时搞砸了 .

在阅读了flink文档并试图更好地理解它之后,有人可以确认我的以下“学习”吗?

1.)传递 -p 2 仅配置任务并行性,即任务(例如 map(deserialize) )将被拆分的最大并行实例数 . 如果我想通过整个管道保持订单,我必须使用 -p 1 .

2.)这对我来说似乎是矛盾/混乱:即使并行性设置为1,仍然可以并行(同时)运行不同的任务 . 因此,如果我通过 -p 1 ,我的3个管道也将并行运行 .

并作为一个后续问题:有没有办法找出哪些任务映射到哪个任务槽,以便我自己确认并行执行?

我很感激任何输入!

Update

Here是flink的 -p 2 执行计划 .

3 回答

  • 0

    Apache Flink user email list上提出问题后,这里的答案是:

    1.) -p 选项定义每个作业的任务并行度 . 如果选择的并行度高于1并且数据被重新分配(例如,通过rebalance()或keyBy()),则无法保证顺序 .

    2.)将 -p 设置为1,仅使用1个任务槽,即1个CPU核心 . 因此,可能有多个线程同时在一个核心上运行但不并行运行 .

    至于我的要求:为了并行运行多个管道并仍保持顺序,我可以运行多个Flink作业,而不是在同一个Flink作业中运行所有管道 .

  • 0

    我会试着用我所知道的回答 .

    1)是的,使用CLI客户端,可以使用-p指定parallelism参数 . 你说这是最大并行实例数是对的 . 但是,我没有看到并行性和顺序之间的联系?据我所知,订单由Flink管理,其中包含事件中提供的时间戳或他自己的提取时间戳 . 如果您想维护不同数据源的顺序,对我来说似乎很复杂,或者您可能将这些不同的数据源合并为一个 .

    2)如果你将并行度设置为3,你的3个流水线可以并行运行 . 我认为这里并行意味着在不同的插槽上 .

    后续问题)您可以在http://localhost:8081检查哪些任务映射到JobManager的Web前端上的哪个任务槽 .

  • 1

    请在下面找到使用侧输出和插槽组进行本地扩展的示例 .

    package org.example
    
    /*
     * Licensed to the Apache Software Foundation (ASF) under one
     * or more contributor license agreements.  See the NOTICE file
     * distributed with this work for additional information
     * regarding copyright ownership.  The ASF licenses this file
     * to you under the Apache License, Version 2.0 (the
     * "License"); you may not use this file except in compliance
     * with the License.  You may obtain a copy of the License at
     *
     *     http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */
    
    import org.apache.flink.streaming.api.functions.ProcessFunction
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.util.Collector
    
    /**
      * This example shows an implementation of WordCount with data from a text socket.
      * To run the example make sure that the service providing the text data is already up and running.
      *
      * To start an example socket text stream on your local machine run netcat from a command line,
      * where the parameter specifies the port number:
      *
      * {{{
      *   nc -lk 9999
      * }}}
      *
      * Usage:
      * {{{
      *   SocketTextStreamWordCount <hostname> <port> <output path>
      * }}}
      *
      * This example shows how to:
      *
      *   - use StreamExecutionEnvironment.socketTextStream
      *   - write a simple Flink Streaming program in scala.
      *   - write and use user-defined functions.
      */
    object SocketTextStreamWordCount {
    
      def main(args: Array[String]) {
        if (args.length != 2) {
          System.err.println("USAGE:\nSocketTextStreamWordCount <hostname> <port>")
          return
        }
    
        val hostName = args(0)
        val port = args(1).toInt
        val outputTag1 = OutputTag[String]("side-1")
        val outputTag2 = OutputTag[String]("side-2")
    
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.getConfig.enableObjectReuse()
    
        //Create streams for names and ages by mapping the inputs to the corresponding objects
        val text = env.socketTextStream(hostName, port).slotSharingGroup("processElement")
        val counts = text.flatMap {
          _.toLowerCase.split("\\W+") filter {
            _.nonEmpty
          }
        }
          .process(new ProcessFunction[String, String] {
            override def processElement(
                                         value: String,
                                         ctx: ProcessFunction[String, String]#Context,
                                         out: Collector[String]): Unit = {
              if (value.head <= 'm') ctx.output(outputTag1, String.valueOf(value))
              else ctx.output(outputTag2, String.valueOf(value))
            }
          })
    
        val sideOutputStream1: DataStream[String] = counts.getSideOutput(outputTag1)
        val sideOutputStream2: DataStream[String] = counts.getSideOutput(outputTag2)
    
        val output1 = sideOutputStream1.map {
          (_, 1)
        }.slotSharingGroup("map1")
          .keyBy(0)
          .sum(1)
    
        val output2 = sideOutputStream2.map {
          (_, 1)
        }.slotSharingGroup("map2")
          .keyBy(0)
          .sum(1)
    
        output1.print()
        output2.print()
    
        env.execute("Scala SocketTextStreamWordCount Example")
      }
    
    }
    

相关问题