配置SASL/PLAINTEXT MECHANISM为SCRAM-SHA512
Kafka加密 配置SASL+ACL 一、 SASL配置 1. 修改zoo.cfg配置文件,开启zk的SASL认证requireClientAuthScheme=sasl
2. 创建kafka-broker-jaas.conf文件,为kafka添加认证信息kafksServer 中的username和password是broker之间通信
Client 是客户端的username和password,除了配置文件方式,也可以通过命令创建(后面会讲)
KafkaServer { org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="admin1234"; }; Client { org.apache.zookeeper.server.auth.DigestLoginModule required username="kafka" password="kafka1234"; };3. 在kafka的sasl_server.properties配置文件中开启SASL认证
#设置本例中admin为超级用户 super.users=User:admin;User:kafka #启用SCRAM机制,采用SCRAM-SHA-512算法 sasl.enabled.mechanisms=SCRAM-SHA-512 #为broker间通讯开启SCRAM机制,采用SCRAM-SHA-512算法 sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512 #broker间通讯使用PLAINTEXT,本例中不演示SSL配置 security.inter.broker.protocol=SASL_PLAINTEXT #配置listeners使用SASL_PLAINTEXT listeners=SASL_PLAINTEXT://192.168.1.88:9092 #配置advertised.listeners advertised.listeners=SASL_PLAINTEXT://192.168.1.88:9092 security.protocol=SASL_SSL4. 修改kafka启动脚本,加载指定的properties文件 及 读取认证配置文件,这一行代码 放在zkEnv.sh脚本的最前面
export KAFKA_OPTS="-Djava.security.auth.login.config=/home/impdatahd/kafka_2.12-2.4.0/config/kafka-broker-jaas.conf"
5.sh kafka-start.sh 启动kafka server,并验证启动成功 二、 ACL配置 1. 修改zoo.cfg配置文件,开启zk的ACL认证authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
2. 在kafka的sasl_server.properties配置文件中开启ACL认证# 启用ACL authorizer.class.name=kafka.security.authorizer.AclAuthorizer3. ACL动态创建用户、分配用户组 及 topic读、写赋权命令
创建账号:
sh kafka-configs.sh --zookeeper localhost:2181/kafka240 --alter --add-config 'SCRAM-SHA-512=[password=sasl_user_1_pwd]' --entity-type users --entity-name sasl_user_1
添加账号写权限:
sh kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181/kafka240 --add --allow-principal User:sasl_user_1 --operation Write --topic kafka_sasl_2
添加账号读权限:
sh kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181/kafka240 --add --allow-principal User:test3read --operation Read --topic kafka_sasl_6
创建Group:
sh kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181/kafka240 --add --allow-principal User:sasl_user_1 --group kafka-acls-group4. 验证步骤3是否正确,通过kafka自带的producer、consumer命令测试
4.1 创建生产者配置文件 producer.properties
security.protocol=SASL_PLAINTEXT sasl.mechanism=SCRAM-SHA-512 sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka" password="kafka1234";
4.2 创建消费者配置文件 consumer.properties
bootstrap.servers=localhost:9092 group.id=kafka-acls-group security.protocol=SASL_PLAINTEXT sasl.mechanism=SCRAM-SHA-512 sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="test3read" password="test3read1234";
4.3 生产 和 消费命令
生产:kafka-console-producer.sh --broker-list loyx01:9092 --topic sasl_test_one --producer.config /home/impdatahd/kafka_2.12-2.4.0/config/p_sasl.properties
消费:kafka-console-consumer.sh --bootstrap-server loyx01:9092 --topic behavior_log_andr_test --consumer.config /home/impdatahd/kafka_2.12-2.4.0/config/c_sasl.properties
三、 常见问题
- kafka启动时报错: ERROR SASL authentication failed using login context ‘Client’ with exception: {}
解决:
- 使用windows拖拽的方式会导致有无法看见的结束符,不识别conf文件而造成失败。启动时没有加载zookeeper_sasl.conf文件报错
利用flume进行采集需要做以下修改。
1)创建jaas文件
需要和kafka配置保持一致!
KafkaServer { org.apache.kafka.common.security.scram.ScramLoginModule required username="youradminusername" password="youradminpwd"; }; Client { org.apache.zookeeper.server.auth.DigestLoginModule required username="yourusername" password="yourpwd"; };
2)修改flume-env.sh
JAVA_OPTS="$JAVA_OPTS -Djava.security.auth.login.config=/home/impdatahd/flume-1.9.0/conf/kafka-broker-jaas.conf"
3)编写flume脚本
# 命名每个组件 a1代表agent的名称 #a1.sources代表a1中配置的source,多个使用空格间隔 #a1.sinks代表a1中配置的sink,多个使用空格间隔 #a1.channels代表a1中配置的channel,多个使用空格间隔 a1.sources = r1 a1.sinks = k1 a1.channels = c1 #source a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource a1.sources.r1.batchSize = 1000 a1.sources.r1.batchDurationMillis = 1000 a1.sources.r1.kafka.bootstrap.servers = 192.168.1.88:9092,192.168.1.89:9092,192.168.1.90:9092 a1.sources.r1.kafka.topics = kafka_sasl_1 a1.sources.r1.kafka.consumer.group.id = kafka-acls-group a1.sources.r1.kafka.consumer.security.protocol = SASL_PLAINTEXT a1.sources.r1.kafka.consumer.sasl.mechanism = SCRAM-SHA-512 a1.sources.r1.kafka.consumer.sasl.jaas.config =org.apache.kafka.common.security.scram.ScramLoginModule required username="yourusername" password="yourpwd"; a1.sources.r1.kafka.consumer.auto.offset.reset = earliest #channel a1.channels.c1.type = memory a1.channels.c1.capacity = 100000 a1.channels.c1.transactionCapacity = 10000 a1.channels.c1.byteCapacityBufferPercentage = 20 a1.channels.c1.byteCapacity = 6912212 # 配置sink a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path=/origin_data/test/sasl_test/%Y-%m-%d #上传文件的前缀 a1.sinks.k1.hdfs.filePrefix = sasl- a1.sinks.k1.hdfs.batchSize= 7500 a1.sinks.k1.hdfs.minBlockReplicas=1 #配置文件滚动 # 30MIN a1.sinks.k1.hdfs.rollInterval = 1800 #128M after codec a1.sinks.k1.hdfs.rollSize = 1580484745 a1.sinks.k1.hdfs.rollCount = 0 a1.sinks.k1.hdfs.fileType=DataStream # 绑定和连接组件 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
启动flume即可!
采集后如图所示
1)exporter
利用github开源项目做数据收集,分布式部署到kafka所在节点。
exporter开源项目 采用版本v.1.13.1
将源文件编译后得到其脚本文件,给予可执行权限后运行以下语句开启监控。
grafana面板地址
ID:10736
下面是启停命令。
# --kafka.server=kafka_broker_address # --kafka.version=kafka_version # --log.leve=日志等级 nohup ./kafka_exporter --kafka.server=192.168.1.88:9092 --kafka.server=192.168.1.89:9092 --kafka.server=192.168.1.90:9092 --kafka.version=2.4.0 --sasl.enabled --sasl.mechanism=scram-sha512 --sasl.username=admin --sasl.password=admin1234 --tls.insecure-skip-tls-verify --log.level=info > kafka_exporter.log --web.listen-address=:29092 & ps -ef | grep kafka_exporter| grep -v grep | awk '{print $2}'| xargs kill
2)promethus
需要对promethus做相关配置。
global: scrape_interval: 60s evaluation_interval: 60s scrape_configs: - job_name: 'prometheus' static_configs: - targets: ['localhost:9090'] labels: instance: prometheus - job_name: 'linux' metrics_path: "/metrics" static_configs: - targets: ['192.168.1.88:9100','192.168.1.89:9100','192.168.1.90:9100','192.168.1.91:9100'] - job_name: 'kafka_exporter' metrics_path: "/metrics" scrape_interval: 5s static_configs: - targets: ['192.168.1.88:29092','192.168.1.89:29092','192.168.1.90:29092'] - job_name: 'flume_exporter' metrics_path: "/metrics" scrape_interval: 5s static_configs: - targets: ['192.168.1.89:9360','192.168.1.90:9360测试 javademo关键代码 application.yml
server: port: 9000 spring: kafka: consumer: bootstrap-servers: 192.168.1.88:9092 group-id: kafka_test auto-offset-reset: earliest key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer producer: bootstrap-servers: 192.168.1.88:9092 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer acks: 1 retries: 3 buffer-memory: 33554432 batch-size: 16384 properties: security.protocol: SASL_PLAINTEXT sasl.mechanism: SCRAM-SHA-512 ssl.endpoint.identification.algorithm: "" sasl.jaas.config: org.apache.kafka.common.security.scram.ScramLoginModule required username='kafka' password='kafka1234';Controller
package com.imprexion.test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; @RestController @RequestMapping(value = "/kafka") public class KafkaController { private final Producer producer; @Autowired KafkaController(Producer producer) { this.producer = producer; } @PostMapping(value = "/publish") public void sendMessageToKafkaTopic(@RequestParam("message") String message) { this.producer.sendMessage(message); } } Producer package com.imprexion.test; import javafx.scene.input.DataFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Service; import java.sql.Timestamp; import java.text.SimpleDateFormat; import java.util.Date; @Service public class Producer { private static final Logger logger = LoggerFactory.getLogger(Producer.class); private static final String TOPIC = "kafka_sasl_1"; @Autowired private KafkaTemplateConsumerkafkaTemplate; public void sendMessage(String message) { logger.info(String.format("#### -> Producing message -> %s", message)); for (int i = 0; i < 10000; i++) { SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String format = simpleDateFormat.format(new Date()); long time = System.currentTimeMillis(); message = "{ "a": " " + format + ""," + " "app_version": "4.2.1"," + " "device_id": "1f43f912c2"," + " "e": "faceID_launch"," + " "faceImage": ""," + " "p": {" + " "package": "com.imprexion.member"," + " "page": "com.Orbbec.MagicSalad2"" + " "valume": "" + i + "," + " }," + " "package_name": "com.imprexion.service.facerecognition"," + " "pre_login_id": "1f43f912c2_1626330120074"," + " "source_channel": "com.imprexion.aibar"," + " "t": "" + time + "," + " "uid": -1," + " "v": 1," + " "st":"" + time + "}"; this.kafkaTemplate.send(TOPIC, message); } } }
package com.imprexion.test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Service; import java.io.IOException; @Service public class Consumer { private final Logger logger = LoggerFactory.getLogger(Producer.class); @KafkaListener(topics = "kafka_sasl_1", groupId = "kafka-acls-group") public void consume(String message) throws IOException { logger.info(String.format("#### -> Consumed message -> %s", message)); } } Application package com.imprexion.sendmsg; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.ComponentScan; @SpringBootApplication @ComponentScan(basePackages = {"com.imprexion.test"}) public class SendmsgApplication { public static void main(String[] args) { SpringApplication.run(SendmsgApplication.class, args); } }
访问地址
http://localhost:9000/kafka/publish?message=hello
监控图表展示
数据生产前
数据生产后
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)