- 本着傻瓜式配置,可重复搭建kafka的指导思想编写本文档
- 快速满足安全加固要求
Static hostname: localhost.localdomain
Icon name: computer-vm
Chassis: vm
Machine ID: c744f9429d54f945b3b38d3eb7f3a591
Boot ID: 3fc16fddef1543deb3c57a6bac71a670
Virtualization: kvm
Operating System: CentOS Linux 7 (Core)
CPE OS Name: cpe:/o:centos:centos:7
Kernel: Linux 3.10.0-1160.49.1.el7.x86_64
Architecture: x86-64
版本信息
- kafka_2.11-2.1.0
- zookeeper-3.4.13
# 上传到/home/kafka目录
scp kafka_2.11-2.1.0.tgz root@192.168.2.21:/home/kafka
scp zookeeper-3.4.13.tar.gz root@192.168.2.21:/home/kafka
cd /home
tar zxvf kafka_2.11-2.1.0.tgz
tar zxvf zookeeper-3.4.13.tar.gz
2. 添加SASL/PLAIN配置信息
为了满足测试,需要修改的文件如下
/home/kafka/kafka_2.11-2.1.0/config
.
├── consumer.properties
├── kafka_server_jaas.conf
├── producer.properties
├── server.properties
├── zk_server_jaas.conf
└── zookeeper.properties
kafka_server_jaas.conf
vi kafka_server_jaas.conf
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="adminpass"
user_admin="adminpass"
user_test="testpass";
};
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="adminpass"
user_admin="adminpass"
user_test="testpass";
};
Client {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="adminpass"
user_admin="adminpass";
};
Server {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="adminpass"
user_admin="adminpass";
};
producer.properties末尾追加如下配置
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="admin" \
password="adminpass";
server.properties
# 注释
#listeners=PLAINTEXT://0.0.0.0:9092
listeners=SASL_PLAINTEXT://localhost:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
allow.everyone.if.no.acl.found=false
# 参数 allow.everyone.if.no.acl.found
# 设置为 true,ACL 机制改为黑名单机制,只有黑名单中的用户无法访问
# 设置为 false,ACL 机制改为白名单机制,只有白名单中的用户可以访问,默认值为 false
auto.create.topics.enable=false
super.users=User:admin
zookeeper.set.acl=true
zk_server_jaas.conf
vi zk_server_jaas.conf
Client {
org.apache.zookeeper.server.auth.DigestLoginModule required
user_admin="adminpass";
};
Server {
org.apache.zookeeper.server.auth.DigestLoginModule required
user_admin="adminpass";
};
zookeeper.properties末尾追加如下配置
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000
/home/kafka/kafka_2.11-2.1.0/bin
.
├── kafka-console-consumer.sh
├── kafka-console-producer.sh
├── kafka-server-start.sh
├── kafka-topics.sh
├── zookeeper-security-migration.sh
├── zookeeper-server-start.sh
kafka-console-consumer.sh、kafka-console-producer.sh
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx512M" # 找到当前行,追加如下内容
fi
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx512M -Djava.security.auth.login.config=/home/kafka/kafka_2.11-2.1.0/config/kafka_client_jaas.conf"
fi
kafka-server-start.sh
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G" # 找到当前行,追加如下内容
fi
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G -Djava.security.auth.login.config=/home/kafka/kafka_2.11-2.1.0/config/kafka_server_jaas.conf"
fi
kafka-topics.sh
exec $(dirname $0)/kafka-run-class.sh kafka.admin.TopicCommand "$@" # 找到当前行,在上面追加如下内容
export KAFKA_OPTS="-Djava.security.auth.login.config=/home/kafka/kafka_2.11-2.1.0/config/kafka_server_jaas.conf"
exec $(dirname $0)/kafka-run-class.sh kafka.admin.TopicCommand "$@"
zookeeper-security-migration.sh
exec $(dirname $0)/kafka-run-class.sh kafka.admin.ZkSecurityMigrator "$@" # 找到当前行,在上面追加如下内容
export KAFKA_OPTS="-Djava.security.auth.login.config=/home/kafka/kafka_2.11-2.1.0/config/kafka_server_jaas.conf"
exec $(dirname $0)/kafka-run-class.sh kafka.admin.ZkSecurityMigrator "$@"
zookeeper-server-start.sh
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx512M -Xms512M " # 找到当前行,追加如下内容
fi
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx512M -Xms512M -Djava.security.auth.login.config=/home/kafka/kafka_2.11-2.1.0/config/zk_server_jaas.conf"
fi
3. 启动服务
强烈建议准备全新安装之后的 *** 作系统环境,避免重复验证时出现干扰因素,浪费时间排查问题
cd /home/kafka/kafka_2.11-2.1.0/
# 新开终端,执行如下命令
./bin/kafka-server-start.sh config/server.properties
# 新开终端,执行如下命令
./bin/zookeeper-server-start.sh config/zookeeper.properties
# 新开终端,执行如下命令,创建成功后,再启动生产者、消费者
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
# 新开终端,执行如下命令
./bin/kafka-console-producer.sh --broker-list localhost:9092 --producer.config=cig/producer.properties --topic test
# 新开终端,执行如下命令
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer.config config/consumer.properties --topic test --from-beginning
参考文档
-
写的太好了=》给 Kafka 配置 SASL/PLAIN 认证 | Kyle’s Blog
本人找到这篇文章前,确实也遇到了相同的问题
定义用户的方式非常奇葩,很难理解。username 和 password 两字段定义 Kafka brokers 内部沟通的用户密码,user_用户名 配置的是 client 连接 broker 时用的用户、密码。据我尝试,必须定义一个 username 用户对应的 user_ 字段,否则连不上。就像上面,有个 username=”testuser” ,所以必须再定义一次 user_testuser 且密码保持一致。此外可以再添加新用户,如添加 user_alice=”alice-secret” 。 -
10.0 Zookeeper 权限控制 ACL | 菜鸟教程 (runoob.com)
-
Apache Kafka
-
Welcome — ZooNavigator Docs (elkozmon.com)
docker run \ --rm \ -e HTTP_PORT=9000 \ --name zoonavigator \ -p 9000:9000 \ elkozmon/zoonavigator:latest
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)