支付宝支付和微信支付是当今互联网产品常用的功能,我使用django rest framework实现了网页上支付宝支付和微信支付的一个通用服务,提供rpc接口给其他服务,包括获取支付宝支付页面url的rpc接口、支付宝支付成功异步回调http接口、获取微信支付二维码rpc接口、主动查询微信订单是否支付的rpc接口等。
支付宝网站支付需要蚂蚁金服开放平台账号,创建应用、配置秘钥等步骤请参考:蚂蚁金服支付宝电脑网站支付快速接入
微信网站支付需要到微信支付官网注册服务商账号,
目录结构如下:
1、models.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
|
from django.db import models from django.contrib.postgres.fields import arrayfield # create your models here. class basemodel(models.model): """ 基础模型 """ created_time = models.datetimefield(auto_now_add = true, verbose_name = "创建时间" ) updated_time = models.datetimefield(auto_now = true, verbose_name = "修改时间" ) created_by = models.integerfield(verbose_name = "创建人id" ) updated_by = models.integerfield(verbose_name = "修改人id" ) is_active = models.booleanfield(default = true, verbose_name = '是否正常' ) class meta: abstract = true class alipay(basemodel): """ 支付 """ subject = models.charfield(max_length = 256 , verbose_name = "订单标题" ) out_trade_no = models.charfield(max_length = 64 , unique = true, verbose_name = "唯一订单号" ) trade_no = models.charfield(default = " ", max_length=64, verbose_name=" 支付宝系统中的交易流水号") total_amount = models.decimalfield(max_digits = 11 , decimal_places = 2 , verbose_name = "订单的资金总额" ) return_url = models.charfield(max_length = 500 , verbose_name = "支付完成同步跳转地址" ) notify_url = models.charfield(max_length = 500 , verbose_name = "支付完成异步通知rpc地址" ) pay_time = models.datetimefield(null = true, blank = true, verbose_name = "支付时间" ) pay_nos = arrayfield(models.charfield(max_length = 100 ), default = [], verbose_name = '同一订单的支付id数组' ) class meta: verbose_name = "阿里支付" verbose_name_plural = verbose_name ordering = ( '-created_time' ,) class wxorder(basemodel): """ 订单 """ body = models.charfield(max_length = 256 , verbose_name = "商品描述" ) out_trade_no = models.charfield(max_length = 64 , unique = true, verbose_name = "订单号" ) transaction_id = models.charfield(default = " ", max_length=64, verbose_name=" 微信支付订单号") total_fee = models.bigintegerfield(verbose_name = "订单的资金总额,单位为分" ) product_id = models.charfield(max_length = 16 , verbose_name = "商品id" ) notify_url = models.charfield(max_length = 500 , verbose_name = "支付完成通知url" ) pay_time = models.datetimefield(null = true, blank = true, verbose_name = "支付时间" ) class meta: verbose_name = "微信订单" verbose_name_plural = verbose_name ordering = ( '-created_time' ,) class wxpay(basemodel): """ 微信支付 """ out_trade_no = models.charfield(null = true, blank = true, max_length = 64 , verbose_name = "订单号" ) pay_no = models.charfield(null = true, blank = true, max_length = 64 , unique = true, verbose_name = "支付唯一订单号" ) code_url = models.charfield(null = true, blank = true, max_length = 100 , verbose_name = "二维码地址" ) nonce_str = models.charfield(null = true, blank = true, max_length = 32 , verbose_name = "随机字符串" ) class meta: verbose_name = "微信支付" verbose_name_plural = verbose_name ordering = ( '-created_time' ,) |
2、serializers.py:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
from django.conf import settings from rest_framework import serializers from pay.models import alipay, wxpay class baseserializer(serializers.modelserializer): created_time = serializers.datetimefield( format = settings.datetime_format, read_only = true) updated_time = serializers.datetimefield( format = settings.datetime_format, read_only = true) is_active = serializers.booleanfield(read_only = true) class meta: model = none class alipayserializer(baseserializer): """ 阿里支付序列化类 """ class meta: model = alipay fields = "__all__" class wxpayserializer(baseserializer): """ 阿里支付序列化类 """ class meta: model = wxpay fields = "__all__" |
3、views.py:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
|
# -*- coding=utf-8 -*- # create your views here. import time import dicttoxml from jsonrpc import jsonrpc_method from rest_framework.decorators import list_route from rest_framework.response import response from rest_framework.viewsets import modelviewset from rest_framework.views import apiview from rest_framework_xml.parsers import xmlparser from rest_framework_xml.renderers import xmlrenderer from tokenauth.decorators import is_login from pay import utils from pay.weixin_pay import weixinpay, unifiedorderpay, orderquery from pay.uuidtools import uuidtools from pay.models import alipay, wxpay, wxorder from pay.serializers import alipayserializer, wxpayserializer from pay.utils import unactivemodelmixin from pay.alipay import alipay from pay_service.settings.base import appid, private_key_path, \ ali_pub_key_path, alipay_callback_url, \ wxappid, wx_pay_key, wx_mch_id, wxpay_callback_url notify_url = alipay_callback_url + 'api/v1.0/pay/alipay/notify/' class alipayviewset(modelviewset): queryset = alipay.objects. filter (is_active = true) serializer_class = alipayserializer @list_route (methods = [ 'post' ]) def notify( self , request): """ 处理支付宝的notify_url :param request: :return: """ processed_dict = {} for k, v in request.data.items(): processed_dict[k] = v app_id = processed_dict.get( 'app_id' ) pay_no = processed_dict.get( 'out_trade_no' ) trade_no = processed_dict.get( 'trade_no' ) total_amount = processed_dict.get( 'total_amount' ) pay_time = time.strftime( '%y-%m-%d %h:%m:%s' ,time.localtime()) alipay = alipay.objects. filter (pay_nos__contains = [pay_no]).values().first() if alipay is none: return response( "failed" ) if str (alipay.get( 'total_amount' )) ! = str (total_amount): return response( "failed" ) if app_id ! = appid: return response( "failed" ) if alipay.get( 'trade_no' ) ! = "": return response( "failed" ) sign = processed_dict.pop( 'sign' , none) ali_pay = alipay( appid = appid, app_notify_url = notify_url, app_private_key_path = private_key_path, alipay_public_key_path = ali_pub_key_path, debug = true, # 默认false, return_url = alipay.get( 'return_url' ) ) is_verify = ali_pay.verify(processed_dict, sign) if is_verify is true: alipay.objects. filter (pk = alipay.get( 'id' )).update(pay_time = pay_time, trade_no = trade_no) ret = utils.request_thrift( 'tradingmanager' , 'notify' , settings.trading_rpc_ip, int (settings.trading_rpc_port), alipay.get( 'out_trade_no' ), str (pay_time)) if ret = = "success" : return response( "success" ) class wxpayviewset(modelviewset): queryset = wxpay.objects. filter (is_active = true) serializer_class = wxpayserializer parser_classes = (xmlparser,) renderer_classes = (xmlrenderer,) @jsonrpc_method ( 'pay.get_alipay_url' ) def get_alipay_url(request, subject, out_trade_no, total_amount, return_url, notify_url, user_id): recode = alipay.objects. filter (out_trade_no = out_trade_no).values().first() if recode is not none: pay_no = uuidtools.datetime_random() alipay = alipay.objects.get(pk = recode.get( 'id' )) alipay.pay_nos.append(pay_no) alipay.save() else : pay_no = out_trade_no alipay.objects.create(subject = subject, out_trade_no = out_trade_no, total_amount = total_amount, return_url = return_url, notify_url = notify_url, pay_nos = [pay_no], created_by = user_id, updated_by = user_id ) ali_pay = alipay( appid = appid, app_notify_url = notify_url, app_private_key_path = private_key_path, alipay_public_key_path = ali_pub_key_path, debug = true, # 默认false, return_url = return_url ) total_amount = "%.2f" % float (total_amount) url = ali_pay.direct_pay( subject = subject, out_trade_no = pay_no, total_amount = total_amount ) # 沙箱环境网关 # alipay_url = "https://openapi.alipaydev.com/gateway.do?{data}".format(data=url) # 正式环境网关 alipay_url = "https://openapi.alipay.com/gateway.do?{data}" . format (data = url) return alipay_url @jsonrpc_method ( 'pay.get_wxpay_url' ) def get_wxpay_url(request, out_trade_no, body, total_fee, notify_url, product_id, user_id): recode = wxorder.objects. filter (out_trade_no = out_trade_no).values().first() if recode is none: wxorder.objects.create( out_trade_no = out_trade_no, body = body, total_fee = total_fee, notify_url = notify_url, product_id = product_id, created_by = user_id, updated_by = user_id ) pay_no = uuidtools.datetime_random() pay = unifiedorderpay(wxappid, wx_mch_id, wx_pay_key) response = pay.post(body, pay_no, total_fee, wxpay_callback_url.split( '://' )[ 1 ].split( ':' )[ 0 ], wx_notify_url) if response and response[ "return_code" ] = = "success" and response[ "result_code" ] = = "success" : wxorder = wxorder.objects. filter (out_trade_no = out_trade_no).values().first() wxpay.objects.create( out_trade_no = out_trade_no, pay_no = pay_no, code_url = response.get( 'code_url' ), nonce_str = response.get( 'nonce_str' ), created_by = user_id, updated_by = user_id ) return response.get( 'code_url' ) @jsonrpc_method ( 'pay.wx_order_query' ) def wx_order_query(request, out_trade_no): wxpays = wxpay.objects. filter (out_trade_no = out_trade_no).values() pay = orderquery(wxappid, wx_mch_id, wx_pay_key) for wxpay in wxpays: response = pay.post(wxpay.get( 'pay_no' )) if response and response[ "return_code" ] = = "success" \ and response[ "result_code" ] = = "success" : trade_state = response[ "trade_state" ] if trade_state = = "success" : # 支付成功 pay_time = response[ "time_end" ] transaction_id = response[ "transaction_id" ] wxorder.objects. filter (out_trade_no = out_trade_no).update( pay_time = time.strftime( "%y-%m-%d %h:%m:%s" , time.strptime(pay_time, "%y%m%d%h%m%s" )), transaction_id = transaction_id ) return { "success" : true, "pay_time" : pay_time} return { "success" : false} |
4、alipay.py:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
|
# -*- coding: utf-8 -*- # pip install pycryptodome from datetime import datetime from crypto.publickey import rsa from crypto.signature import pkcs1_v1_5 from crypto. hash import sha256 from base64 import b64encode, b64decode from urllib.parse import quote_plus from urllib.parse import urlparse, parse_qs from urllib.request import urlopen from base64 import decodebytes, encodebytes import json class alipay( object ): """ 支付宝支付接口 """ def __init__( self , appid, app_notify_url, app_private_key_path, alipay_public_key_path, return_url, debug = false): self .appid = appid self .app_notify_url = app_notify_url self .app_private_key_path = app_private_key_path self .app_private_key = none self .return_url = return_url with open ( self .app_private_key_path) as fp: self .app_private_key = rsa.importkey(fp.read()) self .alipay_public_key_path = alipay_public_key_path with open ( self .alipay_public_key_path) as fp: self .alipay_public_key = rsa.import_key(fp.read()) if debug is true: self .__gateway = "https://openapi.alipaydev.com/gateway.do" else : self .__gateway = "https://openapi.alipay.com/gateway.do" def direct_pay( self , subject, out_trade_no, total_amount, return_url = none, * * kwargs): biz_content = { "subject" : subject, "out_trade_no" : out_trade_no, "total_amount" : total_amount, "product_code" : "fast_instant_trade_pay" , # "qr_pay_mode":4 } biz_content.update(kwargs) data = self .build_body( "alipay.trade.page.pay" , biz_content, self .return_url) return self .sign_data(data) def build_body( self , method, biz_content, return_url = none): data = { "app_id" : self .appid, "method" : method, "charset" : "utf-8" , "sign_type" : "rsa2" , "timestamp" : datetime.now().strftime( "%y-%m-%d %h:%m:%s" ), "version" : "1.0" , "biz_content" : biz_content } if return_url is not none: data[ "notify_url" ] = self .app_notify_url data[ "return_url" ] = self .return_url return data def sign_data( self , data): data.pop( "sign" , none) # 排序后的字符串 unsigned_items = self .ordered_data(data) unsigned_string = "&" .join( "{0}={1}" . format (k, v) for k, v in unsigned_items) sign = self .sign(unsigned_string.encode( "utf-8" )) ordered_items = self .ordered_data(data) quoted_string = "&" .join( "{0}={1}" . format (k, quote_plus(v)) for k, v in ordered_items) # 获得最终的订单信息字符串 signed_string = quoted_string + "&sign=" + quote_plus(sign) return signed_string def ordered_data( self , data): complex_keys = [] for key, value in data.items(): if isinstance (value, dict ): complex_keys.append(key) # 将字典类型的数据dump出来 for key in complex_keys: data[key] = json.dumps(data[key], separators = ( ',' , ':' )) return sorted ([(k, v) for k, v in data.items()]) def sign( self , unsigned_string): # 开始计算签名 key = self .app_private_key signer = pkcs1_v1_5.new(key) signature = signer.sign(sha256.new(unsigned_string)) # base64 编码,转换为unicode表示并移除回车 sign = encodebytes(signature).decode( "utf8" ).replace( "\n" , "") return sign def _verify( self , raw_content, signature): # 开始计算签名 key = self .alipay_public_key signer = pkcs1_v1_5.new(key) digest = sha256.new() digest.update(raw_content.encode( "utf8" )) if signer.verify(digest, decodebytes(signature.encode( "utf8" ))): return true return false def verify( self , data, signature): if "sign_type" in data: sign_type = data.pop( "sign_type" ) # 排序后的字符串 unsigned_items = self .ordered_data(data) message = "&" .join(u "{}={}" . format (k, v) for k, v in unsigned_items) return self ._verify(message, signature) if __name__ = = "__main__" : alipay = alipay( appid = "2016081500252338" , app_notify_url = "http://projectsedus.com/" , app_private_key_path = "keys/private_2048.txt" , alipay_public_key_path = "keys/alipay_key_2048.txt" , # 支付宝的公钥,验证支付宝回传消息使用,不是你自己的公钥, debug = true, # 默认false, return_url = "http://192.168.247.129:8000/" ) url = alipay.direct_pay( subject = "测试订单" , out_trade_no = "20170202126666" , total_amount = 1000 ) re_url = "https://openapi.alipaydev.com/gateway.do?{data}" . format (data = url) print (re_url) |
5、wxpay.py:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
|
# -*- coding=utf-8 -*- import time import json import hashlib import requests from pay.utils import (smart_str, dict_to_xml, calculate_sign, random_str, post_xml, xml_to_dict, validate_post_xml, format_url) # from local_settings import appid, mch_id, api_key oauth2_authorize_url = "https://open.weixin.qq.com/connect/oauth2/authorize?%s" oauth2_access_token_url = "https://api.weixin.qq.com/sns/oauth2/access_token?%s" class weixinpay( object ): def __init__( self , appid, mch_id, api_key): self .appid = appid # 微信公众号身份的唯一标识。审核通过后,在微信发送的邮件中查看 self .mch_id = mch_id # 受理商id,身份标识 self .api_key = api_key # 商户支付密钥key。审核通过后,在微信发送的邮件中查看 self .common_params = { "appid" : self .appid, "mch_id" : self .mch_id, } self .params = {} self .url = "" self .trade_type = "" def set_params( self , * * kwargs): self .params = {} for (k, v) in kwargs.items(): self .params[k] = smart_str(v) self .params[ "nonce_str" ] = random_str( 32 ) if self .trade_type: self .params[ "trade_type" ] = self .trade_type self .params.update( self .common_params) def post_xml( self ): sign = calculate_sign( self .params, self .api_key) xml = dict_to_xml( self .params, sign) response = post_xml( self .url, xml) return xml_to_dict(response.text) def valiate_xml( self , xml): return validate_post_xml(xml, self .appid, self .mch_id, self .api_key) def get_error_code_desc( self , error_code): error_desc = { "systemerror" : u "接口后台错误" , "invalid_transactionid" : u "无效 transaction_id" , "param_error" : u "提交参数错误" , "orderpaid" : u "订单已支付" , "out_trade_no_used" : u "商户订单号重复" , "noauth" : u "商户无权限" , "notenough" : u "余额丌足" , "notsuportcard" : u "不支持卡类型" , "orderclosed" : u "订单已关闭" , "bankerror" : u "银行系统异常" , "refund_fee_invalid" : u "退款金额大亍支付金额" , "ordernotexist" : u "订单不存在" , } return error_desc.get(error_code.strip().upper(), u "未知错误" ) class unifiedorderpay(weixinpay): """发送预支付单""" def __init__( self , appid, mch_id, api_key): super (unifiedorderpay, self ).__init__(appid, mch_id, api_key) self .url = "https://api.mch.weixin.qq.com/pay/unifiedorder" self .trade_type = "native" def post( self , body, out_trade_no, total_fee, spbill_create_ip, notify_url, * * kwargs): tmp_kwargs = { "body" : body, "out_trade_no" : out_trade_no, "total_fee" : total_fee, "spbill_create_ip" : spbill_create_ip, "notify_url" : notify_url, } tmp_kwargs.update( * * kwargs) self .set_params( * * tmp_kwargs) return self .post_xml()[ 1 ] class orderquery(weixinpay): """订单状态查询""" def __init__( self , appid, mch_id, api_key): super (orderquery, self ).__init__(appid, mch_id, api_key) self .url = "https://api.mch.weixin.qq.com/pay/orderquery" def post( self , out_trade_no): self .set_params(out_trade_no = out_trade_no) return self .post_xml()[ 1 ] class jsapiorderpay(unifiedorderpay): """h5页面的js调用类""" def __init__( self , appid, mch_id, api_key, app_secret): super (jsapiorderpay, self ).__init__(appid, mch_id, api_key) self .app_secret = app_secret self .trade_type = "jsapi" def create_oauth_url_for_code( self , redirect_uri): url_params = { "appid" : self .appid, "redirect_uri" : redirect_uri, # 一般是回调当前页面 "response_type" : "code" , "scope" : "snsapi_base" , "state" : "state#wechat_redirect" } url = format_url(url_params) return oauth2_authorize_url % url def _create_oauth_url_for_openid( self , code): url_params = { "appid" : self .appid, "secret" : self .app_secret, "code" : code, "grant_type" : "authorization_code" , } url = format_url(url_params) return oauth2_access_token_url % url def _get_oauth_info( self , code): """ 获取oauth2的信息:access_token、expires_in、refresh_token、openid、scope 返回结果为字典,可使用["xxx"]或.get("xxx", none)的方式进行读取 """ url = self ._create_oauth_url_for_openid(code) response = requests.get(url) return response.json() if response else none def _get_openid( self , code): oauth_info = self ._get_oauth_info(code) if oauth_info: return oauth_info.get( "openid" , none) return none def _get_json_js_api_params( self , prepay_id): js_params = { "appid" : self .appid, "timestamp" : "%d" % time.time(), "noncestr" : random_str( 32 ), "package" : "prepay_id=%s" % prepay_id, "signtype" : "md5" , } js_params[ "paysign" ] = calculate_sign(js_params, self .api_key) return js_params def post( self , body, out_trade_no, total_fee, spbill_create_ip, notify_url, code): if code: open_id = self ._get_openid(code) if open_id: # 直接调用基类的post方法查询prepay_id,如果成功,返回一个字典 unified_order = super (jsapiorderpay, self ).post(body, out_trade_no, total_fee, spbill_create_ip, notify_url, open_id = open_id) if unified_order: prepay_id = unified_order.get( "prepay_id" , none) if prepay_id: return self ._get_json_js_api_params(prepay_id) return none |
6、utils.py:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
|
# -*- coding=utf-8 -*- import hashlib import re import types from random import random import requests import thriftpy from django.conf import settings from django.core.exceptions import fielddoesnotexist from django.db import models from django.db.models.fields.reverse_related import foreignobjectrel from rest_framework.pagination import pagenumberpagination from thriftpy.rpc import make_client from pay.exception_handler import foreignobjectreldeleteerror, modeldonthaveisactivefiled, logger def smart_str(s, encoding = 'utf-8' , strings_only = false, errors = 'strict' ): """ returns a bytestring version of 's', encoded as specified in 'encoding'. if strings_only is true, don't convert (some) non-string-like objects. """ if strings_only and isinstance (s, (types.nonetype, int )): return s if not isinstance (s, str ): try : return str (s) except unicodeencodeerror: if isinstance (s, exception): # an exception subclass containing non-ascii data that doesn't # know how to print itself properly. we shouldn't raise a # further exception. return ' ' .join([smart_str(arg, encoding, strings_only, errors) for arg in s]) return unicode (s).encode(encoding, errors) elif s and encoding ! = 'utf-8' : return s.decode( 'utf-8' , errors).encode(encoding, errors) else : return s def format_url(params, api_key = none): url = "&" .join([ '%s=%s' % (key, smart_str(params[key])) for key in sorted (params)]) if api_key: url = '%s&key=%s' % (url, api_key) return url def calculate_sign(params, api_key): # 签名步骤一:按字典序排序参数, 在string后加入key url = format_url(params, api_key) # 签名步骤二:md5加密, 所有字符转为大写 return hashlib.md5(url.encode( 'utf-8' )).hexdigest().upper() def dict_to_xml(params, sign): xml = [ "<xml>" , ] for (k, v) in params.items(): if (v.isdigit()): xml.append( '<%s>%s</%s>' % (k, v, k)) else : xml.append( '<%s><![cdata[%s]]></%s>' % (k, v, k)) xml.append( '<sign><![cdata[%s]]></sign></xml>' % sign) return ''.join(xml) def xml_to_dict(xml): if xml[ 0 : 5 ].upper() ! = "<xml>" and xml[ - 6 ].upper() ! = "</xml>" : return none, none result = {} sign = none content = ' '.join(xml[5:-6].strip().split(' \n')) pattern = re. compile (r "<(?p<key>.+)>(?p<value>.+)</(?p=key)>" ) m = pattern.match(content) while (m): key = m.group( "key" ).strip() value = m.group( "value" ).strip() if value ! = "<![cdata[]]>" : pattern_inner = re. compile (r "<!\[cdata\[(?p<inner_val>.+)\]\]>" ) inner_m = pattern_inner.match(value) if inner_m: value = inner_m.group( "inner_val" ).strip() if key = = "sign" : sign = value else : result[key] = value next_index = m.end( "value" ) + len (key) + 3 if next_index > = len (content): break content = content[next_index:] m = pattern.match(content) return sign, result def validate_post_xml(xml, appid, mch_id, api_key): sign, params = xml_to_dict(xml) if ( not sign) or ( not params): return none remote_sign = calculate_sign(params, api_key) if sign ! = remote_sign: return none if params[ "appid" ] ! = appid or params[ "mch_id" ] ! = mch_id: return none return params def random_str(randomlength = 8 ): chars = 'abcdefghijklmnopqrstuvwxyz0123456789' random = random() return "".join([chars[random.randint( 0 , len (chars) - 1 )] for i in range (randomlength)]) def post_xml(url, xml): return requests.post(url, data = xml.encode( 'utf-8' ), verify = false) class unactivemodelmixin( object ): """ 删除一个对象,并不真删除,级联将对应外键对象的is_active设置为false,需要外键对象都有is_active字段. """ def perform_destroy( self , instance): rel_fileds = [f for f in instance._meta.get_fields() if isinstance (f, foreignobjectrel)] links = [f.get_accessor_name() for f in rel_fileds] for link in links: manager = getattr (instance, link, none) if not manager: continue if isinstance (manager, models.model): if hasattr (manager, 'is_active' ) and manager.is_active: manager.is_active = false manager.save() raise foreignobjectreldeleteerror(u '{} 上有关联数据' . format (link)) else : if not manager.count(): continue try : manager.model._meta.get_field( 'is_active' ) manager. filter (is_active = true).update(is_active = false) except fielddoesnotexist as ex: # 理论上,级联删除的model上面应该也有is_active字段,否则代码逻辑应该有问题 logger.warn(ex) raise modeldonthaveisactivefiled( '{}.{} 没有is_active字段, 请检查程序逻辑' . format ( manager.model.__module__, manager.model.__class__.__name__ )) instance.is_active = false instance.save() def get_queryset( self ): return self .queryset. filter (is_active = true) class standardresultssetpagination(pagenumberpagination): page_size_query_param = 'size' |
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持服务器之家。
原文链接:https://www.cnblogs.com/victorwu/p/8505592.html