Python采用并发查询mysql以及调用API灌数据 (二) - PyMysql操作数据库基本类封装...

网友投稿 942 2022-11-21

python采用并发查询mysql以及调用API灌数据 (二) - PyMysql操作数据库基本类封装...

Python采用并发查询mysql以及调用API灌数据 (二) - PyMysql操作数据库基本类封装...

前情回顾

​​上一篇文章​​已经写好了查询数据库以及post请求API的实例,那么本章节我们来继续。

实战任务

本次因为服务架构重构,表优化、重构,带来的任务就是需要从原来的mysql数据库中,读取原表数据(部分存在多张关联查询)然后通过调用API的服务方式灌入新的数据库表中(包含mysql、mongodb)。

执行流程如下

那么根据流程所需要的功能,需要以下的实例进行支撑: 1.并发实例 2.查询数据实例 3.执行post请求实例

目标:构建实际数据场景 --> 抽象编写查询以及post的类方法 --> 编写整合处理方法

构建实际数据场景

可以看出,整个流程中对于mysql的操作是很重要的,为了方便行事。下面我对PyMysql操作数据库的基本类进行了封装处理。

编写数据库查询的工具类方法

实现代码如下:

# -*- coding: utf-8 -*-import pymysqlimport reclass MysqldbHelper(object): # 继承object类所有方法 ''' 构造方法: config = { 'host': '127.0.0.1', 'port': 3306, 'user': 'root', 'passwd': 'root', 'charset':'utf8', 'cursorclass':pymysql.cursors.DictCursor } conn = pymysql.connect(**config) conn.autocommit(1) cursor = conn.cursor() ''' def __init__(self , config): self.host = config['host'] self.username = config['user'] self.password = config['passwd'] self.port = config['port'] self.con = None self.cur = None try: self.con = pymysql.connect(**config) self.con.autocommit(1) # 所有的查询,都在连接 con 的一个模块 cursor 上面运行的 self.cur = self.con.cursor() except: print "DataBase connect error,please check the db config." # 关闭数据库连接 def close(self): if not self.con: self.con.close() else: print "DataBase doesn't connect,close connectiong error;please check the db config." # 创建数据库 def createDataBase(self,DB_NAME): # 创建数据库 self.cur.execute('CREATE DATABASE IF NOT EXISTS %s DEFAULT CHARACTER SET utf8 COLLATE utf8_general_ci' % DB_NAME) self.con.select_db(DB_NAME) print 'creatDatabase:' + DB_NAME # 选择数据库 def selectDataBase(self,DB_NAME): self.con.select_db(DB_NAME) # 获取数据库版本号 def getVersion(self): self.cur.execute("SELECT VERSION()") return self.getOneData() # 获取上个查询的结果 def getOneData(self): # 取得上个查询的结果,是单个结果 data = self.cur.fetchone() return data # 创建数据库表 def creatTable(self, tablename, attrdict, constraint): """创建数据库表 args: tablename :表名字 attrdict :属性键值对,{'book_name':'varchar(200) NOT NULL'...} constraint :主外键约束,PRIMARY KEY(`id`) """ if self.isExistTable(tablename): print "%s is exit" % tablename return sql = '' sql_mid = '`id` bigint(11) NOT NULL AUTO_INCREMENT,' for attr,value in attrdict.items(): sql_mid = sql_mid + '`'+attr + '`'+' '+ value+',' sql = sql + 'CREATE TABLE IF NOT EXISTS %s ('%tablename sql = sql + sql_mid sql = sql + constraint sql = sql + ') ENGINE=InnoDB DEFAULT CHARSET=utf8' print 'creatTable:'+sql self.executeCommit(sql) def executeSql(self,sql=''): """执行sql语句,针对读操作返回结果集 args: sql :sql语句 """ try: self.cur.execute(sql) records = self.cur.fetchall() return records except pymysql.Error,e: error = 'MySQL execute failed! ERROR (%s): %s' %(e.args[0],e.args[1]) print error def executeCommit(self,sql=''): """执行数据库sql语句,针对更新,删除,事务等操作失败时回滚 """ try: self.cur.execute(sql) self.con.commit() except pymysql.Error, e: self.con.rollback() error = 'MySQL execute failed! ERROR (%s): %s' %(e.args[0],e.args[1]) print "error:", error return error def insert(self, tablename, params): """创建数据库表 args: tablename :表名字 key :属性键 value :属性值 """ key = [] value = [] for tmpkey, tmpvalue in params.items(): key.append(tmpkey) if isinstance(tmpvalue, str): value.append("\'" + tmpvalue + "\'") else: value.append(tmpvalue) attrs_sql = '('+','.join(key)+')' values_sql = ' values('+','.join(value)+')' sql = 'insert into %s'%tablename sql = sql + attrs_sql + values_sql print '_insert:'+sql self.executeCommit(sql) def select(self, tablename, cond_dict='', order='', fields='*'): """查询数据 args: tablename :表名字 cond_dict :查询条件 order :排序条件 example: print mydb.select(table) print mydb.select(table, fields=["name"]) print mydb.select(table, fields=["name", "age"]) print mydb.select(table, fields=["age", "name"]) """ consql = ' ' if cond_dict!='': for k, v in cond_dict.items(): consql = consql+'`'+k +'`'+ '=' + '"'+v + '"' + ' and' consql = consql + ' 1=1 ' if fields == "*": sql = 'select * from %s where ' % tablename else: if isinstance(fields, list): fields = ",".join(fields) sql = 'select %s from %s where ' % (fields, tablename) else: print "fields input error, please input list fields." sql = sql + consql + order print 'select:' + sql return self.executeSql(sql) def insertMany(self,table, attrs, values): """插入多条数据 args: tablename :表名字 attrs :属性键 values :属性值 example: table='test_mysqldb' key = ["id" ,"name", "age"] value = [[101, "liuqiao", "25"], [102,"liuqiao1", "26"], [103 ,"liuqiao2", "27"], [104 ,"liuqiao3", "28"]] mydb.insertMany(table, key, value) """ values_sql = ['%s' for v in attrs] attrs_sql = '('+','.join(attrs)+')' values_sql = ' values('+','.join(values_sql)+')' sql = 'insert into %s'% table sql = sql + attrs_sql + values_sql print 'insertMany:'+sql try: print sql for i in range(0,len(values),20000): self.cur.executemany(sql,values[i:i+20000]) self.con.commit() except pymysql.Error,e: self.con.rollback() error = 'insertMany executemany failed! ERROR (%s): %s' %(e.args[0],e.args[1]) print error def delete(self, tablename, cond_dict): """删除数据 args: tablename :表名字 cond_dict :删除条件字典 example: params = {"name" : "caixinglong", "age" : "38"} mydb.delete(table, params) """ consql = ' ' if cond_dict!='': for k, v in cond_dict.items(): if isinstance(v, str): v = "\'" + v + "\'" consql = consql + tablename + "." + k + '=' + v + ' and ' consql = consql + ' 1=1 ' sql = "DELETE FROM %s where%s" % (tablename, consql) print sql return self.executeCommit(sql) def update(self, tablename, attrs_dict, cond_dict): """更新数据 args: tablename :表名字 attrs_dict :更新属性键值对字典 cond_dict :更新条件字典 example: params = {"name" : "caixinglong", "age" : "38"} cond_dict = {"name" : "liuqiao", "age" : "18"} mydb.update(table, params, cond_dict) """ attrs_list = [] consql = ' ' for tmpkey, tmpvalue in attrs_dict.items(): attrs_list.append("`" + tmpkey + "`" + "=" +"\'" + tmpvalue + "\'") attrs_sql = ",".join(attrs_list) print "attrs_sql:", attrs_sql if cond_dict!='': for k, v in cond_dict.items(): if isinstance(v, str): v = "\'" + v + "\'" consql = consql + "`" + tablename +"`." + "`" + k + "`" + '=' + v + ' and ' consql = consql + ' 1=1 ' sql = "UPDATE %s SET %s where%s" % (tablename, attrs_sql, consql) print sql return self.executeCommit(sql) def dropTable(self, tablename): """删除数据库表 args: tablename :表名字 """ sql = "DROP TABLE %s" % tablename self.executeCommit(sql) def deleteTable(self, tablename): """清空数据库表 args: tablename :表名字 """ sql = "DELETE FROM %s" % tablename print "sql=",sql self.executeCommit(sql) def isExistTable(self, tablename): """判断数据表是否存在 args: tablename :表名字 Return: 存在返回True,不存在返回False """ sql = "select * from %s" % tablename result = self.executeCommit(sql) if result is None: return True else: if re.search("doesn't exist", result): return False else: return Trueif __name__ == "__main__": # 定义数据库访问参数 config = { 'host': '你的mysql服务器IP地址', 'port': 3361, 'user': 'root', 'passwd': '你的mysql服务器root密码', 'charset': 'utf8', 'cursorclass': pymysql.cursors.DictCursor } # 初始化打开数据库连接 mydb = MysqldbHelper(config) # 打印数据库版本 print mydb.getVersion() # 创建数据库 DB_NAME = 'test_db' # mydb.createDataBase(DB_NAME) # 选择数据库 print "========= 选择数据库%s ===========" % DB_NAME mydb.selectDataBase(DB_NAME) #创建表 TABLE_NAME = 'test_user' print "========= 选择数据表%s ===========" % TABLE_NAME # CREATE TABLE %s(id int(11) primary key,name varchar(30))' %TABLE_NAME attrdict = {'name':'varchar(30) NOT NULL'} constraint = "PRIMARY KEY(`id`)" mydb.creatTable(TABLE_NAME,attrdict,constraint) # 插入纪录 print "========= 单条数据插入 ===========" params = {} for i in range(5): params.update({"name":"testuser"+str(i)}) # 生成字典数据,循环插入 print params mydb.insert(TABLE_NAME, params) print # 批量插入数据 print "========= 多条数据同时插入 ===========" insert_values = [] for i in range(5): # values.append((i,"testuser"+str(i))) insert_values.append([u"测试用户"+str(i)]) # 插入中文数据 print insert_values insert_attrs = ["name"] mydb.insertMany(TABLE_NAME,insert_attrs, insert_values) # 数据查询 print "========= 数据查询 ===========" print mydb.select(TABLE_NAME, fields=["id", "name"]) print mydb.select(TABLE_NAME, cond_dict = {'name':'测试用户2'},fields=["id", "name"]) print mydb.select(TABLE_NAME, cond_dict = {'name':'测试用户2'},fields=["id", "name"],order="order by id desc") # 删除数据 print "========= 删除数据 ===========" delete_params = {"name": "测试用户2"} mydb.delete(TABLE_NAME, delete_params) # 更新数据 print "========= 更新数据 ===========" update_params = {"name": "测试用户99"} # 需要更新为什么值 update_cond_dict = {"name": "测试用户3"} # 更新执行的查询条件 mydb.update(TABLE_NAME, update_params, update_cond_dict) # 删除表数据 print "========= 删除表数据 ===========" mydb.deleteTable(TABLE_NAME) # 删除表 print "========= 删除表 ===========" mydb.dropTable(TABLE_NAME)

测试执行结果如下:

D:\Python27\python.exe E:/PycharmProjects/DataProject/tools/MysqlTools.py{u'VERSION()': u'5.7.9-log'}========= 选择数据库test_db ==================== 选择数据表test_user ===========test_user is exit========= 单条数据插入 ==========={'name': 'testuser0'}_insert:insert into test_user(name) values('testuser0'){'name': 'testuser1'}_insert:insert into test_user(name) values('testuser1'){'name': 'testuser2'}_insert:insert into test_user(name) values('testuser2'){'name': 'testuser3'}_insert:insert into test_user(name) values('testuser3'){'name': 'testuser4'}_insert:insert into test_user(name) values('testuser4')========= 多条数据同时插入 ===========[[u'\u6d4b\u8bd5\u7528\u62370'], [u'\u6d4b\u8bd5\u7528\u62371'], [u'\u6d4b\u8bd5\u7528\u62372'], [u'\u6d4b\u8bd5\u7528\u62373'], [u'\u6d4b\u8bd5\u7528\u62374']]insertMany:insert into test_user(name) values(%s)insert into test_user(name) values(%s)========= 数据查询 ===========select:select id,name from test_user where 1=1 [{u'id': 361, u'name': u'testuser0'}, {u'id': 362, u'name': u'testuser1'}, {u'id': 363, u'name': u'testuser2'}, {u'id': 364, u'name': u'testuser3'}, {u'id': 365, u'name': u'testuser4'}, {u'id': 366, u'name': u'\u6d4b\u8bd5\u7528\u62370'}, {u'id': 367, u'name': u'\u6d4b\u8bd5\u7528\u62371'}, {u'id': 368, u'name': u'\u6d4b\u8bd5\u7528\u62372'}, {u'id': 369, u'name': u'\u6d4b\u8bd5\u7528\u62373'}, {u'id': 370, u'name': u'\u6d4b\u8bd5\u7528\u62374'}]select:select id,name from test_user where `name`="测试用户2" and 1=1 [{u'id': 368, u'name': u'\u6d4b\u8bd5\u7528\u62372'}]select:select id,name from test_user where `name`="测试用户2" and 1=1 order by id desc[{u'id': 368, u'name': u'\u6d4b\u8bd5\u7528\u62372'}]========= 删除数据 ===========DELETE FROM test_user where test_user.name='测试用户2' and 1=1 ========= 更新数据 ===========attrs_sql: `name`='测试用户99'UPDATE test_user SET `name`='测试用户99' where `test_user`.`name`='测试用户3' and 1=1 ========= 删除表数据 ===========sql= DELETE FROM test_user========= 删除表 ===========Process finished with exit code 0

好了,写完了基本操作类之后该怎么引用呢? 我们下一章节再见。

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:当外卖哥摔倒在了眼前,他默默爬起,捡起一个个散落的外卖
下一篇:golang操作kafka
相关文章

 发表评论

暂时没有评论,来抢沙发吧~