Apache Kafka broker支持通过 SASL 进行客户端身份验证。SASL 身份验证可以与 SSL 加密同时启用(SSL 客户端身份验证将被禁用)。
支持SASL的机制:
在实践之前先说一下配置相关的基本内容,实践中使用到的docker-compose.yml在后文有展示。
Kafka 使用 Java 身份验证和授权服务 ( JAAS ) 进行 SASL 配置。
可以通过java.security.auth.login.config
指定JAAS配置文件并传递给JVM,例如:
export KAFKA_OPTS="-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf"
kafka_server_jaas.conf的内容如下所示:
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret"
user_admin="admin-secret"
user_alice="alice";
};
在JAAS文件中,broker使用的section名称是KafkaServer
,如果有多个listener要配置使用SASL,则可以在KafkaServer
前加上listener名称作为前缀,例如sasl_ssl.KafkaServer
除了使用JAAS配置文件外,还能通过broker配置属性sasl.jaas.config
来配置SASL,但必须要在属性名前加上listener的前缀和SASL机制的前缀:
listener.name.{listenerName}.{saslMechanism}.sasl.jaas.config
示例:
listener.name.sasl_ssl.scram-sha-256.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
username="admin" \
password="admin-secret";
listener.name.sasl_ssl.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="admin" \
password="admin-secret" \
user_admin="admin-secret" \
user_alice="alice-secret";
官方推荐使用这种方法配置JAAS。
如果同时使用多种方式配置JAAS,其应用的优先级顺序:
listener.name.{listenerName}.{saslMechanism}.sasl.jaas.config
{listenerName}.KafkaServer
静态 JAAS 配置部分KafkaServer
静态 JAAS 配置部分可以通过java.security.auth.login.config
指定客户端JAAS配置文件并传递给JVM,例如:
-Djava.security.auth.login.config=/etc/kafka/kafka_client_jaas.conf
kafka_client_jaas.conf的内容如下所示:
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="alice"
password="alice";
};
通过配置属性sasl.jaas.config
配置,例如:
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="confluent" \
password="confluent-secret";
如果同时指定了配置文件和配置属性,则和服务端一样,将使用配置属性。
PLAIN或SASL/PLAIN是一种简单的用户名/密码认证机制,通常与SSL一起使用进行加密以实现安全认证,确保认证的密码不会被明文传输。
在每个broker的server.properties
中启用SASL/PLAIN
:
# List of enabled mechanisms, can be more than one
sasl.enabled.mechanisms=PLAIN
# Specify one of of the SASL mechanisms
sasl.mechanism.inter.broker.protocol=PLAIN
配置端口监听
listeners=SASL_PLAINTEXT://kafka1:9093
advertised.listeners=SASL_PLAINTEXT://localhost:9093
配置JAAS属性
listener.name.sasl_plaintext.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="admin" \
password="admin-secret" \
user_admin="admin-secret" \
user_alice="alice-secret";
配置认证
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
super.users=User:admin
制作证书
启用SASL_SSL需要制作相应的SSL证书秘钥,制作方法如下,我整理成脚本,提交到了github。
git clone https://github.com/korimas/kafka-certs-create.git
cd kafka-certs-create
sh create.sh
生成的证书在target目录下。
配置server.properties
(同SASL_PLAIN)
# List of enabled mechanisms, can be more than one
sasl.enabled.mechanisms=PLAIN
# Specify one of of the SASL mechanisms
sasl.mechanism.inter.broker.protocol=PLAIN
配置JAAS(同SASL_PLAIN)
listener.name.sasl_plaintext.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="admin" \
password="admin-secret" \
user_admin="admin-secret" \
user_alice="alice-secret";
配置broker之间通信协议
security.inter.broker.protocol=SASL_SSL
配置监听端口
listeners=SASL_SSL://kafka1:9093
advertised.listeners=SASL_SSL://localhost:9093
配置SSL证书,证书提前放到对应目录下
ssl.key.password=hillstone
ssl.keystore.location=/etc/kafka/secrets/server.keystore.jks
ssl.keystore.password=hillstone
ssl.truststore.location=/etc/kafka/secrets/server.truststore.jks
ssl.truststore.password=hillstone
# 关闭证书域名的校验
ssl.endpoint.identification.algorithm=
配置认证
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
super.users=User:admin
通过Kafka server.properties
的以下选项控制:
sasl.enabled.mechanisms=GSSAPI,PLAIN
同时在对应的JAAS配置中指定所启用的机制的登录模块配置
KafkaServer {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="/etc/security/keytabs/kafka_server.keytab"
principal="kafka/kafka1.hostname.com@EXAMPLE.COM";
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret"
user_admin="admin-secret"
user_alice="alice-secret";
};
实践配置了一套使用SASL_PLAIN + SSL的三节点集群。
使用的证书可以按上文的方法制作。
直接上docker-compose.yml
:
version: '2'
services:
zookeeper:
container_name: sasl_ssl_zookeeper
image: confluentinc/cp-zookeeper:5.1.2
hostname: sasl_ssl_zookeeper
restart: always
ports:
- 22182:2182
environment:
ZOOKEEPER_CLIENT_PORT: 2182
ZOOKEEPER_TICK_TIME: 2000
ZOOKEEPER_MAXCLIENTCNXNS: 0
ZOOKEEPER_AUTHPROVIDER.1: org.apache.zookeeper.server.auth.SASLAuthenticationProvider
ZOOKEEPER_REQUIRECLIENTAUTHSCHEME: sasl
ZOOKEEPER_JAASLOGINRENEW: 3600000
KAFKA_OPTS: -Djava.security.auth.login.config=/etc/kafka/secrets/zk_server_jaas.conf
volumes:
- ./secrets:/etc/kafka/secrets
kafka01:
image: wurstmeister/kafka:latest
hostname: sasl_ssl_kafka01
container_name: sasl_ssl_kafka01
depends_on:
- zookeeper
ports:
- 29093:9093
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'sasl_ssl_zookeeper:2182'
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_LISTENERS: SASL_SSL://:9093
KAFKA_ADVERTISED_LISTENERS: SASL_SSL://10.182.51.86:29093
KAFKA_SECURITY_INTER_BROKER_PROTOCOL: SASL_SSL
KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: PLAIN
KAFKA_SASL_ENABLED_MECHANISMS: PLAIN
KAFKA_SSL_KEYSTORE_LOCATION: "/etc/kafka/secrets/server.keystore.jks"
KAFKA_SSL_KEYSTORE_PASSWORD: "hillstone"
KAFKA_SSL_KEY_PASSWORD: "hillstone"
KAFKA_SSL_TRUSTSTORE_LOCATION: "/etc/kafka/secrets/server.truststore.jks"
KAFKA_SSL_TRUSTSTORE_PASSWORD: "hillstone"
KAFKA_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: ""
KAFKA_SSL_CLIENT_AUTH: none
KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.auth.SimpleAclAuthorizer
KAFKA_OPTS: -Djava.security.auth.login.config=/etc/kafka/secrets/kafka_server_jaas.conf
KAFKA_SUPER_USERS: User:admin
volumes:
- ./secrets:/etc/kafka/secrets
kafka02:
image: wurstmeister/kafka:latest
hostname: sasl_ssl_kafka02
container_name: sasl_ssl_kafka02
depends_on:
- zookeeper
ports:
- 39093:9093
environment:
KAFKA_BROKER_ID: 2
KAFKA_ZOOKEEPER_CONNECT: 'sasl_ssl_zookeeper:2182'
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_LISTENERS: SASL_SSL://:9093
KAFKA_ADVERTISED_LISTENERS: SASL_SSL://10.182.51.86:39093
KAFKA_SECURITY_INTER_BROKER_PROTOCOL: SASL_SSL
KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: PLAIN
KAFKA_SASL_ENABLED_MECHANISMS: PLAIN
KAFKA_SSL_KEYSTORE_LOCATION: "/etc/kafka/secrets/server.keystore.jks"
KAFKA_SSL_KEYSTORE_PASSWORD: "hillstone"
KAFKA_SSL_KEY_PASSWORD: "hillstone"
KAFKA_SSL_TRUSTSTORE_LOCATION: "/etc/kafka/secrets/server.truststore.jks"
KAFKA_SSL_TRUSTSTORE_PASSWORD: "hillstone"
KAFKA_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: ""
KAFKA_SSL_CLIENT_AUTH: none
KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.auth.SimpleAclAuthorizer
KAFKA_OPTS: -Djava.security.auth.login.config=/etc/kafka/secrets/kafka_server_jaas.conf
KAFKA_SUPER_USERS: User:admin
volumes:
- ./secrets:/etc/kafka/secrets
kafka03:
image: wurstmeister/kafka:latest
hostname: sasl_ssl_kafka03
container_name: sasl_ssl_kafka03
depends_on:
- zookeeper
ports:
- 49093:9093
environment:
KAFKA_BROKER_ID: 3
KAFKA_ZOOKEEPER_CONNECT: 'sasl_ssl_zookeeper:2182'
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_LISTENERS: SASL_SSL://:9093
KAFKA_ADVERTISED_LISTENERS: SASL_SSL://10.182.51.86:49093
KAFKA_SECURITY_INTER_BROKER_PROTOCOL: SASL_SSL
KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: PLAIN
KAFKA_SASL_ENABLED_MECHANISMS: PLAIN
KAFKA_SSL_KEYSTORE_LOCATION: "/etc/kafka/secrets/server.keystore.jks"
KAFKA_SSL_KEYSTORE_PASSWORD: "hillstone"
KAFKA_SSL_KEY_PASSWORD: "hillstone"
KAFKA_SSL_TRUSTSTORE_LOCATION: "/etc/kafka/secrets/server.truststore.jks"
KAFKA_SSL_TRUSTSTORE_PASSWORD: "hillstone"
KAFKA_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: ""
KAFKA_SSL_CLIENT_AUTH: none
KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.auth.SimpleAclAuthorizer
KAFKA_OPTS: -Djava.security.auth.login.config=/etc/kafka/secrets/kafka_server_jaas.conf
KAFKA_SUPER_USERS: User:admin
volumes:
- ./secrets:/etc/kafka/secrets
其中使用到的kafka_server_jaas.conf
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret"
user_admin="admin-secret"
user_alice="alice";
};
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="alice"
password="alice";
};
Client {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret";
};
以及zk_server_jaas.conf
Server {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret"
user_admin="admin-secret";
};
启动容器直接执行
docker-compose up -d
需要注意的是,上述docker-compose.yml
使用了confluentinc
的zookeeper容器镜像和wurstmeister
的kafka容器镜像,照理他们都各自有自己的zookeeper镜像和kafka镜像,不过在我实践中使用同一家的镜像一直无法搭建成功,有搭建成功的可以留下自己的docker-compose.yml。
python可以使用python-kafka
这个第三方的库去访问Kafka服务。
from kafka import KafkaConsumer
import time
import json
BOOTSTRAP_SERVERS='10.182.51.86:29093'
TOPIC='topic'
consumer = KafkaConsumer(TOPIC,
bootstrap_servers=BOOTSTRAP_SERVERS,
ssl_check_hostname=False,
ssl_cafile='./certs/target/cacert.pem',
auto_offset_reset='earliest',
security_protocol='SASL_SSL',
#ssl_context=context,
sasl_mechanism='PLAIN',
sasl_plain_username='admin',
sasl_plain_password='admin-secret',
api_version=(0,10),
receive_buffer_bytes=1024,
enable_auto_commit='False')
# consumer_timeout_ms=1000)
# Consumption log
for msg in consumer:
print(msg)
from kafka import KafkaProducer
producer = KafkaProducer(
bootstrap_servers="10.182.51.86:29093",
security_protocol="SASL_SSL",
ssl_cafile='./certs/target/cacert.pem',
ssl_check_hostname=False,
sasl_mechanism='PLAIN',
sasl_plain_username='admin',
sasl_plain_password='admin-secret',
)
future = producer.send("topic", value="hello world".encode())
result = future.get(timeout=10)
print(result)