创建数据库
from sqlalchemy import create_engine, Column, String, Integer
from sqlalchemy.ext.declarative import declarative_base
# 创建Base类
Base = declarative_base()
class Test(Base):
__tablename__ = 'user' # 表名
id = Column(Integer, primary_key=True, autoincrement=True) # id: int 主键 自增
username = Column(String(20))
password = Column(String(20))
# 连接mysql
# 参数:echo (调试程序时True,使用时False)
# pool_size(默认5) max_overflow=5
# engine = create_engine('mysql+pymysql://root:root@127.0.0.1:3306/test?charset=utf8',echo=True)
engine = create_engine('mysql+pymysql://root:root@127.0.0.1:3306/test?charset=utf8')
if __name__ == "__main__":
# 创建所有表
Base.metadata.create_all(engine)
增删改查
from sqlalchemy import create_engine, Column, String, Integer
from sqlalchemy.orm import sessionmaker
from creat import Test # 此处调用了创建数据 ↑↑↑
engine = create_engine('mysql+pymysql://root:root@localhost:3306/test?charset=utf8',echo=True)
# 把当前的引擎绑定给这个会话
Session = sessionmaker(bind=engine)
# 实例化
session = Session()
# 增
u1 = Test(username='ZZY', password='123456')
u2 = Test(username='XFY', password='123456')
u3 = Test(username='WGY', password='123456')
u4 = Test(username='DGY', password='123456')
# 插入一条数据
session.add(u1)
# 批量插入
session.add_all([u2, u3, u4])
# 查询
# print(session.query(Test))
# all()返回列表
userlist = session.query(Test).all()
print(userlist)
# 条件查询
userlist = session.query(Test).filter(Test.id==2).all()
print(userlist)
# one()返回单个
user = session.query(Test).filter(Test.id==4).one()
print(user)
# 删除
# 1. 根据对象删除
user = session.query(Test).filter(Test.id==4).one()
session.delete(user)
# 2. 根据查询条件删除
session.query(Test).filter(Test.id==3).delete()
# 改
# synchronize_session可以为False, 'fetch', 'evaluate'
# False - 不对session进行同步,直接delete or updata
# 'fetch' - 在delete or updata之前,先发一条sql到数据库获取到符合条件的记录
# 'evaluate' - 在delete or update *** 作之前,用query中的条件直接对session的identity_map
# 中的objects进行eval *** 作,将符合条件的记录下来,字符串 *** 作无法使用'evaluate'
user = session.query(Test).filter(Test.id==2).one()
print(user.username)
session.query(Test).filter(Test.id==2).update({"username":"BXR"}, synchronize_session=False)
print(user.username)
#
session.query(Test).filter(Test.id==2).update({"username":"WDH"},
synchronize_session='fetch')
print(user.username)
# 查询后直接修改
user = session.query(Test).filter(Test.id==2).one()
user.username = 'DDL'
session.commit()
session.close()
从csv导入数据
其他类型文件相似
import pandas as pd
from sqlalchemy import create_engine
engine = create_engine('mysql+pymysql://root:root@127.0.0.1:3306/test?charset=utf8')
data = pd.read_csv('sql/data/user.csv', encoding='gbk').iloc[:10]
# 修改列索引和数据库中对应
data.columns = ['username', 'password']
data.reset_index(inplace=True, drop=True)
data.to_sql(name='user', con=engine, if_exists='append', index=False)
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)