rabbmitmq连接池[已过生产]

rabbmitmq连接池[已过生产],第1张


源码地址https://gitee.com/tym_hmm/rabbitmq-pool-go

rabbitmq 连接池channel复用

开发语言 golang 依赖库

go get -u gitee.com/tym_hmm/rabbitmq-pool-go

go get -u gitee.com/tym_hmm/rabbitmq-pool-go

已在线上生产环镜运行, 5200W请求 qbs 3000 时, 连接池显示无压力
rabbitmq部署为线上集群

功能说明 自定义连接池大小及最大处理channel数消费者底层断线自动重连底层使用轮循方式复用tcp生产者每个tcp对应一个channel,防止channel写入阻塞造成内存使用过量支持rabbitmq exchangeType默认值
名称说明
tcp最大连接数5
生产者消费发送失败最大重试次数5
消费者最大channel信道数(每个连接自动平分)100(每个tcp10个)
使用 初始化
var oncePool sync.Once
var instanceRPool *kelleyRabbimqPool.RabbitPool
func initrabbitmq() *kelleyRabbimqPool.RabbitPool {
  oncePool.Do(func() {
  //初始化生产者
  instanceRPool = kelleyRabbimqPool.NewProductPool()
  //初始化消费者
  instanceConsumePool = kelleyRabbimqPool.NewConsumePool()
    err := instanceRPool.Connect("192.168.1.202", 5672, "guest", "guest")
    if err != nil {
      fmt.Println(err)
    }
  })
  return instanceRPool
}
生产者
var wg sync.WaitGroup
for i:=0;i<100000; i++ {
  wg.Add(1)
  go func(num int) {
    defer wg.Done()
    data:=kelleyRabbimqPool.GetRabbitMqDataFormat(
      "testChange5", 
      kelleyRabbimqPool.EXCHANGE_TYPE_TOPIC, 
      "textQueue5", 
      "/", 
      fmt.Sprintf("这里是数据%d", num)
    )
    _=instanceRPool.Push(data)
  }(i)
}
wg.Wait()
消费者

可定义多个消息者事件, 不通交换机, 队列, 路由

每个事件独立

nomrl := &rabbitmq.ConsumeReceive{
#定义消费者事件
	ExchangeName: "testChange31",//队列名称
        ExchangeType: kelleyRabbimqPool.EXCHANGE_TYPE_DIRECT,
        Route:        "",
        QueueName:    "testQueue31",
        IsTry:true,//是否重试
        MaxReTry: 5,//最大重试次数
        EventFail: func(code int, e error, data []byte) {
        	fmt.Printf("error:%s", e)
        },
        /***
         * 参数说明
         * @param data []byte 接收的rabbitmq数据
         * @param header map[string]interface{} 原rabbitmq header
         * @param retryClient RabbitmqPool.RetryClientInterface 自定义重试数据接口,重试需return true 防止数据重复提交
         ***/
        EventSuccess: func(data []byte, header map[string]interface{},retryClient kelleyRabbimqPool.RetryClientInterface)bool {//如果返回true 则无需重试
        	fmt.Printf("data:%s\n", string(data))
        	return true
        },
}
instanceConsumePool.RegisterConsumeReceive(nomrl)

err := instanceConsumePool.RunConsume()
if err != nil {
  fmt.Println(err)
}
错误码说明

错误码为

生产者push时返回的 *RabbitMqError消费者事件监听回返的 code
错误码说明
501生产者发送超过最大重试次数
502获取信道失败, 一般为认道队列数用尽
503交换机/队列/绑定失败
504连接失败
506信道创建失败
507超过最大重试次数

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/994481.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-21
下一篇 2022-05-21

发表评论

登录后才能评论

评论列表(0条)

保存