import cx_Oracle
conn = cx_Oracle.connect('test', '123456', '127.0.0.1/test', encoding='utf-8')
if conn:
print('orecle connect success')
使用连接池
cx_Oracle中提供SessionPool()创建连接池,连接池一般是在应用程序初始化时创建。相比通过connect()方法创建单个数据库连接,使用SessionPool()创建连接池时,需要额外指定最少连接数(min)和最大连接数(max),连接池创建时会创建有min个数据库连接,当连接不够用时会继续新增连接,当连接未被使用时连接池将会自动减少连接的数量。在创建好连接池后,通过调用acquire()方法可以获取一个数据库连接,连接使用完毕之后,最好使用SessionPool.release(connection)或Connection.close()将连接放回连接池。
# 创建连接池
pool = cx_Oracle.SessionPool("username", "password",
"127.0.0.1:1521/服务名", min=2, max=5, increment=1, encoding="UTF-8")
# 从连接池中获取一个连接
connection = pool.acquire()
# 使用连接进行查询
cursor = connection.cursor()
for result in cursor.execute("select * from scott.students"):
print(result)
# 将连接放回连接池
pool.release(connection)
# 关闭连接池
pool.close()
2、python连接mysql
import pymysql
conn = pymysql.connect(database='test', host='127.0.0.1', user='root', password='123456', port=3306)
if conn:
print('mysql connect success')
使用连接池,用到了pooledDB
安装DBUtils(pip install DBUtils )
import pymysql
from dbutils.pooled_db import PooledDB
# 5表示最高5个连接
pool = PooledDB(pymysql, 5, host="127.0.0.1", user='root', password='123456',
database='123456', port=3306, charset="utf8")
conn = pool.connection()
cur = conn.cursor()
SQL = "SELECT * FROM scrapy_config"
r = cur.execute(SQL)
r = cur.fetchall()
print(r)
cur.close()
conn.close()
3、python连接postgresql(postgres)
import psycopg2
conn = psycopg2.connect(database='test', host='127.0.0.1', user='postgres', password='123456', port=5432)
if conn:
print('postgresql connect success')
使用连接池,用到了dbutils
import psycopg2
from dbutils.pooled_db import PooledDB
pool = PooledDB(psycopg2, 5, host="127.0.0.1", user='postgres', password='123456',
database='test', port=5432)
conn = pool.connection()
cur = conn.cursor()
SQL = "select name, setting from pg_settings where category='File Locations' ;"
r = cur.execute(SQL)
r = cur.fetchall()
print(r)
cur.close()
conn.close()
4、python连接sqlserver
import pymssql
conn = pymssql.connect(database='test', host='127.0.0.1', user='sa', password='123456', port=1433)
if conn:
print('sqlserver connect success')
使用连接池,用到了dbutils
import pymssql
from dbutils.pooled_db import PooledDB
pool = PooledDB(pymssql, 5, host="127.0.0.1", user='sa', password='123456',
database='test', port=1433, charset="utf8")
conn = pool.connection()
cur = conn.cursor()
SQL = "SELECT * FROM master.dbo.sysprocesses WHERE DB_NAME(dbid) = 'dicp2' order by login_time desc"
r = cur.execute(SQL)
r = cur.fetchall()
print(r)
cur.close()
conn.close()
5、python连接redis
import redis
# 普通连接
conn = redis.Redis(host="127.0.0.1", port=6379,password="123456", decode_responses=True)
conn.set("x1","hello",ex=5) # ex代表seconds,px代表ms
val = conn.get("x1")
print(val)
使用连接池进行连接
使用 connection pool 来管理对一个 redis server 的所有连接,避免每次建立、释放连接的开销。
默认,每个Redis实例都会维护一个自己的连接池。可以直接建立一个连接池,然后作为参数 Redis,这样就可以实现多个 Redis 实例共享一个连接池。
import redis
# 连接池
pool = redis.ConnectionPool(host="127.0.0.1", port=6379,password="123456",max_connections=1024)
conn = redis.Redis(connection_pool=pool)
print(conn.get("x1"))
6、使用sqlalchemy连接数据库(通常和dataframe连用)
from sqlalchemy import create_engine
conn_str = 'database_type://user:password@ip:port/database'
engine = create_engine(conn_str)
df.to_sql('tablename', engine, index=False, if_exists='append', dtype=type_dict)
例如连接oracle:
conn_str = 'oracle://test:123456@127.0.0.1:1521/jwdd'
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)