首页 文章

Flink Queryable状态不起作用

提问于
浏览
2

我正在从IDE运行flink . 在可查询中存储数据是有效的,但不知何故,当我查询它时,它会抛出异常

Exeception

Failure(akka.actor.ActorNotFound: Actor not found for: ActorSelection[Anchor(akka.tcp://flink@127.0.0.1:6123/), Path(/user/jobmanager)])

我的代码:

config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY,"localhost")
config.setString(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,"6123")

@throws[Throwable]
def recover(failure: Throwable): Future[Array[Byte]] = if (failure.isInstanceOf[AssertionError]) return Futures.failed(failure)
else {
  // At startup some failures are expected
  // due to races. Make sure that they don't
  // fail this test.
  return Patterns.after(retryDelay, TEST_ACTOR_SYSTEM.scheduler, TEST_ACTOR_SYSTEM.dispatcher, new Callable[Future[Array[Byte]]]() {
    @throws[Exception]
    def call: Future[Array[Byte]] = return getKvStateWithRetries(queryName, key, serializedKey)
  })
}
}

  @SuppressWarnings(Array("unchecked"))
  private def getKvStateWithRetries(queryName: String,
                                keyHash: Int,
                                serializedKey: Array[Byte]): Future[Array[Byte]] = {

val kvState = client.getKvState(jobID, queryName, keyHash, serializedKey)
kvState.recoverWith(recover(queryName, keyHash, serializedKey))
  }

def onSuccess = new OnSuccess[Array[Byte]]() {
@throws(classOf[Throwable])
override def onSuccess(result: Array[Byte]): Unit = {
  println("found record ")
  val value = KvStateRequestSerializer.deserializeValue(result, valueSerializer)
  println(value)
 }
}


override def invoke(query: QueryMetaData): Unit = {
println("getting inside querystore"+query.record)
val serializedResult = flinkQuery.getResult(query.record, queryName)
serializedResult.onSuccess(onSuccess)

我不会产生新的迷你集群或集群提交,如https://github.com/apache/flink/blob/master/flink-tests/src/test/java/org/apache/flink/test/query /QueryableStateITCase.java正如我想要的那样,在与使用env.execute运行的主应用程序相同的环境中的同一个集群中 . 这一步是否必要 .

从deault flink的文档中运行localhost:6123连接是否有问题 . 我是否需要在单独的群集中提交作业 .

1 回答

  • 2

    经过大量的谷歌搜索,我找到了解决方案 .

    我使用LocalStreamEnvironment并得到相同的错误,直到找到这个线程RemoteEnv connect failed . 描述的错误是针对不同的设置(不是本地),但用于测试的主题中包含的gist示例是创建LocalFlinkMiniCluster,参数"useSingleActorSystem"设置为 false .

    查看LocalStreamEnvironment的实现,创建MiniCluster时将"useSingleActorSystem"设置为 true .

    我只是创建了一个扩展LocalStreamEnvironment的类LocalQueryableStreamEnvironment,其中创建了迷你集群,"useSingleActorSystem"设置为 true ,一切都在IDE中运行 .

    现在我的代码如下:

    组态:

    Configuration config = new Configuration();
    config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 6);
    config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
    config.setInteger(JobManagerOptions.WEB_PORT, JobManagerOptions.WEB_PORT.defaultValue());
    config.setBoolean(QueryableStateOptions.SERVER_ENABLE, true);
    config.setString(JobManagerOptions.ADDRESS, "localhost");
    config.setInteger(JobManagerOptions.PORT,JobManagerOptions.PORT.defaultValue());
    **config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);**
    

    注意:QueryableState仅适用于此配置LOCAL_NUMBER_TASK_MANAGER设置为多于1的值!

    实例化/执行环境:

    LocalQueryableStreamEnvironment env = LocalQueryableStreamEnvironment.createLocalEnvironment(3, config);
    ...
    env.addSource(anySource)
       .keyby(anyAtribute)
       .flatmap(new UpdateMyStateToBeQueriedLaterMapper())
       .addSink(..); //etc
    ...
    env.execute("JobNameHere");
    

    并创建客户端:

    final Configuration config = new Configuration();
    config.setString(JobManagerOptions.ADDRESS, "localhost");
    config.setInteger(JobManagerOptions.PORT, JobManagerOptions.PORT.defaultValue());
    
    HighAvailabilityServices highAvailabilityServices = HighAvailabilityServicesUtils
        .createHighAvailabilityServices(
                       config, 
                       Executors.newSingleThreadScheduledExecutor(),
                       HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION
        );
    return new QueryableStateClient(config,highAvailabilityServices);
    

    有关更多信息访问:

    Queryable States in ApacheFlink - Implementation

    Queryable State Client with 1.3.0-rc0

    我的依赖:

    compile group: 'org.apache.flink', name: 'flink-java', version: '1.3.1'
    compile group: 'org.apache.flink', name: 'flink-jdbc', version: '1.3.1'
    compile group: 'org.apache.flink', name: 'flink-streaming-java_2.11', version: '1.3.1'
    compile group: 'org.apache.flink', name: 'flink-clients_2.11', version: '1.3.1'
    compile group: 'org.apache.flink', name: 'flink-cep_2.11', version: '1.3.1'
    compile group: 'org.apache.flink', name: 'flink-connector-kafka-0.10_2.11', version: '1.3.1'
    compile 'org.apache.flink:flink-runtime-web_2.11:1.3.1'
    

相关问题