首页 文章

为kafka-console-consumer配置SSL和ACL

提问于
浏览
1

我按照说明herehere向我的Confluent-3.0.1 Kafka群集添加SSL安全性 .

在下面的Linux事务片段中,我已经用myserverA,myserverB和myserverC替换了我的服务器名称 . 我也模糊了密码 . 这是我在留言板上的第一篇帖子 . 对于这篇文章中任何格式不佳的部分,我深表歉意 .

我的问题:

什么ACL控制对下面显示的获取偏移的访问?我是否需要更改配置或SSL密钥?

非常感谢您提供的任何帮助 .

我能够通过SSL使用kafka-console-producer生成数据,但无法使用kafka-console-consumer读取数据 . 我收到以下错误:

[kafka@myserverA confluent-3.0.1]$ /kafka/confluent-3.0.1/bin/kafka-console-consumer --bootstrap-server myserverA:9093 --zookeeper myserverA:2181/kafka --topic ssl-test --from-beginning --new-consumer --consumer.config /kafka/data/client/ssl/client.properties
[2017-06-27 13:11:50,462] WARN Attempt to fetch offsets for partition ssl-test-0 failed due to: Not authorized to access topics: [Topic authorization failed.] (org.apache.kafka.clients.consumer.internals.Fetcher)
[2017-06-27 13:11:50,473] WARN Error while fetching metadata with correlation id 6 : {ssl-test=TOPIC_AUTHORIZATION_FAILED} (org.apache.kafka.clients.NetworkClient)
[2017-06-27 13:11:50,476] ERROR Unknown error when running consumer:  (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [ssl-test]

目前尚不清楚我的问题是在客户端配置还是代理间配置中 .

我的三个代理中的每个代理上的server.properties文件包括以下内容:

###################### SSL Configuration ################
#
ssl.keystore.location=/kafka/data/ssl/keystore/kafka.keystore.jks
ssl.keystore.password=<hidden for this posting>
ssl.key.password=<hidden for this posting>
ssl.truststore.location=/kafka/data/ssl/truststore/kafka.truststore.jks
ssl.truststore.password=<hidden for this posting>

ssl.client.auth=requested
#ssl.cipher.suites=
ssl.enabled.protocols = TLSv1.2,TLSv1.1,TLSv1
ssl.keystore.type = JKS
ssl.truststore.type = JKS

security.inter.broker.protocol=ssl

# #### Enable ACLs ####
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
allow.everyone.if.no.acl.found=true

super.users=User:CN=myserverA,OU=NBCUniversal,O=NBCUniversal,L=NY,ST=NY,C=US;User:myserverB,OU=NBCUniversal,O=NBCUniversal,L=NY,ST=NY,C=US;User:CN=myserverC,OU=NBCUniversal,O=NBCUniversal,L=NY,ST=NY,C=US

我对producer.config和consumer.config使用相同的client.properties . 它包含以下内容:

###################### SSL Configuration ################
#
security.protocol=ssl

ssl.keystore.location=/kafka/data/client/ssl/keystore/kafka.client.keystore.jks
ssl.keystore.password=<hidden for this posting>
ssl.key.password=<hidden for this posting>
ssl.truststore.location=/kafka/data/client/ssl/truststore/kafka.client.truststore.jks
ssl.truststore.password=<hidden for this posting>

#ssl.provider=
#ssl.cipher.suites=
ssl.enabled.protocols = TLSv1.2,TLSv1.1,TLSv1
ssl.keystore.type = JKS
ssl.truststore.type = JKS

我在ssl-test主题上有大量的ACL授权 . 我试过:1)逗号后带空格的SSL编号,2)逗号后没有空格的SSL编号,3)代理证书的SSL通用名称

[root@myserverA ~]# /kafka/confluent-3.0.1/bin/kafka-acls --authorizer-properties zookeeper.connect=myserverA:2181/kafka --list --topic ssl-test
Current ACLs for resource `Topic:ssl-test`:
User:CN=Test Client,OU=Test Client Unit,O=Test Client Org,L=LA,ST=CA,C=US has Allow permission for operations: Read from hosts: *
User:CN=Test Client, OU=Test Client Unit, O=Test Client Org, L=LA, ST=CA, C=US has Allow permission for operations: Read from hosts: *
User:myserverA has Allow permission for operations: Write from hosts: *
User:myserverC has Allow permission for operations: Read from hosts: *
User:CN=myserverB,OU=NBCUniversal,O=NBCUniversal,L=NY,ST=NY,C=US has Allow permission for operations: Write from hosts: *
User:CN=myserverA,OU=NBCUniversal,O=NBCUniversal,L=NY,ST=NY,C=US has Allow permission for operations: Read from hosts: *
User:Test Client has Allow permission for operations: Read from hosts: *
User:Test Client has Allow permission for operations: Write from hosts: *
User:myserverB has Allow permission for operations: Write from hosts: *
User:CN=Test Client,OU=Test Client Unit,O=Test Client Org,L=LA,ST=CA,C=US has Allow permission for operations: Write from hosts: *
User:CN=myserverC,OU=NBCUniversal,O=NBCUniversal,L=NY,ST=NY,C=US has Allow permission for operations: Read from hosts: *
User:CN=myserverA,OU=NBCUniversal,O=NBCUniversal,L=NY,ST=NY,C=US has Allow permission for operations: Write from hosts: *
User:CN=myserverB,OU=NBCUniversal,O=NBCUniversal,L=NY,ST=NY,C=US has Allow permission for operations: Read from hosts: *
User:myserverB has Allow permission for operations: Read from hosts: *
User:myserverA has Allow permission for operations: Read from hosts: *
User:CN=Test Client, OU=Test Client Unit, O=Test Client Org, L=LA, ST=CA, C=US has Allow permission for operations: Write from hosts: *
 ser:myserverC has Allow permission for operations: Write from hosts: *
 ser:CN=myserverC,OU=NBCUniversal,O=NBCUniversal,L=NY,ST=NY,C=US has Allow permission for operations: Write from hosts: *

kafka-console-producer通常通过SSL运行:

[kafka@myserverA confluent-3.0.1]$ bin/kafka-console-producer --broker-list myserverA:9093 --topic ssl-test --producer.config /kafka/data/client/ssl/client.properties
j
k
<Ctrl-D>

2 回答

  • 0

    根据the documentation消费者需要关于该主题的READ和DESCRIBE,以及需要READ的消费者群体 . 选项 --consumer 可以方便地将所有这些设置为一次;使用他们的例子:

    bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 \
      --add \
      --allow-principal User:Bob \
      --consumer \
      --topic Test-topic \
      --group Group-1
    
  • 0

    我的Kafka SSL配置中存在多个问题 . 但是,运行kafka-console-consumer时显式错误“WARN尝试获取分区ssl-test-0的偏移失败...”是由于客户端证书未包含在kafka节点B的信任库中C .

相关问题