- Zookeeper 非公平锁/公平锁/共享锁
- Zookeeper分布式锁实战
- 互斥锁
- 非公平锁
- 公平锁
- 读写锁
- 读写锁引起的问题
- 读写锁的原理
- 案例
- 案例场景分析
- 案例实现
- 案例演示
- 数据库表初始化
- jmeter安装(压力测试工具,模拟多线程测试)
- nginx安装(负载均衡,配置让其可以轮询调用集群实例)
- 项目启动,并启动压力测试
- 添加分布式锁的相关代码
- zookeeper分布式锁源代码解析
- Leader 选举在分布式场景中的应用
- 案例演示
- 源码简介
- Spring Cloud Zookeeper注册中心实战
- 注册中心场景分析
- 实战
- 创建用户服务
- 创建产品服务
- 案例演示
如上实现方式在并发问题比较严重的情况下,性能会下降的比较厉害,主要原因是,所有的连接 都在对同一个节点进行监听,当服务器检测到删除事件时,要通知所有的连接,所有的连接同时 收到事件,再次并发竞争,这就是羊群效应。这种加锁方式是非公平锁的具体实现:如何避免呢,我们看下面这种方式。
如上借助于临时顺序节点,可以避免同时多个节点的并发竞争锁,缓解了服务端压力。这种实 现方式所有加锁请求都进行排队加锁,是公平锁的具体实现
前面这两种加锁方式有一个共同的特质,就是都是互斥锁,同一时间只能有一个请求占用,如果 是大量的并发上来,性能是会急剧下降的,所有的请求都得加锁,那是不是真的所有的请求都需要加锁呢?答案是否定的,比如如果数据没有进行任何修改的话,是不需要加锁的,但是如果读数据的请求还没读完,这个时候来了一个写请求,怎么办呢?有人已经在读数据了,这个时候是不能写数据的,不然数据就不正确了。直到前面读锁全部释放掉以后,写请求才能执行,所以需要给这个读请求加一个标识(读锁),让写请求知道,这个时候是不能修改数据的。不然数据就 不一致了。如果已经有人在写数据了,再来一个请求写数据,也是不允许的,这样也会导致数据的不一致,所以所有的写请求,都需要加一个写锁,是为了避免同时对共享数据进行写 *** 作。
读写锁引起的问题-
读写并发不一致
-
双写不一致情况
配置类
package com.tuling.distributedlock.zkcfg; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.Curatorframework; import org.apache.curator.framework.CuratorframeworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class CuratorCfg { @Bean(initMethod = "start") public Curatorframework curatorframework(){ RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); Curatorframework client = CuratorframeworkFactory.newClient("192.168.1.104:2181", retryPolicy); return client; } }
Controller类
package com.tuling.distributedlock; import org.apache.curator.framework.Curatorframework; import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RestController; @RestController public class TestController { @Autowired private OrderService orderService; @Value("${server.port}") private String port; @Autowired Curatorframework curatorframework; @PostMapping("/stock/deduct") public Object reduceStock(Integer id) throws Exception { try { orderService.reduceStock(id); } catch (Exception e) { if (e instanceof RuntimeException) { throw e; } } return "ok:" + port; } }
Service类
package com.tuling.distributedlock; import com.tuling.distributedlock.entity.Order; import com.tuling.distributedlock.entity.Product; import com.tuling.distributedlock.mapper.OrderMapper; import com.tuling.distributedlock.mapper.ProductMapper; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import java.util.UUID; import java.util.concurrent.TimeUnit; @Service public class OrderService { @Autowired private ProductMapper productMapper; @Autowired private OrderMapper orderMapper; @Transactional public void reduceStock(Integer id){ // 1. 获取库存 Product product = productMapper.getProduct(id); // 模拟耗时业务处理 sleep( 500); // 其他业务处理 if (product.getStock() <=0 ) { throw new RuntimeException("out of stock"); } // 2. 减库存 int i = productMapper.deductStock(id); if (i==1){ Order order = new Order(); order.setUserId(UUID.randomUUID().toString()); order.setPid(id); orderMapper.insert(order); }else{ throw new RuntimeException("deduct stock fail, retry."); } } public void sleep(long wait){ try { TimeUnit.MILLISECONDS.sleep( wait ); } catch (InterruptedException e) { e.printStackTrace(); } } }
全局异常处理类
package com.tuling.distributedlock; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.HttpStatus; import org.springframework.web.bind.annotation.*; import java.util.HashMap; import java.util.Map; @ControllerAdvice public class ExceptionHandlerController { @ExceptionHandler @ResponseStatus(value = HttpStatus.BAD_REQUEST) @ResponseBody public Object exceptionHandler(RuntimeException e){ Mapresult=new HashMap<>( ); result.put( "status","error" ); result.put( "message",e.getMessage() ); return result; } }
package com.tuling.distributedlock.mapper; import com.tuling.distributedlock.entity.Product; import org.apache.ibatis.annotations.Param; import org.apache.ibatis.annotations.Select; import org.apache.ibatis.annotations.Update; public interface ProductMapper { @Select(" select * from product where id=#{id} ") Product getProduct(@Param("id") Integer id); @Update(" update product set stock=stock-1 where id=#{id} ") int deductStock(@Param("id") Integer id); }
package com.tuling.distributedlock.mapper; import com.tuling.distributedlock.entity.Order; import org.apache.ibatis.annotations.Insert; import org.apache.ibatis.annotations.Options; public interface OrderMapper { @Options(useGeneratedKeys = true,keyColumn = "id",keyProperty = "id") @Insert(" insert into `order`(user_id,pid) values(#{userId},#{pid}) ") int insert(Order order); }
package com.tuling.distributedlock.entity; public class Product { private Integer id; private String productName; private Integer stock; private Integer version; public Integer getId() { return id; } public void setId(Integer id) { this.id = id; } public String getProductName() { return productName; } public void setProductName(String productName) { this.productName = productName; } public Integer getStock() { return stock; } public void setStock(Integer stock) { this.stock = stock; } public Integer getVersion() { return version; } public void setVersion(Integer version) { this.version = version; } }
package com.tuling.distributedlock.entity; public class Order { private Integer id; private Integer pid; private String userId; public Integer getId() { return id; } public void setId(Integer id) { this.id = id; } public Integer getPid() { return pid; } public void setPid(Integer pid) { this.pid = pid; } public String getUserId() { return userId; } public void setUserId(String userId) { this.userId = userId; } }
package com.tuling; import org.mybatis.spring.annotation.MapperScan; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @MapperScan("com.tuling.distributedlock.mapper") @SpringBootApplication public class DistributedlockApplication { public static void main(String[] args) { SpringApplication.run(DistributedlockApplication.class, args); } }
server.port=8080 spring.datasource.url=jdbc:mysql://localhost:3306/pro?serverTimezone=UTC&useSSL=false&useUnicode=true&characterEncoding=UTF-8&allowPublicKeyRetrieval=true spring.datasource.username=root spring.datasource.password=1234 spring.datasource.driver-class-name=com.mysql.jdbc.Driver mybatis.configuration.map-underscore-to-camel-case=true
SET FOREIGN_KEY_CHECKS=0; -- ---------------------------- -- Table structure for order -- ---------------------------- DROP TABLE IF EXISTS `order`; CREATE TABLE `order` ( `id` int(11) NOT NULL AUTO_INCREMENT, `pid` int(11) DEFAULT NULL, `user_id` varchar(255) DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=1513 DEFAULT CHARSET=utf8mb4; -- ---------------------------- -- Table structure for product -- ---------------------------- DROP TABLE IF EXISTS `product`; CREATE TABLE `product` ( `id` int(11) NOT NULL AUTO_INCREMENT, `product_name` varchar(255) DEFAULT NULL, `stock` int(11) DEFAULT NULL, `version` int(11) DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4; INSERT INTO `product` VALUES ('1', '元旦大礼包', '0', '0');案例演示 数据库表初始化
此时可以明显看出10个线程抢购5个库存商品。库存剩下-5个商品,存在10个订单。此处存在并发问题
mysql mysql-connector-java5.1.34 runtime org.springframework.boot spring-boot-starter-weborg.mybatis.spring.boot mybatis-spring-boot-starter2.1.1 org.springframework.boot spring-boot-starter-testtest org.junit.vintage junit-vintage-engineorg.apache.curator curator-recipes5.0.0 org.apache.zookeeper zookeeperorg.apache.zookeeper zookeeper3.5.8
@RestController public class TestController { @Autowired private OrderService orderService; @Value("${server.port}") private String port; @Autowired Curatorframework curatorframework; @PostMapping("/stock/deduct") public Object reduceStock(Integer id) throws Exception { InterProcessMutex interProcessMutex = new InterProcessMutex(curatorframework, "/product_" + id); try { // ... interProcessMutex.acquire(); orderService.reduceStock(id); } catch (Exception e) { if (e instanceof RuntimeException) { throw e; } } finally { interProcessMutex.release(); } return "ok:" + port; } }
org.apache.curator curator-recipes5.0.0 org.apache.zookeeper zookeeperorg.apache.zookeeper zookeeper3.5.8
package zookeeper.leaderselector; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.Curatorframework; import org.apache.curator.framework.CuratorframeworkFactory; import org.apache.curator.framework.recipes.leader.LeaderSelector; import org.apache.curator.framework.recipes.leader.LeaderSelectorListener; import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter; import org.apache.curator.retry.ExponentialBackoffRetry; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; public class LeaderSelectorDemo { private static final String CONNECT_STR="192.168.1.104:2181"; private static RetryPolicy retryPolicy=new ExponentialBackoffRetry( 5*1000, 10 ); private static Curatorframework curatorframework; private static CountDownLatch countDownLatch = new CountDownLatch(1); public static void main(String[] args) throws InterruptedException { String appName = System.getProperty("appName"); Curatorframework curatorframework = CuratorframeworkFactory.newClient(CONNECT_STR, retryPolicy); LeaderSelectorDemo.curatorframework = curatorframework; curatorframework.start(); LeaderSelectorListener listener = new LeaderSelectorListenerAdapter() { public void takeLeadership(Curatorframework client) throws Exception { System.out.println(" I' m leader now . i'm , "+appName); TimeUnit.SECONDS.sleep(15); } }; LeaderSelector selector = new LeaderSelector(curatorframework, "/cachePreHeat_leader", listener); selector.autoRequeue(); // not required, but this is behavior that you will probably expect selector.start(); countDownLatch.await(); } }
其底层源码也使用分布式锁的原理
什么是注册中心,让众多的服务都在Zookeeper中进行注册,把自己的一些服务信息,比如IP,端口,还有一些更加具体的服务信息,都写到 zookeeper节点上, 这样有需要的服务就可以直接从zookeeper上面去拿,怎么拿呢? 这时我们可以定义统一的名称,比如, User-Service, 那所有的用户服务在启动的时候,都在User-Service 这个节点下面创建一个子节点(临时节点),这个子节点保持唯一就好,代表了每个服务实例的唯一标识,有依赖用户服务 的比如Order-Service 就可以通过User-Service 这个父节点,就能获取所有的User-Service 子 节点,并且获取所有的子节点信息(IP,端口等信息),拿到子节点的数据后Order-Service可 以对其进行缓存,然后实现一个客户端的负载均衡,同时还可以对这个User-Service 目录进行 监听, 这样有新的节点加入,或者退出,Order-Service都能收到通知,这样Order-Service重 新获取所有子节点,且进行数据更新。
这个用户服务的子节点的类型为临时节点。Zookeeper中临时节点生命周期是和SESSION绑定的,如果SESSION超时了,对应的节点会被删除,被删除时,Zookeeper 会通知对该节点父节点进行监听的客户端, 这样对应的客户 端又可以刷新本地缓存了。当有新服务加入时,同样也会通知对应的客户端,刷新本地缓存,要达到这个目标需要客户端重复的注册对父节点的监听。这样就实现了服务的自动注册和自动退出。
Spring Cloud 生态也提供了Zookeeper注册中心的实现,这个项目叫 Spring Cloud Zookeeper
user-center : 用户服务
product-center: 产品服务
用户调用产品服务,且实现客户端的负载均衡,产品服务自动加入集群,自动退出服务。
4.0.0 org.springframework.boot spring-boot-starter-parent2.3.5.RELEASE com.example user-center0.0.1-SNAPSHOT user-center Demo project for Spring Boot 1.8 Hoxton.SR8 org.springframework.boot spring-boot-starter-weborg.springframework.cloud spring-cloud-starter-zookeeper-discoveryorg.projectlombok lomboktrue org.springframework.boot spring-boot-starter-testtest org.junit.vintage junit-vintage-engineorg.springframework.cloud spring-cloud-dependencies${spring-cloud.version} pom import org.springframework.boot spring-boot-maven-plugin
配置文件
spring.application.name=user-center #zookeeper 连接地址 spring.cloud.zookeeper.connect-string=192.168.1.104:2181
启动类,配置了负载均衡器@LoadBalanced,可以轮询调用多实例
package com.example.usercenter; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.client.loadbalancer.LoadBalanced; import org.springframework.context.annotation.Bean; import org.springframework.web.client.RestTemplate; @SpringBootApplication public class UserCenterApplication { public static void main(String[] args) { SpringApplication.run(UserCenterApplication.class, args); } @Bean @LoadBalanced public RestTemplate restTemplate() { RestTemplate restTemplate = new RestTemplate(); return restTemplate; } }
Controller类
package com.example.usercenter; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cloud.client.ServiceInstance; import org.springframework.cloud.client.loadbalancer.LoadBalancerClient; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.client.RestTemplate; @RestController public class TestController { @Autowired private RestTemplate restTemplate; @Autowired private LoadBalancerClient loadBalancerClient; @GetMapping("/test") public String test() { return this.restTemplate.getForObject("http://product-center/getInfo", String.class); } @GetMapping("/lb") public String getLb(){ ServiceInstance choose = loadBalancerClient.choose("product-center"); String serviceId = choose.getServiceId(); int port = choose.getPort(); return serviceId + " : "+port; } }创建产品服务
4.0.0 org.springframework.boot spring-boot-starter-parent2.3.5.RELEASE com.example product-center0.0.1-SNAPSHOT product-center Demo project for Spring Boot 1.8 Hoxton.SR8 org.springframework.boot spring-boot-starter-weborg.springframework.cloud spring-cloud-starter-zookeeper-discoveryorg.projectlombok lomboktrue org.springframework.boot spring-boot-starter-testtest org.junit.vintage junit-vintage-engineorg.springframework.cloud spring-cloud-dependencies${spring-cloud.version} pom import org.springframework.boot spring-boot-maven-plugin
spring.application.name=product-center #zookeeper 连接地址 spring.cloud.zookeeper.connect-string=192.168.1.104:2181 #将本服务注册到zookeeper spring.cloud.zookeeper.discovery.register=true spring.cloud.zookeeper.session-timeout=30000
启动类,顺便也是Controller类
package com.example.productcenter; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; @SpringBootApplication @RestController public class ProductCenterApplication { @Value("${server.port}") private String port; @Value( "${spring.application.name}" ) private String name; @GetMapping("/getInfo") public String getServerPortAndName(){ return this.name +" : "+ this.port; } public static void main(String[] args) { SpringApplication.run(ProductCenterApplication.class, args); } }
package com.example.productcenter; import lombok.extern.slf4j.Slf4j; import org.springframework.cloud.client.discovery.event.HeartbeatEvent; import org.springframework.cloud.zookeeper.discovery.ZookeeperServiceWatch; import org.springframework.context.ApplicationListener; import org.springframework.stereotype.Component; @Component @Slf4j public class HeartbeatEventListener implements ApplicationListener{ @Override public void onApplicationEvent(HeartbeatEvent event) { Object value = event.getValue(); ZookeeperServiceWatch source = (ZookeeperServiceWatch)event.getSource(); log.info(" event:source: {} ,event:value{}",source.getCache().getCurrentChildren("/services"),value.toString()); } }
package com.example.productcenter; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.cloud.zookeeper.serviceregistry.ServiceInstanceRegistration; import org.springframework.cloud.zookeeper.serviceregistry.ZookeeperRegistration; import org.springframework.cloud.zookeeper.serviceregistry.ZookeeperServiceRegistry; import org.springframework.stereotype.Component; @Component public class ApplicationRunner1 implements ApplicationRunner{ @Autowired private ZookeeperServiceRegistry serviceRegistry; @Override public void run(ApplicationArguments args) throws Exception { ZookeeperRegistration registration = ServiceInstanceRegistration.builder() .defaultUriSpec() .address("anyUrl") .port(10) .name("/a/b/c/d/anotherservice") .build(); this.serviceRegistry.register(registration); } }案例演示
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)