2、生产者org.apache.kafka kafka_2.122.1.0 provided org.apache.kafka kafka-clients2.1.1 org.apache.kafka kafka-streams1.0.0 org.apache.commons commons-lang33.12.0
import java.util.Properties; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; public class KafkaProducerTest implements Runnable { private final KafkaProducer2、消费者producer; private final String topic; private String clientid; public KafkaProducerTest(String topicName,String clientid) { Properties props = new Properties(); props.put("bootstrap.servers", "10.1.11.212:32765,10.1.11.212:32766,10.1.11.212:32767"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("key.serializer", StringSerializer.class.getName()); props.put("value.serializer", StringSerializer.class.getName()); this.producer = new KafkaProducer (props); this.topic = topicName; this.clientid = clientid; } @Override public void run() { int messageNo = 1; try { for(;;) { String messageStr= "你好,这是第"+messageNo+"条数据 clientid=" + clientid; producer.send(new ProducerRecord (topic, "Message", messageStr)); //生产了100条就打印 if(messageNo%100==0){ System.out.println("发送的信息:" + messageStr); } //生产1000条就退出 if(messageNo == 1000){ System.out.println("成功发送了"+messageNo+"条"); break; } messageNo++; } } catch (Exception e) { e.printStackTrace(); } finally { producer.close(); } } public static void main(String args[]) { KafkaProducerTest test1 = new KafkaProducerTest("logstash-08-04", "clientid1"); Thread thread1 = new Thread(test1); thread1.start(); } }
package com.example.demo; import java.util.Arrays; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; public class KafkaConsumerTest implements Runnable { private final KafkaConsumer4、logback.xml(可选)consumer; private ConsumerRecords msgList; private final String topic; private String clientid; private static final String GROUPID = "groupA"; public KafkaConsumerTest(String topicName,String clientid) { Properties props = new Properties(); props.put("bootstrap.servers", "10.1.11.212:32765,10.1.11.212:32766,10.1.11.212:32767"); props.put("group.id", GROUPID); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("auto.offset.reset", "earliest"); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName()); this.consumer = new KafkaConsumer (props); this.topic = topicName; this.consumer.subscribe(Arrays.asList(topic)); this.clientid = clientid; } @Override public void run() { int messageNo = 1; System.out.println("---------开始消费---------"); try { for (;;) { msgList = consumer.poll(1000); if(null!=msgList&&msgList.count()>0){ for (ConsumerRecord record : msgList) { //消费100条就打印 ,但打印的数据不一定是这个规律的 if(messageNo%100==0){ System.out.println(messageNo+"=======成功消费:receive: key = " + record.key() + ", value = " + record.value()+" offset==="+record.offset()); } //当消费了1000条就退出 if(messageNo == 1000){ break; } messageNo++; } }else{ Thread.sleep(1000); } } } catch (InterruptedException e) { e.printStackTrace(); } finally { consumer.close(); } } public static void main(String args[]) { KafkaConsumerTest test1 = new KafkaConsumerTest("logstash-08-04", "clientid1"); Thread thread1 = new Thread(test1); thread1.start(); } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)