socket包,用来做网络连接,socketio 在客户端和服务端之间建立的双向通信数据交换技术,底层使用EngineIO。SocketIO的的客户端使用Engine.IO-Client,服务端使用Engine.IO实现。github.com/dgrijalva/jwt-go
来解析用户token,得到用户信息github.com/joho/godotenv
配置信息存储到.env,该包用来获取配置信息,例如redis连接配置,es连接配置github.com/patrickmn/go-cache
go 缓存包,存储到内存中,用来存储用户 socket的连接信息github.com/nsqio/go-nsq
消息队列nsq包。使用nsq消息队列的原因是为了异步解耦,例如发送消息,消息发送成功后存储消息到es中。github.com/go-redis/redis
redis连接包。gopkg.in/olivere/elastic.v7
Elasticsearch 连接包 其他组件 redis
用来存储用户连接的socket 服务端信息,以及用户信息。nsq 消息队列
用来做业务逻辑的解耦,分布式socket实现的核心组件就是消息队列。Elasticsearch
存储消息数据,在用户端与管理后台展示消息,以及消息的搜索nginx
使用nginx来做反向代理,对客户端来说socket的连接地址统一到nginx,再由nginx来做分发。 实现逻辑 当socket服务端启动时,同步会启动nsq消息队列消费,该消费是用来监听消息下发。客户端发起链接时候会先到Nginx,然后在由Nginx 负载到 socket服务端 ,同时把消息体也一并转发到socket服务上,客户端建立链接时会携带token信息,首先对token进行解密(此处token是用jwt实现)从而得到用户的身份ID,其次获取到当前socket的服务器IP与socket的端口,存储到redis中,用来做下发消息的topic。客户端发送消息时,首先解析消息,获取到接收方用户身份ID,从redis获取到当前用户所在socket服务端的topic,把消息推送到该用户的所在的服务器topic上。消息队列消费该topic,获取到消息后解析消息,获取到接收方用户ID,在go-cache缓存中找到该用户的socket连接信息,从而进行消息下发。消息下发成功后,发送一个消息队列,此消息队列是用来存储消息数据到Elasticsearch,在另一个go协程中,消费此topic 解析消息数据存储数据到Elasticsearch中。 Nginx配置
upstream websocket {
server 192.168.10.56:9500;
server 192.168.10.55:9500;
server 192.168.10.57:9500;
server 192.168.10.58:9500;
}
server {
listen 80; # managed by Certbot
server_name im.tickeup.com;
#root /home/wwwroot;
index index.html index.htm;
access_log /var/log/nginx/im.tickeup.com.access.log;
location / {
proxy_pass http://websocket;
proxy_read_timeout 300s;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
}
}
结束语
至此分布式socket代码实现完成,思路就是这样,通过nginx、redis、nsq消息队列我们就可以实现websocket的分布式了,而且当在线用户数量多的时候,我们可以随时进行扩容,只需要在nginx处增加新的socket IP地址就可以了。
实现逻辑思路其实不难, 但是在具体实现上业务的逻辑比较多,此项目是我在2020年的时候在公司做的社交聊天的socket服务,具体在开发过程中还是要注意很多事项,在此不一一陈述,只讲核心的实现逻辑。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)