首页 文章

为Kafka客户端启用SSL

提问于
浏览
7

Kafka允许客户端通过SSL连接 . 默认情况下,SSL已禁用,但我通过引用以下链接启用了该功能 . http://docs.confluent.io/2.0.0/kafka/ssl.html

完成所有配置后,Producer / Consumer无法生成/使用该消息 .

[2016-02-29 09:20:49,189] ERROR Error when sending message to topic ssltopic with key: null, value: 2 bytes with error: Failed to update metadata after 60000 ms. (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
sas
[2016-02-29 09:21:16,031] WARN Failed to send SSL Close message  (org.apache.kafka.common.network.SslTransportLayer)
java.io.IOException: Connection reset by peer

2 回答

  • 7

    supermonk的上述答案澄清了大多数要检查的地方 . 我遇到了与OP类似的问题,错误不在代理配置中,而是客户端配置 .
    在官方documentation中,虽然他们暗中提到将client.keystore创建为步骤1,但我错过了使用CA签署证书,就像为server.keystore所做的那样 . 这导致Kafka经纪人拒绝来自客户( 生产环境 者/消费者)的连接 .

    执行这两个步骤已经消除了我的问题 .

    keytool -keystore kafka.client.keystore.jks -alias localhost -certreq -file cert-file
    openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days $VALIDITY -CAcreateserial -passin pass:$PASSWORD
    
    keytool -keystore kafka.client.keystore.jks -alias CARoot -import -file ca-cert
    keytool -keystore kafka.client.keystore.jks -alias localhost -import -file cert-signed
    

    这将使用CA证书对证书进行签名,并将CARoot和签名证书添加到client.keystore .

    参考:Confluent blog on securing Apache Kafka

  • 6

    以下是一些验证步骤 . 由于错误日志不详细,您是否可以尝试以下步骤来验证设置是否良好 .

    • 要验证服务器的 keystoretruststore 是否设置正确,您可以运行以下命令:

    openssl s_client -debug -connect localhost:9093 -tls1注意:TLSv1应列在ssl.enabled.protocols下 .

    在此命令的输出中,您应该看到服务器的证书:

    -----BEGIN CERTIFICATE-----
    {variable sized random bytes} 
    -----END CERTIFICATE----- 
    subject=/C=US/ST=CA/L=Santa Clara/O=org/OU=org/CN=Joe Smith   issuer=/C=US/ST=CA/L=Santa Clara/O=org/OU=org/CN=kafka/emailAddress=test@test.com
    

    如果证书未显示或者有任何其他错误消息,则表明 keystore 未正确设置 .

    • 检查server.properties
    echo "############################# Security  #############################" >>server.properties
    echo "listeners=SSL://:9093" >>server.properties
    echo "security.inter.broker.protocol=SSL" >> server.properties
    echo "ssl.client.auth=required" >> server.properties
    echo "ssl.keystore.location=/home/vagrant/securityDemo/kafka.server.keystore.jks" >> server.properties
    echo "ssl.keystore.password=test1234" >> server.properties
    echo "ssl.key.password=test1234" >> server.properties
    echo "ssl.truststore.location=/home/vagrant/securityDemo/kafka.server.truststore.jks" >> server.properties
    echo "ssl.truststore.password=test1234" >> server.properties
    echo "ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1" >> server.properties
    echo "authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer" >>, if acl enabled ** server.properties
    
    • 确保您只有一个CA根 .

    Reference 1.Reference 2.

相关问题