BlockingQueue时一个接口,又许多实现类
Kafka入门高吞吐量:Kafka是硬盘顺序读取:硬盘顺序读取高于内存的随机读取。
高可靠性:分布式的集群
高扩展性:加集群很方便
Broker:Kafka的服务器
Zookeeper:独立的软件,用来管理集群(Kafka里面内置有)
Topic:主题(消息队列实现的方式:点对点(每个数据只被一个消费者消费),发布订阅模式(消息可以被多个消费者读取))
Partition:对主题分区,调高效率,每一个分区顺序追加数据
offset:消息再分区内存放的索引,序列
Replica:副本,做备份,分布式的消息引擎,为了数据更可靠,提高容错率。分为Leader(可以处理请求)和Follower(只是备份不能响应)。主副本挂掉后,从众多的从副本里选一个新的主副本
进入Kafka解压后的文件目录
cd C:usersoftkafka_2.12-2.3.0
启动zokeeper
binwindowszookeeper-server-start.bat configzookeeper.properties
启动Kafka
binwindowskafka-server-start.bat configserver.properties
创建主题(代表位置,代表消息类别)
指定那个服务器,一个副本一个分区,主题的名字
kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
查看所有主题
kafka-topics.bat --list --bootstrap-server localhost:9092
调用生产者发送消息
kafka-console-producer.bat --broker-list localhost:9092 --topic test
在另起一个窗口消费者读取消息
消费者消费消息
从哪个服务器,读哪个主题
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning
# 存放路径(看自己情况)
E:javawebworkkafka_2.12-2.8.0
# windows运行路径(看自己的情况)
E:javawebworkkafka_2.12-2.8.0binwindows
# 启动服务器 (先启动zookeeper服务器,再启动kafka) !!!千万不要手动暴力关闭,用下面的命令关闭
binwindowszookeeper-server-start.bat configzookeeper.properties``binwindowskafka-server-start.bat configserver.properties
# 创建主题
kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
# 查看当前服务器的主题
kafka-topics.bat --list --bootstrap-server localhost:9092
# 创建生产者,往指定主题上发消息
kafka-console-producer.bat --broker-list localhost:9092 --topic test
# 消费者
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning
# 关闭zookeeper服务器
zookeeper-server-stop.bat
# 关闭kafka服务器
kafka-server-stop.batSpring整合Kafka
org.springframework.kafka spring-kafka
配置
#kafka spring.kafka.bootstrap-servers=localhost:9092 spring.kafka.consumer.group-id=test-consumer-group #是否自动提交消费者的偏移量(读取消息按偏移量读取) spring.kafka.consumer.enable-auto-commit=true #自动提交的频率 spring.kafka.consumer.auto-commit-interval=3000
测试
@RunWith(SpringRunner.class) @SpringBootTest public class KafkaTest { @Autowired private KafkaProducer kafkaProducer; @Test public void testKafka(){ kafkaProducer.sendMessage("learn","学习"); kafkaProducer.sendMessage("learn","加油"); try { Thread.sleep(2000*10); } catch (InterruptedException e) { e.printStackTrace(); } } } //生产者,交给spring管理,发送消息需要我们手动去调用方法 @Component class KafkaProducer{ @Autowired private KafkaTemplate kafkaTemplate; //发消息方法,主题和内容 public void sendMessage(String topic,String content){ kafkaTemplate.send(topic,content); } } //消费者,消费者处理消息是被动的,当队列中有消息就去处理 @Component class KafkaConsumer{ //要监听的主题 @KafkaListener(topics = {"learn"}) public void handleMessage(ConsumerRecord record){ System.out.println(record.value()); } }发送系统通知
为了提高性能,需要用到消息队列,为什么需要消息队列呢?
评论、点赞、关注不同的事情,三个主题,包装扔到队列里,当前线程(消息的生产者)就可以继续去做别的事情。
生产者和消费者处理消息是可以并发的,称为异步。
业务角度时间为目标
系统通知,后台给用户发,假设后台id(from_id)为1,这个时候conversation_id改存为主题(评论、点赞、关注),不再是两个对话的id,内容存json字符串,里面包含页面上要显示的信息
在相应的事件(评论、点赞、关注)后调用生产者,发送消息,消费者不需要调
为了可以看到帖子详细页面还需要帖子id,因为没有帖子对应的属性,所以将帖子id存到content里面
判断评论的是实体是帖子还是评论,将其存到实体的作者里面
不管是什么根据实体id找到目标
event事件封装好之后,发送消息。之后线程继续向下执行,后序消息的发布有消息队列发布,并行异步,我在处理的同时,另外一个线程也在处理执行
点赞才通知,取消点赞不通知,所以要先判断一下
Kafka出现问题就把日志文件删除
消费者
@Component //消费者 public class EventConsumer implements CommunityConstant { //记日志 private static final Logger logger= LoggerFactory.getLogger(EventConsumer.class); //往消息表里插数据 @Autowired private MessageService messageService; //一个方法,包含三个主题(定义常量引用主题) @KafkaListener(topics = {TOPIC_COMMENT,TOPIC_FOLLOW,TOPIC_LIKE}) public void handleCommentMessage(ConsumerRecord record){ //发了一个空消息 if(record==null || record.value()==null){ logger.error("发送消息为空!"); return; } //将json消息转为对象,指定字符串对应的具体类型 Event event = JSONObject.parseObject(record.value().toString(), Event.class); //转为对象之后再判断 if(event==null){ logger.error("消息格式错误!"); return; } //内容和格式都对之后,发送站内通知 Message message=new Message(); //设置消息的发送者,from_id系统用户规定为1(定义为常量) message.setFromId(SYSTEM_USER_ID); //设置消息的接收者(帖子的作者) message.setToId(event.getEntityUserId()); //存主题 message.setConversationId(event.getTopic()); //当前时间 message.setCreateTime(new Date()); //内容是json字符串,先存map,再转为json字符串存进入 Mapcontent=new HashMap<>(); //事件触发者 content.put("userId",event.getUserId()); //实体的类型(帖子、点赞、关注) content.put("entityType",event.getEntityType()); //实体的id content.put("entityId",event.getEntityId()); //不方便村的字段存到content //判断事件对象有没有值 if(!event.getData().isEmpty()){ //遍历事件对象的map,将其存到content里 for (Map.Entry entry : event.getData().entrySet()) { content.put(entry.getKey(),entry.getValue()); } } //将content转为json字符串 message.setContent(JSONObject.toJSONString(content)); //存消息 messageService.addLetter(message); } }
生产者
@Component public class EventProducer { @Autowired private KafkaTemplate kafkaTemplate; //生产者发送消息(处理事件) public void fireEvent(Event event){ //将事件发布到指定的主题(主题,字符串(事件当中所有的数据)json) kafkaTemplate.send(event.getTopic(), JSONObject.toJSONString(event)); } }
别人评论之后触发事件
//封装触发评论事件,链接评论帖子/评论的详情 Event event=new Event() .setUserId(hostHolder.getUser().getId()) .setEntityType(comment.getEntityType()) .setEntityId(comment.getEntityId()) .setData("postId",discussPostId) .setTopic(TOPIC_COMMENT); if(comment.getEntityType()==ENTITY_TYPE_POST){ DiscussPost target = discussPostService.findDiscussDetail(comment.getEntityId()); event.setEntityUserId(target.getId()); }else if(comment.getEntityType()==ENTITY_TYPE_COMMENT){ Comment target = commentService.findComment(comment.getEntityId()); event.setEntityUserId(target.getId()); } eventProducer.fireEvent(event);显示系统通知
通知存在message里面
每个请求都有消息链接,用拦截器处理,实现接口,在controller之后,模板之前post
判断有没有登录,mv有没有携带,查私信和通知,相加传给页面
拦截器写完之后要进行配置
@Override public void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler, ModelAndView modelAndView) throws Exception { //判断有没有登录,mv有没有携带数据 User user = hostHolder.getUser(); if (user!=null && modelAndView!=null){ int unreadCount = messageService.findUnreadCount(user.getId(), null); int noticeUnreadCount = messageService.findNoticeUnreadCount(user.getId(), null); modelAndView.addObject("allUnreadCount",unreadCount+noticeUnreadCount); } } modelAndView) throws Exception { //判断有没有登录,mv有没有携带数据 User user = hostHolder.getUser(); if (user!=null && modelAndView!=null){ int unreadCount = messageService.findUnreadCount(user.getId(), null); int noticeUnreadCount = messageService.findNoticeUnreadCount(user.getId(), null); modelAndView.addObject("allUnreadCount",unreadCount+noticeUnreadCount); } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)