微服务设计指导-用“死信-延时”队列彻底解决业务超时补偿时发生的数据库“死锁”问题

微服务设计指导-用“死信-延时”队列彻底解决业务超时补偿时发生的数据库“死锁”问题,第1张

微服务设计指导-用“死信-延时”队列彻底解决业务超时补偿时发生的数据库“死锁”问题 简介

业务超时这种场景我们经常碰到。举例来说:支付在请求到支付网关后但支付网关那或者是因为第三方支付渠道问题、亦或是网络等问题导致这笔支付回调没有“成功”或者根本就没有回调请求来通知企业方相关的支付状态。对于此情况我们亲爱的程序员们我看了最多的设计就是喜欢用:每隔X分钟跑一个JOB,然后这个JOB去把所有的status=未支付成功的订单状态在数据库里改一下状态(以便于后续业务 *** 作)。

哎。。。问题出就出在这个跑JOB上面。这也是程序员或者相关研发团队缺少训练的典型事故。数据库死锁往往就是在这种情况下发生的。

试想一下如果一个中大型工程,有几百个这样的JOB。。。嘿嘿嘿,你公司的DBA得有多崩溃。不信,来看一个实例吧。

模拟死锁 准备工作
    我们写书一段mybatis dao,它会从数据库里捞出status为1(我们假设status=1为一直未响应的支付超时状态)并对它的状态进行变更成“2”的“业务补偿模拟”;相应的我们书一个service方法来调用这个dao方法;使用junit test case启动5个线程并且设成同等优先级每隔X秒跑一次(为了快速把死锁问题重现我们用1秒超时来模拟支付超时),因此每隔1秒会有5个线程(模拟5个payment的集群幅本)同时去update数据库中的状态;我们在数据库的payment表里造了884万条数据;
PaymentDao.xml
	
		update sky_payment set status='2' where status='1' limit 3000
	

相对应的PaymentDao.java的内容

 public int updatePaymentStatusByLimit();
PaymentService.java
@Resource
    private PaymentDao paymentDao;

    @Transactional(rollbackFor = Exception.class)
    public void updatePaymentStatusByLimit() throws Exception {
        try {
            int records = paymentDao.updatePaymentStatusByLimit();
            logger.info(">>>>>>updated {}", records);
        } catch (Exception e) {
            logger.error(">>>>>>updatePamentStatusByLimit error: " + e.getMessage(), e);
            throw new Exception(">>>>>>updatePamentStatusByLimit error: " + e.getMessage(), e);
        }
    }

很简单,书写代码没有压力、增删改查谁不会!

相关的单元测试中启动5个线程的代码
package org.mk.demo.skypayment.service;

import java.util.List;
import java.util.concurrent.CountDownLatch;

import javax.annotation.Resource;

import org.junit.jupiter.api.Test;
import org.mk.demo.skypayment.SkyPaymentApp;
import org.mk.demo.skypayment.vo.PaymentBean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.test.context.SpringBootTest;

import net.minidev.json.JSONArray;



@SpringBootTest(classes = {SkyPaymentApp.class})
public class PaymentServiceTest {
    private Logger logger = LoggerFactory.getLogger(this.getClass());
    private int threadCnt = 5;
    private CountDownLatch latch = new CountDownLatch(threadCnt);

    @Resource
    private PaymentService updatePaymentStatusService;

    @Test
    public void updatePaymentStatusByLimit() throws Exception {
        for (int i = 0; i < threadCnt; i++) {
            (new Thread(new UpdatorRunner(), "JUNIT多线程测试")).start();
        }
        latch.await();
    }

    class UpdatorRunner implements Runnable {
        @Override
        public void run() {
            logger.info(">>>>>>[当前线程ID]:" + Thread.currentThread().getId());
            try {
                while (true) {
                    updatePaymentStatusService.updatePaymentStatusByLimit();
                    Thread.sleep(1000);
                }
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
            }
            latch.countDown(); // 执行完毕,计数器减1
        }
    }

}
造了847万条数据在数据库里
CREATE TABLE `sky_payment` (
  `pay_id` int(11) NOT NULL AUTO_INCREMENT,
  `status` tinyint(3) DEFAULT NULL,
  `transf_amount` varchar(45) DEFAULT NULL,
  `created_date` timestamp NULL DEFAULT CURRENT_TIMESTAMP,
  `updated_date` timestamp NULL DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP,
  PRIMARY KEY (`pay_id`)
) ENGINE=InnoDB AUTO_INCREMENT=9098509 DEFAULT CHARSET=utf8mb4;

 然后运行一下我们看到了什么?

哇。。。我可是用了两台512GB内存的数据库,CPU是128核的服务器安装成了mysql master slaver读写分理的模式啊,只用了1分钟就出现了以下介个玩意儿。。。

 大家知道吧,一般在生产上我们的单个微服务的集群幅本一般为10-20个左右幅本的量去支撑日常的9,000~1万左右的并发。因此不需要5分钟的,往往1-2分钟内如果两个幅本间有重复的update语句,整个生产db就会直接卡死。

此时如果你的数据库里数据量单表在>百万行数时,100%就会产生“DB主从延迟”,然后“一路向上卡”,进而你的整个商城也就“拜拜”了您哎。

这一切源于对数据库中的批量update(含delete)原理的不熟悉

不要只知道增、删、改、查。

批量的更新(含delete)动作,尤其是update...where或者是delete...where是锁数据的。一个tomcat/netty(反正就是一个spring boot的应用)就是一个线程,当>1个线程同时运行了更新...where且同时不同update/delete的where条件中有数据重复,就会产生死锁。

这个问题在单机上永不会发生,只有在集群环境中发生。

因为在生产环境,我们不可能单机运行一个业务模块的,我们要应对外部的流量就势必多机多集群运行来接受外部的流量。

这下好,你单机运行倒是保证了业务的正确性,当流量一大你要d出几台幅本时瞬间卡爆了整个商城应用。

这是典型的“不符合云原生设计”的案例。

说了难听点这叫“脖子细,胃小”,要么吃不下,要吃得下的话却把了个胃撑爆。。。你这样的设计、代码还让不让企业玩了哈?你对企业对团队有不满你说一声呢,何必用这种“手段”来玩死企业呢?

嘿嘿,以上开个玩笑而己哈。

下面来让我们看正确的设计

使用“延时”队列来解决业务补偿类跑批的设计

多说一句:

其实上述设计也可以改成把update...where改成限定死在每一条update语句是by主键,然后把几百个、上千个(依赖于数据库的性能)update by id串成mybatis的batch update去处理,当by主键去处理时数据库是永远不会发生死锁的。但是这也不符合云原生。因为你跑批始终是跑在单机上的,它并不会随着你的应用幅本数增加而同时增加并行计算、处理能力。

还有一个点,你的job是频率性跑补偿,在没有“业务需要补偿”时你的job实际上是在空转,极其消耗系统资源,这也是很“恐龙”级的设计、不合理的设计。

云原生的宗旨之一就是横向扩展集群幅本时你的系统的计算、处理能力也会随着扩展。

现在的RabbitMQ最新的版本如3.9已经拥有了“延时队列”了,Redis里也有延时队列的成熟特性。

不过我们这边还是利用RabbitMQ3.8.x,因为大多公司用的RabbitMQ还是是3.7-3.8版。如果为了用新特性对企业的底层架构这种改动显然风险过高。

在RabbitMQ3.8.x中有一种队列叫死信队列,这个死信队列就是可以用来作“延时 *** 作的”。

我们使用spring boot2.x结合RabbitTemplate对于RabbitMQ的死信队列的使用如下配置

application-local.yml
mysql:
  datasource:
    db:
      type: com.alibaba.druid.pool.DruidDataSource
      driverClassName: com.mysql.jdbc.Driver
      minIdle: 50
      initialSize: 50
      maxActive: 300
      maxWait: 1000
      testOnBorrow: false
      testOnReturn: true
      testWhileIdle: true
      validationQuery: select 1
      validationQueryTimeout: 1
      timeBetweenEvictionRunsMillis: 5000
      ConnectionErrorRetryAttempts: 3
      NotFullTimeoutRetryCount: 3
      numTestsPerEvictionRun: 10
      minEvictableIdleTimeMillis: 480000
      maxEvictableIdleTimeMillis: 480000
      keepAliveBetweenTimeMillis: 480000
      keepalive: true
      poolPreparedStatements: true
      maxPoolPreparedStatementPerConnectionSize: 512
      maxOpenPreparedStatements: 512
    master: #master db
      type: com.alibaba.druid.pool.DruidDataSource
      driverClassName: com.mysql.jdbc.Driver
      url: jdbc:mysql://localhost:3306/ecom?useUnicode=true&characterEncoding=utf-8&useSSL=false&useAffectedRows=true&autoReconnect=true
      username: root
      password: 111111
    slaver: #slaver db
      type: com.alibaba.druid.pool.DruidDataSource
      driverClassName: com.mysql.jdbc.Driver
      url: jdbc:mysql://localhost:3307/ecom?useUnicode=true&characterEncoding=utf-8&useSSL=false&useAffectedRows=true&autoReconnect=true
      username: root
      password: 111111     
server:
  port: 9080
  tomcat:
    max-http-post-size: -1
  max-http-header-size: 10240000

spring:
  application:
    name: skypayment
  servlet:
    multipart:
      max-file-size: 10MB
      max-request-size: 10MB
    context-path: /skypayment

  #配置rabbitMq 服务器
  rabbitmq:
    addresses: localhost:5672
    username: admin
    password: admin
    #虚拟host 可以不设置,使用server默认host
    virtual-host: /
    publisher-/confirm/i-type: CORRELATED
    listener:
       ## simple类型
       simple:
         #最小消费者数量
         concurrency: 32
         #最大的消费者数量
         maxConcurrency: 64
         #指定一个请求能处理多少个消息,如果有事务的话,必须大于等于transaction数量
         prefetch: 32
         retry: 
           enabled: false
#rabbitmq的超时用于队列的超时使用
queue:
  expire: 1000
RabbitMqConfig.java
package org.mk.demo.skypayment.config.mq;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.ExchangeBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;


@Component
public class RabbitMqConfig {
    
    public static final String PAYMENT_EXCHANGE = "payment.exchange";
    public static final String PAYMENT_DL_EXCHANGE = "payment.dl.exchange";
    public static final String PAYMENT_QUEUE = "payment.queue";
    public static final String PAYMENT_DEAD_QUEUE = "payment.queue.dead";

    public static final String PAYMENT_FANOUT_EXCHANGE = "paymentFanoutExchange";
    
    @Value("${queue.expire:5000}")
    private long queueExpire;

    
    @Bean
    public TopicExchange paymentExchange() {
        return (TopicExchange)ExchangeBuilder.topicExchange(PAYMENT_EXCHANGE).durable(true).build();
    }

    
    @Bean
    public TopicExchange paymentExchangeDl() {
        return (TopicExchange)ExchangeBuilder.topicExchange(PAYMENT_DL_EXCHANGE).durable(true).build();
    }

    
    @Bean
    public Queue paymentQueue() {
        return QueueBuilder.durable(PAYMENT_QUEUE).withArgument("x-dead-letter-exchange", PAYMENT_DL_EXCHANGE)// 设置死信交换机
            .withArgument("x-message-ttl", queueExpire).withArgument("x-dead-letter-routing-key", PAYMENT_DEAD_QUEUE)// 设置死信routingKey
            .build();
    }

    
    @Bean
    public Queue paymentDelayQueue() {
        return QueueBuilder.durable(PAYMENT_DEAD_QUEUE).build();
    }

    
    @Bean
    public Binding bindDeadBuilders() {
        return BindingBuilder.bind(paymentDelayQueue()).to(paymentExchangeDl()).with(PAYMENT_DEAD_QUEUE);
    }

    
    @Bean
    public Binding bindBuilders() {
        return BindingBuilder.bind(paymentQueue()).to(paymentExchange()).with(PAYMENT_QUEUE);
    }

    
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange(PAYMENT_FANOUT_EXCHANGE);
    }

    @Bean
    public RabbitTemplate rabbitTemplate(final ConnectionFactory connectionFactory) {
        final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(producerJackson2MessageConverter());
        return rabbitTemplate;
    }

    @Bean
    public Jackson2JsonMessageConverter producerJackson2MessageConverter() {
        return new Jackson2JsonMessageConverter();
    }
}
 RabbitMqListenerConfig.java
package org.mk.demo.skypayment.config.mq;

import org.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer;
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistrar;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;
import org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory;


@Configuration
public class RabbitMqListenerConfig implements RabbitListenerConfigurer {

    
    @Override
    public void configureRabbitListeners(RabbitListenerEndpointRegistrar registor) {
        registor.setMessageHandlerMethodFactory(messageHandlerMethodFactory());

    }

    @Bean
    MessageHandlerMethodFactory messageHandlerMethodFactory() {
        DefaultMessageHandlerMethodFactory messageHandlerMethodFactory = new DefaultMessageHandlerMethodFactory();
        messageHandlerMethodFactory.setMessageConverter(consumerJackson2MessageConverter());
        return messageHandlerMethodFactory;
    }

    @Bean
    public MappingJackson2MessageConverter consumerJackson2MessageConverter() {
        return new MappingJackson2MessageConverter();
    }
}

当spring boot工程启动后它会在RabbitMQ里建立如下主要的内容

两个queue,一个正常的队列,一个以.dead结束的队列;

两个exchange,一个正常的exchange对应着正常的队列,一个.dl.exchange对应着.dead结束的队列。

注:

在运行我的项目例子时要记得给自己本机的rabbitmq的连接用户分配到可以建立queue和exchange的权限,如何给rabbitmq自带用户分配权限网上太多不在此作详细说明了。

上述中的payment.queue有一个过期时间,此处我们设置成了1秒,用来模拟正常支付回调超时。一旦这个queue到达了1秒的超时,它就会被转发到payment.dl.exchange里,然后此时如果你的应用通过"监听“对应着payment.dl.exchange中的payment.queue.dead时,你就可以得到这个消息。得到了这个消息后就是你正常的“业务超时补偿”的那些“业务代码处理了”,此处我们为update by id方法。如下代码块。

PaymentDao.xml



	
		
		
		
	

	
	
		update sky_payment set status='2' where status='1' limit 3000
	
	
	
		update sky_payment set status=#{status}  where pay_id=#{payId}
	

相对应的PaymentDao.java

package org.mk.demo.skypayment.dao;

import org.springframework.stereotype.Repository;

import java.util.List;

import org.mk.demo.skypayment.vo.PaymentBean;


@Repository
public interface PaymentDao {

    public int updatePaymentStatusByLimit();

    public int updatePaymentStatusById(PaymentBean payment);

}
Publisher.java

 用于产生正常支付请求到第三方支付渠道或者是支付网关的的请求,这个类就是正常提交支付请求的同时对这个队列设一个超时。

如果在超时内有响应那么这个队列就不会产生“相应的死信队列”;反过来就是“如果你在死信队列中接到了相应的订单流水,那么一定代表了这一条支付请求没有在业务允许的超时时间内被响应,因此就需要补偿了”;

为了模拟这个场景,我们不会去处理这条请求,那么这条请求永远必进死信队列

package org.mk.demo.skypayment.service;

import org.mk.demo.skypayment.config.mq.RabbitMqConfig;
import org.mk.demo.skypayment.vo.PaymentBean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;


@Component

public class Publisher {
    private Logger logger = LoggerFactory.getLogger(this.getClass());
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void publishPaymentStatusChange(PaymentBean payment) {
        try {
            rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
            rabbitTemplate.convertAndSend(RabbitMqConfig.PAYMENT_EXCHANGE, RabbitMqConfig.PAYMENT_QUEUE, payment);
        } catch (Exception ex) {
            logger.error(">>>>>>publish exception: " + ex.getMessage(), ex);
        }
    }
}
Subscriber.java

用于实现监听死信队列得到消息后的业务补偿 *** 作用。

package org.mk.demo.skypayment.service;

import javax.annotation.Resource;

import org.mk.demo.skypayment.config.mq.RabbitMqConfig;
import org.mk.demo.skypayment.vo.PaymentBean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;


@Component
public class Subscriber {
    private Logger logger = LoggerFactory.getLogger(this.getClass());

    @Resource
    private PaymentService paymentService;

    @RabbitListener(queues = RabbitMqConfig.PAYMENT_DEAD_QUEUE)
    public void receiveDL(PaymentBean payment) {
        try {
            if (payment != null) {
                logger.info(">>>>>>从死信队列拿到数据并开发更改:payId->{} status->{} transfAmount->{}", payment.getPayId(),
                    payment.getStatus(), payment.getTransfAmount());
                int records = paymentService.updatePaymentStatusById(payment);
                logger.info(">>>>>>修改: {}", records);
            }
        } catch (Exception ex) {
            logger.error(">>>>>>Subscriber from dead queue exception: " + ex.getMessage(), ex);
        }
    }
}
运行使用死信队列来实现业务补偿的例子

接下去我们先以两个幅本来运行,前端压入这么多请求每一个请求都会产生一条死信。

为此我们先把两个spring boot的幅本做成集群模式如下nginix设置:

nginix中对spring boot集群做代理的核心配置

    upstream skypayment-lb {
      server   localhost:9080 weight=1 fail_timeout=1s;
      server   localhost:9081 weight=1 fail_timeout=1s;
    }
    

    server {

        location /skypayment/ {
                port_in_redirect on;
                # 负载配置
                proxy_pass http://skypayment-lb/;
                proxy_redirect  off;
                proxy_set_header Host $host;
                proxy_set_header X-Real-IP $remote_addr;
                proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
                add_header backendIP $upstream_addr;
                add_header backendCode $upstream_status;
        }

运行了近3分钟了,没有任何的死锁。因为不可能产生死锁,就是因为每次的数据库里的update都是by id。

我们继续观察rabbitmq的控制台http://localhost:15672中队列的状态。

发觉有700-1,000个unack的队列有积压。这不是问题,因为前端的请求量太大了,你可以认为是并发的2万3千多个支付请求需要处理呢!

如果你的企业一天有这么多笔支付,恭喜你,你一年肯定是>3个亿的收入的。

为了解决这个不断的有unack消息的积压,我。。。多启动一个幅本,让应用从原来的两个幅本变成了三个幅本同时在运行。

 过了10秒不到,整个RabbitMQ城的unack队列瞬间减少到个位数。

这就是云原生中提到过的,你的应用不能受限于相应的资源限制并且随着应用幅本的横向d性扩容系统的计算能力也会随之扩大。

最终,当前端的2万3千多个请求结束后,RabbitMQ继续用了10秒左右把一些未处理完的队列中的支付状态变更完后整个系统归于平静。

0 error、0死锁、系统响应高、吐吞量大的不得 、微服务、云原生。

 附件 parent-pom.xml

	4.0.0
	org.mk.demo
	springboot-demo
	0.0.1
	pom
	
		1.8
		0.8.3
		0.0.1
		2.4.2
		
		
		3.4.13
		
		2020.0.1
		2.7.3
		4.0.1
		2.8.0
		
		1.2.6
		27.0.1-jre
		1.2.59
		2.7.3
		1.1.4
		
		5.1.46
		3.4.2
		1.8.13
		1.8.14-RELEASE
		1.0.0
		4.1.42.Final
		0.1.4
		1.16.22
		3.1.0
		2.1.0
		1.2.3
		1.3.10.RELEASE
		1.0.2
		4.0.0
		2.4.6
		2.9.2
		1.9.6
		1.5.23
		1.5.22
		1.5.22
		1.9.5
		0.0.1
		3.1.6
		2.11.1
		2.8.6
		2.5.8
		0.1.4
		1.7.25
		2.0-M2-groovy-2.5
		2.2.0
		3.10.0
		2.6
		5.0.0
		${java.version}
		${java.version}
		3.8.1
		3.2.3
		3.1.1
		2.2.3
		1.4.197
		3.4.14
		4.4.10
		4.5.6
		3.0.0
		UTF-8
		UTF-8
		2.0.22-RELEASE
		4.1.0
		4.1.0
		4.1.0
		1.6.1
		3.1.0
		3.10.0
		2.6
		5.0.0
		2.2.5.RELEASE
		2.2.1.RELEASE
		3.16.1
		2.17.1
		0.0.1
		0.0.1
		0.0.1
	
	
		
			
			
				org.redisson
				redisson-spring-boot-starter
				${redission.version}
				
			
			
				org.redisson
				redisson-spring-data-21
				${redission.version}
			
			
				com.alibaba.cloud
				spring-cloud-alibaba-dependencies
				${spring-cloud-alibaba.version}
				pom
				import
			
			
				com.alibaba.cloud
				spring-cloud-starter-alibaba-nacos-discovery
				
				${nacos-discovery.version}
			
			
				com.aldi.jdbc
				sharding
				${aldi-sharding.version}
			
			
			
				com.auth0
				java-jwt
				${java-jwt.version}
			
			
				cn.hutool
				hutool-crypto
				${hutool-crypto.version}
			
			
			
				org.apache.poi
				poi
				${poi.version}
			
			
				org.apache.poi
				poi-ooxml
				${poi-ooxml.version}
			
			
				org.apache.poi
				poi-ooxml
				${poi-ooxml.version}
			
			
				org.apache.poi
				poi-ooxml-schemas
				${poi-ooxml-schemas.version}
			
			
				dom4j
				dom4j
				${dom4j.version}
			
			
				org.apache.xmlbeans
				xmlbeans
				${xmlbeans.version}
			
			
			
				com.odianyun.architecture
				oseq-aldi
				${oseq-aldi.version}
			
			
				org.apache.httpcomponents
				httpcore
				${httpcore.version}
			

			
				org.apache.httpcomponents
				httpclient
				${httpclient.version}
			
			
				org.apache.zookeeper
				zookeeper
				${zkclient.version}
				
					
						log4j
						log4j
					
					
						org.slf4j
						slf4j-log4j12
					
				
			
			
			
				org.quartz-scheduler
				quartz
				${quartz.version}
			
			
				org.quartz-scheduler
				quartz-jobs
				${quartz.version}
			
			
			
				org.springframework.cloud
				spring-cloud-dependencies
				Hoxton.SR7
				pom
				import
			

			
			
				org.mockito
				mockito-core
				${mockito-core.version}
				test
			
			
			
				com.auth0
				java-jwt
				${java-jwt.version}
			
			
				cn.hutool
				hutool-crypto
				${hutool-crypto.version}
			

			
				org.springframework.boot
				spring-boot-starter-actuator
				${spring-boot.version}
			
			
			
				org.logback-extensions
				logback-ext-spring
				${logback-ext-spring.version}
			
			
				org.slf4j
				jcl-over-slf4j
				${jcl-over-slf4j.version}
			

			
			
				com.h2database
				h2
				${h2.version}
			

			
				org.apache.zookeeper
				zookeeper
				${zookeeper.version}
				
					
						org.slf4j
						slf4j-log4j12
					
					
						log4j
						log4j
					
				
			

			
			
				com.xuxueli
				xxl-job-core
				${xxljob.version}
			
			
				org.springframework.boot
				spring-boot-starter-test
				${spring-boot.version}
				test
				
					
						org.springframework.boot
						spring-boot-starter-logging
					
					
						org.slf4j
						slf4j-log4j12
					
				
			
			
				org.spockframework
				spock-core
				1.3-groovy-2.4
				test
			
			
				org.spockframework
				spock-spring
				1.3-RC1-groovy-2.4
				test
			
			
				org.codehaus.groovy
				groovy-all
				2.4.6
			

			
				com.google.code.gson
				gson
				${gson.version}
			
			
				com.fasterxml.jackson.core
				jackson-databind
				${jackson-databind.version}
			
			
				org.springframework.boot
				spring-boot-starter-web-services
				${spring-boot.version}
			
			
				org.apache.cxf
				cxf-rt-frontend-jaxws
				${cxf.version}
			
			
				org.apache.cxf
				cxf-rt-transports-http
				${cxf.version}
			
			
				org.springframework.boot
				spring-boot-starter-security
				${spring-boot.version}
			

			
				io.github.swagger2markup
				swagger2markup
				1.3.1
			
			
				io.springfox
				springfox-swagger2
				${swagger.version}
			

			
				io.springfox
				springfox-swagger-ui
				${swagger.version}
			

			
				com.github.xiaoymin
				swagger-bootstrap-ui
				${swagger-bootstrap-ui.version}
			

			
				io.swagger
				swagger-annotations
				${swagger-annotations.version}
			

			
				io.swagger
				swagger-models
				${swagger-models.version}
			
			
				org.sky
				sky-sharding-jdbc
				${sky-sharding-jdbc.version}
			
			
				com.googlecode.xmemcached
				xmemcached
				${xmemcached.version}
			
			
				org.apache.shardingsphere
				sharding-jdbc-core
				${shardingsphere.jdbc.version}
			
			
			
				org.springframework.kafka
				spring-kafka
				1.3.10.RELEASE
			
			
				org.mybatis.spring.boot
				mybatis-spring-boot-starter
				${mybatis.version}
			
			
				com.github.pagehelper
				pagehelper-spring-boot-starter
				${pagehelper-mybatis.version}
			
			
				org.springframework.boot
				spring-boot-starter-web
				${spring-boot.version}
				
					
						org.slf4j
						slf4j-log4j12
					
					
						org.springframework.boot
						spring-boot-starter-logging
					
				
			
			
				org.springframework.boot
				spring-boot-dependencies
				${spring-boot.version}
				pom
				import
			
			
				org.apache.dubbo
				dubbo-spring-boot-starter
				${dubbo.version}
				
					
						org.slf4j
						slf4j-log4j12
					
					
						org.springframework.boot
						spring-boot-starter-logging
					
				
			
			
				org.apache.dubbo
				dubbo
				${dubbo.version}
				
					
						javax.servlet
						servlet-api
					
				
			
			
				org.apache.curator
				curator-framework
				${curator-framework.version}
			

			
				org.apache.curator
				curator-recipes
				${curator-recipes.version}
			
			
				mysql
				mysql-connector-java
				${mysql-connector-java.version}
			
			
				com.alibaba
				druid
				${druid.version}
			
			
				com.alibaba
				druid-spring-boot-starter
				${druid.version}
			
			
				com.lmax
				disruptor
				${disruptor.version}
			
			
				com.google.guava
				guava
				${guava.version}
			
			
				com.alibaba
				fastjson
				${fastjson.version}
			
			
				org.apache.dubbo
				dubbo-registry-nacos
				${dubbo-registry-nacos.version}
			
			
				com.alibaba.nacos
				nacos-client
				${nacos-client.version}
			
			
				org.aspectj
				aspectjweaver
				${aspectj.version}
			
			
				org.springframework.boot
				spring-boot-starter-data-redis
				${spring-boot.version}
			
			
				io.seata
				seata-all
				${seata.version}
			
			
				io.netty
				netty-all
				${netty.version}
			
			
				org.projectlombok
				lombok
				${lombok.version}
			
			
			
				com.alibaba.boot
				nacos-config-spring-boot-starter
				${nacos.spring.version}
				
					
						nacos-client
						com.alibaba.nacos
					
				
			

			
				net.sourceforge.groboutils
				groboutils-core
				5
			
			
				commons-lang
				commons-lang
				${commons-lang.version}
			
		
	
	
		rabbitmq-demo
		redis-demo
		db-demo
		threadpool-demo
		oseq-demo
		osoa-demo
		ody-channel-web
		export-to-excel-demo
		ody-smartds
		import-to-db
		mvc1
		mvc2
		datastructure-demo
		sharding-jdbc-demo
		sm2-demo
		redis-common
		redis-doublebuffer-demo
		common-util
		db-common
		skypayment
	
 pom.xml

	4.0.0
	
		org.mk.demo
		springboot-demo
		0.0.1
	
	skypayment
	
		
		
			org.springframework.boot
			spring-boot-starter-amqp
			
				
					org.springframework.boot
					spring-boot-starter-logging
				
			
		
		
		
			org.springframework.boot
			spring-boot-starter-jdbc
			
				
					org.springframework.boot
					spring-boot-starter-logging
				
			
		
		
			mysql
			mysql-connector-java
		
		
			com.alibaba
			druid
		
		
		
			org.springframework.boot
			spring-boot-starter-data-redis
			
				
					org.springframework.boot
					spring-boot-starter-logging
				
				
					org.slf4j
					slf4j-log4j12
				
			
		
		
			org.mybatis.spring.boot
			mybatis-spring-boot-starter
		
		
		
			redis.clients
			jedis
		
		
		
			org.redisson
			redisson-spring-boot-starter
			3.13.6
			
				
					org.redisson
					redisson-spring-data-23
				
			
		
		
			org.apache.commons
			commons-lang3
		
		
		
			org.redisson
			redisson-spring-data-21
			3.13.1
		
		
			org.springframework.boot
			spring-boot-starter-test
			test
			
				
					org.springframework.boot
					spring-boot-starter-logging
				
				
					org.slf4j
					slf4j-log4j12
				
			
		
		
			org.springframework.boot
			spring-boot-starter-log4j2
			
				
					org.apache.logging.log4j
					log4j-core
				
				
					org.apache.logging.log4j
					log4j-api
				
			
		
		
		
			org.apache.logging.log4j
			log4j-api
			${log4j2.version}
		
		
			org.apache.logging.log4j
			log4j-core
			${log4j2.version}
		
		
		
			org.aspectj
			aspectjweaver
		
		
			com.lmax
			disruptor
		
		
			com.alibaba
			fastjson
		
		
			com.fasterxml.jackson.core
			jackson-databind
		
		
			com.google.guava
			guava
		
		
			com.alibaba
			fastjson
		
		
			com.fasterxml.jackson.core
			jackson-databind
		

		
		
		
			org.mk.demo
			common-util
			${common-util.version}
		
		
			org.mk.demo
			db-common
			${db-common.version}
		
	
SkyPaymentApp.java
package org.mk.demo.skypayment;

import org.mybatis.spring.annotation.MapperScan;
import org.redisson.spring.starter.RedissonAutoConfiguration;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.data.redis.RedisAutoConfiguration;
import org.springframework.boot.autoconfigure.data.redis.RedisRepositoriesAutoConfiguration;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.context.annotation.ComponentScan;


@SpringBootApplication
@ComponentScan(basePackages = {"org.mk"})
@EnableAutoConfiguration(exclude = {DataSourceAutoConfiguration.class, RedisAutoConfiguration.class,
    RedissonAutoConfiguration.class, RedisRepositoriesAutoConfiguration.class})
@MapperScan("org.mk.demo.skypayment.dao")
public class SkyPaymentApp {

    
    public static void main(String[] args) {
        SpringApplication.run(SkyPaymentApp.class);

    }

}
PaymentController.java
package org.mk.demo.skypayment.controller;

import javax.annotation.Resource;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.mk.demo.util.response.ResponseBean;
import org.mk.demo.util.response.ResponseCodeEnum;
import org.mk.demo.skypayment.service.Publisher;
import org.mk.demo.skypayment.vo.PaymentBean;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;

import reactor.core.publisher.Mono;



@RestController
public class PaymentController {
    private Logger logger = LogManager.getLogger(this.getClass());

    @Resource
    private Publisher publisher;

    @PostMapping(value = "/updateStatus", produces = "application/json")
    @ResponseBody
    public Mono updateStatus(@RequestBody PaymentBean payment) {
        ResponseBean resp = new ResponseBean();
        try {
            publisher.publishPaymentStatusChange(payment);
            resp = new ResponseBean(ResponseCodeEnum.SUCCESS.getCode(), "success");
        } catch (Exception e) {
            resp = new ResponseBean(ResponseCodeEnum.FAIL.getCode(), "system error");
            logger.error(">>>>>>updateStatus error: " + e.getMessage(), e);
        }
        return Mono.just(resp);
    }
}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5721850.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-18
下一篇 2022-12-18

发表评论

登录后才能评论

评论列表(0条)

保存