[root@CentOSA kafka-eagle]# ll 总用量 0 drwxr-xr-x. 2 root root 33 1月 2 22:26 bin drwxr-xr-x. 2 root root 62 1月 2 22:26 conf drwxr-xr-x. 2 root root 6 9月 13 01:12 db drwxr-xr-x. 2 root root 23 1月 2 22:26 font drwxr-xr-x. 9 root root 91 9月 13 01:12 kms drwxr-xr-x. 2 root root 6 9月 13 01:12 logs3.配置文件
[root@CentOSA conf]# cat system-config.properties ###################################### # multi zookeeper & kafka cluster list # Settings prefixed with 'kafka.eagle.' will be deprecated, use 'efak.' instead ###################################### efak.zk.cluster.alias=cluster1,cluster2 # 1.修改zookeeper 集群 cluster1.zk.list=CentOSA:2181,CentOSB:2181,CentOSC:2181 # cluster2.zk.list=xdn10:2181,xdn11:2181,xdn12:2181 ###################################### # zookeeper enable acl ###################################### cluster1.zk.acl.enable=false cluster1.zk.acl.schema=digest cluster1.zk.acl.username=test cluster1.zk.acl.password=test123 ###################################### # broker size online list ###################################### cluster1.efak.broker.size=20 ###################################### # zk client thread limit ###################################### kafka.zk.limit.size=32 ###################################### # EFAK webui port 2.服务端口 ###################################### efak.webui.port=8048 ###################################### # kafka jmx acl and ssl authenticate ###################################### cluster1.efak.jmx.acl=false cluster1.efak.jmx.user=keadmin cluster1.efak.jmx.password=keadmin123 cluster1.efak.jmx.ssl=false cluster1.efak.jmx.truststore.location=/data/ssl/certificates/kafka.truststore cluster1.efak.jmx.truststore.password=ke123456 ###################################### # kafka offset storage ###################################### cluster1.efak.offset.storage=kafka # cluster2.efak.offset.storage=zk ###################################### # kafka jmx uri ###################################### cluster1.efak.jmx.uri=service:jmx:rmi:///jndi/rmi://%s/jmxrmi ###################################### # kafka metrics, 15 days by default 3.报表图 这个要开必须开启kafka jmx ###################################### efak.metrics.charts=true efak.metrics.retain=15 ###################################### # kafka sql topic records max ###################################### efak.sql.topic.records.max=5000 efak.sql.topic.preview.records.max=10 ###################################### # delete kafka topic token 4.管理的密码 ###################################### efak.topic.token=keadmin ###################################### # kafka sasl authenticate ###################################### cluster1.efak.sasl.enable=false cluster1.efak.sasl.protocol=SASL_PLAINTEXT cluster1.efak.sasl.mechanism=SCRAM-SHA-256 cluster1.efak.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka" password="kafka-eagle"; cluster1.efak.sasl.client.id= cluster1.efak.blacklist.topics= cluster1.efak.sasl.cgroup.enable=false cluster1.efak.sasl.cgroup.topics= #cluster2.efak.sasl.enable=false #cluster2.efak.sasl.protocol=SASL_PLAINTEXT #cluster2.efak.sasl.mechanism=PLAIN #cluster2.efak.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka" password="kafka-eagle"; #cluster2.efak.sasl.client.id= #cluster2.efak.blacklist.topics= #cluster2.efak.sasl.cgroup.enable=false #cluster2.efak.sasl.cgroup.topics= ###################################### # kafka ssl authenticate ###################################### cluster3.efak.ssl.enable=false cluster3.efak.ssl.protocol=SSL cluster3.efak.ssl.truststore.location= cluster3.efak.ssl.truststore.password= cluster3.efak.ssl.keystore.location= cluster3.efak.ssl.keystore.password= cluster3.efak.ssl.key.password= cluster3.efak.ssl.endpoint.identification.algorithm=https cluster3.efak.blacklist.topics= cluster3.efak.ssl.cgroup.enable=false cluster3.efak.ssl.cgroup.topics= ###################################### # kafka sqlite jdbc driver address ###################################### efak.driver=org.sqlite.JDBC efak.url=jdbc:sqlite:/hadoop/kafka-eagle/db/ke.db efak.username=root efak.password=www.kafka-eagle.org ###################################### # kafka mysql jdbc driver address 5.插件依赖的数据库 ###################################### efak.driver=com.mysql.cj.jdbc.Driver efak.url=jdbc:mysql://127.0.0.1:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull efak.username=root efak.password=root4.配置KE_HOME
因为启动的时候需要这个变量;
二. Springboot 项目集成 1.pom.xml2.配置文件4.0.0 com.sff springbootkafka1.0-SNAPSHOT org.springframework.boot spring-boot-starter-parent2.2.6.RELEASE org.springframework.boot spring-boot-starterorg.springframework.kafka spring-kafkaorg.springframework.boot spring-boot-starter-testtest 8 8
spring.kafka.bootstrap-servers=CentOSA:9092,CentOSB:9092,CentOSC:9092 #producer spring.kafka.producer.retries=5 spring.kafka.producer.acks=all spring.kafka.producer.batch-size=16384 spring.kafka.producer.buffer-memory=33554432 ## 开启事务 如果 1.代码里面使用事务的api 2.使用transaction 若无发送数据会报错 spring.kafka.producer.transaction-id-prefix=transaction-id- spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.properties.enable.idempotence=true # consumer spring.kafka.consumer.group-id=springboot-kafka spring.kafka.consumer.auto-offset-reset=earliest spring.kafka.consumer.enable-auto-commit=true spring.kafka.consumer.auto-commit-interval=100 spring.kafka.consumer.properties.isolation.level=read_committed spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer ##批量消费 # 若没有配置 则每次消费一个 # 若配置 KafkaListeners 注解的方法 就不是一个对象了 而是一个List 使用一个对象会报错 spring.kafka.listener.type=batch spring.kafka.listener.poll-timeout=1000 spring.kafka.listener.concurrency=1 spring.kafka.consumer.max-poll-records=203.监听kafka代码 3.1 则每次消费一个
@KafkaListeners( value = { @KafkaListener(topics = {"topic06"}) } ) public void recevice3(ConsumerRecord3.2 每次消费多个record) { System.out.println("topic06: "+record.value()); }
@KafkaListeners( value = { @KafkaListener(topics = {"topic06"}) } ) public void recevice3(List4.发送kafka代码 4.1 发送没有事务> records) { System.out.println("consumer size:"+records.size()); for (ConsumerRecord record : records) { System.out.println("topic06: " + record.value()); } }
public void sendMessage(String topic ,String value) throws InterruptedException { ProducerRecord4.2 发送携带事务一record = new ProducerRecord<>(topic, value); kafkaTemplate.send(record); }
public void testSend(){ kafkaTemplate.executeInTransaction(new KafkaOperations.OperationsCallback4.3 发送携带事务二() { @Override public Object doInOperations(KafkaOperations operations) { for (int i = 0; i < 50; i++) { ProducerRecord record = new ProducerRecord<>("topic06", "testSend"+i); kafkaTemplate.send(record); } return null; } }); }
@Transactional public void sendMessage(String topic, String value) { ProducerRecordrecord = new ProducerRecord<>(topic, value); kafkaTemplate.send(record); }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)