我不知道第一次犯了什么错误,在再次犯下的错误之后,它仍然可以正常工作。
首先授予所有访问主题的权限:
bin/kafka-acls.sh --add --allow-principals user:ctadmin --operation ALL --topic marchTesting --authorizer-properties zookeeper.connect={hostname}:2181
创建jass文件:kafka-jaas.conf
KafkaClient { com.sun.security.auth.module.Krb5LoginModule required donotprompt=true useTicketCache=true principal="ctadmin@HSCALE.COM" useKeyTab=true serviceName="kafka" keyTab="/etc/security/keytabs/ctadmin.keytab" client=true;};
Java程序:
package com.ct.test.kafka;import java.util.Date;import java.util.Properties;import kafka.javaapi.producer.Producer;import kafka.producer.KeyedMessage;import kafka.producer.ProducerConfig;public class KafkaProducer { public static void main(String[] args) { String topic = args[0]; Properties props = new Properties(); props.put("metadata.broker.list", "{Hostname}:6667"); props.put("serializer.class", "kafka.serializer.StringEnprer"); props.put("request.required.acks", "1"); props.put("security.protocol", "PLAINTEXTSASL"); ProducerConfig config = new ProducerConfig(props); Producer<String, String> producer = new Producer<String, String>(config); for (int i = 0; i < 10; i++){ producer.send(new KeyedMessage<String, String>(topic, "Test Date: " + new Date())); } }}
运行应用程序:
java -Djava.security.auth.login.config = / home / ctadmin / kafka-jaas.conf
-Djava.security.krb5.conf = / etc / krb5.conf
-Djavax.security.auth.useSubjectCredsonly = true -cp kafka -testing-0.0.1-jar-
with-dependencies.jar com.ct.test.kafka.KafkaProducer
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)