apt-get install default-libmysqlclient-dev
pip install mysql
conda install mysql
pip install DBUtils==1.3
pip install pymysql
import pymysql from DBUtils.PooledDB import PooledDB import logging #打开数据库连接 class MySQL: host = '47.75.158.180' user = 'vae' port = 3306 password = 'vae123456789' db = 'movie' charset = 'utf8' #create database db_test default character set utf8 collate utf8_general_ci; pool = None limit_count = 3 def __init__(self): pass def init_DB(self): try: self.pool = PooledDB( pymysql, self.limit_count, host=self.host, user=self.user, passwd=self.password, db=self.db, port=self.port, charset=self.charset, use_unicode=True) except Exception as e: logging.error("mysql init_DB error %s" % e) def select(self, sql): try: logging.debug("mysql exec sql: %s" % sql) conn = self.pool.connection() cus = conn.cursor() count = cus.execute(sql) fc = cus.fetchall() return fc except e: logging.error("mysql select table error %s" % e) return None finally: cus.close() conn.close() def selectByParam(self, sql,param): try: logging.debug("mysql exec sql: %s" % sql) conn = self.pool.connection() cus = conn.cursor() count = cus.execute(sql, param) fc = cus.fetchall() return fc except e: logging.error("mysql select table error %s" % e) return None finally: cus.close() def update2ByParam(self, sql, params,sql1,params1): try: logging.debug("update2ByParam sql: %s, %s"% (sql,params)) logging.debug("update2ByParam sql: %s, %s"% (sql1,params1)) conn = self.pool.connection() cus = conn.cursor() count = cus.execute(sql, params) count1 = cus.execute(sql1, params1) conn.commit() return count, count1 except e: conn.rollback() logging.error("mysql update by param table error %s" % e) return None finally: cus.close() def updateByParam(self, sql, params): try: logging.debug("updateByParam sql: %s, %s"% (sql,params)) conn = self.pool.connection() cus = conn.cursor() count = cus.execute(sql, params) conn.commit() return count except e: conn.rollback() logging.error("mysql update by param table error %s" % e) return 0 finally: cus.close() def update(self, sql): try: logging.debug("mysql update sql: %s" % sql) conn = self.pool.connection() cus = conn.cursor() count = cus.execute(sql) conn.commit() return count except e: conn.rollback() logging.error("mysql update table error %s" % e) return 0 finally: cus.close() cus.close()
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)