- 写在前面
- 需求说明
- 阶段1需求
- 阶段2需求
- 阶段3需求
- 阶段4需求
- 阶段5需求
- 阶段6需求
- 实现
- 数据库设计
- 接口编写
- jmeter测试
- 使用@cacheable注解
- 改为纯Redis
- 改为kafka
- jwt方案
- nginx使用
- ES学习
- 改为ES
本文是我毕业后入职的第一家公司的第一周入职培训所布置的任务,主要是考察一些java的基础知识,以及一些数据库的基本使用,文章过于入门,请大佬走开。
需求说明 阶段1需求- 纯后台,不需要前端
- 增加接口,增加商品(商品名称、商品型号、生产厂家、添加时间、商品编号(主键)、商品状态[未售出、已售出]),其中商品编号用雪花算法生成
- 增加接口,商品查询(查询条件为 商品名称[商品型号][生产厂家]或根据商品编号直接查询),考虑复合索引
- 增加接口,售出商品(商品名称、商品型号、添加时间),编辑商品状态即可
- 使用jmeter测试效果,录入10W个商品,同时售出5W个商品
- 使用springboot+mybatis+mysql8+swagger
阶段1主要是springboot对mysql数据库的基础 *** 作吧,这部分还是比较简单。
阶段2需求- 纯后台,不需要前端将商品查询接口改为redis缓存,加入@cacheable注解
- 了解redis的五种数据结构及常用命令
阶段2就加几个注解,背会儿redis相关的东西吧
阶段3需求- 将之前的系统改造为纯redis实现
- 引入lambda stream
阶段3就是由对mysql数据库 *** 作改为对redis数据库的 *** 作
阶段4需求- 学习什么是MQ,什么是kafka、生产者、消费者组、消费者、topic、partition、单播广播,及kafka原理
- 增加商品改为kafka生产,多线程入库
- 售出商品改为kafka消费,多线程出库
这个阶段就是改成使用kafka来当做"数据库"了吧,除此之外经理还让我下去了解以下这几个问题,也记录上吧。。。
问题1:什么是MQ?
问题2:什么是kafka
问题3:生产者
问题4:消费者组
问题5:消费者
问题6:topic
问题7:partition
问题8:单播广播
问题9:kafka原理。
- 了解分布式系统中session的解决方案(客户端或服务端)
- 增加登录接口,jwt方案
- 部署2-3个程序,前置nginx,验证jwt,给出验证方案
这个阶段我就属实看不懂了,在学校也都单机没研究过分布式问题。。。
阶段6需求- 学习Elasticsearch
- 程序全改用ES,入10w卖5W
又改数据库了吧。。不过最后还是都了解到了redis,kafka主要作为中间件,作为缓存、暂时存放的数据库,ES作为主要用来查询的数据库。同样针对这个阶段的需求1,经理提出了以下问题
问题1:shard分片 replicas副本 index template索引模板 常见数据类型
问题2:luence 与 ES的关系
问题3:ELK是什么以及三个的关系
问题4:ES存储 字符串 有哪两种数据类型,这两种的区别以及原理【重要】
问题5:ES检索(query)与过滤(filter)的区别是什么(本质)(问题3关联)
问题6:DSL基本语法(K)–查询仓库中所有状态为已售出的数据10W(分页 浅?深?)
问题7:ES分页 浅分页 深分页 区别是什么,怎么用
问题8:bulkbulk processor是什么,什么时候用)(刁钻问题 bulk和bulk processor的区别)
问题9:官方JAVA CLIENT(highLevel client/lowLevel client 区别)
第一天,开始整吧,首先设计数据库,需求中方括号指的是可选条件。
一些基本类型,状态是枚举,主键使用雪花算法。之后是根据查询条件建立联合索引。
DROp TABLE IF EXISTSmy_stock`;
CREATE TABLE my_stock (
id bigint NOT NULL COMMENT ‘商品编号’,
goods_name varchar(100) DEFAULT NULL COMMENT ‘商品名称’,
goods_type varchar(100) DEFAULT NULL COMMENT ‘商品型号’,
manufacturer varchar(100) DEFAULT NULL COMMENT ‘生产厂家’,
add_date datetime DEFAULT NULL COMMENT ‘添加时间’,
status tinyint DEFAULT NULL COMMENT ‘商品状态’,
PRIMARY KEY (id),
UNIQUE KEY goods_unique_indeies (goods_name,goods_type,add_date) USING BTREE,
KEY goods_indeies (goods_name,goods_type,manufacturer) USING BTREE,
KEY goods_indeies1 (goods_name,manufacturer) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;`
之后是编写实体类
@Data @AllArgsConstructor @NoArgsConstructor public class Goods implements Serializable { private Long id; private String goodsName; private String goodsType; private String manufacturer; private Status status; private Date addDate; }
商品状态是0未售出,1已售出的枚举,过于简单就不放代码了
不过主要是要实现TypeHandler来让数据库更方便的储存枚举(mybatisplus直接使用一个注解就好了)
@MappedJdbcTypes(JdbcType.TINYINT) public class StatusHandle extends baseTypeHandler{ @Override public void setNonNullParameter(PreparedStatement ps,int i,Status param,JdbcType jt) throws SQLException { ps.setInt(i,param.getStateCode()); } private Status getState(int code){ if(code==0||code==1){ return Status.getStateByCode(code); } return null; } @Override public Status getNullableResult(ResultSet rs,String columnName) throws SQLException{ int statuscode=rs.getInt(columnName); return getState(statuscode); } @Override public Status getNullableResult(ResultSet rs,int columnName) throws SQLException{ int statuscode=rs.getInt(columnName); return getState(statuscode); } @Override public Status getNullableResult(CallableStatement cs, int columnName) throws SQLException{ int statuscode=cs.getInt(columnName); return getState(statuscode); } }
其次是最好是类名与字段名与数据库的一致
最后是雪花算法,说实话当时看到需求根本不认识雪花算法,心里还咯噔一下。
之后了解到雪花算法其实就是指使用64位bit作为唯一id,因为在分布式模式中全局唯一id是非常重要的
其中第一位没有意义,统一为0,之后41位是时间戳,之后10位分别是机房的5位id和机器的5位id。然后一个机房的一个机器在一毫秒内可能生成多个id,所以最后12位就区分这种情况。
既然知道了原理,之后我们从网上copy一个雪花算法工具类就行了。。。。
基本的增删改查。
首先是添加:
请求类DTO为
@Data @ApiModel(description = "商品") public class GoodsDTO implements { @ApiModelProperty(value = "商品名称") @NotBlank(message = "商品名称不能为空") private String goodsName; @ApiModelProperty(value = "商品类型") private String goodsType; @ApiModelProperty(value = "生产厂家") private String manufacturer; }
id由内部雪花算法生成,添加时间为调用接口时间,售出状态刚添加肯定为未售出。如果有枚举作为入参的需求可以看这篇 枚举反序列化
之后是mapper的insert语句
INSERT INTO my_stock VALUES (#{id}, #{goodsName}, #{goodsType}, #{manufacturer}, #{addDate}, #{status})
然后是查询商品,这个阶段最难的地方,主要是可以通过各种条件进行查询,所以当时写mapper还整了挺久的
最后是售出商品,只是把售出状态改为已售出而已
jmeter测试UPDATE my_stock set status = 1 where goods_name = #{goodsName} and goods_type = #{goodsType} and add_date = #{addDate}
jmeter是啥???我直接问经理,经理告诉我就只是个压力测试工具,很简单的!(经理的口头禅),之前那个实习生10分钟就学会啦。
好吧,先康康咋用的
新建一个http请求,输入地址端口路径和参数,然后回到Thread Group选择线程数和循环数,需求是录入10w,那我就500个线程循环200次吧,之后打开navicat查询select count来看数量有10条,成功。
售出的话其他 *** 作大差不差,但是问题是jmeter里id怎么给5w个?百度:使用csv。好吧,先用navicat查出5w个id并保存为csv,之后将csv存入jmeter(新建CSV Data Set Config,并设置csv文件路径和参数)之后就ok拉。
第二天,加入redis的缓存机制,在查询的方法上加入
@Cacheable(value = "goodsCache",key="'goods'+#goodsDTO.goodsName")
以及在启动类上加入@EnableCacheing
就实现第一次查询时会同时保存至redis数据库,之后就会先去redis里查数据了,redis作为缓存中间件。
最后经理说,写的不戳,可是你的redis配置在哪呢?
我才突然想起来我忘记配置redis了,可是数据确确实实保存到了redis了啊,我跟经理说:我不写配置就是默认使用本机地址和默认端口。经理说这个我知道,可是你redis依赖都没有呢。
对哦,那为啥会保存到redis了?经理叫我下去研究,我至今都不知道为啥,如果有人知道希望评论告诉我。
至于redis的常用命令建议看redis菜鸟教程
第三天。。
增删改查大差不差,就拿查询作为例子吧,简简单单的添加,可需求的联合索引查询条件咋办呢,redis有像mysql这样的联合索引吗,我是没有找到,所以我用了最笨的方法,插入一条数据时是对redis里插入了6条数据(分别对应6个查询条件)
代码:
@Override public int addGoods(GoodsDTO goods){ HashMapgoodsmap=new HashMap<>(); goodsmap.put("id",goods.getId().toString()); goodsmap.put("goodsName",goods.getGoodsName()); goodsmap.put("goodsType",goods.getGoodsType()); goodsmap.put("manufacturer",goods.getManufacturer()); goodsmap.put("addDate",goods.getAddDate()); goodsmap.put("Status",goods.getStatus().getStateString()); try { Jedis jedis = new Jedis("localhost", 6379); Transaction transaction = jedis.multi(); transaction.hmset(RedisKey.getGoodsKey(goods.getId().toString()), goodsmap); transaction.hmset(RedisKey.getGoodsKey(goods.getGoodsName()), goodsmap); transaction.hmset(RedisKey.getGoodsKey(goods.getGoodsName() + goods.getGoodsType()), goodsmap); transaction.hmset(RedisKey.getGoodsKey(goods.getGoodsName() + goods.getManufacturer()), goodsmap); transaction.hmset(RedisKey.getGoodsKey(goods.getGoodsName() + goods.getGoodsType() + goods.getManufacturer()), goodsmap); transaction.hmset(RedisKey.getGoodsKey(goods.getGoodsName() + goods.getGoodsType() + goods.getAddDate()), goodsmap); transaction.exec(); log.info("添加完成"); jedis.close(); return 1; } catch (Exception e){ log.error("添加事务执行失败",e); return 0; } }
记得使用事务,得保证同时插入6条。
查询是方便了,后果是售出商品的话,如果我售出一个商品,那么需要对6个redis的key修改为已售出。。。。
lambda学是学了,但没找到可以用的地方。
第四天。。
现在不需要查询商品了,首先是生产者添加商品
编写生产者工具类
@Component @Slf4j public class ProducerUtil { private final KafkaProducerkafkaProducer; public ProducerUtil(){ Properties properties=new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class); kafkaProducer=new KafkaProducer<>(properties); } public void produce(String topic,String date){ kafkaProducer.send(new ProducerRecord<>(topic,date)); log.info("发送成功"); } }
只需要在调接口时,调用工具类的produce方法传入topic和数据就行了
之后是消费者售出商品,经理是让我使用jemeter添加10w条,然后再启一个程序运行来消费5w条,我直接用测试写了
@SpringBootTest class StockManageApplicationTests { @Test void contextLoads() { String topic = "mytest"; Properties p = new Properties(); p.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); p.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); p.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); p.put(ConsumerConfig.GROUP_ID_CONFIG, "my_consumer_test"); p.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); KafkaConsumerkafkaConsumer = new KafkaConsumer<>(p); kafkaConsumer.subscribe(Arrays.asList(topic));// 订阅消息 int count = 1; while (true) { ConsumerRecords records = kafkaConsumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord record : records) { record.value(); System.out.println("已消费" + count + "条"); count++; } } } }
注意topic要保持一致。
关于提的问题:
问题1:什么是MQ?
回答1:消息队列
问题2:什么是kafka
回答2:分布式发布消息订阅系统
问题3:生产者
回答3:产生消息的个体
问题4:消费者组
回答4:多个消费者实例的组成
问题5:消费者
回答5:订阅消息的个体
问题6:topic
回答6:存放消息,指定消息被放入某一个队列里
问题7:partition
回答7:topic的数据分割为一个或多个的分区
问题8:单播广播
回答8:广播即将消息发送给所有消费者,单播即发送给某一个消费者
问题9:kafka原理。
回答9:生产者发布消息topic到broker上,每条信息追加顺序写入到不同分区,kafka将数据持久化到磁盘,消费者订阅某个topic的消息进行消费。
第五天。。。
首先是了解了分布式系统怎么统一session的问题的解决方案。
第一个是session复制:既一个用户发起请求,每台服务器都保存一个session,不过这样做服务器压力很大。
第二个是存储在客户端的cookie中,这样就可以不用存在服务器啦,不过这个方案缺点很严重,不仅需要用户宽带资源保存,cookie还有大小限制,而且还不安全。
第三个是四层代理:通过反向代理用户ip来作hash,以此保证每个用户落在同一个服务器上。不过缺点是如果这台服务器拉闸了就需要用户重新登录了。
第四个是保存到数据库里。利用中间件redis作为缓存。缺点是增加一次网络请求。
最后一个就是本次要使用的方案,用客户端保存session,每次请求的时候传递session到服务端,服务端来验证session是否正确有效。
jwt只有该方案的一种实现方式,而token就是客户端用来登录的凭证。
token由三部分组成
header:定义token的类型以及签名signature的生成算法。
payload:除了jwt声明的exp(token有效期)等其他已经声明的信息外,还可以自己添加一些key,比如用户名,以此来获得发起请求的用户名。
signature:token的签名,请求时以此来判断token是否被修改,登录是否有效。
流程是,用户发起登录请求,服务端返回token给客户端,之后客户端再次发起请求时就会附带token而服务端会验证token判断登录。实际上验证token跟生成token是一个样的,都是生成一个token,只是验证时会判断附带的生成的是否一致。
了解完这些开始敲代码吧
首先是登录接口,就是简单的判断一下用户名密码和token
这是我写的token工具类
@Slf4j public class TokenUtil { //设置过期时间 private static final long EXPIRE_DATE=10000000; //token秘钥 private static final String TOKEN_SECRET = "ZCfasfhuaUUHufguGuwu2021BQWE"; public static String token (String userAccount){ String token = ""; //过期时间 Date date = new Date(System.currentTimeMillis()+EXPIRE_DATE); //秘钥及加密算法 Algorithm algorithm = Algorithm.HMAC256(TOKEN_SECRET); //设置头部信息 Mapheader = new HashMap<>(); header.put("typ","JWT"); header.put("alg","HS256"); //携带userId,username,password信息,生成签名 token = JWT.create() .withHeader(header) // .withClaim("userId",userId) .withClaim("userAccount",userAccount) // .withClaim("userName",userName) .withExpiresAt(date) .sign(algorithm); return token; } public static String verify(String token) { try { JWTVerifier jwtVerifier = JWT.require(Algorithm.HMAC256(TOKEN_SECRET)).build(); DecodedJWT decodedJWT = jwtVerifier.verify(token); return decodedJWT.getClaim("userAccount").asString(); } catch (Exception e) { log.error("token验证方法错误",e); return ""; } } }
而在Service层就可以验证一下token
public ApiResultDTOnginx使用verify(String token){ log.info(token); if(token==null|| "".equals(token)){ return ApiResultDTO.error("用户未登录"); } else { String userAccount= TokenUtil.verify(token); return ApiResultDTO.success("现在的登录用户帐号为:"+userAccount); } }
来试验分布式案例,nginx作为均衡负载的服务器,分布式环境下还是得学的。
主要就是写nginx的配置文件
upstream api{ #定义自己需要代理的服务器 server localhost:8080; server localhost:8081; } server { listen 80; #端口 server_name localhost; #地址 location / { proxy_pass http://api/; #转向自己定义的api服务器列表 }
配置好 后,启动nginx,打开自己配置的代理服务器,再启动2个springboot,端口为8080和8081,每次登录成功后就返回端口号,后面验证端口号一直在变,说明nginx代理成功了。
ES学习第二周的第一天。这个需求给了我2天时间,还算充裕吧。首先是学习Elasticsearch,所以先把问题回答了吧。
问题1:shard分片 replicas副本 index template索引模板 常见数据类型
回答:分片是es中的最小数据单元,由一个index拆分为一个或多个分片,每一个分片都是一个luence索引实例。
副本是每一个主分片的复制。
通过模板定义好了mapping,只要index的名称被模板匹配到,那么该index的mapping就按照模板中定义的mapping自动创建。
text,keyword,integer,long,short,boolean,double,flaot,date
问题2:luence 与 ES的关系
回答:luence是信息检索工具jar包,而ES是基于lucene的搜索引擎。
问题3:ELK是什么以及三个的关系
回答:E是elasticsearch。
L是Logstash,作为ELK的中央数据流引擎,从各个目标收集不同格式的数据,经过过滤后输出到各个目地。
K是kibana,可以将es的数据可视化展示,并提供了分析的功能。
问题4:ES存储 字符串 有哪两种数据类型,这两种的区别以及原理【重要】
text,存储数据的时候会自动分词并生成索引,不支持排序、聚合等。
keyword,存储时不会分词,直接建立索引,支持排序和聚合。
问题5:ES检索(query)与过滤(filter)的区别是什么(本质)(问题3关联)
query在查询时不仅会判断是否包含查询条件,还会判断结果相关匹配程度,_score越高搜索时排名越靠前
而filter只会查询到是或不是满足查询条件的结果。
问题6:DSL基本语法(K)–查询仓库中所有状态为已售出的数据10W(分页 浅?深?)
POST /test/_search?scroll=3m { "query": { "term": { "Status.keyword": { "value": "已售出" } } } }
问题7:ES分页 浅分页 深分页 区别是什么,怎么用
浅分页指对数据量不大的数据进行分页
from+size的浅分页
scroll深分页
search_after深分页
问题8:bulkbulk processor是什么,什么时候用(刁钻问题 bulk和bulk processor的区别)
bulk是Elasticsearch的restful语法中的一个方法。
bulk processor是java中ES中的一个类。
批量 *** 作时使用
问题9:官方JAVA CLIENT(highLevel client/lowLevel client 区别)
低级客户端是用户自己处理参数的封装以及结果的解析。
高级客户端基于低级客户端,内部处理参数封装与结果的解析。
第二天。
首先是添加商品:
@Override public int addGoods(GoodsDTO goods){ HashMapgoodsmap=new HashMap<>(); goodsmap.put("id",goods.getId().toString()); goodsmap.put("goodsName",goods.getGoodsName()); goodsmap.put("goodsType",goods.getGoodsType()); goodsmap.put("manufacturer",goods.getManufacturer()); goodsmap.put("addDate",goods.getAddDate()); goodsmap.put("Status",goods.getStatus().getStateString()); BulkRequest bulkRequest=new BulkRequest(); bulkRequest.add(new IndexRequest(INDEX).id(String.valueOf(goodsmap.remove("id"))).source(goodsmap)); try { restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT); return 1; } catch (IOException e){ log.error("插入时错误",e); return 0; } }
这里需要注意下是由于source始终是要保存偶数个值,所以如果是奇数还会报错,所以尽量我都使用map来保存值。
难点是售出商品,由于ES最大查询数量是1w条,所以10w肯定不行,需要用的深分页,这里我选择的深分页方式是scroll。
@Override public int sellGoods(){ //首先是得到1w个数据 SearchRequest searchRequest=new SearchRequest(INDEX); SearchSourceBuilder sourceBuilder=new SearchSourceBuilder(); sourceBuilder.query(QueryBuilders.matchAllQuery()); sourceBuilder.from(0); sourceBuilder.size(10000); searchRequest.source(sourceBuilder); searchRequest.scroll(Timevalue.timevalueMinutes(3L)); SearchResponse searchResponse=null; try { searchResponse= restHighLevelClient.search(searchRequest,RequestOptions.DEFAULT); } catch (IOException e){ log.error("得到数据时错误",e); return 0; } //然后是售出 String scrollid=searchResponse.getScrollId(); SearchHit[] searchHits=searchResponse.getHits().getHits(); //循环5次,即5w个 for (int i = 0; i < 5; i++) { for (SearchHit row:searchHits ) { update(row); } SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollid); scrollRequest.scroll(SCROLL); try { searchResponse = restHighLevelClient.scroll(scrollRequest, RequestOptions.DEFAULT); } catch (IOException e){ log.error("scroll得到数据错误",e); return 0; } scrollid=searchResponse.getScrollId(); searchHits=searchResponse.getHits().getHits(); } return 1; }
private void update(SearchHit hit){ UpdateRequest updateRequest=new UpdateRequest(INDEX,hit.getId()); updateRequest.doc("Status",Status.SOLD.getStateString()); try { restHighLevelClient.update(updateRequest, RequestOptions.DEFAULT); }catch (IOException e){ log.error("售出时错误",e); } }
that’s all,all finished!
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)