首页 文章

卡夫卡 TOPIC_AUTHORIZATION_FAILED

提问于
浏览
2

我实际上正在使用 SASL 纯文本设置简单的 Kafka 身份验证并添加 ACL 授权。但是当我尝试使用数据时,我遇到了一个问题。

[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 0.10.0.0
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : b8642491e78c5a13
[main] WARN org.apache.kafka.clients.NetworkClient - Error while fetching metadata with correlation id 1 : {test-topic=TOPIC_AUTHORIZATION_FAILED}
[main] WARN org.apache.kafka.clients.NetworkClient - Error while fetching metadata with correlation id 2 : {test-topic=TOPIC_AUTHORIZATION_FAILED}
[main] WARN org.apache.kafka.clients.NetworkClient - Error while fetching metadata with correlation id 3 : {test-topic=TOPIC_AUTHORIZATION_FAILED}
[main] WARN org.apache.kafka.clients.NetworkClient - Error while fetching metadata with correlation id 4 : {test-topic=TOPIC_AUTHORIZATION_FAILED}
[main] WARN org.apache.kafka.clients.NetworkClient - Error while fetching metadata with correlation id 5 : {test-topic=TOPIC_AUTHORIZATION_FAILED}
[main] WARN org.apache.kafka.clients.NetworkClient - Error while fetching metadata with correlation id 6 : {test-topic=TOPIC_AUTHORIZATION_FAILED}
[main] WARN org.apache.kafka.clients.NetworkClient - Error while fetching metadata with correlation id 7 : {test-topic=TOPIC_AUTHORIZATION_FAILED}
[main] WARN org.apache.kafka.clients.NetworkClient - Error while fetching metadata with correlation id 8 : {test-topic=TOPIC_AUTHORIZATION_FAILED}
[main] WARN org.apache.kafka.clients.NetworkClient - Error while fetching metadata with correlation id 9 : {test-topic=TOPIC_AUTHORIZATION_FAILED}
[main] WARN org.apache.kafka.clients.NetworkClient - Error while fetching metadata with correlation id 10 : {test-topic=TOPIC_AUTHORIZATION_FAILED}

接下来,您可以看到我的配置文件。

server.properties

listeners=SASL_PLAINTEXT://localhost:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
broker.id=0
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer

producer.properties

security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
bootstrap.servers=localhost:9092
compression.type=none

consumer.properties

security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
zookeeper.connect=127.0.0.1:2181
zookeeper.connection.timeout.ms=6000
group.id=test-consumer-group

kafka_server_jaas.conf

KafkaServer {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="admin"
  password="admin-secret"
  user_admin="admin-secret"
  user_alice="alice-secret";
};

KafkaClient {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="alice"
  password="alice-secret";
};

环境变量:

export KAFKA_OPTS="-Djava.security.auth.login.config=/home/user/kafka_2.10-0.10.0.1/kafka_server_jaas.conf"

命令

设置 ACL:

bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:alice --operation All --group test-consumer-group --topic test-topic

启动 Kafka Server:

./bin/kafka-server-start.sh config/server.properties

开始制片人:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test-topic --producer.config=config/producer.properties

开始消费者:

bin/kafka-console-consumer.sh --new-consumer --zookeeper localhost:2181 --topic test-topic --from-beginning --consumer.config=config/consumer.properties  --bootstrap-server=localhost:9092

当我尝试启动消费者时,我遇到了上述问题。另外,在 kafka 日志中,我有:

[2016-10-22 20:17:14,091] ERROR [KafkaApi-0] Error when handling request {group_id=test-consumer-group} (kafka.server.KafkaApis)
kafka.admin.AdminOperationException: replication factor: 3 larger than available brokers: 1
    at kafka.admin.AdminUtils$.assignReplicasToBrokers(AdminUtils.scala:117)
    at kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:403)
    at kafka.server.KafkaApis.kafka$server$KafkaApis$$createTopic(KafkaApis.scala:629)
    at kafka.server.KafkaApis.kafka$server$KafkaApis$$createGroupMetadataTopic(KafkaApis.scala:651)
    at kafka.server.KafkaApis$$anonfun$getOrCreateGroupMetadataTopic$1.apply(KafkaApis.scala:657)
    at kafka.server.KafkaApis$$anonfun$getOrCreateGroupMetadataTopic$1.apply(KafkaApis.scala:657)
    at scala.Option.getOrElse(Option.scala:121)
    at kafka.server.KafkaApis.getOrCreateGroupMetadataTopic(KafkaApis.scala:657)
    at kafka.server.KafkaApis.handleGroupCoordinatorRequest(KafkaApis.scala:818)
    at kafka.server.KafkaApis.handle(KafkaApis.scala:86)
    at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
    at java.lang.Thread.run(Thread.java:745)

我怎样才能解决这个问题?

3 回答

  • 2

    通过分离 jaas 客户端和 jaas 服务器修复问题。

    kafka_server_jaas.conf

    KafkaServer {
      org.apache.kafka.common.security.plain.PlainLoginModule required
      username="admin"
      password="admin-secret"
      user_admin="admin-secret"
      user_alice="alice-secret";
    };
    

    kafka_client_jaas.conf

    KafkaClient {
      org.apache.kafka.common.security.plain.PlainLoginModule required
      username="alice"
      password="alice-secret";
    };
    

    在同一个终端上,导出 jaas 服务器 conf 文件并启动 kafka broker:

    $ export KAFKA_OPTS="-Djava.security.auth.login.config=/home/user/kafka_2.10-0.10.0.1/kafka_server_jaas.conf"
    $ ./bin/kafka-server-start.sh config/server.properties
    

    在客户端终端上,导出客户端 jaas conf 文件并启动使用者:

    $ export KAFKA_OPTS="-Djava.security.auth.login.config=/home/user/kafka_2.10-0.10.0.1/kafka_client_jaas.conf"
    $ ./bin/kafka-console-consumer.sh --new-consumer --zookeeper localhost:2181 --topic test-topic --from-beginning --consumer.config=config/consumer.properties  --bootstrap-server=localhost:9092
    

    如果您还想生产,请在另一个终端窗口上执行此操作:

    $ export KAFKA_OPTS="-Djava.security.auth.login.config=/home/user/kafka_2.10-0.10.0.1/kafka_client_jaas.conf"
    $ ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test-topic --producer.config=config/producer.properties
    
  • 0

    似乎您创建了一个复制因子为 3 的主题,但您只运行了 1 个代理。尝试使用“--replication-factor 1”创建主题。如果要自动创建主题,您可能还希望将默认复制因子更改为 1(config/server.properties in config/server.properties)。

  • 0

    我在 Kafka v.0.10 中使用 ACL 时遇到了类似的问题。我觉得这个讨论很有帮助。特别是启用授权日志以检查请求的传入用户名是什么以及 ACL 中指定的内容。

    首先检查服务器主管理员是否提供了所需的所有授权。需要允许服务器主体对所有主题,组和群集执行所有类型的授权。最好在 server.properties server.properties 文件中声明管理员。如果这不能解决问题,那么您可以启用授权日志以找出正在为哪种操作定义哪个样本。

    可以通过修改 config 文件夹中的 log4j.properties 来启用授权日志。在 log4j.properties 文件中,将WARN更改为DEBUG并重新启动 kafka-servers。

    log4j.logger.kafka.authorizer.logger=DEBUG, authorizerAppender
    

    这有助于我解决我的问题。希望有所帮助。

    PS:生成的授权日志非常冗长,占用大量空间。所以,记得在完成调试时关闭它。

相关问题