import pymysql
from sqlalchemy import create_engine
import pandas as pd
class DataBaseHandle(object):
''' MySQL *** 作类'''
def __init__(self, host, username, password, database, port):
'''初始化数据库信息并创建数据库连接'''
# pymysql
self.db_conn = pymysql.connect(
host=host,
user=username,
password=password,
db=database,
port=port,
charset="utf8"
)
self.cursor = self.db_conn.cursor()
# sqlalchemy
self.engine = create_engine(
'mysql+pymysql://{user}:{passwd}@{host}:{port}/{db}'.format(
user=username,
passwd=password,
host=host,
port=port,
db=database
))
def select_for_df(self, sql):
''' 查询并返回数据帧'''
try:
# 将数据转化成数据帧
return pd.read_sql(sql, con=self.db_conn)
except Exception as e:
# print('fetch the data fail')
print('problem:',e)
return None
def append_for_df(self,df,table_name,if_exists='append'):
''' 刷新数据-df插入 '''
# 默认是替换 replace替换、append追加,fail则当表存在时提示ValueError
df.to_sql(name=table_name, con=self.engine, if_exists=if_exists, index=False)
def not_select_ifo(self, sql):
''' 非查询 '''
try:
result = self.cursor.execute(sql)
self.db_conn.commit()
return result
except Exception as e:
self.db_conn.rollback()
raise e
def close(self):
''' 关闭连接 '''
self.cursor.close()
self.db_conn.close()
2.异步 *** 作mysql
import pandas as pd
import asyncio
import aiomysql
nest_asyncio.apply()
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
class AioMysql:
def __init__(self):
self.coon = None
self.pool = None
async def initpool(self,db,minsize=5,maxsize=10):
try:
__pool = await aiomysql.create_pool(
minsize=minsize, # 连接池最小值
maxsize=maxsize, # 连接池最大值
host='localhost',
port=3306,
user='db',
password='***',
db=db,
autocommit=True, # 自动提交模式
)
return __pool
except:
print('connect error')
async def getCurosr(self):
conn = await self.pool.acquire()
# 返回字典格式
cur = await conn.cursor(aiomysql.DictCursor)
return conn, cur
async def select_for_df(self, query, param=None):
"""
查询 *** 作
:param query: sql语句
:param param: 参数
:return:
"""
conn, cur = await self.getCurosr()
try:
await cur.execute(query, param)
# 接收全部的返回结果行,将数据转化成数据帧
data = await cur.fetchall()
return pd.DataFrame(data)
except Exception as e:
print('---------------->query error')
raise e
finally:
if cur:
await cur.close()
# 释放掉conn,将连接放回到连接池中
await self.pool.release(conn)
async def not_select_ifo(self, query, param=None):
"""
增删改 *** 作
:param query: sql语句
:param param: 参数
:return:
"""
conn, cur = await self.getCurosr()
try:
await cur.execute(query, param)
# 这是一个只读属性,并返回执行execute()方法后影响的行数。
if cur.rowcount == 0:
return False
else:
return True
except:
await conn.rollback()
finally:
if cur:
await cur.close()
# 释放掉conn,将连接放回到连接池中
await self.pool.release(conn)
class PoolOjb(object):
''' 获取连接'''
@staticmethod
def connect():
async def getAmysqlobj(db):
mysqlobj = AioMysql()
pool = await mysqlobj.initpool(db)
mysqlobj.pool = pool
return mysqlobj
return getAmysqlobj
#################################### 在其他类中调用
class ClassCountTable(object):
sql = "select * from table1 where to_days(now())>=to_days(start_time) and to_days(now())<= to_days(end_time);"
def __init__(self):
# 创建数据库连接
self.db = 'db_name'
# 获取连接对象
self.async_conn = PoolOjb.connect()
self.df = self.get_data()
def get_data(self):
async def get_df():
# 根据连接对象获取连接池
mysqlobj = await self.async_conn(self.db)
df = await mysqlobj.select_for_df(self.sql)
return df
df = loop.run_until_complete(get_df())
return df
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)