作用
- 对数据备份, 实现高可用 HA (主要)
- 通过读写分离, 提高吞吐量, 实现高性能
原理
- Mysql的复制 是一个异步的复制过程
- 过程本质为 Slave 从 Master 端获取 Binary Log, 然后再在自己身上完全顺序的执行日志中所记录的各种 *** 作
- MySQL 复制的基本过程如下:
1)Slave 上面的 IO 线程连接上 Master, 并请求从指定日志文件的指定位置之后的日志内容;
2)Master 接收到来自 Slave 的 IO 线程的请求后, 通过负责复制的IO线程 根据请求信息读取日志信息,返回给 Slave 端的 IO 线程。
3)Slave 的 IO 线程接收到信息后,将接收到的日志内容依次写入到 Slave 端的 Relay Log文件
4)Slave 的 SQL 线程检测到 Relay Log 中新增加了内容后,会马上解析该文件中的内容, 并在自身执行这些 原始SQL语句。
注:
I/O thread从Binary log中读取到的数据并不是读一条执行一条而是先写入到Relay log中,否则效率会很慢
主从架构
- 性能
一主多从, 读写分离, 提高吞吐量 - 可用性
主库单点, 一旦挂了, 无法写入
从库高可用
主备架构
- 性能
单库读写, 性能一般 - 可用性
高可用, 一旦主库挂了, 就启用备库 - 这种方案被阿里云、美团等企业广泛使用
主备架构搭建除了配置双主同步, 还需要搭配第三方故障转移/高可用方案, 属于DBA和运维专业领域
MySQL + Keepalived 双主热备高可用 *** 作记录
我们通常说的双机热备是指两台机器都在运行,但并不是两台机器都同时在提供服务。当提供服务的一台出现故障的时候,另外一台会马上自动接管并且提供服务,而且切换的时间非常短。MySQL双主复制,即互为Master-Slave(只有一个Master提供写 *** 作),可以实现数据库服务器的热备,但是一个Master宕机后不能实现动态切换。使用Keepalived,可以通过虚拟IP,实现双主对外的统一接口以及自动检查、失败切换机制,从而实现MySQL数据库的高可用方案
1)先实施Master->Slave的主主同步。主主是数据双向同步,主从是数据单向同步。一般情况下,主库宕机后,需要手动将连接切换到从库上。(但是用keepalived就可以自动切换)
2)再结合Keepalived的使用,通过VIP实现Mysql双主对外连接的统一接口。即客户端通过Vip连接数据库;当其中一台宕机后,VIP会漂移到另一台上,这个过程对于客户端的数据连接来说几乎无感觉,从而实现高可用。
如果我们基于代码层面而不考虑去安装部署keepalive,只需要在配置访问数据库地址时设置为VIP虚拟IP即可
问题: 既然主备互为备份, 为什么不采用双主方案, 提供两台主进行负载均衡?
- 原因是为了避免数据的冲突,防止造成数据的不一致性。 虽然在两边执行的修改有先后顺序,但由于 Replication 是异步的实现机制,同样可能会导致 晚做的修改被早做的修改所覆盖
- 不仅B库数据错误, 且A&B库数据不一致
高可用复合架构
- 性能
读写分离, 提高吞吐量 - 可用性
高可用, 一旦主库挂了, 就启用备库
- sqlalchemy 并没有像 django-orm 一样内置完善的读写分离方案, 但是提供了可以自定义的接口: 官方参考文档, 我们可以借此对 flask-sqlalchemy 进行二次开发, 实现读写分离
engines = {
'leader':create_engine("sqlite:///leader.db"),
'other':create_engine("sqlite:///other.db"),
'follower1':create_engine("sqlite:///follower1.db"),
'follower2':create_engine("sqlite:///follower2.db"),
}
from sqlalchemy.sql import Update, Delete
from sqlalchemy.orm import Session, sessionmaker
import random
class RoutingSession(Session):
def get_bind(self, mapper=None, clause=None):
#如果是我指定的某个模型类(比如User),就调用这个other这个库
if mapper and issubclass(mapper.class_, MyOtherClass):
return engines['other']
# 如果是增删改就调用leader这个库
elif self._flushing or isinstance(clause, (Update, Delete)):
return engines['leader']
else:
# 都不是的话就随机选择一个从库
return engines[
random.choice(['follower1','follower2'])
]
# 通过class_ 这个属性指定自定义的session类
Session = sessionmaker(class_=RoutingSession)
基本实现思路:
实现自定义的 session类, 继承 SignallingSession类
- 重写 get_bind方法, 根据读写需求选择对应的数据库地址
实现自定义的 SQLAlchemy类, 继承 SQLAlchemy类
- 重写 create_session方法, 在内部使用自定义的 Session类
import random
from flask import Flask
from flask_sqlalchemy import SQLAlchemy, SignallingSession, get_state
from sqlalchemy import orm
from sqlalchemy.sql.dml import UpdateBase
app = Flask(__name__)
# 设置单个数据库URI (用于建表)
app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://root:mysql@192.168.105.140:3306/test31'
# 设置多个数据库的URI (用于数据 *** 作)
app.config['SQLALCHEMY_BINDS'] = {
'master': 'mysql://root:mysql@192.168.105.140:3306/test31',
'slave1': 'mysql://root:mysql@192.168.105.140:8306/test31',
'slave2': 'mysql://root:mysql@192.168.105.140:3306/test31'
}
# 其他配置
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config['SQLALCHEMY_ECHO'] = True
# 1. 自定义Session类, 继承SignallingSession, 并重写get_bind方法
class RoutingSession(SignallingSession):
def __init__(self, db, autocommit=False, autoflush=True, **options):
# 先完成父类方法的默认 *** 作
super(RoutingSession, self).__init__(db, autocommit, autoflush, **options)
# 每个Session(请求), 随机一次从库,可以保证每次请求中查询时会使用同一个从库,提高效率并且减少数据不一致的错误现象
self.slave = random.choice(['slave1', 'slave2'])
def get_bind(self, mapper=None, clause=None):
"""每次数据库 *** 作(增删改查及事务 *** 作)都会调用该方法, 来获取对应的数据库引擎(访问的数据库)"""
state = get_state(self.app)
if mapper is not None: # 如果该 *** 作中涉及的模型类和数据表建立了映射
try:
# SA >= 1.3
persist_selectable = mapper.persist_selectable
except AttributeError:
# SA < 1.3
persist_selectable = mapper.mapped_table
info = getattr(persist_selectable, 'info', {})
bind_key = info.get('bind_key') # 查询模型类是否指定了访问的数据库
if bind_key is not None: # 如果该模型类已指定数据库, 使用指定的数据库
return state.db.get_engine(self.app, bind=bind_key)
if self._flushing or isinstance(clause, UpdateBase): # 如果模型类未指定数据库, 判断是否为写 *** 作 该属性为True
print('写 *** 作')
return state.db.get_engine(self.app, bind='master')
else:
print('读 *** 作: ', self.slave)
return state.db.get_engine(self.app, bind=self.slave)
# 2. 自定义SQLALchemy类, 重写create_session方法
class RoutingSQLAlchemy(SQLAlchemy):
def create_session(self, options):
return orm.sessionmaker(class_=RoutingSession, db=self, **options)
# 创建组件对象
db = RoutingSQLAlchemy(app)
# 构建模型类
class User(db.Model):
__tablename__ = 't_user'
# __bind_key__ = 'master' # 指定模型访问的数据库
id = db.Column(db.Integer, primary_key=True)
name = db.Column('username', db.String(20), unique=True)
age = db.Column(db.Integer, default=0, index=True)
@app.route('/')
def index():
"""增加数据"""
print('---读-----------')
users = User.query.all()
for user in users:
print(user.id, user.name, user.age)
# print('---写-----------')
#
# user1 = User(name='zs', age=20)
# db.session.add(user1)
# db.session.commit()
print('---读-----------')
users = User.query.all()
for user in users:
print(user.id, user.name, user.age)
return "index"
if __name__ == '__main__':
# 重置所有继承自db.Model的表
# 如果模型类没有设置__bind_ky__属性(指定对应的数据库), 则DDL *** 作 根据SQLALCHEMY_DATABASE_URI 指定的数据库进行处理
db.drop_all()
db.create_all()
app.run(debug=True, host='0.0.0.0')
项目集成
- 将工具包routing_db 导入 common/models中 , 其中的 routing_sqlalchemy.py文件实现了读写分离
# routing_db/routing_sqlalchemy.py
import random
from flask_sqlalchemy import SQLAlchemy, SignallingSession, get_state
from sqlalchemy import orm
from sqlalchemy.sql.dml import UpdateBase
# 设置多个数据库的URI (用于数据 *** 作)
# app.config['SQLALCHEMY_BINDS'] = {
# 'master': 'mysql://root:mysql@192.168.105.134:3306/test31',
# 'slave1': 'mysql://root:mysql@192.168.105.134:8306/test31',
# 'slave2': 'mysql://root:mysql@192.168.105.134:3306/test31'
# }
class RoutingSession(SignallingSession):
"""自定义Session类, 继承SignallingSession"""
def __init__(self, db, autocommit=False, autoflush=True, **options):
super(RoutingSession, self).__init__(db, autocommit, autoflush, **options)
# 每个Session(请求), 随机一次从库, 避免每个请求访问多个从库影响性能
self.slave = random.choice(['slave1', 'slave2'])
def get_bind(self, mapper=None, clause=None):
"""每次数据库 *** 作(增删改查及事务 *** 作)都会调用该方法, 来获取对应的数据库引擎(访问的数据库)"""
state = get_state(self.app)
if mapper is not None: # 如果该 *** 作中涉及的模型类和数据表建立了映射
try:
# SA >= 1.3
persist_selectable = mapper.persist_selectable
except AttributeError:
# SA < 1.3
persist_selectable = mapper.mapped_table
info = getattr(persist_selectable, 'info', {})
bind_key = info.get('bind_key') # 查询模型类是否指定了访问的数据库
if bind_key is not None: # 如果该模型类已指定数据库, 使用指定的数据库
return state.db.get_engine(self.app, bind=bind_key)
if self._flushing or isinstance(clause, UpdateBase): # 如果模型类未指定数据库, 判断是否为写 *** 作
print('写 *** 作')
return state.db.get_engine(self.app, bind='master')
else:
print('读 *** 作: ', self.slave)
return state.db.get_engine(self.app, bind=self.slave)
class RoutingSQLAlchemy(SQLAlchemy):
"""自定义SQLALchemy类"""
def create_session(self, options):
"""重写create_session方法: 使用自定义Session类"""
return orm.sessionmaker(class_=RoutingSession, db=self, **options)
- 在 app/settings/config.py文件中 设置主从数据库的URI地址
# app/settings/config.py
class DefaultConfig:
"""默认配置"""
...
SQLALCHEMY_BINDS = { # 主从数据库的URI
"master": 'mysql://root:mysql@192.168.105.140:3306/hm_topnews',
"slave1": 'mysql://root:mysql@192.168.105.140:3306/hm_topnews',
"slave2": 'mysql://root:mysql@192.168.105.140:8306/hm_topnews'
}
...
- 在 app/init.py文件 中使用自定义SQLAlchemy类
# app/__init__.py
...
# from flask_sqlalchemy import SQLAlchemy
# db = SQLAlchemy()
from models.routing_db.routing_sqlalchemy import RoutingSQLAlchemy
# mysql数据库 *** 作对象
db = RoutingSQLAlchemy()
...
优化
修改前
class LoginResource(Resource):
"""注册登录"""
def post(self):
# 获取参数
parser = RequestParser()
parser.add_argument('mobile', required=True, location='json', type=mobile_type)
parser.add_argument('code', required=True, location='json', type=regex(r'^\d{6}$'))
args = parser.parse_args()
mobile = args.mobile
code = args.code
# 校验短信验证码
key = 'app:code:{}'.format(mobile)
real_code = redis_client.get(key)
if not real_code or real_code != code:
return {'message': 'Invalid Code', 'data': None}, 400
# 删除验证码
# redis_client.delete(key)
# 校验成功, 查询数据库
user = User.query.options(load_only(User.id)).filter(User.mobile == mobile).first()
if user: # 如果有, 取出用户id, 更新最后登录时间
user.last_login = datetime.now()
else: # 如果没有, 创建新用户
user = User(mobile=mobile, name=mobile, last_login=datetime.now())
db.session.add(user)
#这里commit之后事务就关闭了,那下面返回的user.id就得重新查(意思是这里会查两边),但是如果提前保存这个userid就可以
db.session.commit()
# 生成jwt (这里会重新查一边)
token = generate_jwt({'userid': user.id},
expiry=datetime.utcnow() + timedelta(days=current_app.config['JWT_EXPIRE_DAYS']))
# 返回结果
return {'token': token}, 201
修改后
class LoginResource(Resource):
"""注册登录"""
def post(self):
# 获取参数
parser = RequestParser()
parser.add_argument('mobile', required=True, location='json', type=mobile_type)
parser.add_argument('code', required=True, location='json', type=regex(r'^\d{6}$'))
args = parser.parse_args()
mobile = args.mobile
code = args.code
# 校验短信验证码
key = 'app:code:{}'.format(mobile)
real_code = redis_client.get(key)
if not real_code or real_code != code:
return {'message': 'Invalid Code', 'data': None}, 400
# 删除验证码
# redis_client.delete(key)
# 校验成功, 查询数据库
user = User.query.options(load_only(User.id)).filter(User.mobile == mobile).first()
if user: # 如果有, 取出用户id, 更新最后登录时间
user.last_login = datetime.now()
else: # 如果没有, 创建新用户
user = User(mobile=mobile, name=mobile, last_login=datetime.now())
db.session.add(user)
db.session.flush()
userid = user.id
db.session.commit()
# 生成jwt
token = generate_jwt({'userid': userid},
expiry=datetime.utcnow() + timedelta(days=current_app.config['JWT_EXPIRE_DAYS']))
# 返回结果
return {'token': token}, 201
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)