第五章:Kafka

第五章:Kafka,第1张

第五章:Kafka 系统消息和通知 阻塞队列

BlockingQueue时一个接口,又许多实现类

Kafka入门

高吞吐量:Kafka是硬盘顺序读取:硬盘顺序读取高于内存的随机读取。

高可靠性:分布式的集群

高扩展性:加集群很方便

Broker:Kafka的服务器

Zookeeper:独立的软件,用来管理集群(Kafka里面内置有)

Topic:主题(消息队列实现的方式:点对点(每个数据只被一个消费者消费),发布订阅模式(消息可以被多个消费者读取))

Partition:对主题分区,调高效率,每一个分区顺序追加数据

offset:消息再分区内存放的索引,序列

Replica:副本,做备份,分布式的消息引擎,为了数据更可靠,提高容错率。分为Leader(可以处理请求)和Follower(只是备份不能响应)。主副本挂掉后,从众多的从副本里选一个新的主副本

进入Kafka解压后的文件目录

cd C:usersoftkafka_2.12-2.3.0

启动zokeeper

binwindowszookeeper-server-start.bat configzookeeper.properties

启动Kafka

binwindowskafka-server-start.bat configserver.properties

创建主题(代表位置,代表消息类别)

指定那个服务器,一个副本一个分区,主题的名字

kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1  --topic test  

查看所有主题

kafka-topics.bat --list --bootstrap-server localhost:9092 

调用生产者发送消息

kafka-console-producer.bat --broker-list localhost:9092 --topic test

在另起一个窗口消费者读取消息

消费者消费消息

从哪个服务器,读哪个主题

kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning 

# 存放路径(看自己情况)

E:javawebworkkafka_2.12-2.8.0

# windows运行路径(看自己的情况)

E:javawebworkkafka_2.12-2.8.0binwindows

# 启动服务器 (先启动zookeeper服务器,再启动kafka) !!!千万不要手动暴力关闭,用下面的命令关闭

binwindowszookeeper-server-start.bat configzookeeper.properties``binwindowskafka-server-start.bat configserver.properties

# 创建主题

kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test

# 查看当前服务器的主题

kafka-topics.bat --list --bootstrap-server localhost:9092

# 创建生产者,往指定主题上发消息

kafka-console-producer.bat --broker-list localhost:9092 --topic test

# 消费者

kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning

# 关闭zookeeper服务器

zookeeper-server-stop.bat

# 关闭kafka服务器

kafka-server-stop.bat
Spring整合Kafka


    org.springframework.kafka
    spring-kafka

配置

#kafka
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=test-consumer-group
#是否自动提交消费者的偏移量(读取消息按偏移量读取)
spring.kafka.consumer.enable-auto-commit=true
#自动提交的频率
spring.kafka.consumer.auto-commit-interval=3000

测试

@RunWith(SpringRunner.class)
@SpringBootTest
public class KafkaTest {

    @Autowired
    private KafkaProducer kafkaProducer;
    @Test
    public void testKafka(){
        kafkaProducer.sendMessage("learn","学习");
        kafkaProducer.sendMessage("learn","加油");
        try {
            Thread.sleep(2000*10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

//生产者,交给spring管理,发送消息需要我们手动去调用方法
@Component
class KafkaProducer{

    @Autowired
    private KafkaTemplate kafkaTemplate;
    //发消息方法,主题和内容

    public void sendMessage(String topic,String content){
        kafkaTemplate.send(topic,content);
    }

}
//消费者,消费者处理消息是被动的,当队列中有消息就去处理
@Component
class KafkaConsumer{
    //要监听的主题
    @KafkaListener(topics = {"learn"})
    public void handleMessage(ConsumerRecord record){
        System.out.println(record.value());
    }

}
发送系统通知

为了提高性能,需要用到消息队列,为什么需要消息队列呢?

评论、点赞、关注不同的事情,三个主题,包装扔到队列里,当前线程(消息的生产者)就可以继续去做别的事情。

生产者和消费者处理消息是可以并发的,称为异步。

业务角度时间为目标

系统通知,后台给用户发,假设后台id(from_id)为1,这个时候conversation_id改存为主题(评论、点赞、关注),不再是两个对话的id,内容存json字符串,里面包含页面上要显示的信息

在相应的事件(评论、点赞、关注)后调用生产者,发送消息,消费者不需要调

为了可以看到帖子详细页面还需要帖子id,因为没有帖子对应的属性,所以将帖子id存到content里面

判断评论的是实体是帖子还是评论,将其存到实体的作者里面

不管是什么根据实体id找到目标

event事件封装好之后,发送消息。之后线程继续向下执行,后序消息的发布有消息队列发布,并行异步,我在处理的同时,另外一个线程也在处理执行

点赞才通知,取消点赞不通知,所以要先判断一下

Kafka出现问题就把日志文件删除

消费者

@Component   //消费者
public class EventConsumer implements CommunityConstant {
    //记日志
    private static final Logger logger= LoggerFactory.getLogger(EventConsumer.class);

    //往消息表里插数据
    @Autowired
    private MessageService messageService;

    //一个方法,包含三个主题(定义常量引用主题)
    @KafkaListener(topics = {TOPIC_COMMENT,TOPIC_FOLLOW,TOPIC_LIKE})
    public void handleCommentMessage(ConsumerRecord record){
        //发了一个空消息
        if(record==null || record.value()==null){
            logger.error("发送消息为空!");
            return;
        }
        //将json消息转为对象,指定字符串对应的具体类型
        Event event = JSONObject.parseObject(record.value().toString(), Event.class);
        //转为对象之后再判断
        if(event==null){
            logger.error("消息格式错误!");
            return;
        }
        //内容和格式都对之后,发送站内通知
        Message message=new Message();
        //设置消息的发送者,from_id系统用户规定为1(定义为常量)
        message.setFromId(SYSTEM_USER_ID);
        //设置消息的接收者(帖子的作者)
        message.setToId(event.getEntityUserId());
        //存主题
        message.setConversationId(event.getTopic());
        //当前时间
        message.setCreateTime(new Date());
        //内容是json字符串,先存map,再转为json字符串存进入
        Map content=new HashMap<>();
        //事件触发者
        content.put("userId",event.getUserId());
        //实体的类型(帖子、点赞、关注)
        content.put("entityType",event.getEntityType());
        //实体的id
        content.put("entityId",event.getEntityId());
        //不方便村的字段存到content
        //判断事件对象有没有值
        if(!event.getData().isEmpty()){
            //遍历事件对象的map,将其存到content里
            for (Map.Entry entry : event.getData().entrySet()) {
                content.put(entry.getKey(),entry.getValue());
            }

        }
        //将content转为json字符串
        message.setContent(JSONObject.toJSONString(content));
        //存消息
        messageService.addLetter(message);
    }

}

生产者

@Component
public class EventProducer {

    @Autowired
    private KafkaTemplate kafkaTemplate;

    //生产者发送消息(处理事件)
    public void fireEvent(Event event){
        //将事件发布到指定的主题(主题,字符串(事件当中所有的数据)json)
        kafkaTemplate.send(event.getTopic(), JSONObject.toJSONString(event));

    }
}

别人评论之后触发事件

//封装触发评论事件,链接评论帖子/评论的详情
Event event=new Event()
        .setUserId(hostHolder.getUser().getId())
        .setEntityType(comment.getEntityType())
        .setEntityId(comment.getEntityId())
        .setData("postId",discussPostId)
        .setTopic(TOPIC_COMMENT);

if(comment.getEntityType()==ENTITY_TYPE_POST){
    DiscussPost target = discussPostService.findDiscussDetail(comment.getEntityId());
    event.setEntityUserId(target.getId());
}else if(comment.getEntityType()==ENTITY_TYPE_COMMENT){
    Comment target = commentService.findComment(comment.getEntityId());
    event.setEntityUserId(target.getId());
}

eventProducer.fireEvent(event);
显示系统通知

通知存在message里面



每个请求都有消息链接,用拦截器处理,实现接口,在controller之后,模板之前post

判断有没有登录,mv有没有携带,查私信和通知,相加传给页面

拦截器写完之后要进行配置

@Override
public void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler, ModelAndView modelAndView) throws Exception {
    //判断有没有登录,mv有没有携带数据
    User user = hostHolder.getUser();
    if (user!=null && modelAndView!=null){
        int unreadCount = messageService.findUnreadCount(user.getId(), null);
        int noticeUnreadCount = messageService.findNoticeUnreadCount(user.getId(), null);
        modelAndView.addObject("allUnreadCount",unreadCount+noticeUnreadCount);
    }
}
modelAndView) throws Exception {
    //判断有没有登录,mv有没有携带数据
    User user = hostHolder.getUser();
    if (user!=null && modelAndView!=null){
        int unreadCount = messageService.findUnreadCount(user.getId(), null);
        int noticeUnreadCount = messageService.findNoticeUnreadCount(user.getId(), null);
        modelAndView.addObject("allUnreadCount",unreadCount+noticeUnreadCount);
    }
}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5688155.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存