我有一个 Java Spring 应用程序(在 Hadoop 集群之外的服务器上运行),它使用 KEYTAB 文件连接到 Kerberized Kafka 主题(由 Hadoop 集群上的 Kerberos 保护)并推送流数据。

我现在面临的问题是 TGT 每 24 小时重新生成一次,我的 Java 应用程序失去了与 Kafka 的连接。

这是错误日志:

2018-05-12 10:57:01 WARN  NetworkClient:241 - [Producer clientId=amqp7] Connection to node 1001 terminated during authentication. This may indicate that authentication failed due to invalid credentials.
2018-05-12 10:57:01 WARN  Selector:246 - [Producer clientId=amqp7] Unexpected error from domain.com/1.2.3.4; closing connection
java.lang.IllegalStateException: This ticket is no longer valid
               at javax.security.auth.kerberos.KerberosTicket.toString(KerberosTicket.java:638)
               at java.lang.String.valueOf(String.java:2994)
               at java.lang.StringBuilder.append(StringBuilder.java:131)
               at sun.security.jgss.krb5.SubjectComber.findAux(SubjectComber.java:171)
               at sun.security.jgss.krb5.SubjectComber.find(SubjectComber.java:61)
               at sun.security.jgss.krb5.Krb5Util.getTicket(Krb5Util.java:153)
               at sun.security.jgss.krb5.Krb5InitCredential$1.run(Krb5InitCredential.java:335)
               at sun.security.jgss.krb5.Krb5InitCredential$1.run(Krb5InitCredential.java:331)
               at java.security.AccessController.doPrivileged(Native Method)
               at sun.security.jgss.krb5.Krb5InitCredential.getTgt(Krb5InitCredential.java:330)
               at sun.security.jgss.krb5.Krb5InitCredential.getInstance(Krb5InitCredential.java:145)
               at sun.security.jgss.krb5.Krb5MechFactory.getCredentialElement(Krb5MechFactory.java:122)
               at sun.security.jgss.krb5.Krb5MechFactory.getMechanismContext(Krb5MechFactory.java:187)
               at sun.security.jgss.GSSManagerImpl.getMechanismContext(GSSManagerImpl.java:224)
               at sun.security.jgss.GSSContextImpl.initSecContext(GSSContextImpl.java:212)
               at sun.security.jgss.GSSContextImpl.initSecContext(GSSContextImpl.java:179)
               at com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:192)
               at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator$2.run(SaslClientAuthenticator.java:361)
               at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator$2.run(SaslClientAuthenticator.java:359)
               at java.security.AccessController.doPrivileged(Native Method)
               at javax.security.auth.Subject.doAs(Subject.java:422)
               at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.createSaslToken(SaslClientAuthenticator.java:359)
               at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.sendSaslClientToken(SaslClientAuthenticator.java:269)
               at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.authenticate(SaslClientAuthenticator.java:206)
               at org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:81)
               at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:474)
               at org.apache.kafka.common.network.Selector.poll(Selector.java:412)
               at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:460)
               at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
               at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)
               at java.lang.Thread.run(Thread.java:745)

如果我终止应用程序并再次运行它将成功进行身份验证并将数据推送到 Kerberized Kafka 而不会出现任何错误。

我在 Producer.send()方法中尝试了回调处理程序,但没有运气。

关于如何在 kerberos 机票更新代码中我的客户的任何想法?