Flask *** 作多个数据库

Flask *** 作多个数据库,第1张

config.py:

SQLALCHEMY_BINDS = {
    "db1": "mysql+pymysql://db_user1:db_pwd1@db_host1:db_port1/db_name1?charset=utf8",
    "db2": "mysql+pymysql://db_user2:db_pwd2@db_host2:db_port2/db_name2?charset=utf8",
}



models.py:

def dynamic_model(bind_key, model_n):
    properties = {
        "__tablename__" = model_n.__tablename__,
        "__bind_key__" = bind_key
    }
    for col in dir(model_n):
        if col.startswith('_'):
            continue
        properties[col] = getattr(model_n, col)
    model = type(model_n.__tablename__, (db.Model,), properties)
    return model

自定义序列化:

class Serialize:
    def __init__(self, model):
        self.query = model.query
        self.model = model
        self.columns = []
        self.page_size = request.args.get('page_size', type=int, default=10)
        self.current_page = request.args.get('page', default=1, type=int)
        self.start = (self.current_page - 1) * self.page_size
        self.end = self.start + self.page_size
        self.field_func_map = {}

    def field(self, *columns):
        self.columns = columns
        self.query = db.session.query(*[getattr(self.model, column) for column in columns])
        return self

    def search(self, **kw_map):
        for column in kw_map:
            if kw_map[column]:
                self.query = self.query.filter(
                    getattr(self.model, column).like("%{}%".format('/' + kw_map[column]), escape='/'))
        return self

    def exclude(self, **kw_map):
        for column in kw_map:
            self.query = self.query.filter(
                getattr(self.model, column) != kw_map[column])
        return self

    def custom_handle(self, **kargs):
        self.field_func_map = kargs
        return self

    def to_json(self, **kw_map):
        res = []
        for column in kw_map:
            self.query = self.query.filter(
                getattr(self.model, column) == kw_map[column])
        if self.columns:
            for q in self.query.slice(self.start, self.end):
                obj_dict = {}
                for i, c in enumerate(self.columns):
                    if c in self.field_func_map:
                        obj_dict[c] = self.field_func_map[c.name](q[i])
                    else:
                        obj_dict[c] = q[i]
                res.append(obj_dict)
        else:
            for q in self.query.slice(self.start, self.end):
                obj_dict = {}
                for c in class_mapper(q.__class__).columns:
                    if c.name in self.field_func_map:
                        obj_dict[c.name] = self.field_func_map[c.name](getattr(q, c.name))
                    else:
                        obj_dict[c.name] = getattr(q, c.name)
                res.append(obj_dict)
        return res

    def total(self):
        return self.query.count()

    def data(self, **kw_map):
        return {
            'data': self.to_json(**kw_map),
            'total': self.total(),
            'currentPage': self.current_page,
            'pageSize': self.page_size
        }

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/726435.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-04-26
下一篇 2022-04-26

发表评论

登录后才能评论

评论列表(0条)

保存