Hey team, I have a few questions about Confluent security.
I’ve set up SSL using self-signed certs and created client certs signed by my own CA to securely consume from Kafka topics. Now I’m trying to import multiple CA certs into the broker truststore so clients with certs signed by different CAs can connect — but it’s not working as expected.
Does Kafka support multiple CAs in the broker truststore?
Are there any limitations I should be aware of?
Also, is it possible to use both self-signed and procured certs in the same Kafka cluster?
We are currently using confluent-7.6.0 community edition.
Hi Siva, thanks for replying.
Certificate and Keystore Setup
- Create the Certificate Authority (CA) and Private Key
a. Create the CA configuration file:
sudo nano openssl-ca.cnf
b. Generate the CA certificate and private key:
-keyout cakey.pem -out cacert.pem -days 365 \
-config openssl-ca.cnf -subj "/CN=Kafka-CA"```
2. Create Kafka Broker Keystore
```keytool -keystore kafka1.keystore.p12 -alias kafka-broker-1 -validity 365 \
-genkey -keyalg RSA -storetype pkcs12 \
-ext "SAN=server_hostname"```
3. Generate Certificate Signing Request (CSR)
```keytool -keystore kafka1.keystore.p12 -alias kafka-broker-1 \
-certreq -file kafka1.csr```
4. Sign the CSR with CA
```openssl x509 -req -CA cacert.pem -CAkey cakey.pem -CAcreateserial \
-in kafka1.csr -out kafka-cert-1.pem -days 365 -sha256 \
-extfile openssl-ca.cnf -extensions signing_req```
5. Generate Truststore and Import CA
```keytool -keystore kafka.truststore.p12 -alias CARoot -import -file cacert.pem -noprompt```
For Debezium Client (or any client)
6. Create Client Keystore
```keytool -keystore clientdeb.keystore.p12 -alias clientdeb -validity 365 \
-genkey -keyalg RSA -storetype pkcs12 \
-ext "SAN=DNS:debezium-client"```
7. Generate Client CSR
```keytool -keystore clientdeb.keystore.p12 -alias clientdeb -certreq -file clientdeb.csr```
8. Sign Client CSR with CA
```openssl x509 -req -CA cacert.pem -CAkey cakey.pem -CAcreateserial \
-in clientdeb.csr -out clientdeb-cert.pem -days 365 -sha256 \
-extfile openssl-ca.cnf -extensions signing_req```
9. Import CA and Signed Cert into Client Keystore
```keytool -keystore clientdeb.keystore.p12 -alias CARoot -import -file cacert.pem -noprompt
keytool -keystore clientdeb.keystore.p12 -alias clientdeb -import -file clientdeb-cert.pem```
10. Import CA and Kafka Broker Cert into Client Truststore
```keytool -keystore clientdeb.truststore.p12 -alias CARoot -import -file cacert.pem -noprompt
keytool -keystore clientdeb.truststore.p12 -alias kafka-broker-1 -import -file kafka-cert-1.pem```
11. Import Client Cert into Broker Truststore
```keytool -keystore kafka.truststore.p12 -alias clientdeb -import -file clientdeb-cert.pem```
On the Python Side
1. Create a Client Key
```openssl genrsa -out client.key 2048```
2. Client CSR
```openssl req -new -key client.key -out client.csr -subj "/CN=client"```
3. Getting the Client CSR Signed
```sudo openssl x509 -req -in client.csr -CA SSLcomDVCA_2.crt -CAkey cakey.pem -CAcreateserial -out client.crt -days 365 -sha256```
Python Code Snippet
```from confluent_kafka import Consumer
import json
def configure_kafka_consumer(bootstrap_server, group_id, offset_reset):
try:
consumer_config = {
'bootstrap.servers': bootstrap_server,
'group.id': group_id,
'auto.offset.reset': offset_reset,
'<http://max.poll.interval.ms|max.poll.interval.ms>': 86400000,
'enable.auto.commit': False,
'security.protocol': 'SSL', # Enable SSL
'ssl.ca.location': '<path-to-cacert.pem>', # CA certificate
'ssl.certificate.location': '<path-to-client.crt>', # Client certificate
'ssl.key.location': '<path-to-client.key>',
'ssl.endpoint.identification.algorithm': 'https',
}
consumer = Consumer(consumer_config)
return consumer
except Exception as error:
raise
consumer = configure_kafka_consumer(
bootstrap_server='<bootstrap_server>',
group_id='<Group_id>',
offset_reset='earliest',
)
try:
# Subscribe to the topic
topic_name = '<Topic_name>'
consumer.subscribe([topic_name])
print("Consumer is now subscribed and waiting for messages...")
while True:
consumer_batch = consumer.consume(num_messages=100000, timeout=3)
for msg in consumer_batch:
print(msg.error())
if msg is not None and msg.error() is None:
message = json.loads(msg.value().decode())
print(message)
if len(consumer_batch) > 0:
print(f'Length of received messages: {len(consumer_batch)}')
consumer.commit()
# else:
# print("No messages received, still waiting...")
except KeyboardInterrupt:
print("Consumer interrupted by user.")
except Exception as e:
print(f"Error processing messages: {str(e)}")
finally:
print("Closing consumer connection...")
consumer.close()```
This approach works until I attempt to import a second, procured certificate. Is there an alternative method for importing procured certificates or using them within my Kafka cluster?
I am encountering the following error when I add the procured certificate details to the consumer configuration:
```[thrd:<ssl://FQDN:9092/bootstrap>]: <ssl://FQDN:9092/bootstrap>: SSL handshake failed: error:0A000086:SSL routines::certificate verify failed: broker certificate could not be verified, verify that ssl.ca.location is correctly configured or root CA certificates are installed (install ca-certificates package) (after 12ms in state SSL_HANDSHAKE)```
Hi Charan, I read through your set up in detail. In your original message you mentioned -
I’ve set up SSL using self-signed certs and created client certs signed by my own CA to securely consume from Kafka topics. Now I’m trying to import multiple CA certs into the broker truststore so clients with certs signed by different CAs can connect — but it’s not working as expected.
But the error you are getting is related to your client not trusting your broker certificate -
certificate verify failed: broker certificate could not be verified, verify that ssl.ca.location is correctly configured or root CA certificates are installed (install ca-certificates package)
Given the list of commands, your broker certificate seems to be signed by the Kafka-CA certificate and you are pointing to it correctly (ssl.ca.location: <path-to-cacert.pem>
). Please double-check your Kafka broker certificate.
I think I got the problem. You are not importing the signed CA cert (kafka-cert-1.pem
) back in the Broker keystore (kafka1.keystore.p12
). Your Broker keystore still has the self-signed certificate.
I apologies seems i missed them out , i do import them into my keystore
keytool -keystore kafka1.keystore.p12 -alias CARoot -import -file cacert.pem -noprompt
keytool -keystore kafka1.keystore.p12 -alias kafka-broker-1 -import -file kafka-cert-1.pem.
Thanks a lot for helping me out siva, i believe i found the solution to my issue.
Aim - Trying to consume data from kafka cluster that has ssl enabled ,while using client certs signed by a different CA (who has not signed Broker)
Process
-
kafka_side - Imported new CA who has signed client side certs into kafka broker trustore.
-
airflow_side -
ssl.ca.location - pointing to CA who signed broker
ssl.certificate.location - pointing to new client.cert signed by new CA
ssl.key.location - pointing to new client.key signed by new CA
-
ACl - Granted acces to username
Observation - Able to Consume data.
Let me know if this is the right way of doing things , your advice would be of great help to me and my fellow peers.
Thanks.
Charan
Yes, keytool -keystore kafka1.keystore.p12 -alias kafka-broker-1 -import -file kafka-cert-1.pem
is required. This imports your CA reply into your keystore and presents the CA signed cert to the client. With this it should work.
The process you listed is also correct for mutual TLS authentication. Kafka broker truststore should contain the CA cert that signed the client’s certificate. The ca.location
on the client side should contain the CA cert that signed the broker’s cert.
Glad that your issued is resolved. 