2 项目结构 3 代码 3.1 配置文件和Kafka服务器所需配置org.springframework.boot spring-boot-starter-weborg.springframework.kafka spring-kafkaorg.projectlombok lombok1.18.20 com.alibaba fastjson1.2.76 org.springframework.boot spring-boot-starter-testtest org.springframework.kafka spring-kafka-testtest
application.properties
server.port=8080 #制定kafka代理地址 spring.kafka.bootstrap-servers=8.131.57.161:9092 #消息发送失败重试次数 spring.kafka.producer.retries=0 #每次批量发送消息的数量 spring.kafka.producer.batch-size=16384 #每次批量发送消息的缓冲区大小 spring.kafka.producer.buffer-memory=335554432 # 指定消息key和消息体的编解码方式 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer # 指定默认消费者group id spring.kafka.consumer.group-id=user-log-group spring.kafka.consumer.bootstrap-servers=8.131.57.161:9092 spring.kafka.consumer.auto-offset-reset=earliest spring.kafka.consumer.enable-auto-commit=true spring.kafka.consumer.auto-commit-interval=100 # 指定消息key和消息体的编解码方式 spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
Kafka服务器所需配置,server.properties文件
# 33行左右 0.0.0.0代表允许外部端口连接 listeners=PLAINTEXT://0.0.0.0:9092 # 36行左右 ip代表外部代理地址 advertised.listeners=PLAINTEXT://8.131.57.161:90923.2 生产者和实体类代码
Student.java
@Data @Accessors(chain = true) @NoArgsConstructor @AllArgsConstructor public class Student implements Serializable { private String id; private String name; private String context; }
StudentService.java
public interface StudentService { void stuSayHello(Student student); }
StudentServiceImpl.java
@Service public class StudentServiceImpl implements StudentService { @Autowired private KafkaTemplate kafkaTemplate; private static final String STU_TOPIC = "stu.sayHello"; @Override public void stuSayHello(Student student) { Student stu = new Student("1", "zs", "Hello Ls."); kafkaTemplate.send(STU_TOPIC, JSON.toJSONString(stu)); } }3.3 消费者代码
MyKafkaListener.java
@Component public class MyKafkaListener { private static final String STU_TOPIC = "stu.sayHello"; @KafkaListener(topics = {STU_TOPIC}) public void stuTopicConsumer(ConsumerRecord consumerRecord) { Optional kafkaMsg = Optional.ofNullable(consumerRecord.value()); if (kafkaMsg.isPresent()) { Object msg = kafkaMsg.get(); System.err.println(msg); } } }3.4 测试
@SpringBootTest class SpKafkaApplicationTests { @Autowired private StudentService studentService; @Test void contextLoads() throws Exception{ for (int i = 0; i < 900000; i++) { studentService.stuSayHello(new Student()); } } }
玩转Kafka—Golang整合Kafka
几个常见的Go整合Kafka客户端工具:我们本次使用的是Shopify
-
Shopify:https://github.com/Shopify/sarama
-
Big Data Open Source Security:https://github.com/stealthly/go_kafka_client
-
OptioPay:https://github.com/optiopay/kafka
https://github.com/nuance/kafka
https://github.com/jdamick/kafka.go
-
Confluent:https://github.com/confluentinc/confluent-kafka-go
Docs: http://docs.confluent.io/current/clients/index.html
-
Travis Bischel: https://pkg.go.dev/github.com/twmb/kafka-go/pkg/kgo
1 新建go modules 2 项目结构 3 生产者代码ps:配置go get代理(类似于Maven配置阿里云镜像)教程:
https://goproxy.io/zh/docs/getting-started.html
KakaProducer.go
package main import ( "fmt" "github.com/Shopify/sarama" "time" ) //消息生产者 func main() { //获取配置类 config := sarama.NewConfig() //配置类实例(指针类型) config.Producer.RequiredAcks = sarama.WaitForAll //代理需要的确认可靠性级别(默认为WaitForLocal) config.Producer.Partitioner = sarama.NewRandomPartitioner //生成用于选择要发送消息的分区的分区(默认为散列消息键)。 config.Producer.Return.Successes = true //如果启用,成功传递的消息将在成功通道(默认禁用)。 //获取客户端对象 client, err := sarama.NewSyncProducer([]string{"8.131.57.161:9092"}, config) if err != nil { //获取客户端失败 fmt.Println("producer close, err:", err) return } //延迟执行,类似于栈,等到其他代码都执行完毕后再执行 defer client.Close() //一直循环 for { //获取Message对象 msg := &sarama.ProducerMessage{} //设置topic msg.Topic = "go_kafka" //设置Message值 msg.Value = sarama.StringEncoder("this is a good test, my message is good") //发送消息,返回pid、片偏移 pid, offset, err := client.SendMessage(msg) //发送失败 if err != nil { fmt.Println("send message failed,", err) return } //打印返回结果 fmt.Printf("pid:%v offset:%vn", pid, offset) //线程休眠下 time.Sleep(10 * time.Second) } }4 消费者代码
KafkaConsumer.go
package main import ( "fmt" "github.com/Shopify/sarama" "strings" "sync" "time" ) var ( wg sync.WaitGroup //同步等待组 //在类型上,它是一个结构体。一个WaitGroup的用途是等待一个goroutine的集合执行完成。 //主goroutine调用了Add()方法来设置要等待的goroutine的数量。 //然后,每个goroutine都会执行并且执行完成后调用Done()这个方法。 //与此同时,可以使用Wait()方法来阻塞,直到所有的goroutine都执行完成。 ) func main() { //获取消费者对象 可以设置多个IP地址和端口号,使用逗号进行分割 consumer, err := sarama.NewConsumer(strings.Split("8.131.57.161:9092", ","), nil) //获取失败 if err != nil { fmt.Println("Failed to start consumer: %s", err) return } //对该topic进行监听 partitionList, err := consumer.Partitions("go_kafka") if err != nil { fmt.Println("Failed to get the list of partitions: ", err) return } //打印分区 fmt.Println(partitionList) //获取分区和片偏移 for partition := range partitionList { pc, err := consumer.ConsumePartition("go_kafka", int32(partition), sarama.OffsetNewest) if err != nil { fmt.Printf("Failed to start consumer for partition %d: %sn", partition, err) return } //延迟执行 defer pc.AsyncClose() //启动多线程 go func(pc sarama.PartitionConsumer) { wg.Add(1) //获得message的信息 for msg := range pc.Messages() { fmt.Printf("Partition:%d, Offset:%d, Key:%s, Value:%s", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value)) fmt.Println() } wg.Done() }(pc) } //线程休眠 time.Sleep(10 * time.Second) wg.Wait() consumer.Close() }5 测试
参考文章:https://www.cnblogs.com/angelyan/p/10800739.html
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)