本文主要内容python MySQLdb数据库批量插入insert,更新update的:
1.python MySQLdb的使用,写了一个基类让其他的sqldb继承这样比较方便,数据库的ip, port等信息使用json配置文件
2.常见的查找,批量插入更新
下面贴出基类代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
# _*_ coding:utf-8 _*_ import MySQLdb import json import codecs # 这个自己改一下啊 from utils.JsonUtil import get_json_from_file def byteify( input ): """ the string of json typed unicode to str in python This function coming from stack overflow :param input: {u'first_name': u'Guido', u'last_name': u'jack'} :return: {'first_name': 'Guido', 'last_name': 'jack'} """ if isinstance ( input , dict ): return {byteify(key): byteify(value) for key, value in input .iteritems()} elif isinstance ( input , list ): return [byteify(element) for element in input ] elif isinstance ( input , unicode ): return input .encode( 'utf-8' ) else : return input def get_json_from_file(filename): with open (filename) as jf: jsondata = json.load(jf) return byteify(jsondata) class DbBase( object ): def __init__( self , * * kwargs): self .db_config_file = kwargs[ 'db_config_file' ] self .config_db( self .db_config_file) def config_db( self , db_config_file): data = get_json_from_file(db_config_file) host = data[ 'host' ] user = data[ 'user' ] pwd = data[ 'pwd' ] db = data[ 'db' ] port = data[ 'port' ] self .tb_audit_mobile = data[ 'tb_audit_mobile' ] self .conn = MySQLdb.connect(host = host, port = port, user = user, passwd = pwd, db = db, charset = "utf8" , use_unicode = True ) self .cursor = self .conn.cursor() |
子类的示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
|
class DbAuditTestService(DbBase): def __init__( self , * * kwargs): super (DbAuditTestService, self ).__init__( * * kwargs) def getAdTestURl( self , beg, end): sql = """select url, source from tb_name where create_date BETWEEN '%s' and '%s' """ % (beg, end) self .cursor.execute(sql) res = [row for row in self .cursor] return res def insert( self , lst_row): """batch insert, use ignore 避免索引唯一问题""" try : insert_sql = 'INSERT ignore INTO tb_ms_mobile_report_test (appid, source) VALUES (%s, %s)' self .cursor.executemany(insert_sql, lst_row) self .conn.commit() except MySQLdb.OperationalError as e: logger.info( '%s' % e) self .cursor.close() self .conn.close() self .config_db( self .db_config_file) def update_ip_info( self , ip_info): """ batch update [[voilate_right_rate, ip]] :param ip_info: :return: """ query = """ update tb_ms_audit_ip_info set ip_right_rate=%s where submit_ip=%s """ self .cursor.executemany(query, ip_info) self .conn.commit() def insert_all(): """批量操作的示例""" db_audit = DbAuditService(db_config_file = '../config/mysql_police_audit.json' ) size = db_audit.count() db_audit_test = DbAuditTestService(db_config_file = '../config/mysql_local_audit.json' ) batch_size = 2000 for k in xrange ( 100000 , size, batch_size): logger.info( 'query limit %s ~ %s' % (k, batch_size)) lst_row = db_audit.query_limit(k, batch_size) logger.info( 'convert_rows ' ) lst_row = convert_rows(lst_row) db_audit_test.insert(lst_row) |
总结
以上所述是小编给大家介绍的python MySQLdb使用教程详解,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对服务器之家网站的支持!
原文链接:http://blog.csdn.net/haluoluo211/article/details/77721138