如何用python写sql

如何用python写sql,第1张

python可以利用pymysql模块 *** 作数据库

什么是 PyMySQL?

PyMySQL 是在 Python3.x 版本中用于连接 MySQL 服务器的一个库,Python2中则使用mysqldb。

PyMySQL 遵循 Python 数据库 API v2.0 规范,并包含了 pure-Python MySQL 客户端库。

PyMySQL 安装

在使用 PyMySQL 之前,我们需要确保 PyMySQL 已安装。

PyMySQL 下载地址:https://github.com/PyMySQL/PyMySQL。

如果还未安装,我们可以使用以下命令安装最新版的 PyMySQL:

$ pip3 install PyMySQL

如果你的系统不支持 pip 命令,可以使用以下方式安装:

1、使用 git 命令下载安装包安装(你也可以手动下载):

$ git clone https://github.com/PyMySQL/PyMySQL$ cd PyMySQL/$ python3 setup.py install

2、如果需要制定版本号,可以使用 curl 命令来安装:

$ # X.X 为 PyMySQL 的版本号$ curl -L https://github.com/PyMySQL/PyMySQL/tarball/pymysql-X.X | tar xz$ cd PyMySQL*$ python3 setup.py install

$ # 现在你可以删除 PyMySQL* 目录

注意:请确保您有root权限来安装上述模块。

安装的过程中可能会出现"ImportError: No module named setuptools"的错误提示,意思是你没有安装setuptools,你可以访问https://pypi.python.org/pypi/setuptools 找到各个系统的安装方法。

Linux 系统安装实例:

$ wget https://bootstrap.pypa.io/ez_setup.py$ python3 ez_setup.py

数据库连接

连接数据库前,请先确认以下事项:

您已经创建了数据库 TESTDB.

在TESTDB数据库中您已经创建了表 EMPLOYEE

EMPLOYEE表字段为 FIRST_NAME, LAST_NAME, AGE, SEX 和 INCOME。

连接数据库TESTDB使用的用户名为 "testuser" ,密码为 "test123",你可以可以自己设定或者直接使用root用户名及其密码,Mysql数据库用户授权请使用Grant命令。

在你的机子上已经安装了 Python MySQLdb 模块。

如果您对sql语句不熟悉,可以访问我们的 SQL基础教程

实例:

以下实例链接 Mysql 的 TESTDB 数据库:

实例(Python 3.0+)

#!/usr/bin/python3

import pymysql

# 打开数据库连接db = pymysql.connect("localhost","testuser","test123","TESTDB" )

# 使用 cursor() 方法创建一个游标对象 cursorcursor = db.cursor()

# 使用 execute()  方法执行 SQL 查询 cursor.execute("SELECT VERSION()")

# 使用 fetchone() 方法获取单条数据.data = cursor.fetchone()

print ("Database version : %s " % data)

# 关闭数据库连接db.close()

执行以上脚本输出结果如下:

Database version : 5.5.20-log

创建数据库表

如果数据库连接存在我们可以使用execute()方法来为数据库创建表,如下所示创建表EMPLOYEE:

实例(Python 3.0+)

#!/usr/bin/python3

import pymysql

# 打开数据库连接db = pymysql.connect("localhost","testuser","test123","TESTDB" )

# 使用 cursor() 方法创建一个游标对象 cursorcursor = db.cursor()

# 使用 execute() 方法执行 SQL,如果表存在则删除cursor.execute("DROP TABLE IF EXISTS EMPLOYEE")

# 使用预处理语句创建表sql = """CREATE TABLE EMPLOYEE (

        FIRST_NAME  CHAR(20) NOT NULL,

        LAST_NAME  CHAR(20),

        AGE INT,  

        SEX CHAR(1),

        INCOME FLOAT )"""

cursor.execute(sql)

# 关闭数据库连接db.close()

数据库插入 *** 作

以下实例使用执行 SQL INSERT 语句向表 EMPLOYEE 插入记录:

实例(Python 3.0+)

#!/usr/bin/python3

import pymysql

# 打开数据库连接db = pymysql.connect("localhost","testuser","test123","TESTDB" )

# 使用cursor()方法获取 *** 作游标 cursor = db.cursor()

# SQL 插入语句sql = """INSERT INTO EMPLOYEE(FIRST_NAME,

        LAST_NAME, AGE, SEX, INCOME)

        VALUES ('Mac', 'Mohan', 20, 'M', 2000)"""try:   # 执行sql语句

  cursor.execute(sql)

  # 提交到数据库执行

  db.commit()except:   # 如果发生错误则回滚

  db.rollback()

# 关闭数据库连接db.close()

以上例子也可以写成如下形式:

实例(Python 3.0+)

#!/usr/bin/python3

import pymysql

# 打开数据库连接db = pymysql.connect("localhost","testuser","test123","TESTDB" )

# 使用cursor()方法获取 *** 作游标 cursor = db.cursor()

# SQL 插入语句sql = "INSERT INTO EMPLOYEE(FIRST_NAME, \

      LAST_NAME, AGE, SEX, INCOME) \

      VALUES ('%s', '%s',  %s,  '%s',  %s)" % \       ('Mac', 'Mohan', 20, 'M', 2000)try:   # 执行sql语句

  cursor.execute(sql)

  # 执行sql语句

  db.commit()except:   # 发生错误时回滚

  db.rollback()

# 关闭数据库连接db.close()

以下代码使用变量向SQL语句中传递参数:

..................................user_id = "test123"password = "password"con.execute('insert into Login values( %s,  %s)' % \             (user_id, password))..................................

数据库查询 *** 作

Python查询Mysql使用 fetchone() 方法获取单条数据, 使用fetchall() 方法获取多条数据。

fetchone(): 该方法获取下一个查询结果集。结果集是一个对象

fetchall(): 接收全部的返回结果行.

rowcount: 这是一个只读属性,并返回执行execute()方法后影响的行数。

实例:

查询EMPLOYEE表中salary(工资)字段大于1000的所有数据:

实例(Python 3.0+)

#!/usr/bin/python3

import pymysql

# 打开数据库连接db = pymysql.connect("localhost","testuser","test123","TESTDB" )

# 使用cursor()方法获取 *** 作游标 cursor = db.cursor()

# SQL 查询语句sql = "SELECT * FROM EMPLOYEE \

      WHERE INCOME >%s" % (1000)try:   # 执行SQL语句

  cursor.execute(sql)

  # 获取所有记录列表

  results = cursor.fetchall()

  for row in results:      fname = row[0]

     lname = row[1]

     age = row[2]

     sex = row[3]

     income = row[4]

      # 打印结果

     print ("fname=%s,lname=%s,age=%s,sex=%s,income=%s" % \             (fname, lname, age, sex, income ))except:   print ("Error: unable to fetch data")

# 关闭数据库连接db.close()

以上脚本执行结果如下:

fname=Mac, lname=Mohan, age=20, sex=M, income=2000

数据库更新 *** 作

更新 *** 作用于更新数据表的的数据,以下实例将 TESTDB 表中 SEX 为 'M' 的 AGE 字段递增 1:

实例(Python 3.0+)

#!/usr/bin/python3

import pymysql

# 打开数据库连接db = pymysql.connect("localhost","testuser","test123","TESTDB" )

# 使用cursor()方法获取 *** 作游标 cursor = db.cursor()

# SQL 更新语句sql = "UPDATE EMPLOYEE SET AGE = AGE + 1 WHERE SEX = '%c'" % ('M')try:   # 执行SQL语句

  cursor.execute(sql)

  # 提交到数据库执行

  db.commit()except:   # 发生错误时回滚

  db.rollback()

# 关闭数据库连接db.close()

删除 *** 作

删除 *** 作用于删除数据表中的数据,以下实例演示了删除数据表 EMPLOYEE 中 AGE 大于 20 的所有数据:

实例(Python 3.0+)

#!/usr/bin/python3

import pymysql

# 打开数据库连接db = pymysql.connect("localhost","testuser","test123","TESTDB" )

# 使用cursor()方法获取 *** 作游标 cursor = db.cursor()

# SQL 删除语句sql = "DELETE FROM EMPLOYEE WHERE AGE >%s" % (20)try:   # 执行SQL语句

  cursor.execute(sql)

  # 提交修改

  db.commit()except:   # 发生错误时回滚

  db.rollback()

# 关闭连接db.close()

执行事务

事务机制可以确保数据一致性。

事务应该具有4个属性:原子性、一致性、隔离性、持久性。这四个属性通常称为ACID特性。

原子性(atomicity)。一个事务是一个不可分割的工作单位,事务中包括的诸 *** 作要么都做,要么都不做。

一致性(consistency)。事务必须是使数据库从一个一致性状态变到另一个一致性状态。一致性与原子性是密切相关的。

隔离性(isolation)。一个事务的执行不能被其他事务干扰。即一个事务内部的 *** 作及使用的数据对并发的其他事务是隔离的,并发执行的各个事务之间不能互相干扰。

持久性(durability)。持续性也称永久性(permanence),指一个事务一旦提交,它对数据库中数据的改变就应该是永久性的。接下来的其他 *** 作或故障不应该对其有任何影响。

Python DB API 2.0 的事务提供了两个方法 commit 或 rollback。

实例

实例(Python 3.0+)

# SQL删除记录语句sql = "DELETE FROM EMPLOYEE WHERE AGE >%s" % (20)try:   # 执行SQL语句

  cursor.execute(sql)

  # 向数据库提交

  db.commit()except:   # 发生错误时回滚

  db.rollback()

对于支持事务的数据库, 在Python数据库编程中,当游标建立之时,就自动开始了一个隐形的数据库事务。

commit()方法游标的所有更新 *** 作,rollback()方法回滚当前游标的所有 *** 作。每一个方法都开始了一个新的事务。

错误处理

DB API中定义了一些数据库 *** 作的错误及异常,下表列出了这些错误和异常:

异常

描述

Warning    当有严重警告时触发,例如插入数据是被截断等等。必须是 StandardError 的子类。  

Error    警告以外所有其他错误类。必须是 StandardError 的子类。  

InterfaceError    当有数据库接口模块本身的错误(而不是数据库的错误)发生时触发。 必须是Error的子类。  

DatabaseError    和数据库有关的错误发生时触发。 必须是Error的子类。  

DataError    当有数据处理时的错误发生时触发,例如:除零错误,数据超范围等等。 必须是DatabaseError的子类。  

OperationalError    指非用户控制的,而是 *** 作数据库时发生的错误。例如:连接意外断开、 数据库名未找到、事务处理失败、内存分配错误等等 *** 作数据库是发生的错误。 必须是DatabaseError的子类。  

IntegrityError    完整性相关的错误,例如外键检查失败等。必须是DatabaseError子类。  

InternalError    数据库的内部错误,例如游标(cursor)失效了、事务同步失败等等。 必须是DatabaseError子类。  

ProgrammingError    程序错误,例如数据表(table)没找到或已存在、SQL语句语法错误、 参数数量错误等等。必须是DatabaseError的子类。  

NotSupportedError    不支持错误,指使用了数据库不支持的函数或API等。例如在连接对象上 使用.rollback()函数,然而数据库并不支持事务或者事务已关闭。 必须是DatabaseError的子类。  

你可以访问Python数据库接口及API查看详细的支持数据库列表。不同的数据库你需要下载不同的DB API模块,例如你需要访问Oracle数据库和Mysql数据,你需要下载Oracle和MySQL数据库模块。

DB-API 是一个规范. 它定义了一系列必须的对象和数据库存取方式, 以便为各种各样的底层数据库系统和多种多样的数据库接口程序提供一致的访问接口 。

Python的DB-API,为大多数的数据库实现了接口,使用它连接各数据库后,就可以用相同的方式 *** 作各数据库。

Python DB-API使用流程:

引入 API 模块。

获取与数据库的连接。

执行SQL语句和存储过程。

关闭数据库连接。

什么是MySQLdb?

MySQLdb 是用于Python链接Mysql数据库的接口,它实现了 Python 数据库 API 规范 V2.0,基于 MySQL C API 上建立的。

如何安装MySQLdb?

为了用DB-API编写MySQL脚本,必须确保已经安装了MySQL。复制以下代码,并执行:

#!/usr/bin/python

# -*- coding: UTF-8 -*-

import MySQLdb

如果执行后的输出结果如下所示,意味着你没有安装 MySQLdb 模块:

Traceback (most recent call last):

File "test.py", line 3, in <module>

import MySQLdb

ImportError: No module named MySQLdb

本文主要给大家介绍了关于python模拟sql语句对员工表格进行增删改查的相关内容,分享出来供大家参考学习,下面来一起看看详细的介绍:

具体需求:

员工信息表程序,实现增删改查 *** 作:

可进行模糊查询,语法支持下面3种:

select name,age from staff_data where age >22 多个查询参数name,age 用','分割

select * from staff_data where dept = 人事

select * from staff_data where enroll_date like 2013

查到的信息,打印后,最后面还要显示查到的条数

可创建新员工纪录,以phone做唯一键,phone存在即提示,staff_id需自增,添加多个记录record1/record2中间用'/'分割

insert into staff_data values record1/record2

可删除指定员工信息纪录,输入员工id,即可删除

delete from staff_data where staff_id>=5andstaff_id<=10

可修改员工信息,语法如下:

update staff_table set dept=Market,phone=13566677787 where dept = 运维 多个set值用','分割

使用re模块,os模块,充分使用函数精简代码,熟练使用 str.split()来解析格式化字符串

由于,sql命令中的几个关键字符串有一定规律,只出现一次,并且有顺序!!!

按照key_lis = ['select', 'insert', 'delete', 'update', 'from', 'into', 'set', 'values', 'where', 'limit']的元素顺序分割sql.

分割元素作为sql_dic字典的key放进字典中.分割后的列表为b,如果len(b)>1,说明sql字符串中含有分割元素,同时b[0]对应上一个分割元素的值,b[-1]为下一次分割对象!

这样不断迭代直到把sql按出现的所有分割元素分割完毕,但注意这里每次循环都是先分割后赋值!!!当前分割元素比如'select'对应的值,需要等到下一个分割元素

比如'from'执行分割后的列表b,其中b[0]的值才会赋值给sql_dic['select'] ,所以最后一个分割元素的值,不能通过上述循环来完成,必须先处理可能是最后一个分割元素,再正常循环!!

在这sql语句中,有可能成为最后一个分割元素的 'limit' ,'values', 'where', 按优先级别,先处理'limit' ,再处理'values'或 'where'.....

处理完得到sql_dic后,就是你按不同命令执行,对数据文件的增删改查,最后返回处理结果!!

示例代码

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308

# _*_coding:utf-8_*_# Author:Jaye Heimport reimport os def sql_parse(sql, key_lis): ''' 解析sql命令字符串,按照key_lis列表里的元素分割sql得到字典形式的命令sql_dic :param sql: :param key_lis: :return: ''' sql_list = [] sql_dic = {} for i in key_lis: b = [j.strip() for j in sql.split(i)] if len(b) >1: if len(sql.split('limit')) >1:sql_dic['limit'] = sql.split('limit')[-1] if i == 'where' or i == 'values':sql_dic[i] = b[-1] if sql_list:sql_dic[sql_list[-1]] = b[0] sql_list.append(i) sql = b[-1] else: sql = b[0] if sql_dic.get('select'): if not sql_dic.get('from') and not sql_dic.get('where'):sql_dic['from'] = b[-1] if sql_dic.get('select'): sql_dic['select'] = sql_dic.get('select').split(',') if sql_dic.get('where'): sql_dic['where'] = where_parse(sql_dic.get('where')) return sql_dic def where_parse(where): ''' 格式化where字符串为列表where_list,用'and', 'or', 'not'分割字符串 :param where: :return: ''' casual_l = [where] logic_key = ['and', 'or', 'not'] for j in logic_key: for i in casual_l: if i not in logic_key:if len(i.split(j)) >1: ele = i.split(j) index = casual_l.index(i) casual_l.pop(index) casual_l.insert(index, ele[0]) casual_l.insert(index+1, j) casual_l.insert(index+2, ele[1]) casual_l = [k for k in casual_l if k] where_list = three_parse(casual_l, logic_key) return where_list def three_parse(casual_l, logic_key): ''' 处理临时列表casual_l中具体的条件,'staff_id>5'-->['staff_id','>','5'] :param casual_l: :param logic_key: :return: ''' where_list = [] for i in casual_l: if i not in logic_key: b = i.split('like') if len(b) >1:b.insert(1, 'like')where_list.append(b) else:key = ['<', '=', '>']new_lis = []opt = ''lis = [j for j in re.split('([=<>])', i) if j]for k in lis: if k in key: opt += k else: new_lis.append(k)new_lis.insert(1, opt)where_list.append(new_lis) else: where_list.append(i) return where_list def sql_action(sql_dic, title): ''' 把解析好的sql_dic分发给相应函数执行处理 :param sql_dic: :param title: :return: ''' key = {'select': select, 'insert': insert, 'delete': delete, 'update': update} res = [] for i in sql_dic: if i in key: res = key[i](sql_dic, title) return res def select(sql_dic, title): ''' 处理select语句命令 :param sql_dic: :param title: :return: ''' with open('staff_data', 'r', encoding='utf-8') as fh: filter_res = where_action(fh, sql_dic.get('where'), title) limit_res = limit_action(filter_res, sql_dic.get('limit')) search_res = search_action(limit_res, sql_dic.get('select'), title) return search_res def insert(sql_dic, title): ''' 处理insert语句命令 :param sql_dic: :param title: :return: ''' with open('staff_data', 'r+', encoding='utf-8') as f: data = f.readlines() phone_list = [i.strip().split(',')[4] for i in data] ins_count = 0 if not data: new_id = 1 else: last = data[-1] last_id = int(last.split(',')[0]) new_id = last_id+1 record = sql_dic.get('values').split('/') for i in record: if i.split(',')[3] in phone_list:print('\033[131m%s 手机号已存在\033[0m' % i) else:new_record = '%s,%s\n' % (str(new_id), i)f.write(new_record)new_id += 1ins_count += 1 f.flush() return ['insert successful'], [str(ins_count)] def delete(sql_dic, title): ''' 处理delete语句命令 :param sql_dic: :param title: :return: ''' with open('staff_data', 'r', encoding='utf-8') as r_file,\ open('staff_data_bak', 'w', encoding='utf-8') as w_file: del_count = 0 for line in r_file: dic = dict(zip(title.split(','), line.split(','))) filter_res = logic_action(dic, sql_dic.get('where')) if not filter_res:w_file.write(line) else:del_count += 1 w_file.flush() os.remove('staff_data') os.rename('staff_data_bak', 'staff_data') return ['delete successful'], [str(del_count)] def update(sql_dic, title): ''' 处理update语句命令 :param sql_dic: :param title: :return: ''' set_l = sql_dic.get('set').strip().split(',') set_list = [i.split('=') for i in set_l] update_count = 0 with open('staff_data', 'r', encoding='utf-8') as r_file,\ open('staff_data_bak', 'w', encoding='utf-8') as w_file: for line in r_file: dic = dict(zip(title.split(','), line.strip().split(','))) filter_res = logic_action(dic, sql_dic.get('where')) if filter_res:for i in set_list: k = i[0] v = i[-1] dic[k] = vline = [dic[i] for i in title.split(',')]update_count += 1line = ','.join(line)+'\n' w_file.write(line) w_file.flush() os.remove('staff_data') os.rename('staff_data_bak', 'staff_data') return ['update successful'], [str(update_count)] def where_action(fh, where_list, title): ''' 具体处理where_list里的所有条件 :param fh: :param where_list: :param title: :return: ''' res = [] if len(where_list) != 0: for line in fh: dic = dict(zip(title.split(','), line.strip().split(','))) if dic['name'] != 'name':logic_res = logic_action(dic, where_list)if logic_res: res.append(line.strip().split(',')) else: res = [i.split(',') for i in fh.readlines()] return res pass def logic_action(dic, where_list): ''' 判断数据文件中每一条是否符合where_list条件 :param dic: :param where_list: :return: ''' logic = [] for exp in where_list: if type(exp) is list: exp_k, opt, exp_v = exp if exp[1] == '=':opt = '==' logical_char = "'%s'%s'%s'" % (dic[exp_k], opt, exp_v) if opt != 'like':exp = str(eval(logical_char)) else:if exp_v in dic[exp_k]: exp = 'True'else: exp = 'False' logic.append(exp) res = eval(' '.join(logic)) return res def limit_action(filter_res, limit_l): ''' 用列表切分处理显示符合条件的数量 :param filter_res: :param limit_l: :return: ''' if limit_l: index = int(limit_l[0]) res = filter_res[:index] else: res = filter_res return res def search_action(limit_res, select_list, title): ''' 处理需要查询并显示的title和相应数据 :param limit_res: :param select_list: :param title: :return: ''' res = [] fields_list = title.split(',') if select_list[0] == '*': res = limit_res else: fields_list = select_list for data in limit_res: dic = dict(zip(title.split(','), data)) r_l = [] for i in fields_list:r_l.append((dic[i].strip())) res.append(r_l) return fields_list, res if __name__ == '__main__': with open('staff_data', 'r', encoding='utf-8') as f: title = f.readline().strip() key_lis = ['select', 'insert', 'delete', 'update', 'from', 'into', 'set', 'values', 'where', 'limit'] while True: sql = input('请输入sql命令,退出请输入exit:').strip() sql = re.sub(' ', '', sql) if len(sql) == 0:continue if sql == 'exit':break sql_dict = sql_parse(sql, key_lis) fields_list, fields_data = sql_action(sql_dict, title) print('\033[133m结果如下:\033[0m') print('-'.join(fields_list)) for data in fields_data: print('-'.join(data))

总结

以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作能带来一定的帮助,如果有疑问大家可以留言交流,谢谢大家对脚本之家的支持。


欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/sjk/10001245.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-05-04
下一篇 2023-05-04

发表评论

登录后才能评论

评论列表(0条)

保存