ATeam社区(牛客网项目第五章)

ATeam社区(牛客网项目第五章),第1张

ATeam社区(牛客网项目第五章)

文章目录
  • 1. 阻塞队列
  • 2. Kafka入门
    • 2.1 Kafka术语解释
    • 2.2 Windows下修改配置
    • 2.3 Windows下使用Kafka
  • 3. Spring整合Kafka
  • 4. 发送系统通知
    • 4.1 封装事件对象
    • 4.2 开发事件的生产者
    • 4.3 开发事件的消费者
    • 4.4 处理评论事件-CommentController
    • 4.5 处理点赞事件-LikeController
    • 4.6 处理关注事件-FollowController
    • 4.7 测试
  • 5. 显示系统通知
    • 5.1 通知列表
      • 5.1.1 数据层
      • 5.1.2 服务层
      • 5.1.3 视图层
    • 5.2 通知详情
      • 5.2.1 数据层
      • 5.2.2 服务层
      • 5.2.3 视图层
    • 5.3 页面头部显示所有的未读消息数量

1. 阻塞队列
  • BlockingQueue
    • 解决线程通信问题
    • 阻塞方法:put、take
  • 生产者消费者模式
    • 生产者:产生数据的线程
    • 消费者:使用数据的线程
  • 实现类
    • ArrayBlockingQueue
    • linkedBlocikingQueue
    • PriorityBlockingQueue、SynchronousQueue、DelayQueue等

      面试常问:写一个生产者消费者实现模式
package com.ateam.community;

import sun.net.www.protocol.http.HttpURLConnection;

import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;


public class BlockingQueueTests {

    public static void main(String[] args){
      BlockingQueue queue = new ArrayBlockingQueue(10);
      new Thread(new Producer(queue)).start();
      new Thread(new Customer(queue)).start();
      new Thread(new Customer(queue)).start();
      new Thread(new Customer(queue)).start();
      new Thread(new Customer(queue)).start();
      new Thread(new Customer(queue)).start();
    }
    


}


class Producer implements Runnable{

    private BlockingQueue queue;

    public Producer(BlockingQueue queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            for (int i = 0; i < 100; i++) {
                Thread.sleep(20);
                queue.put(i);
                System.out.println(Thread.currentThread().getName() + "生产:" + queue.size());
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}



class Customer implements Runnable{

    private BlockingQueue queue;

    public Customer(BlockingQueue queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            while (true) {
                Thread.sleep(new Random().nextInt(1000));
                queue.take();
                System.out.println(Thread.currentThread().getName() + "消费:" + queue.size());
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
2. Kafka入门
  • Kafka简介
    • Kafka是一个分布式的流媒体平台
    • 应用:消息系统、日志收集、用户行为追踪、流式处理
  • Kafkat特点
    • 高吞吐量、消息持久化、高可靠性、高扩展性
  • Kafka术语
    • Broker、Zookeeper
    • Topic、Partion、Offset
    • Leader Replica、Follower Replica
2.1 Kafka术语解释
  • Broker:Kafka的服务器
  • Zookeeper:管理集群
  • Topic:点对点模式中每个消费者拿到的消息都不同,发布订阅模式中消费者可能拿到同一份消息。Kafka采用发布订阅模式,生产者把消息发布到的空间(位置)叫做Topic
  • Partition:是对topic位置的分区
  • Offset:就是消息存放在分区中的索引
  • Leader Replica:主副本,可以处理请求
  • Follower Replica:从副本,只是用来做备份
    Kafka官网:官网
2.2 Windows下修改配置
  1. Kafka下的目录结构
  2. 修改config目录下的zookeeper的配置文件zookeepe.properties
  3. 修改config目录下的Kafka的配置文件server.properties
2.3 Windows下使用Kafka
  1. 启动Zookeeper
    D:kafka_2.12-2.8.1>binwindowszookeeper-server-start.bat configzookeeper.properties
  2. 启动Kafka
    D:kafka_2.12-2.8.1>binwindowskafka-server-start.bat configserver.properties
    Kafka启动以后,在刚才设置的目录,就会发现对应的文件
  3. 创建主题

    查看所有主题,判断是否主题创建是否成功
  4. 生产者往主题上发送消息

    发送两条消息:hello world
  5. 消费者接受消息

    说明Kafka已正常工作,可以进一步开发使用了。
3. Spring整合Kafka
  • 引入依赖
    • spring-kafka
  • 配置Kafka
    • 配置server、consumer
  • 访问Kafka
    • 生产者
      kafkaTemplate.send(topic, data);
    • 消费者
      @KafkaListener(topics = {“test”})
      public void handleMessage(ConsumerRecord record) {}

  1. 引入依赖

    org.springframework.kafka
    spring-kafka

  1. Kafka相关配置
    spring整合Kafka,在application.properties中配置即可
# kafka
# KafkaProperties
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=test-consumer-group
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=3000
  1. 测试
    在test包下,新建KafkaTests类,进行测试
package com.ateam.community;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringRunner;


@RunWith(SpringRunner.class)
@SpringBootTest
@ContextConfiguration(classes = CommunityApplication.class)//配置类
public class KafkaTests {

    @Autowired
    private KafkaProducer kafkaProducer;


    @Test
    public void testKafka(){
        kafkaProducer.sendMessage("test","hello");
        kafkaProducer.sendMessage("test","world");
        try {
            Thread.sleep(1000 * 10);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}


@Component
class KafkaProducer {
    @Autowired
    private KafkaTemplate kafkaTemplate;

    public void sendMessage(String topic, String content) {
        kafkaTemplate.send(topic, content);
    }
}

@Component
class KafkaConsumer {
    @KafkaListener(topics = {"test"})
    public void handleMessage(ConsumerRecord record) {
        System.out.println(record.value());
    }
}

生产者主动发送消息,消费者被动接收消息。

4. 发送系统通知
  • 触发事件
    • 评论后,发布通知
    • 点赞后,发布通知
    • 关注后,发布通知
  • 处理事件
    • 封装事件对象
    • 开发事件的生产者
    • 开发事件的消费者
4.1 封装事件对象

在entity包下,新建一个类Event,用来封装事件

package com.ateam.community.entity;

import java.util.HashMap;
import java.util.Map;


// 事件
public class Event {

	
	
    private String topic;// 主题,即事件的类型
    private int userId;// 触发事件的用户id
    private int entityType;// 实体的类型
    private int entityId;// 实体的id
    private int entityUserId;// 实体的拥有者id
    private Map data = new HashMap<>();// 其他的一些数据、

    public String getTopic() {
        return topic;
    }

    // 改造set方法
    public Event setTopic(String topic) {
        this.topic = topic;
        return this;
    }

    public int getUserId() {
        return userId;
    }

    public Event setUserId(int userId) {
        this.userId = userId;
        return this;
    }

    public int getEntityType() {
        return entityType;
    }

    public Event setEntityType(int entityType) {
        this.entityType = entityType;
        return this;
    }

    public int getEntityId() {
        return entityId;
    }

    public Event setEntityId(int entityId) {
        this.entityId = entityId;
        return this;
    }

    public int getEntityUserId() {
        return entityUserId;
    }

    public Event setEntityUserId(int entityUserId) {
        this.entityUserId = entityUserId;
        return this;
    }

    public Map getData() {
        return data;
    }

    public Event setData(String key, Object value){
        this.data.put(key,value);
        return this;
    }
}

  • 注意set方法的修改,是为了方便后续的 *** 作,即可以类似sb.append("").append("wsh");
  • 注意Map data的set方法,又与其他属性不同
4.2 开发事件的生产者

新建event包,在包内创建EventProducer类

@Component
public class EventProducer {

    @Autowired
    private KafkaTemplate kafkaTemplate;

    // 处理事件,本质上就是发送消息
    public void fireEvent(Event event) {
        // 将事件发布到指定的主题
        kafkaTemplate.send(event.getTopic(), JSONObject.toJSONString(event));
    }

}
4.3 开发事件的消费者

先在CommunityConstant类中,增加一些常量:

    
    String TOPIC_COMMENT = "comment";

    
    String TOPIC_LIKE = "like";

    
    String TOPIC_FOLLOW = "follow";

    
    String TOPIC_PUBLISH = "publish";

    
    String TOPIC_DELETe = "delete";

    
    String TOPIC_SHARE = "share";

    
    int SYSTEM_USER_ID = 1;

在event包下, 新建EventConsumer类

package com.ateam.community.event;

@Component
public class EventConsumer implements CommunityConstant {
	// 记录日志
    private static final Logger logger = LoggerFactory.getLogger(Event.class);

    @Autowired
    private MessageService messageService;


    @KafkaListener(topics = {TOPIC_COMMENT,TOPIC_FOLLOW,TOPIC_LIKE})
    public void handleEventMessage(ConsumerRecord record) {
        if (record == null || record.value() == null) {
            logger.error("消息的内容为空!");
            return;
        }
        // 利用fastjson将json字符串转化为Event对象
        Event event = JSONObject.parseObject(record.value().toString(), Event.class);
        if (event == null) {
            logger.error("消息格式错误!");
            return;
        }

        // 发送站内通知,主要是构造Message对象
        Message message = new Message();
        // User表中的id为1的代表系统用户
        message.setFromId(SYSTEM_USER_ID);
        message.setToId(event.getEntityUserId());
        message.setConversationId(event.getTopic());
        message.setCreateTime(new Date());

        HashMap content = new HashMap<>();
        content.put("userId",event.getUserId());
        content.put("entityType",event.getEntityType());
        content.put("entityId", event.getEntityId());

        if (!event.getData().isEmpty()) {
            for (Map.Entry entry : event.getData().entrySet()) {
                content.put(entry.getKey(),entry.getValue());
            }
        }

        message.setContent(JSONObject.toJSONString(content));
        messageService.addMessage(message);

    }

}

4.4 处理评论事件-CommentController

在addComment方法中,怎讲如下代码:

修改后,该方法完整代码如下:

    @RequestMapping(value = "/add/{discussPostId}", method = RequestMethod.POST)
    @LoginRequired
    public String addComment(@PathVariable("discussPostId") int discussPostId, Comment comment){
        comment.setUserId(hostHolder.getUser().getId());
        comment.setStatus(0);
        comment.setCreateTime(new Date());
        commentService.addComment(comment);

        // 触发评论事件
        Event event = new Event()
                .setTopic(TOPIC_COMMENT)
                .setUserId(hostHolder.getUser().getId())
                .setEntityId(comment.getEntityId())
                .setEntityType(comment.getEntityType())
                .setData("postId",discussPostId);



        if (comment.getEntityType() == ENTITY_TYPE_POST) {
            DiscussPost target = discussPostService.findDiscussPostById(comment.getEntityId());
            event.setEntityUserId(target.getUserId());
        } else if (comment.getEntityType() == ENTITY_TYPE_COMMENT) {
            Comment target = commentService.findCommentById(comment.getEntityId());
            event.setEntityUserId(target.getUserId());
        }

        eventProducer.fireEvent(event);


        return "redirect:/discuss/detail/" + discussPostId;
    }
4.5 处理点赞事件-LikeController

在LikeController类中like方法,增加如下代码:

修改后,该方法完整代码如下:

   @RequestMapping(value = "/like", method = RequestMethod.POST)
    @ResponseBody
    public String like(int entityType, int entityId, int entityUserId, int postId) {
        User user = hostHolder.getUser();

        // 点赞
        likeService.like(user.getId(),entityType,entityId,entityUserId);
        // 获得点赞的数量
        long likeCount = likeService.findEntityLikeCount(entityType, entityId);
        // 获得点赞的状态
        int likeStatus = likeService.findEntityLikeStatus(user.getId(), entityType, entityId);

        // 封装返回结果
        HashMap map = new HashMap<>();
        map.put("likeCount",likeCount);
        map.put("likeStatus",likeStatus);

        // 触发点赞事件,取消点赞就不发消息了
       if (likeStatus == 1) {
           Event event = new Event()
                   .setTopic(TOPIC_LIKE)
                   .setUserId(hostHolder.getUser().getId())
                   .setEntityType(entityType)
                   .setEntityId(entityId)
                   .setEntityUserId(entityUserId)
                   .setData("postId",postId); // 重构了like方法,添加了postId参数
            eventProducer.fireEvent(event);
       }
        return CommunityUtil.getJSONString(0,null,map);
    }

为什么需要重构,因为我们需要postId这个参数

当系统给用户发通知时,比如:xxx用户赞了你的帖子,xxx用户赞了你的评论,你需要帖子的id,才能完成“点击查看”时,跳转到帖子页面。
因为重构了like方法,所以我们需要处理一些discuss.js中like方法传参问题:
修改discuss-detail页面


修改discusss.js

4.6 处理关注事件-FollowController

在FollowController类中follow方法,增加如下代码:

取消关注,不发消息,不触发事件。

4.7 测试

以前写的AOP记录日志报错

空指针异常:

以前访问都是:前端页面 -> controller层 -> service层
现在,出现了Kafka的 消费者,也调用了service层,就出现了空指针异常,此时,没有请求到service层
解决方法:

点赞、关注、评论后,message表中有数据了

5. 显示系统通知
  • 通知列表
    • 显示评论、点赞、关注三种类型的通知
  • 通知详情
    • 分页显示某一类主题所包含的通知
  • 未读消息
    • 在页面头部显示所有的未读消息数量
5.1 通知列表 5.1.1 数据层

消息最终都存在message表中。
在dao下MessageMaper类中,新增几个方法

   

    // 查询某个主题下 最新 的通知
    Message selectLatestNotice(int userId, String topic);

    // 查询某个主题所包含的通知数量
    int selectNoticeCount(int userId, String topic);

    // 查询未读的通知的数量
    int selectNoticeUnreadCount(int userId, String topic);

在mapper包下的message-mapper.xml中,编写对应的SQL语句

    
        select count(id) from message
        where status != 2
        and from_id = 1
        and to_id = #{userId}
        and conversation_id = #{topic}