kafka

kafka,第1张

kafka_2.11-2.1.0配置 SASL/PLAIN 认证实践
  • 本着傻瓜式配置,可重复搭建kafka的指导思想编写本文档
  • 快速满足安全加固要求
虚拟机 *** 作系统信息
   Static hostname: localhost.localdomain
         Icon name: computer-vm
           Chassis: vm
        Machine ID: c744f9429d54f945b3b38d3eb7f3a591
           Boot ID: 3fc16fddef1543deb3c57a6bac71a670
    Virtualization: kvm
  Operating System: CentOS Linux 7 (Core)
       CPE OS Name: cpe:/o:centos:centos:7
            Kernel: Linux 3.10.0-1160.49.1.el7.x86_64
      Architecture: x86-64
版本信息
  • kafka_2.11-2.1.0
  • zookeeper-3.4.13
*** 作步骤 1. 解压到指定目录,假设文件放于/home/kafka目录下
# 上传到/home/kafka目录
scp kafka_2.11-2.1.0.tgz root@192.168.2.21:/home/kafka
scp zookeeper-3.4.13.tar.gz root@192.168.2.21:/home/kafka

cd /home
tar zxvf kafka_2.11-2.1.0.tgz
tar zxvf zookeeper-3.4.13.tar.gz
2. 添加SASL/PLAIN配置信息

为了满足测试,需要修改的文件如下

/home/kafka/kafka_2.11-2.1.0/config
.
├── consumer.properties
├── kafka_server_jaas.conf
├── producer.properties
├── server.properties
├── zk_server_jaas.conf
└── zookeeper.properties

kafka_server_jaas.conf
vi kafka_server_jaas.conf
KafkaServer {
  org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="adminpass"
    user_admin="adminpass"
    user_test="testpass";
};
KafkaClient {
  org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="adminpass"
    user_admin="adminpass"
    user_test="testpass";
};
Client {
  org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="adminpass"
    user_admin="adminpass";
};
Server {
  org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="adminpass"
    user_admin="adminpass";
};
producer.properties末尾追加如下配置
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="admin" \
    password="adminpass";
server.properties
# 注释
#listeners=PLAINTEXT://0.0.0.0:9092
listeners=SASL_PLAINTEXT://localhost:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
allow.everyone.if.no.acl.found=false
# 参数 allow.everyone.if.no.acl.found
# 设置为 true,ACL 机制改为黑名单机制,只有黑名单中的用户无法访问
# 设置为 false,ACL 机制改为白名单机制,只有白名单中的用户可以访问,默认值为 false
auto.create.topics.enable=false
super.users=User:admin
zookeeper.set.acl=true
zk_server_jaas.conf
vi zk_server_jaas.conf
Client {
  org.apache.zookeeper.server.auth.DigestLoginModule required
    user_admin="adminpass";
};
Server {
  org.apache.zookeeper.server.auth.DigestLoginModule required
    user_admin="adminpass";
};
zookeeper.properties末尾追加如下配置
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000
/home/kafka/kafka_2.11-2.1.0/bin
.
├── kafka-console-consumer.sh
├── kafka-console-producer.sh
├── kafka-server-start.sh
├── kafka-topics.sh
├── zookeeper-security-migration.sh
├── zookeeper-server-start.sh
kafka-console-consumer.sh、kafka-console-producer.sh
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export KAFKA_HEAP_OPTS="-Xmx512M" # 找到当前行,追加如下内容
fi
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export KAFKA_HEAP_OPTS="-Xmx512M -Djava.security.auth.login.config=/home/kafka/kafka_2.11-2.1.0/config/kafka_client_jaas.conf"
fi
kafka-server-start.sh
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G" # 找到当前行,追加如下内容
fi
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G -Djava.security.auth.login.config=/home/kafka/kafka_2.11-2.1.0/config/kafka_server_jaas.conf"
fi
kafka-topics.sh
exec $(dirname $0)/kafka-run-class.sh kafka.admin.TopicCommand "$@" # 找到当前行,在上面追加如下内容
export KAFKA_OPTS="-Djava.security.auth.login.config=/home/kafka/kafka_2.11-2.1.0/config/kafka_server_jaas.conf"

exec $(dirname $0)/kafka-run-class.sh kafka.admin.TopicCommand "$@"
zookeeper-security-migration.sh
exec $(dirname $0)/kafka-run-class.sh kafka.admin.ZkSecurityMigrator "$@" # 找到当前行,在上面追加如下内容
export KAFKA_OPTS="-Djava.security.auth.login.config=/home/kafka/kafka_2.11-2.1.0/config/kafka_server_jaas.conf"

exec $(dirname $0)/kafka-run-class.sh kafka.admin.ZkSecurityMigrator "$@"
zookeeper-server-start.sh
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export KAFKA_HEAP_OPTS="-Xmx512M -Xms512M " # 找到当前行,追加如下内容
fi
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export KAFKA_HEAP_OPTS="-Xmx512M -Xms512M -Djava.security.auth.login.config=/home/kafka/kafka_2.11-2.1.0/config/zk_server_jaas.conf"
fi
3. 启动服务

强烈建议准备全新安装之后的 *** 作系统环境,避免重复验证时出现干扰因素,浪费时间排查问题

cd /home/kafka/kafka_2.11-2.1.0/

# 新开终端,执行如下命令
./bin/kafka-server-start.sh config/server.properties

# 新开终端,执行如下命令
./bin/zookeeper-server-start.sh config/zookeeper.properties 

# 新开终端,执行如下命令,创建成功后,再启动生产者、消费者
./bin/kafka-topics.sh  --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

# 新开终端,执行如下命令
./bin/kafka-console-producer.sh  --broker-list localhost:9092 --producer.config=cig/producer.properties --topic test

# 新开终端,执行如下命令
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer.config config/consumer.properties --topic test --from-beginning 
参考文档
  • 写的太好了=》给 Kafka 配置 SASL/PLAIN 认证 | Kyle’s Blog

    本人找到这篇文章前,确实也遇到了相同的问题
    定义用户的方式非常奇葩,很难理解。username 和 password 两字段定义 Kafka brokers 内部沟通的用户密码,user_用户名 配置的是 client 连接 broker 时用的用户、密码。据我尝试,必须定义一个 username 用户对应的 user_ 字段,否则连不上。就像上面,有个 username=”testuser” ,所以必须再定义一次 user_testuser 且密码保持一致。此外可以再添加新用户,如添加 user_alice=”alice-secret” 。

  • 10.0 Zookeeper 权限控制 ACL | 菜鸟教程 (runoob.com)

  • Apache Kafka

  • Welcome — ZooNavigator Docs (elkozmon.com)

    docker run \
      --rm \
      -e HTTP_PORT=9000 \
      --name zoonavigator \
      -p 9000:9000 \
      elkozmon/zoonavigator:latest
    

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/883743.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-14
下一篇 2022-05-14

发表评论

登录后才能评论

评论列表(0条)

保存