您可以毫无问题地动态构建查询。例如,您可以执行以下 *** 作:
query = select([table])for key, value in params.iteritems(): query = query.where(table.columns[key] == value)print conn.execute(query).fetchall()
那里
params只是限制的字典,如
{'column': 'value'}。这将产生一个查询,其中所有 where
子句都与AND链接在一起。当然,如果您需要更复杂的 where 子句,则查询的构造可能会更加困难。
一个工作示例:
from sqlalchemy import selectfrom sqlalchemy import create_enginefrom sqlalchemy import Column, String, Integerfrom sqlalchemy.schema import metaData, Table# create engine and connectionDB_URI = "mysql+mysqldb://username:password@127.0.0.1:3306/database?charset=utf8mb4"engine = create_engine(DB_URI, convert_unipre=True, echo=False)conn = engine.connect()# create tablemetadata = metaData()pieces_table = Table('pieces', metadata, Column('id', Integer, primary_key=True), Column('size', String(60)), Column('color', String(60)),)metadata.create_all(engine)# insert dataconn.execute(pieces_table.insert(), [ {'size': 'small', 'color': 'blue'}, {'size': 'large', 'color': 'blue'}, {'size': 'small', 'color': 'red'},])# query datadef build_query(table, params): query = select([table]) for key, value in params.iteritems(): query = query.where(table.columns[key] == value) return queryparams = { 'size': 'large', 'color': 'blue',}query = build_query(pieces_table, params)print conn.execute(query).fetchall()
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)