1.数据库通用连接函数
2.数据库批处理:execute/executemany
3.数据库插入百万级数据优化
4.DBUtils管理数据库连接池

1.数据库通用连接函数

连接模块视实际数据库决定,大同小异。将其封装成一个新的python文件,以便可以重复调用,命名为SqlConnectManage.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
from DBUtils import PooledDB
import pymssql

class sqlserverManager(object):
# 构造函数,初始化连接
def __init__(self, server, user, password, database, table):
self.server = server
self.user = user
self.password = password
self.database = database
self.table = table
self.conn = None
self.cursor = None
self.maxconnections = 15 # 设置最大连接数

# 保存数据到SQL server
def connect_database(self):
try:
self.conn = PooledDB(creator=pymssql,
maxconnections=self.maxconnections,
server=self.server,
user=self.user,
password=self.password,
database=self.database,
charset='utf8').connection()
# 创建游标
self.cursor = self.conn.cursor()
print("sql server had connected")
except Exception as e:
print("the connect failed:", e)
return None

def dbclose(self):
self.cursor.close()
self.conn.close()

def dbcommit(self):
self.conn.commit()

def execute(self, sql):
print("-----插入数据-----")
try:
self.cursor.execute(sql)

except Exception as e:
print('\033[1;31;0m\t4---插入更新失败,msg:\033[0m', e, sql.replace("\n", ""))
self.conn.rollback()
return False
return True

def executemany(self, sql,*args):
print("-----插入数据-----")
try:
self.cursor.executemany(sql,*args)

except Exception as e:
print('\033[1;31;0m\t4---插入更新失败,msg:\033[0m', e, sql.replace("\n", ""))
self.conn.rollback()
return False
return True

1.1 MySQL通用封装

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
# -*- coding:utf-8 -*-
import pymysql

'''
===============================================
@Project -> File :OAPlug -> MySQLHelper.py
@IDE :PyCharm
@Author :Miss.BadWoman
@Date :2020/7/4 13:30
@Desc :MySQL通用连接函数
===================================================
'''


class MySQLHelper(object):
def __init__(self, username, password, host, port, database):
self.username = username
self.port = port
self.password = password
self.host = host
self.database = database
self.cursor = None
self.conn = None

def mysql_conn(self):
"""
连接数据库并创建游标
:return: None
"""
self.conn = pymysql.connect(user=self.username, password=self.password, port=self.port,
host=self.host, charset="utf8", database=self.database)
# 创建游标,以字典形式返回
self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor)

def mysql_select(self, sql, *args):
"""
sql语言执行查询操作,返回单行数据
:param sql:sql语句
:return: rows 字典型列表
"""
try:
# 开始执行sql
self.cursor.execute(sql, *args)
rows = self.cursor.fetchone()
return rows

except Exception as e:
print(e)

def mysql_select_all(self, sql, *args):
"""
sql语言执行查询操作,返回数据集
:param sql:sql语句
:return: rows 字典型列表
"""
try:
# 开始执行sql
self.cursor.execute(sql, *args)
rows = self.cursor.fetchall()
return rows

except Exception as e:
print(e)

def execute(self, sql, *args):
"""
sql语言执行插入更新操作
:param sql: sql语句
:param args: 不定数据
:return: None
"""
print("-----插入更新数据-----")
try:
self.cursor.execute(sql, *args)

except Exception as e:
print('\033[1;31;0m\t4---插入更新失败,msg:\033[0m', e, sql.replace("\n", ""))
self.conn.rollback()
return False
return True

def executemany(self, sql, *args):
"""
sql语言执行批次插入操作
:param sql: 不管字段为什么类型,占位符统一使用%s,且不能加上引号
:param args: 元组型列表或元组型元组[(1, '张三', '男'),(2, '李四', '女'),]
:return:None
"""
print("-----插入数据-----")
try:
self.cursor.executemany(sql, *args)

except Exception as e:
print('\033[1;31;0m\t4---插入更新失败,msg:\033[0m', e, sql.replace("\n", ""))
self.conn.rollback()
return False
return True

def commit(self):
self.conn.commit()

def close(self):
self.cursor.close()
self.conn.close()

1.2 Oracle通用封装

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# -*- coding:utf-8 -*-
import cx_Oracle

"""
===================================================
@IDE :PyCharm
@Author :Miss.BadWoman
@Date :2020/7/4 13:30
@Desc :ORACLE通用连接函数
===================================================
"""


class OracleHelper(object):
def __init__(self, ora_username, ora_password, ora_host, ora_port, ora_sid):
"""
初始化Oracle连接
:param ora_username: 用户名
:param ora_password: 密码
:param ora_host: 域名
:param ora_port: 端口号
:param ora_sid: 实例名
"""
self.username = ora_username
self.port = ora_port
self.password = ora_password
self.host = ora_host
self.sid = ora_sid
self.conn = None
self.cursor = None

def oracle_conn(self):
"""
连接数据库并创建游标
:return: None
"""
self.conn = cx_Oracle.connect(self.username, self.password,
self.host + ':' + self.port + '/' + self.sid)
self.cursor = self.conn.cursor()

def oracle_select(self, sql, mode="all"):
"""
sql语言执行查询操作,返回单行数据
:param sql: sql语句
:param mode: (all/one) 返回单条或多条数据,查询类型
:return: 对象型数组
"""
try:
self.cursor.execute(sql)
if mode == "all":
rows = self.cursor.fetchall()
elif mode == "one":
rows = self.cursor.fetchone()
else:
rows = self.cursor.fetchall()
cols = [d[0] for d in self.cursor.description]
result = []
for row in rows:
b = dict(zip(cols, row))
result.append(b)
return result
except Exception as e:
print(e)

def execute(self, sql, *args):
"""
sql语言执行插入更新操作
:param sql: sql语句
:param args: 不定数据
:return: None
"""
print("-----插入更新数据-----")
try:
self.cursor.execute(sql, *args)
except Exception as e:
print('\033[1;31;0m\t4---插入更新失败,msg:\033[0m', e, sql.replace("\n", ""))
self.conn.rollback()
return False
return True

def executemany(self, sql, *args):
"""
sql语言执行批次插入操作
:param sql: 不管字段为什么类型,占位符统一使用%s,且不能加上引号
:param args: 元组型列表或元组型元组[(1, '张三', '男'),(2, '李四', '女'),]
:return:None
"""
print("-----插入数据-----")
try:
self.cursor.executemany(sql, *args)
except Exception as e:
print('\033[1;31;0m\t4---插入更新失败,msg:\033[0m', e, sql.replace("\n", ""))
self.conn.rollback()
return False
return True

def commit(self):
self.conn.commit()

def close(self):
self.cursor.close()
self.conn.close()

1.3 SQL Server通用封装

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# -*- coding:utf-8 -*-
import pymssql

'''
==================================================
@IDE :PyCharm
@Author :Miss.BadWoman
@Date :2020/7/4 15:30
@Version:2.1.4(pymssql推荐版本)
@Desc :SQL Server通用连接函数(有空完善,先写select)
===================================================
'''


class SQLServerHelper(object):
def __init__(self, host, user, password, database):
"""
类的构造和初始化连接
:param host:ip地址
:param user:用户名
:param password:密码
:param database:数据库
"""
try:
self.conn = pymssql.connect(host, user, password, database, charset='utf8')
self.cursor = self.conn.cursor()
except Exception as e:
print("连接数据库失败:"+str(e))

def sqlserver_select(self, sql, mode="all"):
"""
sql语言执行查询操作,返回数据集
:param sql: sql语句
:param mode: (all/one) 返回单条或多条数据,查询类型
:return: 对象型数组
"""
try:
self.cursor.execute(sql)
if mode == "all":
rows = self.cursor.fetchall()
elif mode == "one":
rows = self.cursor.fetchone()
else:
rows = self.cursor.fetchall()
"""
获取字段名时要根据根据pymssql版本的决定
2.1.4及以下:参考下述方法
2.1.5及以上(推荐):
conn = pymssql.connect(server, user, password, "tempdb")
cursor = conn.cursor(as_dict=True)
"""
cols = [d[0] for d in self.cursor.description]
result = []
for row in rows:
b = dict(zip(cols, row))
result.append(b)
if len(result) <= 0:
# 为空返回False
return False
return result
except Exception as e:
print('\033[1;31;0m\t4---查询失败,msg:\033[0m', e, sql.replace("\n", ""))
return False

def sqlserver_update(self, sql, *args):
"""
语言执行update更新操作
:param sql
:return: None
"""
try:
# 开始执行sql
self.cursor.execute(sql, *args)
except Exception as e:
print('\033[1;31;0m\t4---更新失败,msg:\033[0m', e, sql.replace("\n", ""))
self.conn.rollback()
return False

def executemany(self, sql, *args):
"""
sql语言执行批次插入操作
:param sql: 不管字段为什么类型,占位符统一使用%s,且不能加上引号
:param args: 元组型列表或元组型元组[(1, '张三', '男'),(2, '李四', '女'),]
:return:None
"""
print("-----插入数据-----")
try:
self.cursor.executemany(sql, *args)
except Exception as e:
print('\033[1;31;0m\t4---批量插入更新失败,msg:\033[0m', e, sql.replace("\n", ""))
self.conn.rollback()
return False
return True

def commit(self):
self.conn.commit()

def close(self):
self.cursor.close()
self.conn.close()
1.fetchall\(\):表示返回查询到的所有数据;fetchone\(\):表示返回查询到的第一行数据 2.python原生查询返回的所有数据类型为元组型列表,即\[\(\)\,\(\)\,\...\(\)\] 3.元组的访问方式为:元组名\[index\]

2.数据分批次处理

数据库分批次处理可以使用两种方法:execute()与executemany()方法

2.1 executemany

在使用executemany方法时,需要注意的几个问题:

  1. 在写sql语句时,不管字段为什么类型,占位符统一使用%s,且不能加上引号
1
sql="insert into tablename (id,name) values (%s,%s)"
  1. 添加的数据的格式必须为元组型列表或元组型元组(并不是无限添加):list[tuple(),tuple(),tuple()]或者tuple(tuple(),tuple(),tuple())
1
2
3
values=[(1,"zhangsan"),(2,"lisi")]
#或者
values=((1,"zhangsan"),(2,"lisi"))

不建议一次批量操作太多的数据,如果数据太多数据库响应也会很慢。批量操作需要把握一个度,建议每批数据尽量控制在500以内。如果数据多于500,则分多批次处理

  1. 最后通过executemany插入
1
2
cursor.executemany(sql,values)
connect.commit()

2.2 execute

execute()语法一次只能执行一个sql语句,分批次处理的原理是:循环执行批量语句,然后在commit,如下

1
2
3
4
5
for i in range(10):
sql = "insert into users(id,grand) values(i,i+10)"
cursor.execute(sql)
# 批量执行数据后,然后提交
connect.commit()

3.数据库插入百万级数据优化

最近做一个项目,需要插入和读取很多数据,所以就需要对数据库大量插入操作进行优化,还是以python为基石,不过在开始动工前,需要明确以下几点:

  1. 考虑是一条线程完整地执行数据库的连接、创建游标、然后插入数据、commit数据、断开连接这一系列操作,还是使用多线程执行获取数据的操作,然后单线程来插入整理好的数据?
  2. 考虑是每执行一次insert,update或者delete,就执行一次commit,还是批量处理数据后,才执行一次commit
  3. 数据执行语句execute()与executemany()的选择

对我而言,我的设计是调用数据库的通用数据池连接文件SqlConnectManage.py,将数据库的连接和关闭函数作为全局使用,放在程序的首尾(就相当于不会重复连接数据库,减少IO开销)
然后创建多线程+队列的方式获取数据,以单线程形式来插入数据,批量处理数据后,才commit提交。

案例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
from SqlConnectManage import sqlserverManager


# 创建全局连接和游标(这里暂时不用DBUtils库)
DB_CONN = pymssql.connect(SERVER , USER , PASSWORD, DATABASE)
DB_CURSOR = DB_CONN.cursor()
# ...
# ...
# ...
if __name__ == "__main__":

# 开始时间戳
s_time = time.time()

# 设定最大队列数和线程数
q = Queue(maxsize=10)
threads = []
starttime, endtime = get_date()
# print(starttime,endtime) # 经测试,获取查询时间段成功

# 获取所有用户组列表
userid_all_list = get_user_list()
# print(userid_all_list) # 经测试,获取所有用户组成功

# 循环每一个用户组列表并开启多线程采集企业微信考勤数据
while userid_all_list:
user_list = userid_all_list.pop()
t = Thread(target=gatherData, args=(starttime, endtime, user_list, ))
q.put(t)
if (q.full() == True or len(userid_all_list) == 0):
while q.empty() == False:
t = get()
threads.append(t)
t.start()
for t in threads:
t.join()

# 单线程插入数据
insert_sql()

4.DBUtils管理数据库连接池

当使用多线程,多进程将海量数据存入数据库时,每次执行一个sql的时候都单独建立一个mysql连接,执行完就close掉,很明显这样的问题在于,频繁连接,断开mysql,这样是相当消耗系统资源的,而且增加了mysql连接失败的几率,所以万一哪个线程没有连接成功 这个线程也over了。

4.1 连接池原理


1.在程序创建连接的时候,可以从一个空闲的连接中获取,不需要重新初始化连接,提升获取连接的速度
2.关闭连接的时候,把连接放回连接池,而不是真正的关闭,所以可以减少频繁地打开和关闭连接

安装DBUtils库

pip install DBUtils

参数解释

参数 详解
creator, # 使用链接数据库的模块(必须:pymssql,pymysql,cx_oralce,…)
mincached=0, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
maxcached=0, # 链接池中最多闲置的链接,0和None不限制
maxshared=0, # 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用
maxconnections=0, # 连接池允许的最大连接数,0和None表示不限制连接数
blocking=False, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制
setsession=None, # 开始会话前执行的命令列表。如:[“set datestyle to …”, “set time zone …”]
reset=True,
failures=None,
ping=1, # ping MySQL服务端,检查是否服务可用

数据库设置(数据库连接模块不一样,其参数名也不同)

如creator=pymysql:
host='localhost', port=3306,db='mydata',user="root",passwd="123456",charset='utf8'

数据库通用连接函数(连接模块视实际数据库决定)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
from DBUtils import PooledDB
import pymssql

class sqlserverManager(object):
# 构造函数,初始化连接
def __init__(self, server, user, password, database, table):
self.server = server
self.user = user
self.password = password
self.database = database
self.table = table
self.conn = None
self.cursor = None
self.maxconnections = 15 # 设置最大连接数

# 保存数据到SQL server
def connect_database(self):
try:
self.conn = PooledDB(creator=pymssql,
maxconnections=self.maxconnections,
server=self.server,
user=self.user,
password=self.password,
database=self.database,
charset='utf8').connection()
# 创建游标
self.cursor = self.conn.cursor()
print("sql server had connected")
except Exception as e:
print("the connect failed:", e)
return None

def dbclose(self):
self.cursor.close()
self.conn.close()

def dbcommit(self):
self.conn.commit()

def execute(self, sql):
print("-----插入数据-----")
try:
self.cursor.execute(sql)

except Exception as e:
print('\033[1;31;0m\t4---插入更新失败,msg:\033[0m', e, sql.replace("\n", ""))
self.conn.rollback()
return False
return True

4.2 多线程/连接池操作MySQL插入

每调用一次插入函数就从连接池中取出一个链接操作,完成后关闭链接;
executemany批量操作,减少 commit 次数,提升效率;

参考文章


 评论

联系我 | Contact with me

Copyright © 2019-2020 谁知你知我,我知你知深。此恨经年深,比情度日久

博客内容遵循 署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0) 协议