- 1. 阻塞队列
- 2. Kafka入门
- 2.1 Kafka术语解释
- 2.2 Windows下修改配置
- 2.3 Windows下使用Kafka
- 3. Spring整合Kafka
- 4. 发送系统通知
- 4.1 封装事件对象
- 4.2 开发事件的生产者
- 4.3 开发事件的消费者
- 4.4 处理评论事件-CommentController
- 4.5 处理点赞事件-LikeController
- 4.6 处理关注事件-FollowController
- 4.7 测试
- 5. 显示系统通知
- 5.1 通知列表
- 5.1.1 数据层
- 5.1.2 服务层
- 5.1.3 视图层
- 5.2 通知详情
- 5.2.1 数据层
- 5.2.2 服务层
- 5.2.3 视图层
- 5.3 页面头部显示所有的未读消息数量
- BlockingQueue
- 解决线程通信问题
- 阻塞方法:put、take
- 生产者消费者模式
- 生产者:产生数据的线程
- 消费者:使用数据的线程
- 实现类
- ArrayBlockingQueue
- linkedBlocikingQueue
- PriorityBlockingQueue、SynchronousQueue、DelayQueue等
面试常问:写一个生产者消费者实现模式
package com.ateam.community; import sun.net.www.protocol.http.HttpURLConnection; import java.util.Random; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; public class BlockingQueueTests { public static void main(String[] args){ BlockingQueue queue = new ArrayBlockingQueue(10); new Thread(new Producer(queue)).start(); new Thread(new Customer(queue)).start(); new Thread(new Customer(queue)).start(); new Thread(new Customer(queue)).start(); new Thread(new Customer(queue)).start(); new Thread(new Customer(queue)).start(); } } class Producer implements Runnable{ private BlockingQueue2. Kafka入门queue; public Producer(BlockingQueue queue) { this.queue = queue; } @Override public void run() { try { for (int i = 0; i < 100; i++) { Thread.sleep(20); queue.put(i); System.out.println(Thread.currentThread().getName() + "生产:" + queue.size()); } } catch (InterruptedException e) { e.printStackTrace(); } } } class Customer implements Runnable{ private BlockingQueue queue; public Customer(BlockingQueue queue) { this.queue = queue; } @Override public void run() { try { while (true) { Thread.sleep(new Random().nextInt(1000)); queue.take(); System.out.println(Thread.currentThread().getName() + "消费:" + queue.size()); } } catch (Exception e) { e.printStackTrace(); } } }
- Kafka简介
- Kafka是一个分布式的流媒体平台
- 应用:消息系统、日志收集、用户行为追踪、流式处理
- Kafkat特点
- 高吞吐量、消息持久化、高可靠性、高扩展性
- Kafka术语
- Broker、Zookeeper
- Topic、Partion、Offset
- Leader Replica、Follower Replica
- Broker:Kafka的服务器
- Zookeeper:管理集群
- Topic:点对点模式中每个消费者拿到的消息都不同,发布订阅模式中消费者可能拿到同一份消息。Kafka采用发布订阅模式,生产者把消息发布到的空间(位置)叫做Topic
- Partition:是对topic位置的分区
- Offset:就是消息存放在分区中的索引
- Leader Replica:主副本,可以处理请求
- Follower Replica:从副本,只是用来做备份
Kafka官网:官网
- Kafka下的目录结构
- 修改config目录下的zookeeper的配置文件zookeepe.properties
- 修改config目录下的Kafka的配置文件server.properties
- 启动Zookeeper
D:kafka_2.12-2.8.1>binwindowszookeeper-server-start.bat configzookeeper.properties - 启动Kafka
D:kafka_2.12-2.8.1>binwindowskafka-server-start.bat configserver.properties
Kafka启动以后,在刚才设置的目录,就会发现对应的文件
- 创建主题
查看所有主题,判断是否主题创建是否成功
- 生产者往主题上发送消息
发送两条消息:hello world - 消费者接受消息
说明Kafka已正常工作,可以进一步开发使用了。
- 引入依赖
- spring-kafka
- 配置Kafka
- 配置server、consumer
- 访问Kafka
- 生产者
kafkaTemplate.send(topic, data); - 消费者
@KafkaListener(topics = {“test”})
public void handleMessage(ConsumerRecord record) {}
- 生产者
- 引入依赖
org.springframework.kafka spring-kafka
- Kafka相关配置
spring整合Kafka,在application.properties中配置即可
# kafka # KafkaProperties spring.kafka.bootstrap-servers=localhost:9092 spring.kafka.consumer.group-id=test-consumer-group spring.kafka.consumer.enable-auto-commit=true spring.kafka.consumer.auto-commit-interval=3000
- 测试
在test包下,新建KafkaTests类,进行测试
package com.ateam.community; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringRunner; @RunWith(SpringRunner.class) @SpringBootTest @ContextConfiguration(classes = CommunityApplication.class)//配置类 public class KafkaTests { @Autowired private KafkaProducer kafkaProducer; @Test public void testKafka(){ kafkaProducer.sendMessage("test","hello"); kafkaProducer.sendMessage("test","world"); try { Thread.sleep(1000 * 10); } catch (Exception e) { e.printStackTrace(); } } } @Component class KafkaProducer { @Autowired private KafkaTemplate kafkaTemplate; public void sendMessage(String topic, String content) { kafkaTemplate.send(topic, content); } } @Component class KafkaConsumer { @KafkaListener(topics = {"test"}) public void handleMessage(ConsumerRecord record) { System.out.println(record.value()); } }
生产者主动发送消息,消费者被动接收消息。
4. 发送系统通知- 触发事件
- 评论后,发布通知
- 点赞后,发布通知
- 关注后,发布通知
- 处理事件
- 封装事件对象
- 开发事件的生产者
- 开发事件的消费者
在entity包下,新建一个类Event,用来封装事件
package com.ateam.community.entity; import java.util.HashMap; import java.util.Map; // 事件 public class Event { private String topic;// 主题,即事件的类型 private int userId;// 触发事件的用户id private int entityType;// 实体的类型 private int entityId;// 实体的id private int entityUserId;// 实体的拥有者id private Mapdata = new HashMap<>();// 其他的一些数据、 public String getTopic() { return topic; } // 改造set方法 public Event setTopic(String topic) { this.topic = topic; return this; } public int getUserId() { return userId; } public Event setUserId(int userId) { this.userId = userId; return this; } public int getEntityType() { return entityType; } public Event setEntityType(int entityType) { this.entityType = entityType; return this; } public int getEntityId() { return entityId; } public Event setEntityId(int entityId) { this.entityId = entityId; return this; } public int getEntityUserId() { return entityUserId; } public Event setEntityUserId(int entityUserId) { this.entityUserId = entityUserId; return this; } public Map getData() { return data; } public Event setData(String key, Object value){ this.data.put(key,value); return this; } }
- 注意set方法的修改,是为了方便后续的 *** 作,即可以类似sb.append("").append("wsh");
- 注意Map
data的set方法,又与其他属性不同
新建event包,在包内创建EventProducer类
@Component public class EventProducer { @Autowired private KafkaTemplate kafkaTemplate; // 处理事件,本质上就是发送消息 public void fireEvent(Event event) { // 将事件发布到指定的主题 kafkaTemplate.send(event.getTopic(), JSONObject.toJSONString(event)); } }4.3 开发事件的消费者
先在CommunityConstant类中,增加一些常量:
String TOPIC_COMMENT = "comment"; String TOPIC_LIKE = "like"; String TOPIC_FOLLOW = "follow"; String TOPIC_PUBLISH = "publish"; String TOPIC_DELETe = "delete"; String TOPIC_SHARE = "share"; int SYSTEM_USER_ID = 1;
在event包下, 新建EventConsumer类
package com.ateam.community.event; @Component public class EventConsumer implements CommunityConstant { // 记录日志 private static final Logger logger = LoggerFactory.getLogger(Event.class); @Autowired private MessageService messageService; @KafkaListener(topics = {TOPIC_COMMENT,TOPIC_FOLLOW,TOPIC_LIKE}) public void handleEventMessage(ConsumerRecord record) { if (record == null || record.value() == null) { logger.error("消息的内容为空!"); return; } // 利用fastjson将json字符串转化为Event对象 Event event = JSONObject.parseObject(record.value().toString(), Event.class); if (event == null) { logger.error("消息格式错误!"); return; } // 发送站内通知,主要是构造Message对象 Message message = new Message(); // User表中的id为1的代表系统用户 message.setFromId(SYSTEM_USER_ID); message.setToId(event.getEntityUserId()); message.setConversationId(event.getTopic()); message.setCreateTime(new Date()); HashMap4.4 处理评论事件-CommentControllercontent = new HashMap<>(); content.put("userId",event.getUserId()); content.put("entityType",event.getEntityType()); content.put("entityId", event.getEntityId()); if (!event.getData().isEmpty()) { for (Map.Entry entry : event.getData().entrySet()) { content.put(entry.getKey(),entry.getValue()); } } message.setContent(JSONObject.toJSONString(content)); messageService.addMessage(message); } }
在addComment方法中,怎讲如下代码:
修改后,该方法完整代码如下:
@RequestMapping(value = "/add/{discussPostId}", method = RequestMethod.POST) @LoginRequired public String addComment(@PathVariable("discussPostId") int discussPostId, Comment comment){ comment.setUserId(hostHolder.getUser().getId()); comment.setStatus(0); comment.setCreateTime(new Date()); commentService.addComment(comment); // 触发评论事件 Event event = new Event() .setTopic(TOPIC_COMMENT) .setUserId(hostHolder.getUser().getId()) .setEntityId(comment.getEntityId()) .setEntityType(comment.getEntityType()) .setData("postId",discussPostId); if (comment.getEntityType() == ENTITY_TYPE_POST) { DiscussPost target = discussPostService.findDiscussPostById(comment.getEntityId()); event.setEntityUserId(target.getUserId()); } else if (comment.getEntityType() == ENTITY_TYPE_COMMENT) { Comment target = commentService.findCommentById(comment.getEntityId()); event.setEntityUserId(target.getUserId()); } eventProducer.fireEvent(event); return "redirect:/discuss/detail/" + discussPostId; }4.5 处理点赞事件-LikeController
在LikeController类中like方法,增加如下代码:
修改后,该方法完整代码如下:
@RequestMapping(value = "/like", method = RequestMethod.POST) @ResponseBody public String like(int entityType, int entityId, int entityUserId, int postId) { User user = hostHolder.getUser(); // 点赞 likeService.like(user.getId(),entityType,entityId,entityUserId); // 获得点赞的数量 long likeCount = likeService.findEntityLikeCount(entityType, entityId); // 获得点赞的状态 int likeStatus = likeService.findEntityLikeStatus(user.getId(), entityType, entityId); // 封装返回结果 HashMapmap = new HashMap<>(); map.put("likeCount",likeCount); map.put("likeStatus",likeStatus); // 触发点赞事件,取消点赞就不发消息了 if (likeStatus == 1) { Event event = new Event() .setTopic(TOPIC_LIKE) .setUserId(hostHolder.getUser().getId()) .setEntityType(entityType) .setEntityId(entityId) .setEntityUserId(entityUserId) .setData("postId",postId); // 重构了like方法,添加了postId参数 eventProducer.fireEvent(event); } return CommunityUtil.getJSONString(0,null,map); }
为什么需要重构,因为我们需要postId这个参数
当系统给用户发通知时,比如:xxx用户赞了你的帖子,xxx用户赞了你的评论,你需要帖子的id,才能完成“点击查看”时,跳转到帖子页面。
因为重构了like方法,所以我们需要处理一些discuss.js中like方法传参问题:
修改discuss-detail页面
修改discusss.js
在FollowController类中follow方法,增加如下代码:
取消关注,不发消息,不触发事件。
以前写的AOP记录日志报错
空指针异常:
以前访问都是:前端页面 -> controller层 -> service层
现在,出现了Kafka的 消费者,也调用了service层,就出现了空指针异常,此时,没有请求到service层
解决方法:
点赞、关注、评论后,message表中有数据了
- 通知列表
- 显示评论、点赞、关注三种类型的通知
- 通知详情
- 分页显示某一类主题所包含的通知
- 未读消息
- 在页面头部显示所有的未读消息数量
- 在页面头部显示所有的未读消息数量
消息最终都存在message表中。
在dao下MessageMaper类中,新增几个方法
// 查询某个主题下 最新 的通知 Message selectLatestNotice(int userId, String topic); // 查询某个主题所包含的通知数量 int selectNoticeCount(int userId, String topic); // 查询未读的通知的数量 int selectNoticeUnreadCount(int userId, String topic);
在mapper包下的message-mapper.xml中,编写对应的SQL语句
5.1.2 服务层
在service包MessageService类中,添加对应的方法
public Message findLatestNotice(int userId, String topic) { return messageMapper.selectLatestNotice(userId,topic); } public int findNoticeCount(int userId, String topic) { return messageMapper.selectNoticeCount(userId,topic); } public int findNoticeUnreadCount(int userId, String topic) { return messageMapper.selectNoticeUnreadCount(userId, topic); }5.1.3 视图层
- 在controller包下MessageController类中,添加新方法
@RequestMapping(value = "/notice/list", method = RequestMethod.GET) public String getNoticeList(Model model) { User user = hostHolder.getUser(); // 查询评论类的通知 Message message = messageService.findLatestNotice(user.getId(), TOPIC_COMMENT); if (message != null) { MapmessageVO = new HashMap<>(); messageVO.put("message",message); // 将转义字符 转为 原本样式 String content = HtmlUtils.htmlUnescape(message.getContent()); HashMap data = JSONObject.parseObject(content, HashMap.class); messageVO.put("user", userService.findUserById((Integer) data.get("userId"))); messageVO.put("entityType",data.get("entityType")); messageVO.put("entityId",data.get("entityId")); messageVO.put("postId",data.get("postId")); int count = messageService.findNoticeCount(user.getId(), TOPIC_COMMENT); messageVO.put("count",count); int unreadCount = messageService.findNoticeUnreadCount(user.getId(), TOPIC_COMMENT); messageVO.put("unreadCount",unreadCount); model.addAttribute("commentNotice",messageVO); } // 查询点赞类的通知 message = messageService.findLatestNotice(user.getId(), TOPIC_LIKE); if (message != null) { Map messageVO = new HashMap<>(); messageVO.put("message",message); // 将转义字符 转为 原本样式 String content = HtmlUtils.htmlUnescape(message.getContent()); HashMap data = JSONObject.parseObject(content, HashMap.class); messageVO.put("user", userService.findUserById((Integer) data.get("userId"))); messageVO.put("entityType",data.get("entityType")); messageVO.put("entityId",data.get("entityId")); messageVO.put("postId",data.get("postId")); int count = messageService.findNoticeCount(user.getId(), TOPIC_LIKE); messageVO.put("count",count); int unreadCount = messageService.findNoticeUnreadCount(user.getId(), TOPIC_LIKE); messageVO.put("unreadCount",unreadCount); model.addAttribute("likeNotice",messageVO); } // 查询关注类的通知 message = messageService.findLatestNotice(user.getId(), TOPIC_FOLLOW); if (message != null) { Map messageVO = new HashMap<>(); messageVO.put("message",message); // 将转义字符 转为 原本样式 String content = HtmlUtils.htmlUnescape(message.getContent()); HashMap data = JSONObject.parseObject(content, HashMap.class); messageVO.put("user", userService.findUserById((Integer) data.get("userId"))); messageVO.put("entityType",data.get("entityType")); messageVO.put("entityId",data.get("entityId")); int count = messageService.findNoticeCount(user.getId(), TOPIC_FOLLOW); messageVO.put("count",count); int unreadCount = messageService.findNoticeUnreadCount(user.getId(), TOPIC_FOLLOW); messageVO.put("unreadCount",unreadCount); model.addAttribute("followNotice",messageVO); } // 查询未读消息数量 int letterUnreadCount = messageService.findLetterUnreadCount(user.getId(), null); model.addAttribute("letterUnreadCount",letterUnreadCount); int noticeUnreadCount = messageService.findNoticeUnreadCount(user.getId(), null); model.addAttribute("noticeUnreadCount",noticeUnreadCount); return "/site/notice"; }
- 在MessageController类的getLetterList方法中,增加显示通知数量的代码,因为letter.html页面也需要显示系统通知的数量
- 修改页面
修改notice.html页面
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)