listeners=SASL_PLAINTEXT://192.168.43.209:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
allow.everyone.if.no.acl.found=true
super.users=User:root
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
创建新的配置文件vi /home/wucan/kafka/kafka_2.11-1.0.0/config/kafka_server_jaas.conf
KafkaServer{
org.apache.kafka.common.security.plain.PlainLoginModule required
username="kafka"
password="kafkapswd"
user_ kafkaa(用户名)="kafkaapswd"(密码)
user_ kafkab(用户名)=" kafkabpswd"(密码)
user_ kafkac(用户名)=" kafkacpswd"(密码)
user_ kafkad(用户名)=" kafkadpswd"(密码)
}
修改执行文件vi /home/wucan/kafka/kafka_2.11-1.0.0/bin/kafka-server-start.sh
if ["x$KAFKA_OPTS" ]then
export KAFKA_OPTS="-Djava.security.auth.login.config=/home/wucan/kafka/kafka_2.11-1.0.0/config/kafka_server_jaas.conf"
fi
修改执行文件vi /home/wucan/kafka/kafka_2.11-1.0.0/bin/kafka-run-class.sh
KAFKA_SASL_OPTS='-Djava.security.auth.login.config=/home/wucan/kafka/kafka_2.11-1.0.0/config/kafka_server_jaas.conf'
if ["x$DAEMON_MODE" = "xtrue" ]then
nohup $JAVA $KAFKA_HEAP_OPTS$KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_SASL_OPTS $KAFKA_JMX_OPTS$KAFKA_LOG4J_OPTS -cp $CLASSPATH $KAFKA_OPTS "$@" >"$CONSOLE_OUTPUT_FILE" 2>&1 </dev/null &
else
exec $JAVA $KAFKA_HEAP_OPTS$KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_SASL_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH$KAFKA_OPTS "$@"
fi
创建新的配置文件vi /home/wucan/kafka/kafka_2.11-1.0.0/config/kafka_client_jaas.conf
KafkaClient{
org.apache.kafka.common.security.plain.PlainLoginModule required
username=" kafkaa"
password=" kafkaapswd"
}
修改执行文件
vi /home/wucan/kafka/kafka_2.11-1.0.0/bin/kafka-console-consumer.sh
vi /home/wucan/kafka/kafka_2.11-1.0.0/bin/kafka-console-producer.sh
if ["x$KAFKA_OPTS" ]then
export KAFKA_OPTS="-Djava.security.auth.login.config=/home/wucan/kafka/kafka_2.11-1.0.0/config/kafka_client_jaas.conf"
fi
运行jar包的服务器的指定路径下必须有kafka_ client_ jaas.conf文件
在程序中添加如下配置
System.setProperty("java.security.auth.login.config","xx/kafka_client_jaas.conf")
props.put("security.protocol","SASL_PLAINTEXT")
props.put("sasl.mechanism","PLAIN")
问题描述:发布消息、订阅消息时,出现如下错误,WARN [Consumer clientId=consumer-1, groupId=console-consumer-20752]Error while fetching metadata with correlation id 2 :{test2=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
解决方法:各客户端的用户名设置为相同,多个客户端同时管理会产生冲突。
重启
kafka-manager 允许用户执行很多管理工作,如创建topic、preferred leader选举以及分区重分配等。不过在实际的生产环境中这可能是一把双刃剑,因为这意味着任何能够访问 kafka-manager 的用户都能执行这些 *** 作。对于运维人员来说,这往往是不能容忍的,特别是后面两种 *** 作对生产环境的冲击极大,一定要谨慎使用。
下面已常用的选项作说明
下面对画方框的三列做着重解释。
注意如下这种情况也是不计算作倾斜的。
此时,broker2 拥有 3 个 leader 分区,超过平均范围的 2 个,所以 broker2 就 Leader 分区倾斜了,倾斜率 1/5=20%。
用下图举例说明:
上面三个参数对于衡量 topic 的稳定性有重要的影响:
Broker Skew : 反映 broker 的 I/O 压力,broker 上有过多的副本时,相对于其他 broker ,该 broker 频繁的从 Leader 分区 fetch 抓取数据,磁盘 *** 作相对于其他 broker 要多,如果该指标过高,说明 topic 的分区均不不好,topic 的稳定性弱;
Broker Leader Skew :数据的生产和消费进程都至于 Leader 分区打交道,如果 broker 的 Leader 分区过多,该 broker 的数据流入和流出相对于其他 broker 均要大,该指标过高,说明 topic 的分流做的不够好;
Under Replicated : 该指标过高时,表明 topic 的数据容易丢失,数据没有复制到足够的 broker 上。
下面着重讲述红框部分:
上述是关于“优先副本”的相关描述,即在理想的状态下,分区的 leader 最好是 “优先副本”,这样有利于保证集群中 broker 的领导权比较均衡。重新均衡集群的 leadership 可采用 kafka manager 提供的工具:
一般而言,手动调整、系统自动分配分区和添加分区之后,都需要调用 Reassign Partition 。
kafka manager 能够获取到当前消费 kafka 集群消费者的相关信息。
参考
[1]: Kafka副本同步机制理解
[2]: kafka维护工具使用指南
[3]: kafka 中文文档
[4]: Replication tools
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)