阿里云安装redis服务器+入门学习笔记【零基础】【狂神】

阿里云安装redis服务器+入门学习笔记【零基础】【狂神】,第1张

狂神视频地址

文章目录
  • 1.阿里云安装redis
    • redis-benchmark
  • 测试: 100个并发连接 100000次请求
    • 一些常用命令
  • 2.redis的数据类型
    • 1.五大数据类型
      • 1.String(字符串)(基础代码 *** 作)
      • 2.List(列表)
      • 3.Set(集合)
      • 4.Hash(哈希)
      • 5.Zset(有序集合)
    • 2.三大特殊数据类型
      • 1.geospatial(地理位置)
      • 2.hyperloglog
      • 3.bitmaps
  • 3.redis的事务
  • 4.redis的监控
    • 1.悲观锁
    • 2.乐观锁
  • 5.Jedis
    • 1.在IDEA中连接阿里云服务器端redis
    • 2.常用API
      • 1.key
      • 2.String
      • 3.List
      • 4.Set
      • 5.Hash
    • 3.事务
  • 6.springboot整合redis
      • 源码分析
      • 整合测试
      • 自定义redis配置类
      • RedisUtils工具类
  • 7.redis.conf文件配置
  • 8.Redis的持久化
    • 1.RDB(Redis DataBase)
    • 2.AOF(Append Only File)
    • 3.总结
  • 9.redis的发布订阅
    • 简单介绍
    • 命令
    • 测试
  • 10.redis的主从复制
    • 概念
    • 环境配置(伪集群)
    • 一主二从
    • 细节
    • 主从复制的复制原理
    • 宕机后手动配置主机
    • 哨兵模式
      • 概念
      • 测试
      • 哨兵模式的优缺点
      • 哨兵模式配置说明
  • 11.缓存穿透和雪崩(面试高频、工作常用)
    • 缓存穿透(秒杀)
    • 缓存击穿(热搜)
    • 缓存雪崩

1.阿里云安装redis
  • sudo apt-get install -y redis-server

  • 进入 /etc/redis目录

  • 打开redis.conf文件

  • SECURITY下的requirepass注释打开,更改为自定义的密码

  • 重启redis,测试是否安装成功


  • redis-server的停止、启动和重启

上述方法只是提供了一种redis-server停止的方式,实际上我们通过apt安装之后redis-server是默认帮我们开启的,而且你在redis-cli中shutdown之后只关闭redis-cli不会同时关掉你的redis-server,另外上述方法中的kill -9 进程号 是没有用的,因为redisdaemonize守护线程模式通过apt安装时是默认开启的,你杀死一个进程,系统会默认再帮你开一个进程,永远也杀不死。

  • 查看redis相关进程

注意这里如果没有进一步 *** 作,那么只能通过这种/etc/init.d/redis-server stop的方法来启动redis服务器,虽然这样的一个好处是shutdown之后不用重新启动redis,但是如果以后要搭建集群或者自定义一些 conf 配置的话, *** 作起来还是有难度的,所以这里还提供了第二种启动 *** 作

  1. /usr/bin 目录下新建一个bqconfig 文件夹,用来存放自己的一些配置,名字自定义即可

  2. bqconfig文件夹下新建 redis 文件夹,用来存放关于 redis 的配置文件

  3. /etc/redis文件夹下的 redis.conf文件拷贝一份放在 usr/bin/bqconfig/redis文件夹下

  4. 修改redis.conf配置,确保daemonize守护线程模式是开启状态(yes)。

  5. 经过这几步 *** 作之后,就可以使用 redis-server /bqconfig/redis/redis.conf (注意这是在 /usr/bin 目录的前提下,如果在其他目录下需要将路径补充完整 )来启动redis-server了,这种方法可以开启不同配置的redis服务器,只需启动不同的redis.conf文件即可。

  6. 与此同时,在redis-cli中执行shutdown *** 作时,redis-server也会同时被关闭,下次再使用时需要重新启动。

redis-benchmark

是一个压力测试工具,是官方自带的性能测试工具

简单测试

测试: 100个并发连接 100000次请求
redis-benchmark -h localhost -p 6379 -c 100 -n 100000

一些常用命令
127.0.0.1:6379> select 3   # 切换数据库(redis默认有16个数据库,从0开始编号)
OK
127.0.0.1:6379[3]> dbsize  # 查看当前数据库大小(指占用空间大小)
(integer) 0
127.0.0.1:6379[3]> 

127.0.0.1:6379> keys *  # 查看当前数据库所有的key
1) "test"
127.0.0.1:6379> 

127.0.0.1:6379> flushall  # 清空所有数据库内容

127.0.0.1:6379> flushdb   # 清空当前数据库内容

exists xx   # 判断key xx是否存在,存在返回1,不存在返回0
move xx 1   # 将当前数据库中的 xxkey移动到 1号数据库中
expire xx 10  # 设置xxkey的过期时间为 10s,10s之后,xxkey的value自动清空
type xx     # 查看xxkey的数据类型
2.redis的数据类型
  1. redis 是一个开源的,基于内存的数据结构存储系统,它可以用作数据库、缓存和消息中间件MQ。

  2. 支持多种类型的数据结构,如 字符串(string),散列(hash),列表(list),集合(set),有序集合(sorted set)与范围查询,bitmaps,hyperloglogs 和地理空间(geospatial)索引半径查询。

  3. 内置了 复制(replication),LUA脚本(Lua scripting),LRU驱动事件(LRU eviction),事务(transaction)和不同级别的磁盘持久化(persistence)。

  4. 通过redis哨兵(Sentinel)和自动分区(Cluster)提供高可用性(high availability)。

1.五大数据类型 1.String(字符串)(基础代码 *** 作)
127.0.0.1:6379> flushdb 
OK
127.0.0.1:6379> set key1 v1
OK
127.0.0.1:6379> get key1
"v1"
127.0.0.1:6379> exists key1
(integer) 1
127.0.0.1:6379> append key1 "hello"  # 在key1的value后方追加字符串,如果key1在数据库中不存在,那么创建这个key1,并将其value置为原本要追加的内容
(integer) 7
127.0.0.1:6379> get key1
"v1hello"
127.0.0.1:6379> strlen key1          # 获取key1的value的长度
(integer) 7
127.0.0.1:6379> append key1 ",world"
(integer) 13
127.0.0.1:6379> strlen key1
(integer) 13
127.0.0.1:6379> get key1
"v1hello,world"
127.0.0.1:6379> 

####################################################################################

127.0.0.1:6379> set views 0
OK
127.0.0.1:6379> get views
"0"
127.0.0.1:6379> incr views   # 设置自增
(integer) 1
127.0.0.1:6379> incr views
(integer) 2
127.0.0.1:6379> incr views
(integer) 3
127.0.0.1:6379> get views
"3"
127.0.0.1:6379> decr views   # 设置自减
(integer) 2
127.0.0.1:6379> decr views
(integer) 1
127.0.0.1:6379> decr views
(integer) 0
127.0.0.1:6379> get views
"0"
127.0.0.1:6379> 

127.0.0.1:6379> incrby views 10		 # 指定步长自增
(integer) 10
127.0.0.1:6379> incrby views 10
(integer) 20
127.0.0.1:6379> get views
"20"
127.0.0.1:6379> decrby views 10 	 # 指定步长自减
(integer) 10
127.0.0.1:6379> decrby views 10
(integer) 0
127.0.0.1:6379> get views
"0"
127.0.0.1:6379> 

####################################################################################
127.0.0.1:6379> set key2 "hello,world"
OK
127.0.0.1:6379> get key2
"hello,world"

# 截取
127.0.0.1:6379> getrange key2 0 4  # 截取key2的value字符串 可以指定截取范围,与substring功能类似,但是getrange为双闭区间
"hello"
127.0.0.1:6379> getrange key2 6 -1 # -1代表最后一个位置,即倒数第1个位置,-n为倒数第n个位置
"world"
127.0.0.1:6379> getrange key2 0 -1
"hello,world"
127.0.0.1:6379> 

# 替换
127.0.0.1:6379> setrange key2 6 bingbing  # 将key2的value字符串,从下标6位置开始,将后边的字符串替换为新的字符串 bingbing
(integer) 14
127.0.0.1:6379> get key2
"hello,bingbing"
127.0.0.1:6379> 

####################################################################################
# setex (set with expire)   设置过期时间

127.0.0.1:6379> setex key3 30 hello
OK
127.0.0.1:6379> get key3
"hello"
127.0.0.1:6379> ttl key3
(integer) 8
127.0.0.1:6379> get key3
"hello"
127.0.0.1:6379> ttl key3
(integer) 1
127.0.0.1:6379> ttl key3
(integer) -2
127.0.0.1:6379> get key3
(nil)

# setnx (set if not exist)  如果不存在则设置,存在则不处理(在分布式锁中会常常使用)

127.0.0.1:6379> setnx mykey bingbing
(integer) 1
127.0.0.1:6379> get mykey
"bingbing"
127.0.0.1:6379> get key2
"hello,bingbing"
127.0.0.1:6379> setnx key2 bingbing
(integer) 0
127.0.0.1:6379> get key2
"hello,bingbing"
127.0.0.1:6379> 

####################################################################################
# 批量设置
127.0.0.1:6379> mset k1 v1 k2 v2 k3 v3  # 批量设置k1 k2 k3 的值 为 v1 v2 v3
OK
127.0.0.1:6379> keys *
1) "k2"
2) "k1"
3) "k3"
127.0.0.1:6379> keys *
1) "k2"
2) "k1"
3) "k3"
127.0.0.1:6379> mget k1 k2 k3          # 批量取出 k1 k2 k3 的值
1) "v1"
2) "v2"
3) "v3"
127.0.0.1:6379> msetnx k1 v1 k4 v4     # 原子性 *** 作 只要有一个已经存在 则这行命令全部都不会被执行
(integer) 0
127.0.0.1:6379> get k4
(nil)
127.0.0.1:6379> 

####################################################################################
# 设置对象 
# set user:1 {name:zhangsan,age:3}  --> 设置一个user1对象,其值使用json字符串来保存一个对象
# 这里的 key 是一个很巧妙的设计: user:{id}:{field} ,如此设计在redis中是完全ok的

127.0.0.1:6379> mset user:1:name zhangsan user:1:age 2  # 可以理解为,多级key,本例中第一级为user,第二级为1,第三级为 name 和 age
OK
127.0.0.1:6379> mget user:1:name user:1:age
1) "zhangsan"
2) "2"
127.0.0.1:6379> 

####################################################################################
# getset 先get 后 set 如果不存在,则返回null并设置新值,如果存在则返回值并更新新值

127.0.0.1:6379> getset db redis
(nil)
127.0.0.1:6379> get db
"redis"
127.0.0.1:6379> getset db mongodb
"redis"
127.0.0.1:6379> get db
"mongodb"
127.0.0.1:6379> 


2.List(列表)

在redis里面,我们可以把list完成 栈 \textcolor{red}{栈} 队 列 \textcolor{red}{队列} 阻 塞 队 列 \textcolor{red}{阻塞队列}

# 所有的list命令都是以 l 开头的

####################################################################################
# 插入元素  lpush  rpush
127.0.0.1:6379> lpush test_list 1   # 将数据插入列表左端(把列表想象成一根管子)
(integer) 1
127.0.0.1:6379> lpush test_list 2
(integer) 2
127.0.0.1:6379> lpush test_list 3
(integer) 3
127.0.0.1:6379> lrange test_list 0 -1  # 将数据取出 从左到右读取
1) "3"
2) "2"
3) "1"
127.0.0.1:6379> rpush test_list 4   # 将数据插入列表右端
(integer) 4
127.0.0.1:6379> lrange test_list 0 -1
1) "3"
2) "2"
3) "1"
4) "4"
127.0.0.1:6379> 

####################################################################################
# 移除元素 lpop rpop  (此时也获取到了要删除的元素,准确的说 pop属于 移除并获取)

127.0.0.1:6379> lrange test_list 0 -1 
1) "3"
2) "2"
3) "1"
4) "4"
127.0.0.1:6379> lpop test_list  # 移除左侧元素
"3"
127.0.0.1:6379> lrange test_list 0 -1
1) "2"
2) "1"
3) "4"
127.0.0.1:6379> rpop test_list  # 移除右侧元素
"4"
127.0.0.1:6379> lrange test_list 0 -1
1) "2"
2) "1"
127.0.0.1:6379> 

####################################################################################
# 获取list中对应下标的元素(这个只能以左起始,下标从0开始)(想从右起始的话,得+1取反,比如说,从右往左,第0位,为 -1,第1位,为 -2)

127.0.0.1:6379> lrange test_list 0 -1
1) "2"
2) "1"
127.0.0.1:6379> lindex test_list 0
"2"
127.0.0.1:6379> lindex test_list 1
"1"
127.0.0.1:6379> lindex test_list -1
"1"
127.0.0.1:6379> lindex test_list -2
"2"
127.0.0.1:6379> 

####################################################################################
# 获取list的长度 llen

127.0.0.1:6379> lrange test_list 0 -1
1) "2"
2) "1"
127.0.0.1:6379> llen test_list
(integer) 2
127.0.0.1:6379> 

####################################################################################
# 移除指定的元素 lrem 应用场景:取消关注 

127.0.0.1:6379> lrange list 0 -1
1) "3"
2) "3"
3) "2"
4) "1"
127.0.0.1:6379> lrem list 1 1  # 移除list集合中指定个数的value,精确匹配
(integer) 1
127.0.0.1:6379> lrange list 0 -1
1) "3"
2) "3"
3) "2"
127.0.0.1:6379> lrem list 2 3
(integer) 2
127.0.0.1:6379> lrange list 0 -1
1) "2"
127.0.0.1:6379> 

####################################################################################
# 截断  ltrim  只保留list中指定区间内的元素(双闭区间)

127.0.0.1:6379> lrange list 0 -1
1) "hello1"
2) "hello2"
3) "hello3"
4) "hello4"
127.0.0.1:6379> ltrim list 1 2
OK
127.0.0.1:6379> lrange list 0 -1
1) "hello2"
2) "hello3"
127.0.0.1:6379> 

####################################################################################
# rpoplpush  剪切列表的最右侧元素,将其从左侧粘贴到另外一个列表

127.0.0.1:6379> lrange list1 0 -1
1) "hello1"
2) "hello2"
3) "hello3"
127.0.0.1:6379> rpoplpush list1 list2
"hello3"
127.0.0.1:6379> lrange list1 0 -1
1) "hello1"
2) "hello2"
127.0.0.1:6379> lrange list2 0 -1
1) "hello3"
127.0.0.1:6379> 

####################################################################################
# lset  将列表中指定下标的值替换为另外一个值,进行更新,如果列表不存在 或者 列表没有此下标(即指定下标越界),则更新失败

127.0.0.1:6379> exists list
(integer) 0
127.0.0.1:6379> lset list 0 item
(error) ERR no such key
127.0.0.1:6379> lpush list value1
(integer) 1
127.0.0.1:6379> lrange list 0 0
1) "value1"
127.0.0.1:6379> lset list 0 item
OK
127.0.0.1:6379> lrange list 0 0
1) "item"
127.0.0.1:6379> lset list 1 item1
(error) ERR index out of range
127.0.0.1:6379> 

####################################################################################
# linsert  将某个具体的value插入到列表中某个元素的前面或者后面

127.0.0.1:6379> lrange list 0 -1
1) "hello1"
2) "hello2"
3) "hello3"
127.0.0.1:6379> linsert list before hello2 hello12
(integer) 4
127.0.0.1:6379> lrange list 0 -1
1) "hello1"
2) "hello12"
3) "hello2"
4) "hello3"
127.0.0.1:6379> linsert list after hello2 hello23
(integer) 5
127.0.0.1:6379> lrange list 0 -1
1) "hello1"
2) "hello12"
3) "hello2"
4) "hello23"
5) "hello3"
127.0.0.1:6379> 

小结

  • redis 中的 list 实际上是一个链表,before after left right 都可以插入值
  • 如果key不存在,则创建新的链表
  • 如果key存在,则新增内容
  • 如果移除了所有的值,则链表成了空链表,也代表不存在
  • 在两遍插入或者改动时,效率最高! 改动中间的元素,相对来说效率会低一点~

通过list可以实现消息排队!消息队列! lpush rpop 左进右出

通过list可以实现栈! lpush lpop 左进左出

3.Set(集合)

set的值不能重复

# 所有的set *** 作命令 基本上都是以s开头  set是无序的

####################################################################################
# 添加元素				  sadd 
# 查看集合元素			 smembers
# 判断集合中是否存在当前元素  sismember  1说明存在,0说明不存在

127.0.0.1:6379> keys *
(empty list or set)
127.0.0.1:6379> sadd set hello
(integer) 1
127.0.0.1:6379> sadd set world
(integer) 1
127.0.0.1:6379> smembers set
1) "hello"
2) "world"
127.0.0.1:6379> sismember set hello
(integer) 1
127.0.0.1:6379> sismember set bingbing
(integer) 0
127.0.0.1:6379> 

####################################################################################
# 获取集合中元素的个数  scard

127.0.0.1:6379> smembers set
1) "hello"
2) "world"
127.0.0.1:6379> scard set
(integer) 2
127.0.0.1:6379> 

####################################################################################
# 移除集合中某个元素  srem

127.0.0.1:6379> smembers set
1) "hello"
2) "world"
127.0.0.1:6379> srem set hello
(integer) 1
127.0.0.1:6379> smembers set
1) "world"
127.0.0.1:6379> 

####################################################################################
# 随机获取集合中某个元素 srandmember 

127.0.0.1:6379> smembers set
1) "hello3"
2) "hello4"
3) "hello2"
4) "hello1"
5) "hello5"
127.0.0.1:6379> srandmember set
"hello2"
127.0.0.1:6379> srandmember set
"hello2"
127.0.0.1:6379> srandmember set
"hello3"
127.0.0.1:6379> srandmember set
"hello4"
127.0.0.1:6379> srandmember set
"hello3"
127.0.0.1:6379> srandmember set
"hello5"
127.0.0.1:6379> srandmember set
"hello4"
127.0.0.1:6379> srandmember set
"hello4"
127.0.0.1:6379> srandmember set
"hello1"
127.0.0.1:6379> srandmember set
"hello5"
127.0.0.1:6379> 

# 可以指定随机抽取的元素个数

127.0.0.1:6379> srandmember set 2
1) "hello2"
2) "hello1"
127.0.0.1:6379> srandmember set 2
1) "hello2"
2) "hello4"
127.0.0.1:6379> srandmember set 2
1) "hello5"
2) "hello4"
127.0.0.1:6379> srandmember set 2
1) "hello3"
2) "hello4"
127.0.0.1:6379> 

####################################################################################
# 随机删除集合中某个元素 spop

127.0.0.1:6379> spop set
"hello1"
127.0.0.1:6379> spop set
"hello3"
127.0.0.1:6379> spop set
"hello2"
127.0.0.1:6379> smembers set
1) "hello5"
2) "hello4"
127.0.0.1:6379> 

####################################################################################
# 将集合中某个指定元素移动到另一个集合中 smove

127.0.0.1:6379> smembers set
1) "hello5"
2) "hello4"
127.0.0.1:6379> sadd set2 world1
(integer) 1
127.0.0.1:6379> sadd set2 world2
(integer) 1
127.0.0.1:6379> smove set set2 hello4
(integer) 1
127.0.0.1:6379> smembers set
1) "hello5"
127.0.0.1:6379> smembers set2
1) "world2"
2) "hello4"
3) "world1"
127.0.0.1:6379> 

####################################################################################
# 关于集合
#     - 差集  sdiff set1 set2  返回set1与set2的差集 即set1中与set2不重合的元素  
#     - 交集	sinter set1 set2 返回set1和set2的交集 						应用场景:共同好友、共同爱好、推荐好友
#     - 并集  sunion set1 set2 返回set1和set2的并集

127.0.0.1:6379> flushdb
OK
127.0.0.1:6379> sadd set1 aa
(integer) 1
127.0.0.1:6379> sadd set1 bb
(integer) 1
127.0.0.1:6379> sadd set1 cc
(integer) 1
127.0.0.1:6379> sadd set2 cc
(integer) 1
127.0.0.1:6379> sadd set2 dd
(integer) 1
127.0.0.1:6379> sadd set2 ee
(integer) 1
127.0.0.1:6379> sdiff set1 set2
1) "aa"
2) "bb"
127.0.0.1:6379> sinter set1 set2
1) "cc"
127.0.0.1:6379> sunion set1 set2
1) "cc"
2) "ee"
3) "aa"
4) "bb"
5) "dd"
127.0.0.1:6379> 
4.Hash(哈希)

这个可以理解为 key-map ,而map本身是一个 key-value形式。

所有的hash *** 作均以 h 开头

####################################################################################
# 存值 hset | hmset
# 取值 hget | hmget

127.0.0.1:6379> flushdb
OK
127.0.0.1:6379> hset hash name bingbing
(integer) 1
127.0.0.1:6379> hget hash name
"bingbing"
127.0.0.1:6379> 

127.0.0.1:6379> hmset hash name1 bingbing name2 fangfang
OK
127.0.0.1:6379> hmget hash name1 name2
1) "bingbing"
2) "fangfang"
127.0.0.1:6379> 

####################################################################################
# 显示指定key中的所有key-value键值对

127.0.0.1:6379> hgetall hash
1) "name"
2) "bingbing"
3) "name1"
4) "bingbing"
5) "name2"
6) "fangfang"
127.0.0.1:6379> 

####################################################################################
# 删除指定key中某个属性key

127.0.0.1:6379> hgetall hash
1) "name"
2) "bingbing"
3) "name1"
4) "bing"
5) "name2"
6) "fang"
127.0.0.1:6379> hdel hash name
(integer) 1
127.0.0.1:6379> hgetall hash
1) "name1"
2) "bing"
3) "name2"
4) "fang"
127.0.0.1:6379> 

####################################################################################
# 返回当前key中有多少组K-V键值对

127.0.0.1:6379> hgetall hash
1) "name1"
2) "bingbing"
3) "name2"
4) "fangfang"
127.0.0.1:6379> hlen hash
(integer) 2
127.0.0.1:6379> 

####################################################################################
# 只获得所有的key    hkeys
# 只获得所有的value  hvals  

127.0.0.1:6379> hkeys hash
1) "name1"
2) "name2"
127.0.0.1:6379> hvals hash
1) "bingbing"
2) "fangfang"
127.0.0.1:6379> 

####################################################################################
# 指定增量 incr 
# 判断是否存在 ,存在则 *** 作失败返回0,不存在则将新值添加到map中, *** 作成功返回1

127.0.0.1:6379> flushdb
OK
127.0.0.1:6379> hset hash age1 5
(integer) 1
127.0.0.1:6379> hincrby  hash age1 5
(integer) 10
127.0.0.1:6379> hsetnx hash age2 20
(integer) 1
127.0.0.1:6379> hsetnx hash age3 20
(integer) 1
127.0.0.1:6379> hsetnx hash age2 200
(integer) 0
127.0.0.1:6379> hgetall hash
1) "age1"
2) "10"
3) "age2"
4) "20"
5) "age3"
6) "20"
127.0.0.1:6379> 


哈希更适合于对象的存储,String更适合字符串存储

5.Zset(有序集合)

在set的基础上,增加了一个值,如 set k1 v1 -> zset k1 score1 v1

####################################################################################
# 添加元素 zadd
# 查看元素 zrange

127.0.0.1:6379> flushdb
OK
127.0.0.1:6379> zadd zset 1 one            # 添加一个值
(integer) 1
127.0.0.1:6379> zadd zset 2 two 3 three	   # 添加多个值
(integer) 2
127.0.0.1:6379> zrange zset 0 -1
1) "one"
2) "two"
3) "three"
127.0.0.1:6379> 

####################################################################################
# 实现排序

127.0.0.1:6379> zadd salary 2500 xiaohong 5000 zhangsan 500 lisi
(integer) 3
127.0.0.1:6379> zrangebyscore salary -inf +inf   # 显示全部用户,根据score值 递增排序
1) "lisi"
2) "xiaohong"
3) "zhangsan"
127.0.0.1:6379> zrangebyscore salary -inf +inf withscores   # 显示全部用户,根据score值 递增排序,同时显示score值
1) "lisi"
2) "500"
3) "xiaohong"
4) "2500"
5) "zhangsan"
6) "5000"
127.0.0.1:6379> zrevrangebyscore salary +inf -inf withscores  # 显示全部用户,根据score值 递减排序,同时显示score值
1) "zhangsan"
2) "5000"
3) "xiaohong"
4) "2500"
5) "lisi"
6) "500"

####################################################################################
# 删除元素

127.0.0.1:6379> zrange salary 0 -1  # 这里可以注意到当输出全部元素时,默认是根据score升序输出的
1) "lisi"
2) "xiaohong"
3) "zhangsan"
127.0.0.1:6379> zrem salary lisi
(integer) 1
127.0.0.1:6379> zrange salary 0 -1
1) "xiaohong"
2) "zhangsan"
127.0.0.1:6379> 

####################################################################################
# 获取集合大小
127.0.0.1:6379> zcard salary
(integer) 2
127.0.0.1:6379> 

####################################################################################
# 获取集合中指定score区间的元素个数

127.0.0.1:6379> zadd animals 1 tiger 2 monkey 3 donkey 4 mouse 5 cat 6 dog
(integer) 6
127.0.0.1:6379> zrange animals 0 -1 withscores
 1) "tiger"
 2) "1"
 3) "monkey"
 4) "2"
 5) "donkey"
 6) "3"
 7) "mouse"
 8) "4"
 9) "cat"
10) "5"
11) "dog"
12) "6"
127.0.0.1:6379> zcount animals 2 5
(integer) 4
127.0.0.1:6379> 

应用场景:

  • 存储班级成绩表(使用唯一id与score绑定)
  • 对工资表进行排序(使用唯一id与score绑定)
  • 消息权重
    • 普通消息
    • 重要消息
  • 排行榜(取TOP N)
2.三大特殊数据类型 1.geospatial(地理位置)

定位 附近的人 打车距离 …

Redis的Geo支持这些功能的实现

可以推算地理位置的信息,两地之间的距离,方圆几公里内的人…

# 只有6个命令
####################################################################################
# geoadd 添加地理位置
# 规则:两极无法直接添加,我们一般会下载城市数据,直接通过java程序一次性导入!
#      有效的经度从-180° 到 180°
#      有效的纬度从-85.05112878° 到 85.05112878°
# geoadd key 值(经度 纬度 名称) 
127.0.0.1:6379> geoadd china:city 120.2155 30.2530 hangzhou
(integer) 1
127.0.0.1:6379> geoadd china:city 116.3329 39.8926 beijing
(integer) 1
127.0.0.1:6379> geoadd china:city 106.5404 29.4026 chongqing
(integer) 1
127.0.0.1:6379> geoadd china:city 121.4894 31.4052 shanghai
(integer) 1
127.0.0.1:6379> geoadd china:city 113.8803 22.5532 shenzhen
(integer) 1
127.0.0.1:6379> geoadd china:city 108.9342 34.2305 xian 121.5255 38.9522 dalian
(integer) 2
127.0.0.1:6379> 

####################################################################################
# geopos 获取地理位置具体的经度和纬度 (获得当前定位,得到一个坐标值)
127.0.0.1:6379> geopos china:city beijing
1) 1) "116.33290082216262817"
   2) "39.89260124060681534"
127.0.0.1:6379> geopos china:city hangzhou
1) 1) "120.21550029516220093"
   2) "30.25300090777527373"
127.0.0.1:6379> 

####################################################################################
# geodist 获取两个位置之间的距离,可以指定单位
#         - m 表示单位为米     
#         - km 表示单位为千米 
#         - mi 表示单位为英里
#         - ft 表示单位为英尺 
127.0.0.1:6379> geodist china:city beijing shanghai km  # 查看上海到北京的直线距离
"1052.2966"
127.0.0.1:6379> geodist china:city beijing chongqing km # 查看重庆到北京的直线距离
"1469.0513"
127.0.0.1:6379> 

####################################################################################
# georadius 以给定位置的经度纬度为中心,找出给定半径内的所有位置
127.0.0.1:6379> georadius china:city 110 30 1000 km
1) "chongqing"
2) "xian"
3) "shenzhen"
4) "hangzhou"
127.0.0.1:6379> georadius china:city 110 30 500 km
1) "chongqing"
2) "xian"
127.0.0.1:6379> georadius china:city 110 30 1000 km withcoord withdist count 2 # count限制查询出的个数
1) 1) "chongqing"
   2) "340.7691"                  # 到中心位置的直线距离     withdist
   3) 1) "106.54040247201919556"  # 显示出经纬度           withcoord
      2) "29.40259942409589655"
2) 1) "xian"
   2) "481.1256"
   3) 1) "108.93419891595840454"
      2) "34.23050055933691027"
127.0.0.1:6379> 

####################################################################################
# georadiusbymember 以给定位置为中心,找出给定半径内的所有位置(包括自己)
127.0.0.1:6379> georadiusbymember china:city shanghai 400 km
1) "hangzhou"
2) "shanghai"
127.0.0.1:6379> 

####################################################################################
# geohash 返回指定一个或多个位置的 Geohash表示
# 本命令将返回11个字符的Geohash字符串(将二维的经纬度转换为一维的字符串,如果两个字符串越接近,则距离越接近)
127.0.0.1:6379> geohash china:city beijing shanghai
1) "wx4dznpp1p0"
2) "wtw6st1us10"
127.0.0.1:6379> 

GEO 底层的实现原理其实就是Zset,因此我们可以直接使用Zset命令来 *** 作GEO

127.0.0.1:6379> zrange china:city 0 -1
1) "chongqing"
2) "xian"
3) "shenzhen"
4) "hangzhou"
5) "shanghai"
6) "beijing"
7) "dalian"
127.0.0.1:6379> zrem china:city beijing
(integer) 1
127.0.0.1:6379> zrange china:city 0 -1
1) "chongqing"
2) "xian"
3) "shenzhen"
4) "hangzhou"
5) "shanghai"
6) "dalian"
127.0.0.1:6379> 

2.hyperloglog
  • 什么是基数?

    A{1,3,5,7,8, 9,7}

    基数就是指集合中重复的元素个数,比如这个例子中,基数为5。基数可以接受误差!

    Redis Hyperloglog 是进行基数统计的算法,提供了一种不精确的去重计数方案,错误率 0.81%。

  • 优点:在输入元素的数量或者体积非常大时,计算计数所需的空间总是固定的,并且是很小的。在redis里面,每个Hyperloglog键只需要花费12kb内存,就可以计算接近2 ^ 64 个不同元素的基数。

  • 实际案例

    网页的UV(一个人可以访问一个网站多次,但是依然算作一个人)

    传统的方式,使用set保存用户的id,然后就可以统计set中的元素数量作为标准判断!

    这个方式如果保存大量的用户id,就会比较麻烦!然而我们的目的是为了计数,而不是真正保存用户id

  • 如果允许容错,那么推荐使用Hyperloglog

  • 如果不允许容错,那么使用set或者自己的数据类型

####################################################################################
# 添加元素          			pfadd
# 统计元素基数       			   pfcount
# 合并两个键建立新键			     pfmerge
127.0.0.1:6379> flushdb
OK
127.0.0.1:6379> pfadd key1 a b c d e f g h i j 
(integer) 1
127.0.0.1:6379> pfadd key2 f g h i j k l m n o 
(integer) 1
127.0.0.1:6379> pfcount key1
(integer) 10
127.0.0.1:6379> pfcount key2
(integer) 10
127.0.0.1:6379> pfmerge key key1 key2
OK
127.0.0.1:6379> pfcount key
(integer) 15
127.0.0.1:6379> 

3.bitmaps

使用位存储 000000111111 只要是只有两个状态的,都可以使用bitmaps

可以用来:

  • 统计用户 活跃 不活跃

  • 登录 未登录

  • 打卡 未打卡

bitmaps是一种数据结构, *** 作二进制位进行记录,只有 0 和 1 两个状态!

####################################################################################
# 记录周一到周日是否打卡 

127.0.0.1:6379> setbit sign 0 1
(integer) 0
127.0.0.1:6379> setbit sign 1 0
(integer) 0
127.0.0.1:6379> setbit sign 2 1
(integer) 0
127.0.0.1:6379> setbit sign 3 1
(integer) 0
127.0.0.1:6379> setbit sign 4 0
(integer) 0
127.0.0.1:6379> setbit sign 5 1
(integer) 0
127.0.0.1:6379> setbit sign 6 1
(integer) 0
127.0.0.1:6379> 

####################################################################################
# 查看某一天是否打卡

127.0.0.1:6379> getbit sign 3
(integer) 1
127.0.0.1:6379> getbit sign 4
(integer) 0
127.0.0.1:6379> 

####################################################################################
# 查看总有有多少天打卡 可自定义区间,如果不写就是默认全部区间
# 需要注意的是这里的自定义区间是以字节(byte)为单位的,而我们bitmap中存储的数据是以位(bit)为单位的,1 byte = 8 bit

127.0.0.1:6379> bitcount sign
(integer) 5

3.redis的事务

原子性:要么同时成功,要么同时失败

redis中单条命令保证原子性,但是redis的事务是不保证原子性的。

redis事务的本质是一组命令的集合。一个事务中的所有命令都会被序列化,在事务执行过程中,会按照顺序执行。

特性:

  • 一次性
  • 顺序性
  • 排他性

redis事务没有隔离级别的概念:所有的命令在事务中并没有直接被执行,只有在发起执行命令的时候才会执行。

redis事务的执行流程

  • 开启事务 multi
  • 命令入队 …
  • 执行事务 exec
# 正常执行事务
####################################################################################
# 开启事务  multi
# 命令入队  ......
# 执行事务  exec
127.0.0.1:6379> flushdb
OK
127.0.0.1:6379> multi
OK
127.0.0.1:6379> set k1 v1
QUEUED
127.0.0.1:6379> set k2 v2
QUEUED
127.0.0.1:6379> get k2
QUEUED
127.0.0.1:6379> set k3 v3
QUEUED
127.0.0.1:6379> exec
1) OK
2) OK
3) "v2"
4) OK

# 中途放弃事务
####################################################################################
# 放弃事务 discard  一旦放弃事务,当前事务队列中的所有命令均不会被执行
127.0.0.1:6379> keys *
1) "k3"
2) "k1"
3) "k2"
127.0.0.1:6379> multi
OK
127.0.0.1:6379> set k4 v4
QUEUED
127.0.0.1:6379> set k5 v5
QUEUED
127.0.0.1:6379> discard
OK
127.0.0.1:6379> keys *
1) "k3"
2) "k1"
3) "k2"
127.0.0.1:6379> 

遇到异常

  • 编译型异常(代码有问题,命令有错) – 事务中所有的命令都不会被执行
  • 运行时异常(比如说遇到 1 / 0 ) – 如果事务队列中存在语法异常,那么执行命令的时候,错误命令抛出异常 其他命令正常执行
4.redis的监控 1.悲观锁
  • 很悲观,认为什么时候都会出问题,无论做什么都会加锁
2.乐观锁
  • 很乐观,认为什么时候都不会出问题,所以不会加锁!更新数据的时候会判断一下version,在此期间是否有人修改过这个数据。

  • 获取version

  • 更新的时候会比较version

上述是乐观锁和悲观锁的原理简介,下面测试redis如何实现监控

使用watch实现redis的乐观锁 *** 作

正常执行成功

127.0.0.1:6379> flushdb
OK
127.0.0.1:6379> set money 100
OK
127.0.0.1:6379> set out 0
OK
127.0.0.1:6379> watch money  # 监视money对象
OK
127.0.0.1:6379> multi        # 事务正常结束,期间数据没有发生变动,这个时候就正常执行成功
OK
127.0.0.1:6379> decrby money 20
QUEUED
127.0.0.1:6379> incrby out 20
QUEUED
127.0.0.1:6379> exec
1) (integer) 80
2) (integer) 20
127.0.0.1:6379> 

多线程下其他线程修改了队列中的值,则事务执行失败。

# 线程1 
127.0.0.1:6379> watch money
OK
127.0.0.1:6379> multi
OK
127.0.0.1:6379> decrby money 20
QUEUED
127.0.0.1:6379> incrby out 20
QUEUED
127.0.0.1:6379> exec   # 执行之前另外一个线程修改了事务队列中的值,这个时候事务执行失败
(nil)
127.0.0.1:6379> 

# 线程2
127.0.0.1:6379> set money 1000
OK
127.0.0.1:6379> 

5.Jedis

使用java来 *** 作redis

Jedis 是 Redis官方推荐的java连接开发工具 使用java *** 作Redis中间件

如果使用java *** 作redis,那么一定要对jedis十分熟悉。

1.在IDEA中连接阿里云服务器端redis

新建maven项目,在pom中导入

		
        
        <dependency>
            <groupId>redis.clientsgroupId>
            <artifactId>jedisartifactId>
            <version>4.2.2version>
        dependency>

		
        <dependency>
            <groupId>com.alibabagroupId>
            <artifactId>fastjsonartifactId>
            <version>1.2.76version>
        dependency>

新建java文件,测试是否redis是否能ping成功

public class TestPing {
    public static void main(String[] args) {
        //1. new Jedis对象
        Jedis jedis = new Jedis("自己服务器公网ip",7521); //这里7521是自己redis.conf配置文件中的自定义redis端口
        jedis.auth("自己服务器端redis的密码");

        //2.测试
        System.out.println(jedis.ping());
    }
}

连接成功

解决slf4j错误提示,在pom中导入如下日志依赖即可解决。


        
        <dependency>
            <groupId>log4jgroupId>
            <artifactId>log4jartifactId>
            <version>1.2.17version>
        dependency>

        
        <dependency>
            <groupId>org.slf4jgroupId>
            <artifactId>slf4j-apiartifactId>
            <version>2.0.0-alpha7version>
        dependency>

        
        <dependency>
            <groupId>org.slf4jgroupId>
            <artifactId>slf4j-log4j12artifactId>
            <version>2.0.0-alpha7version>
        dependency>

2.常用API 1.key
System.out.println("清空数据:"+jedis.flushDB());
System.out.println("判断某个键是否存在:"+jedis.exists("username"));
System.out.println("新增<'username','kuangshen'>的键值对:"+jedis.set("username", "kuangshen"));
System.out.println("新增<'password','password'>的键值对:"+jedis.set("password", "password"));
System.out.print("系统中所有的键如下:");
Set<String> keys = jedis.keys("*");
System.out.println(keys);
System.out.println("删除键password:"+jedis.del("password"));
System.out.println("判断键password是否存在:"+jedis.exists("password"));
System.out.println("查看键username所存储的值的类型:"+jedis.type("username"));
System.out.println("随机返回key空间的一个:"+jedis.randomKey());
System.out.println("重命名key:"+jedis.rename("username","name"));
System.out.println("取出改后的name:"+jedis.get("name"));
System.out.println("按索引查询:"+jedis.select(0));
System.out.println("删除当前选择数据库中的所有key:"+jedis.flushDB());
System.out.println("返回当前数据库中key的数目:"+jedis.dbSize());
System.out.println("删除所有数据库中的所有key:"+jedis.flushAll());
2.String
jedis.flushDB();
System.out.println("===========增加数据===========");
System.out.println(jedis.set("key1","value1"));
System.out.println(jedis.set("key2","value2"));
System.out.println(jedis.set("key3", "value3"));
System.out.println("删除键key2:"+jedis.del("key2"));
System.out.println("获取键key2:"+jedis.get("key2"));
System.out.println("修改key1:"+jedis.set("key1", "value1Changed"));
System.out.println("获取key1的值:"+jedis.get("key1"));
System.out.println("在key3后面加入值:"+jedis.append("key3", "End"));
System.out.println("key3的值:"+jedis.get("key3"));
System.out.println("增加多个键值对:"+jedis.mset("key01","value01","key02","value02","key03","value03"));
System.out.println("获取多个键值对:"+jedis.mget("key01","key02","key03"));
System.out.println("获取多个键值对:"+jedis.mget("key01","key02","key03","key04"));
System.out.println("删除多个键值对:"+jedis.del("key01","key02"));
System.out.println("获取多个键值对:"+jedis.mget("key01","key02","key03"));
jedis.flushDB();
System.out.println("===========新增键值对防止覆盖原先值==============");
System.out.println(jedis.setnx("key1", "value1"));
System.out.println(jedis.setnx("key2", "value2"));
System.out.println(jedis.setnx("key2", "value2-new"));
System.out.println(jedis.get("key1"));
System.out.println(jedis.get("key2"));
System.out.println("===========新增键值对并设置有效时间=============");
System.out.println(jedis.setex("key3", 2, "value3"));
System.out.println(jedis.get("key3"));
try {
    TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
    e.printStackTrace();
}
System.out.println(jedis.get("key3"));
System.out.println("===========获取原值,更新为新值==========");
System.out.println(jedis.getSet("key2", "key2GetSet"));
System.out.println(jedis.get("key2"));
System.out.println("获得key2的值的字串:"+jedis.getrange("key2", 2, 4));
3.List
jedis.flushDB();
System.out.println("===========添加一个list===========");
jedis.lpush("collections", "ArrayList", "Vector", "Stack","HashMap", "WeakHashMap", "LinkedHashMap");
jedis.lpush("collections", "HashSet");
jedis.lpush("collections", "TreeSet");
jedis.lpush("collections", "TreeMap");
jedis.lpush("collections", "HashMap");
//-1代表倒数第一个元素,-2代表倒数第二个元素,end为-1表示查询全部
System.out.println("collections的内容:"+jedis.lrange("collections", 0, -1));
System.out.println("collections区间0-3的元素:"+jedis.lrange("collections",0,3));
System.out.println("===============================");
// 删除列表指定的值 ,第二个参数为删除的个数(有重复时),后add进去的值先被删,类似于出栈
System.out.println("删除指定元素个数:"+jedis.lrem("collections", 2, "HashMap"));
System.out.println("collections的内容:"+jedis.lrange("collections", 0, -1));
System.out.println("删除下表0-3区间之外的元素:"+jedis.ltrim("collections", 0, 3));
System.out.println("collections的内容:"+jedis.lrange("collections", 0, -1));
System.out.println("collections列表出栈(左端):"+jedis.lpop("collections"));
System.out.println("collections的内容:"+jedis.lrange("collections", 0, -1));
System.out.println("collections添加元素,从列表右端,与lpush相对应:"+jedis.rpush("collections", "EnumMap"));
System.out.println("collections的内容:"+jedis.lrange("collections", 0, -1));
System.out.println("collections列表出栈(右端):"+jedis.rpop("collections"));
System.out.println("collections的内容:"+jedis.lrange("collections", 0, -1));
System.out.println("修改collections指定下标1的内容:"+jedis.lset("collections", 1, "LinkedArrayList"));
System.out.println("collections的内容:"+jedis.lrange("collections", 0, -1));
System.out.println("===============================");
System.out.println("collections的长度:"+jedis.llen("collections"));
System.out.println("获取collections下标为2的元素:"+jedis.lindex("collections", 2));
System.out.println("===============================");
jedis.lpush("sortedList", "3","6","2","0","7","4");
System.out.println("sortedList排序前:"+jedis.lrange("sortedList", 0, -1));
System.out.println(jedis.sort("sortedList"));
System.out.println("sortedList排序后:"+jedis.lrange("sortedList", 0, -1));
4.Set
System.out.println("============向集合中添加元素(不重复)============");
System.out.println(jedis.sadd("eleSet", "e1","e2","e4","e3","e0","e8","e7","e5"));
System.out.println(jedis.sadd("eleSet", "e6"));
System.out.println(jedis.sadd("eleSet", "e6"));
System.out.println("eleSet的所有元素为:"+jedis.smembers("eleSet"));
System.out.println("删除一个元素e0:"+jedis.srem("eleSet", "e0"));
System.out.println("eleSet的所有元素为:"+jedis.smembers("eleSet"));
System.out.println("删除两个元素e7和e6:"+jedis.srem("eleSet", "e7","e6"));
System.out.println("eleSet的所有元素为:"+jedis.smembers("eleSet"));
System.out.println("随机的移除集合中的一个元素:"+jedis.spop("eleSet"));
System.out.println("随机的移除集合中的一个元素:"+jedis.spop("eleSet"));
System.out.println("eleSet的所有元素为:"+jedis.smembers("eleSet"));
System.out.println("eleSet中包含元素的个数:"+jedis.scard("eleSet"));
System.out.println("e3是否在eleSet中:"+jedis.sismember("eleSet", "e3"));
System.out.println("e1是否在eleSet中:"+jedis.sismember("eleSet", "e1"));
System.out.println("e1是否在eleSet中:"+jedis.sismember("eleSet", "e5"));
System.out.println("=================================");
System.out.println(jedis.sadd("eleSet1", "e1","e2","e4","e3","e0","e8","e7","e5"));
System.out.println(jedis.sadd("eleSet2", "e1","e2","e4","e3","e0","e8"));
System.out.println("将eleSet1中删除e1并存入eleSet3中:"+jedis.smove("eleSet1", "eleSet3", "e1"));//移到集合元素
System.out.println("将eleSet1中删除e2并存入eleSet3中:"+jedis.smove("eleSet1", "eleSet3", "e2"));
System.out.println("eleSet1中的元素:"+jedis.smembers("eleSet1"));
System.out.println("eleSet3中的元素:"+jedis.smembers("eleSet3"));
System.out.println("============集合运算=================");
System.out.println("eleSet1中的元素:"+jedis.smembers("eleSet1"));
System.out.println("eleSet2中的元素:"+jedis.smembers("eleSet2"));
System.out.println("eleSet1和eleSet2的交集:"+jedis.sinter("eleSet1","eleSet2"));
System.out.println("eleSet1和eleSet2的并集:"+jedis.sunion("eleSet1","eleSet2"));
System.out.println("eleSet1和eleSet2的差集:"+jedis.sdiff("eleSet1","eleSet2"));//eleSet1中有,eleSet2中没有
jedis.sinterstore("eleSet4","eleSet1","eleSet2");//求交集并将交集保存到dstkey的集合
System.out.println("eleSet4中的元素:"+jedis.smembers("eleSet4"));
5.Hash
jedis.flushDB();
Map<String,String> map = new HashMap<String, String>();
map.put("key1","value1");
map.put("key2","value2");
map.put("key3","value3");
map.put("key4","value4");
//添加名称为hash(key)的hash元素
jedis.hmset("hash",map);
//向名称为hash的hash中添加key为key5,value为value5元素
jedis.hset("hash", "key5", "value5");
System.out.println("散列hash的所有键值对为:"+jedis.hgetAll("hash"));//return Map
System.out.println("散列hash的所有键为:"+jedis.hkeys("hash"));//return Set
System.out.println("散列hash的所有值为:"+jedis.hvals("hash"));//return List
System.out.println("将key6保存的值加上一个整数,如果key6不存在则添加key6:"+jedis.hincrBy("hash", "key6", 6));
System.out.println("散列hash的所有键值对为:"+jedis.hgetAll("hash"));
System.out.println("将key6保存的值加上一个整数,如果key6不存在则添加key6:"+jedis.hincrBy("hash", "key6", 3));
System.out.println("散列hash的所有键值对为:"+jedis.hgetAll("hash"));
System.out.println("删除一个或者多个键值对:"+jedis.hdel("hash", "key2"));
System.out.println("散列hash的所有键值对为:"+jedis.hgetAll("hash"));
System.out.println("散列hash中键值对的个数:"+jedis.hlen("hash"));
System.out.println("判断hash中是否存在key2:"+jedis.hexists("hash","key2"));
System.out.println("判断hash中是否存在key3:"+jedis.hexists("hash","key3"));
System.out.println("获取hash中的值:"+jedis.hmget("hash","key3"));
System.out.println("获取hash中的值:"+jedis.hmget("hash","key3","key4"));
3.事务
		jedis.flushDB();
        JSONObject jsonObject = new JSONObject();
        jsonObject.put("hello","world");
        jsonObject.put("name","bingbing");
        String result = jsonObject.toJSONString();

        //开启事务
        Transaction multi = jedis.multi();
        //乐观锁
//        multi.watch(result);

        try {
            multi.set("user1",result);
            multi.set("user2",result);
            //成功则执行事务
            multi.exec();
        }catch (Exception e){
            //失败则放弃事务
            multi.discard();
            e.printStackTrace();
        }finally {
            System.out.println(jedis.get("user1"));
            System.out.println(jedis.get("user2"));
            jedis.close();//关闭连接
        }
6.springboot整合redis

springboot通过springdata *** 作数据 如 jpa jdbc mongodb redis

springdata也是和springboot齐名的项目!

在springboot 2.x版本之后,原来使用的 jedis 被替换为了 lettuce

  • jedis 采用的直连,多个线程 *** 作的话,是不安全的,如果想要避免不安全,需要使用jedis pool连接池 --BIO模式
  • lettuce 采用的netty。实例可以在多个线程中进行共享,不存在线程不安全的情况!可以减少线程数量 --NIO模式
源码分析
@Bean
@ConditionalOnMissingBean( // 这里表明我们可以自己定义一个 redisTemplate来替换这个默认的
    name = {"redisTemplate"}
)
@ConditionalOnSingleCandidate(RedisConnectionFactory.class)
public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
    // 默认的 RedisTemplate 没有过多的设置,redis对象都是需要序列化的!
    // 两个泛型都是 Object, Object , 我们后续使用需要进行强制转换 String,Object
    RedisTemplate<Object, Object> template = new RedisTemplate();
    template.setConnectionFactory(redisConnectionFactory);
    return template;
}

@Bean
@ConditionalOnMissingBean  // 由于String是redis中最常使用的类型,所以单独提出来了一个bean
@ConditionalOnSingleCandidate(RedisConnectionFactory.class)
public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory redisConnectionFactory) {
    return new StringRedisTemplate(redisConnectionFactory);
}
整合测试

配置连接

# springboot所有的配置类 都有一个自动配置类  RedisAutoConfiguration
# 自动配置类都会绑定一个properties 配置文件  RedisProperties
spring.redis.host=阿里云公网ip
spring.redis.port=自定义端口
spring.redis.password=redis密码

测试

 	// 测试类
	@Autowired
    private RedisTemplate redisTemplate;
    @Test
    void contextLoads() {
        // 在企业开发中,80%的情况下,都不会使用原生的方式去编写代码,因此需要自己编写一个`RedisUtil`工具类
        
        // redisTemplate   *** 作不同的数据类型,API和我们的指令是一样的
        // opsForValue  *** 作字符串  类似 String
        // opsForList   *** 作列表   类似 List
        // opsForSet    *** 作集合   类似 Set
        // opsForGeo    *** 作Geo   类似 Geo
        // opsForHash  opsForZSet  opsForHyperLogLog
//        redisTemplate.opsForValue();
//        redisTemplate.opsForList();
//        redisTemplate.opsForSet();
//        redisTemplate.opsForGeo();
//        redisTemplate.opsForHash();
//        redisTemplate.opsForZSet();
//        redisTemplate.opsForHyperLogLog();

        // 除了基本的 *** 作,我们常用的方法都可以直接通过redisTemplate *** 作,比如事务和基本的CRUD
//        RedisConnection connection = redisTemplate.getConnectionFactory().getConnection();
//        connection.flushDb();

        redisTemplate.opsForValue().set("mykey","bingbing");
        System.out.println(redisTemplate.opsForValue().get("mykey"));


    }

如果value中包含有中文,则在控制台输出可能会出现乱码

分析出现乱码的原因:

源码中

redisTemplate实现序列化配置,默认的序列化方式为jdk序列化,而我们可能会用到json来进行序列化,此时需要自己定义redis的配置类。

直接传入对象会报错,提示对象没有序列化

解决方案,将对象实现序列化接口

自定义redis配置类
@Configuration
public class RedisConfig {
    //编写我们自己的 redisTemplate,这是一个固定的模板
    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
        //为了开发方便,一般直接使用 
        RedisTemplate<String, Object> template = new RedisTemplate();
        template.setConnectionFactory(redisConnectionFactory);
        //Json序列化
        Jackson2JsonRedisSerializer<Object> objectJackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(Object.class);
        //序列化完之后需要进行转义使用
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        objectMapper.activateDefaultTyping(LaissezFaireSubTypeValidator.instance,ObjectMapper.DefaultTyping.NON_FINAL, JsonTypeInfo.As.PROPERTY);
        objectJackson2JsonRedisSerializer.setObjectMapper(objectMapper);

        //String的序列化
        StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();

        //key采用String的序列化方式
        template.setKeySerializer(stringRedisSerializer);
        //value采用json序列化方式
        template.setValueSerializer(objectJackson2JsonRedisSerializer);

        //hash的key采用String的序列化方式
        template.setHashKeySerializer(stringRedisSerializer);
        //hash的value采用json序列化方式
        template.setHashValueSerializer(objectJackson2JsonRedisSerializer);

        template.afterPropertiesSet();

        return template;
    }
}

同样是执行上图代码,使用原生jdk序列化和使用自定义redis配置类之后的序列化效果对比

RedisUtils工具类
package com.bq.utils;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;

/**
 * @Name: RedisUtil
 * @Author: 铁锅炖大鹅
 * @Time: 2022/5/7 21:06
 * @Version: 1.0
 */
@Component
public final class RedisUtil {
    @Autowired
    private RedisTemplate<String,Object> redisTemplate;

    // ==============================================common=====================================================
    /**
     * 指定缓存失效时间
     * @param key 键
     * @param time 时间
     * @return
     */
    public boolean expire(String key,long time){
        try{
            if (time > 0){
                redisTemplate.expire(key,time, TimeUnit.SECONDS);
            }
            return true;
        }catch (Exception e){
            e.printStackTrace();
            return false;
        }
    }

    /**
     *  根据 key 获取过期时间
     * @param key 键 不能为 null
     * @return 返回时间 (以 秒 为单位) 返回0代表永久有效
     */
    public long getExpire(String key){
        return redisTemplate.getExpire(key,TimeUnit.SECONDS);
    }

    /**
     *  判断 key 是否存在
     * @param key 键
     * @return 存在返回 true 不存在返回 false
     */
    public boolean hasKey(String key){
        try{
            return redisTemplate.hasKey(key);
        }catch (Exception e){
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 删除缓存
     * @param key 可以传 一个键 或 多个键
     */
    public void del(String... key){
        if (key != null && key.length > 0){
            if (key.length == 1){
                redisTemplate.delete(key[0]);
            }else {
                redisTemplate.delete((Collection<String>) CollectionUtils.arrayToList(key));
            }
        }
    }


    // ==============================================String=====================================================

    /**
     * 普通缓存获取
     * @param key 键
     * @return 值
     */
    public Object get(String key){
        return key == null ? null : redisTemplate.opsForValue().get(key);
    }

    /**
     * 普通缓存放入
     * @param key 键
     * @param value 值
     * @return 成功返回true 失败返回false
     */
    public boolean set(String key,Object value){
        try {
            redisTemplate.opsForValue().set(key,value);
            return true;
        }catch (Exception e){
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 普通缓存放入并设置时间
     * @param key 键
     * @param value 值
     * @param time 时间(以秒为单位) time > 0 当time <= 0时将设置无限期
     * @return 成功返回 true 失败返回 false
     */
    public boolean set(String key,Object value,long time){
        try {
            if (time > 0){
                redisTemplate.opsForValue().set(key,value,time,TimeUnit.SECONDS);
            }else {
                set(key,value);
            }
            return true;
        }catch (Exception e){
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 递增
     * @param key 键
     * @param delta 递增步长(大于0)
     * @return
     */
    public long incr(String key,long delta){
        if (delta < 0){
            throw new RuntimeException("递增因子必须大于0!");
        }
        return redisTemplate.opsForValue().increment(key,delta);
    }

    /**
     * 递减
     * @param key 键
     * @param delta 递减步长(大于0)
     * @return
     */
    public long decr(String key, long delta) {
        if (delta < 0) {
            throw new RuntimeException("递减因子必须大于0");
        }
        return redisTemplate.opsForValue().increment(key, -delta);
    }

    // ================================================Map===========================================
    /**
     * HashGet
     * @param key 键 不能为null
     * @param item 项 不能为null
     */
    public Object hget(String key, String item) {
        return redisTemplate.opsForHash().get(key, item);
    }

    /**
     * 获取hashKey对应的所有键值
     * @param key 键
     * @return 对应的多个键值
     */
    public Map<Object, Object> hmget(String key) {
        return redisTemplate.opsForHash().entries(key);
    }

    /**
     * HashSet
     * @param key 键
     * @param map 对应多个键值
     */
    public boolean hmset(String key, Map<String, Object> map) {
        try {
            redisTemplate.opsForHash().putAll(key, map);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }
    /**
    * HashSet 并设置时间
    * @param key 键
    * @param map 对应多个键值
    * @param time 时间(秒)
    * @return true成功 false失败
    */
    public boolean hmset(String key,Map<String,Object> map,long time){
        try {
            redisTemplate.opsForHash().putAll(key, map);
            if (time > 0) {
                expire(key, time);
            }
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 向一张hash表中放入数据,如果不存在将创建
     *
     * @param key 键
     * @param item 项
     * @param value 值
     * @return true 成功 false失败
     */
    public boolean hset(String key, String item, Object value) {
        try {
            redisTemplate.opsForHash().put(key, item, value);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }
    /**
     * 向一张hash表中放入数据,如果不存在将创建
     *
     * @param key 键
     * @param item 项
     * @param value 值
     * @param time 时间(秒) 注意:如果已存在的hash表有时间,这里将会替换原有的时间
     * @return true 成功 false失败
     */
    public boolean hset(String key, String item, Object value, long time) {
        try {
            redisTemplate.opsForHash().put(key, item, value);
            if (time > 0) {
                expire(key, time);
            }
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 删除hash表中的值
     * @param key 键 不能为 null
     * @param item 项 可以传入多个 不能为 null
     */
    public void hdel(String key, Object... item) {
        redisTemplate.opsForHash().delete(key, item);
    }

    /**
     * 判断hash表中是否有该项的值
     *
     * @param key 键 不能为null
     * @param item 项 不能为null
     * @return true 存在 false不存在
     */
    public boolean hHasKey(String key, String item) {
        return redisTemplate.opsForHash().hasKey(key, item);
    }
    /**
     * hash递增 如果不存在,就会创建一个 并把新增后的值返回
     *
     * @param key 键
     * @param item 项
     * @param by 要增加几(大于0)
     */
    public double hincr(String key, String item, double by) {
        return redisTemplate.opsForHash().increment(key, item, by);
    }
    /**
     * hash递减
     *
     * @param key 键
     * @param item 项
     * @param by 要减少记(小于0)
     */
    public double hdecr(String key, String item, double by) {
        return redisTemplate.opsForHash().increment(key, item, -by);
    }
// ======================================Set=======================================

    /**
     * 根据key获取Set中的所有值
     * @param key 键
     */
    public Set<Object> sGet(String key) {
        try {
            return redisTemplate.opsForSet().members(key);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }

    /**
     * 根据value从一个set中查询,是否存在
     *
     * @param key 键
     * @param value 值
     * @return true 存在 false不存在
     */
    public boolean sHasKey(String key, Object value) {
        try {
            return redisTemplate.opsForSet().isMember(key, value);
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }
    /**
     * 将数据放入set缓存
     *
     * @param key 键
     * @param values 值 可以是多个
     * @return 成功个数
     */
    public long sSet(String key, Object... values) {
        try {
            return redisTemplate.opsForSet().add(key, values);
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }
    }
    /**
     * 将set数据放入缓存
     *
     * @param key 键
     * @param time 时间(秒)
     * @param values 值 可以是多个
     * @return 成功个数
     */
    public long sSetAndTime(String key, long time, Object... values) {
        try {
            Long count = redisTemplate.opsForSet().add(key, values);
            if (time > 0)
                expire(key, time);
            return count;
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }
    }

    /**
     * 获取 set 缓存的长度
     * @param key 键
     * @return 长度
     */
    public long sGetSetSize(String key) {
        try {
            return redisTemplate.opsForSet().size(key);
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }
    }

    /**
     * 移除值 value
     *
     * @param key 键
     * @param values 值 可以是多个
     * @return 移除的个数
     */
    public long setRemove(String key, Object... values) {
        try {
            Long count = redisTemplate.opsForSet().remove(key, values);
            return count;
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }
    }
// ===============================list=================================
    /**
     * 获取list缓存的内容
     *
     * @param key 键
     * @param start 开始
     * @param end 结束 0 到 -1代表所有值
     */
    public List<Object> lGet(String key, long start, long end) {
        try {
            return redisTemplate.opsForList().range(key, start, end);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }

    /**
     * 获取list缓存的长度
     * @param key 键
     * @return 长度
     */
    public long lGetListSize(String key){
        try {
            return redisTemplate.opsForList().size(key);
        }catch (Exception e){
            e.printStackTrace();
            return 0;
        }
    }

    /**
     * 通过索引 获取list中的值
     * @param key 键
     * @param index 索引 index >= 0 时, 0 表头,1 第二个元素,依次类推;index < 0 时,-1,表尾,-2倒数第二个元素,依次类推
     */
    public Object lGetIndex(String key, long index) {
        try {
            return redisTemplate.opsForList().index(key, index);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }
    /**
     * 将list放入缓存
     *
     * @param key 键
     * @param value 值
     */
    public boolean lSet(String key, Object value) {
        try {
            redisTemplate.opsForList().rightPush(key, value);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 将list放入缓存
     * @param key 键
     * @param value 值
     * @param time 生效时间
     * @return
     */
    public boolean lSet(String key,Object value,long time){
        try {
            redisTemplate.opsForList().rightPush(key,value);
            if (time > 0){
                expire(key,time);
            }
            return true;
        }catch (Exception e){
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 将list放入缓存
     *
     * @param key 键
     * @param value 值
     * @return
     */
    public boolean lSet(String key, List<Object> value) {
        try {
            redisTemplate.opsForList().rightPushAll(key, value);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }
    /**
     * 将list放入缓存
     *
     * @param key 键
     * @param value 值
     * @param time 时间(秒)
     * @return
     */
    public boolean lSet(String key, List<Object> value, long time) {
        try {
            redisTemplate.opsForList().rightPushAll(key, value);
            if (time > 0)
                expire(key, time);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 根据索引修改list中的某条数据
     * @param key 键
     * @param index 索引
     * @param value 值
     * @return 成功返回true 失败返回false
     */
    public boolean lUpdateIndex(String key,long index,Object value){
        try {
            redisTemplate.opsForList().set(key,index,value);
            return true;
        }catch (Exception e){
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 移除N个值value
     *
     * @param key 键
     * @param count 移除多少个
     * @param value 值
     * @return 移除的个数
     */
    public long lRemove(String key, long count, Object value) {
        try {
            Long remove = redisTemplate.opsForList().remove(key, count, value);
            return remove;
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }
    }

}

测试示例

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-MQPGFy0u-1652186726338)(Redis.assets/image-20220508174304583.png)]

7.redis.conf文件配置

启动的时候,就是通过这个配置文件来启动!

单位

配置文件 单位 对大小写不敏感

包含

可以导入其他配置文件

网络

绑定ip

保护模式

端口设置

通用

以守护进程的方式运行,默认是 no ,我们需要自己开启为 yes

如果以后台的方式运行,我们就需要指定一个 pid 文件


设置日志级别,以及日志的存储位置

设置数据库的数量,默认是 16 个

设置是否总是显示 LOGO

快照

用于数据持久化,即 在规定的时间内,执行了多少次 *** 作之后,会将数据持久化到文件(.rdb .aof)

redis 是内存数据库,如果没有进行持久化,那么数据断电即失!

上图默认设置的意思是

如果 900 s 内 至少有 1 个 key 进行了修改,我们就进行数据持久化 *** 作

如果 300 s 内 至少有 10 个 key 进行了修改,我们就进行数据持久化 *** 作

如果 60 s 内 至少有 10000 个 key 进行了修改,我们就进行数据持久化 *** 作

我们之后学习持久化时,会在这里定义自己的数据持久化规则!

数据持久化过程中出错,是否还需要停止工作 默认为停止工作 即 yes

是否压缩.rdb文件

保存 .rdb文件时,进行错误的检查校验

.rdb文件的保存目录

安全

可以在这里设置 redis 的密码,默认是没有密码!

当然密码也可以用过命令来设置

config get requirepass 获取密码

config ser requirepass 设置密码

auth 密码 密码设置过之后在连接redis时需要先加一行这个命令进行登录

限制

限制最大能连接的客户端数量

限制最大的内存容量

内存到达上限之后的处理策略,共有五种策略

从上到下依次为:

  • 只对设置了过期时间的 key 进行 LRU 删除
  • 对任意 key 进行 LRU 删除
  • 只对删除了过期时间的 key 进行 LFU 删除
  • 对任意 key 进行 LFU 删除
  • 只对设置了过期时间的 key 进行随机删除
  • 对任意 key 进行随机删除
  • 永不删除,只返回一个写 *** 作错误

AOF 模式

默认是不开启 aof 模式的,默认是使用 rdb 模式进行持久化的,因为在大部分情况下,rdb 已经完全够用了。

aof 持久化的文件名字是 appendonly.aof

  • 每次修改都会进行同步(消耗性能)

  • 每秒执行一次同步 ,可能会丢失这1 s 的数据!(默认)

  • 每次修改不执行同步(这个时候 *** 作系统自己同步数据,速度最快)

8.Redis的持久化

Redis是内存数据库,如果不将内存中的数据库状态保存到磁盘,那么一旦服务器进程退出,服务器中的数据库状态也会消失。所以Redis提供了持久化功能!

1.RDB(Redis DataBase)
  • 什么是 RDB

在指定的时间间隔内将内存中的数据集快照写入磁盘,也就是行话讲的 Snapshot 快照,它恢复时是将快照文件直接读到内存里。

Redis会单独创建 (fork) 一个子进程来进行持久化,会先将数据写入到一个临时文件中,待持久化过程都结束了,再用这个临时文件替换上次持久化好的文件。整个过程中好,主进程是不进行任何IO *** 作的。这就确保了极高的性能。如果需要进行大规模数据的恢复,且对于数据恢复的完整性不是那么敏感,那么RDB模式要比AOF模式更加的高效。

RDB的缺点是最后一次持久化后的数据可能丢失。

  • RDB保存的是 dump.rdb文件

  • RDB的触发机制

    • 1.在save的规则满足的情况下,会自动触发rdb规则
    • 2.执行flushall命令,也会触发rdb规则
    • 3.退出redis,也会触发rdb规则
  • 如何回复rdb文件

    1. 只需要将rdb文件放在我们redis的启动目录就可以,redis启动的时候会自动检查dump.rdb并恢复其中的数据

    2. 查看我们的启动目录

      118.31.169.182:7521> config get dir
      1) "dir"
      2) "/var/lib/redis"
      118.31.169.182:7521> 
      
  • 优点

    1. 适合大规模的数据恢复
    2. 对数据的完整性不高
  • 缺点

    1. 需要一定的时间间隔进行 *** 作!如果redis意外宕机了,那么最后一次修改的数据是不能恢复的!
    2. fork进程的时候,会占用一定的内存空间!
2.AOF(Append Only File)

将我们的所有命令都记录下来,history,恢复的时候就把这个文件全部再执行一遍!

  • 什么是AOF

    以日志的形式来记录每个写 *** 作,将redis执行过的所有指令都记录下来(不记录读 *** 作),只许追加文件但不可以改写文件。

    redis在启动之初会读取该文件重新构建数据,换句话说,redis重启的话就是根据日志文件的内容将写指令从前到后再执行一遍以完成数据的回复工作

  • AOF保存的是 appendonly.aof文件

  • 我们只需将 appendonly no no 改为 yes就开启了AOF模式

    重启redis即可触发AOF机制。

    如果aof文件有错误,那么这个时候redis是启动不起来的,我们需要修复这个aof文件

  • 如何修复aof文件?

    redis 给我们提供了一个工具 redis-check-aof

    使用命令 redis-check-aof --fix appendonly.aof 对aof文件进行修复

  • 优点

    1. 每一次修改,都会进行进行同步,文件的完整性会更好!
    2. 每秒同步一次,可能会丢失一秒的数据
    3. 从不同步,效率最高
  • 缺点

    1. 相对于数据文件来说,aof远远大于rdb,修复的速度也比rdb慢!
    2. aof运行效率也要比rdb慢,所以redis默认的配置是使用rdb进行持久化!
3.总结
  1. RDB持久化方式能够在指定的时间间隔内对数据进行快照存储
  2. AOF持久化方式记录每次对服务器的写 *** 作,当服务器重启的时候会重新执行这些命令来恢复原始的数据,AOF命令以redis协议追加保存每次的写 *** 作到文件莫问,redis还能对AOF文件进行后台重写,使得AOF文件的体积不至于过大。
  3. 只做缓存,如果只希望自己的数据在服务器运行的时候存在,那么可以不使用任何持久化
  4. 可以同时开始两种持久化方式
    • 这种情况下,当redis重启的时候会优先载入AOF文件来恢复原始的数据,因为在通常情况下AOF文件保存的数据集要比RDB文件保存的数据集更加完整
    • RDB的数据不是实时的,同时使用两者时,服务器重启也只会找AOF文件,但不能只使用AOF,因为RDB更适合用于备份数据库(AOF在不断变化不好备份),快速重启。而且不会有AOF可能潜在的bug。
  5. 性能建议
    • 因为RDB文件只用作备份用途,因此建议只在slave上持久化RDB文件,而且只需15分钟备份一次就够用了,只保留 sava 900 1这条规则即可。
    • 如果Enable AOF,好处是在最恶劣情况下也只会丢失不超过两秒数据,启动脚本较简单,只load自己的AOF文件就可以了,代价 1 带来了持续的IO 代价2 AOF rewrite 的最后将 rewrite过程中产生的新数据写到新文件时造成的阻塞几乎是不可避免的。只要硬盘许可,应该尽量减少 AOF rewrite 的频率,AOF重写的基础大小 默认值 64 M 台下了,可以设置到 5G 以上,默认超过原大小 100% 大小重写可以改到适当的数值。
    • 如果不 Enable AOF ,仅靠 Master-Slave Replication实现高可用性也可以,能省掉一大笔IO,也减少了 rewrite时带来的系统波动。代价是如果 Master/Slave 同时挂掉(比如说断电),会丢失十几分钟的数据,启动脚本也要比较两个 Master / Slave 中的 RDB文件,载入较新的那个,微博就是这种架构。
9.redis的发布订阅 简单介绍

什么是发布订阅

redis 发布订阅(pub/sub) 是一种消息通信模式:发送者(pub)发送消息,订阅者(sub)接收消息。如 微信 微博 关注系统。

redis 客户端可以订阅任意数量的频道。

  1. 消息发送者 2. 频道 3.消息订阅者

命令

这些命令被广泛用于构建即时通信应用,比如网络聊天室(chatroom)和实时广播、实时提醒等。

命令描述
psubscribe pattern [pattern ...]订阅一个或多个符合给定模式的频道
pubsub subcommand [argument[argument ...]]查看订阅与发布系统状态
publish channel message将信息发送到指定的频道
punsubscribe [pattern [pattern ...]]退订所有给定模式的频道
subscribe channel [channel ...]订阅给定的一个或多个频道的信息
unsubscribe [channel [channel ...]]退订给定的频道
测试

订阅端

127.0.0.1:6379> subscribe aiyabingbing       # 订阅一个频道    aiyabingbing
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "aiyabingbing"
3) (integer) 1
# 等待读取推送的消息
1) "message"   			# 消息标志
2) "aiyabingbing"		# 频道名称
3) "hello,bingbing"		# 消息内容
1) "message"
2) "aiyabingbing"
3) "hello,redis"

发送端

127.0.0.1:6379> publish aiyabingbing hello,bingbing    # 发布者发布消息 hello,bingbing 到 aiyabingbing 频道!
(integer) 1
127.0.0.1:6379> publish aiyabingbing hello,redis
(integer) 1
127.0.0.1:6379> 

原理

Redis 是使用C实现的,通过分析Redis源码里的pubsub.c文件,了解发布和订阅机制的底层实现,借此加深对Redis的理解。

Redis通过 publish subscribepsubscribe 等命令实现发布和订阅功能。

通过subscribe命令订阅某频道后,redis-server里维护了一个字典,字典的键就是一个个channel,而字典的值则是一个链表,链表中保存了所有订阅这个channel的客户端。subscribe命令的关键就是,将客户端添加到给定channel的订阅链表中。

通过publish命令向订阅者发送消息,redis-server会使用给定的频道作为键,在它所维护的channel字典中查找记录了订阅这个频道的所有客户端的链表,遍历这个链表,将消息发布给所有订阅者。

pub / sub 从字面上理解就是发布(publish) 与订阅(subscribe),在redis中,你可以设定对某一个key值进行消息发布及消息订阅,当一个key值上进行了消息发布后,所有订阅它的客户端都会收到相应的消息。这一功能最明显的用法就是用作实时消息系统,比如普通的即时聊天,群聊等功能。

使用场景:

  1. 实时消息系统
  2. 实时聊天!(频道当做聊天室,将消息发送给所有人即可)
  3. 订阅、关注系统

稍微复杂的场景就会使用 消息中间件 MQ

10.redis的主从复制 概念

主从复制,是指将一台redis服务器的数据,复制到其他的redis服务器。前者称为主节点(master / leader),后者称为从节点 (slave / follower);数据的复制是单向的,只能由主节点到从节点。master 以写为主,slave 以读为主。

默认情况下,每台redis服务器都是主节点;且一个主节点可以由多个从节点(或没有从节点),但一个从节点只能有一个主节点。

主从复制的左右主要包括:

  1. 数据冗余:主从赋值实现了数据的热备份,是持久化之外的一种数据冗余方式。
  2. 故障恢复:当主节点出现问题是,可以由从节点提供服务,实现快速的故障恢复;实际上是一种服务的冗余。
  3. 负载均衡:在主从复制的基础上,配合读写分离,可以由主节点提供写服务,由从节点提供读服务(即写redis数据时 应用连接主节点,读数据时 应用连接诶从节点),分担服务器负载;尤其是在写少读多的场景下,通过多个从节点分担读负载,可以大大提高redis服务器的并发量。
  4. 高可用基石:除了上述作用以外,主从复制还是哨兵和集群能够实施的基础,因此说主从复制是redis高可用的基础。

一般来说,要将redis运用于工程项目中,只使用一台redis是万万不能的(一般至少要有三台,一主二从),原因如下:

  1. 从结构上,单个redis服务器会发生单点故障,并且一台服务器需要处理所有的请求负载,压力较大;
  2. 从容量上,单个redis服务器内存容量有限,就算一台redis服务器内存容量为256 G,也不能将所有内存用作redis存储内存,一般来说,单台redis最大使用内存不应该超过20 G。

电商网站上的商品,一般都是一次上传,无数次浏览,就是 写少读多

对于这种场景,我们可以使用如下架构:

主从复制,读写分离! 80 % 的情况下都是在进行读 *** 作! 可以用来减缓服务器的压力,在公司架构中经常使用!

环境配置(伪集群)

只配置从库,不用配置主库(因为默认每一台主机都是主库)

127.0.0.1:6379> info replication                       #  查看当前库的配置信息
# Replication
role:master                     # 角色
connected_slaves:0              # 从机
master_replid:f1f31b79ee95f97c457da8301dec428e92c55e87
master_replid2:0000000000000000000000000000000000000000
master_repl_offset:0
second_repl_offset:-1
repl_backlog_active:0
repl_backlog_size:1048576
repl_backlog_first_byte_offset:0
repl_backlog_histlen:0
127.0.0.1:6379> 

配置从机,复制3个配置文件,修改对应的信息

  1. 端口
  2. pid 名字
  3. log 文件名字
  4. dump.rdb 文件名字

修改完毕之后,启动3个redis服务器,可以通过进程命令来查看是否启动成功

ps -ef | grep redis

一主二从

比如说我现在开了3台主机,端口分别是 6379 6380 6381 我现在想设置 6379 为主机,6380 和 6381 为 6379 的从机,我需要进行如下 *** 作

slaveof 127.0.0.1 6379 # 认主机  主机ip  主机端口号
# 此时 在 6379 主机 执行info replication显示多了一台从机,并可以看到从机的相关信息
#     在 6380 主机 执行info replication显示当前主机的角色已经更新为从机(slave),并可以查看主机的相关信息

真实的主从配置应该在配置文件中配置,这样是永久配置,使用命令进行配置只是暂时的。

细节
  1. 主机可以写,从机不能写只能读!主机中的所有信息和数据,都会自动被从机保存!

  2. 主机断开连接,从机依旧保持与主机的连接状态,但是现在从机处于阻塞状态,不能进行写 *** 作,如果主机重新连接,那么从机依旧可以恢复之前与主机保持连接的状态,可以直接获取到主机写的信息!

  3. 如果是通过命令行配置的主从复制,那么当从机断开连接后,其与主机失去连接,当从机再次恢复连接时,也不能恢复之前与主机保持连接的状态,从机重新连接之后角色自动变为主机,但是,只要重新将这台从机的角色设回从机,立马就会获得之前作为从机时存储的数据!

主从复制的复制原理

slave 启动成功连接到 master之后会发送一个sync命令

master接到命令,启动后台的存盘进程,同时收集所有接收到的用于修改数据集的命令,在后台进程执行完毕之后,master将传送整个数据文件到slave,并完成依次完全同步。

  • 全量复制:salve在接收到数据库文件数据后,将其存盘并加载到内存中。

  • 增量复制:master继续将新的所有收集到的修改命令依次传给slave,完成同步

    但是只要是重新连接master,依次完全同步(全量复制)将被自动执行,我们的数据一定可以在从机中看到

宕机后手动配置主机

如果主机断开了连接,我们可以使用命令重新手动选择一台主机

slaveof no one

然后其他的从节点可以手动连接到新的主节点上

如果原先的主机修复了,重新回来也不会自动重新成为这些从机的主节点,需要重新手动配置这些从机

哨兵模式

自动选出新的主机

概念

主动切换技术的方法是:当主服务器宕机后,需要手动把一台服务器切换为主服务器,这就需要人工干预,费时费力,还会造成一段时间内服务不可用。这不是一种推荐的方式,更多时候优先考虑哨兵模式。

redis 从 2.8 开始正式提供了 Sentinel (哨兵) 架构来解决这个问题。

哨兵模式能够后台监控主机是否故障,如果故障了根据投票数自动将从库转换为主库。

哨兵模式是一种特殊的模式,首先redis提供了哨兵的命令,哨兵是一个独立的进程,作为进程,它会独立运行,其原理是 哨兵通过发送命令,等待redis服务器响应,从而监控运行的多个redis实例。

这里的哨兵有两个作用:

  • 通过发布命令,让redis服务器返回监控其运行状态,包括主服务器和从服务器。
  • 当哨兵监测到master宕机,会自动将slave切换成master,然后通过发布订阅模式通知其他的从服务器,修改配置文件,让他们切换主机。

然而一个哨兵进程对redis服务器进行监控,可能会出现问题,为此,我们可以使用多个哨兵进行监控。各个哨兵之间也会相互监控,这样就形成了多哨兵模式。

假设主服务器宕机,哨兵1先监测到这个结果,系统并不会马上进行failover过程,仅仅是哨兵1主观的认为主服务器不可用,这个现象称为主观下线。当后面的哨兵也监测到主服务器不可用,并且数量达到一定值时,那么哨兵之间就会进行一次投票,投票的结果由一个哨兵发起,进行failover[故障转移] *** 作。切换成功后,就会通过发布订阅模式,让各个哨兵把自己监控的从服务器实现切换主机,这个过程称为客观下线。

测试
# 1.首先需要在config目录下新建哨兵文件
vim sentinel.conf

# 2.在哨兵文件中配置信息
# sentinel monitor 主机名称 主机IP 主机端口 1  
sentinel monitor myredis 127.0.0.1 6379 1
# 后面的这个数字1 代表主机挂了,哨兵投票看让谁接替成为主机,票数最多的,就会成为主机!

# 3.启动哨兵
redis-sentinel [配置文件目录(即sentinel.conf所在目录)]/sentinel.conf 


哨兵模式启动之后,如果主机宕机,哨兵就会在从机中通过一定的算法随机选择一台作为新的主机,并将剩下的从机连接到这台新主机上

如果原主机之后又修复回来了,也只能归并到新的主机下,当做新主机的从机,这就是哨兵模式的规则!

哨兵模式的优缺点
  • 优点
    1. 哨兵集群,基于主从复制模式,所有的主动配置优点,它全都有
    2. 主从可以切换,故障可以转移,系统的可用性更好
    3. 哨兵模式就是主从模式的升级,手动到自动,更加健壮!
  • 缺点
    1. redis不好在线扩容,集群容量一旦到达上限,在线扩容就十分麻烦!
    2. 实现哨兵模式的配置其实是很麻烦的,里面有很多选择!
哨兵模式配置说明
# Example sentinel.conf
 
# 哨兵sentinel实例运行的端口 默认26379     如果有哨兵集群,还需要配置每个哨兵端口
port 26379

# 哨兵sentinel的工作目录
dir /tmp

# 哨兵sentinel监控的redis主节点的 ip port
# master-name 可以自己命名的主节点名字 只能由字母A-z、数字0-9 、这三个字符".-_"组成。
# quorum 配置多少个sentinel哨兵统一认为master主节点失联 那么这时客观上认为主节点失联了
# sentinel monitor    
sentinel monitor mymaster 127.0.0.1 6379 2

# 当在Redis实例中开启了requirepass foobared 授权密码 这样所有连接Redis实例的客户端都要提供密码
# 设置哨兵sentinel 连接主从的密码 注意必须为主从设置一样的验证密码
# sentinel auth-pass  
sentinel auth-pass mymaster MySUPER--secret-0123passw0rd

# 指定多少毫秒之后 主节点没有应答哨兵sentinel 此时 哨兵主观上认为主节点下线 默认30秒
# sentinel down-after-milliseconds  
sentinel down-after-milliseconds mymaster 30000

# 这个配置项指定了在发生failover主备切换时最多可以有多少个slave同时对新的master进行同步,这个数字越小,完成failover所需的时间就越长,但是如果这个数字越大,就意味着越多的slave因为replication而不可用。可以通过将这个值设为 1 来保证每次只有一个slave 处于不能处理命令请求的状态。
# sentinel parallel-syncs  
sentinel parallel-syncs mymaster 1

# 故障转移的超时时间 failover-timeout 可以用在以下这些方面:
#	1. 同一个sentinel对同一个master两次failover之间的间隔时间。
#	2. 当一个slave从一个错误的master那里同步数据开始计算时间。直到slave被纠正为向正确的master那里同步数据时。
#	3.当想要取消一个正在进行的failover所需要的时间。
#	4.当进行failover时,配置所有slaves指向新的master所需的最大时间。不过,即使过了这个超时,slaves依然会被正确配置为指向master,但是就不按parallel-syncs所配置的规则来了
# 默认三分钟
# sentinel failover-timeout  
sentinel failover-timeout mymaster 180000

# SCRIPTS EXECUTION
#配置当某一事件发生时所需要执行的脚本,可以通过脚本来通知管理员,例如当系统运行不正常时发邮件通知相关人员。
#对于脚本的运行结果有以下规则:
#若脚本执行后返回1,那么该脚本稍后将会被再次执行,重复次数目前默认为10
#若脚本执行后返回2,或者比2更高的一个返回值,脚本将不会重复执行。
#如果脚本在执行过程中由于收到系统中断信号被终止了,则同返回值为1时的行为相同。
#一个脚本的最大执行时间为60s,如果超过这个时间,脚本将会被一个SIGKILL信号终止,之后重新执行。

#通知型脚本:当sentinel有任何警告级别的事件发生时(比如说redis实例的主观失效和客观失效等等),将会去调用这个脚本,这时这个脚本应该通过邮件,SMS等方式去通知系统管理员关于系统不正常运行的信息。调用该脚本时,将传给脚本两个参数,一个是事件的类型,一个是事件的描述。如果sentinel.conf配置文件中配置了这个脚本路径,那么必须保证这个脚本存在于这个路径,并且是可执行的,否则sentinel无法正常启动成功。
#通知脚本
# sentinel notification-script  
sentinel notification-script mymaster /var/redis/notify.sh

# 客户端重新配置主节点参数脚本
# 当一个master由于failover而发生改变时,这个脚本将会被调用,通知相关的客户端关于master地址已经发生改变的信息。
# 以下参数将会在调用脚本时传给脚本:
#       
# 目前总是“failover”,
# 是“leader”或者“observer”中的一个。
# 参数 from-ip, from-port, to-ip, to-port是用来和旧的master和新的master(即旧的slave)通信的
# 这个脚本应该是通用的,能被多次调用,不是针对性的。
# sentinel client-reconfig-script  
sentinel client-reconfig-script mymaster /var/redis/reconfig.sh    # 一般都是由运维来配置
11.缓存穿透和雪崩(面试高频、工作常用)

初步介绍

redis缓存的使用,极大的提升了应用程序的性能和效率,特别是数据查询方面,但同时,它也带来了一些问题。其中最要害的问题就是数据的一致性问题,从严格意义上讲,这个问题无解。如果对数据的一致性要求较高,那么就不能使用缓存。

另外的一些典型问题就是,缓存穿透、缓存雪崩和缓存击穿。目前业界也都有比较流行的解决方案。

缓存穿透(秒杀)
  • 概念

    缓存穿透的概念很简单,用户想要查询一个数据,发现redis内存数据库没有,也就是缓存没有命中,于是向持久层数据库查询。发现也没有,于是本次查询失败。当用户很多的时候,缓存都没有命中,于是都去请求了持久层数据库。这会给持久层数据库造成很大的压力,这时候就相当于出现了缓存穿透。

  • 解决方案

    布隆过滤器

    布隆过滤器是一种数据结构,对所有可能查询的参数以hash形式存储,在控制层先进行校验,不符合则丢弃,从而避免了对底层存储系统的查询压力。

缓存空对象

​ 当存储层没有命中之后 ,将返回的空对象存储起来,同时设置一个过期时间,之后再访问这个数据将会从缓存中获取,保护了后端数据源。

但是这种方法存在两个问题:

	1. 如果空值能够被缓存起来,这就意味着缓存需要更多的空间来存储更多的键,因为可能会有很多空值键
	2. 即使对空值设置了过期时间,还是会存在缓存层和存储层的数据会有一段时间窗口的不一致,这对于需要保持一致性的业务会有影响
缓存击穿(热搜)
  • 概念

    这里需要注意击穿和穿透的区别,缓存击穿是指一个key非常热点,在不停的扛着大并发,大并发集中对这一个点进行访问,当这个key在失效的瞬间,持续的大并发就穿破缓存,直接请求数据库,就像在一个屏障上凿开了一个洞。

    当某个key在过期的瞬间,有大量的请求并发访问,这类数据一般是热点数据,由于缓存过期,会同时访问数据库来查询最新数据,并且回写缓存,会导致数据库瞬间压力过大。

  • 解决方案

    设置热点数据永不过期

    从缓存层面来看,没有设置过期时间,所以不会出现热点key过期后产生的问题

    加互斥锁

    分布式锁: 使用分布式锁,保证对于每个key同时只有一个线程去查询后端服务,其他线程没有获得分布式锁的权限,因此只需要等待即可。这种方式将高并发的压力转移到了分布式锁,因此对于分布式锁的考验很大

缓存雪崩
  • 概念

    缓存雪崩是指在某一个时间段,缓存集中过期失效。

    产生雪崩的原因之一,举例说明:零点抢购,当一批商品时间比较集中的放入了缓存,假设缓存过期时间为一个小时。那么到了凌晨1点钟的时候,这批商品的缓存全部过期,此时对这批商品的访问查询就全部落到了数据库上。对数据库而言,就会产生周期性的压力波峰。于是所有的请求都会达到存储层,存储层的调用量会暴增,造成存储层也会挂掉。

​ 其实集中过期导致的雪崩也不是非常致命,比较致命的雪崩是缓存服务器某个节点宕机或断网。因为自然形成的缓存雪崩,一定是在某个时间段集中创建缓 存,这个时候,数据库也是可以顶住压力的,无非就是对数据库产生周期性的压力而已。而缓存服务节点的宕机,对数据库服务器造成的压力是不可预知的, 很有可能瞬间就把数据库压垮。

  • 解决方案

    redis高可用

    这个思想的含义是,既然redis有可能挂掉,那我就多增设几台redis,这样一台挂掉之后其他的还可以继续工作,其实就是搭建集群

限流降级

​ 这个解决方案的思想是,在缓存失效后,通过加锁或者队列来控制数据库写缓存的线程数量。比如对某个key只允许一个线程查询数据和写缓存,其他线程等 待

数据预热

​ 数据预热的含义就是在正式部署之前,我先把所有可能的数据先预先访问一遍,这样部分可能大量访问的数据就会被加载到缓存中。在即将发生大并发访问前 手动触发加载缓存不同的key,设置不同的过期时间,让缓存失效的时间点尽量均匀。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/905574.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-15
下一篇 2022-05-15

发表评论

登录后才能评论

评论列表(0条)

保存