由于公司的系统用的是java版本,开通了企业号打卡之后又没有预算让供应商做数据对接,所以只能自己捣鼓这个,以下是个人设置的一些内容,仅供大家参考
安装python
python的安装,这里就不详细写了,大家可自行度娘或google。
安装第三方库
python安装好之后别忘记配置环境变量!另外,所以的内容都是安装在服务器上的,且服务器需要能够上外网,否则,只能配置在本地,因为需要外网连接微信企业号的接口。这里需要用到几个第三方库:
python的pip命令,一般python安装好之后都会默认有,如果不确定,可输入命令查询,通过cmd进入命令提示符,输入
pip list
如果提示你需要更新,你可以更新,也可以不更新,更新命令其实给到你了python -m pip install --upgrade pip
安装所需要的库
step.1
pip install pymssql
如果安装pymssql出错,提示什么visual c++ 14,则先安装wheel,如不报错则忽略step2、step3
step.2
pip install wheel
step.3
下载pymssql-2.1.4.dev5-cp37-cp37m-win_amd64.whl
可去这里下载最新版本的。pymssql下载
下载好之后,进入该文件所在的目录,通过pip install安装即可cd d:\
pip install pymssql-2.1.4.dev5-cp37-cp37m-win_amd64.whl
step.4
pip install requests
至此,所有第三方库都配置好了。
写主程序
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
# !/usr/bin/python # -*- coding:utf-8 -*- # @time: 2018/7/26 16:05 # @author: hychen.cc import json # 因微信企业号返回的格式为json,所以引入json import requests import pymssql import math # 引入数学方法 import time import datetime server = 'xx.xx.xx.xx' # 数据库服务器地址 user = 'sa' # 数据库登录名,可以用sa password = '******' # 数据库用户对应的密码 dbname = 'dbname' # 数据库名称 corp_id = 'xxxxxx' # 微信企业号提供的corp_id corp_secret = 'xxxxx' # 微信企业号提供的corp_secret """ |
因微信接口所需要unix时间戳,所以需要把时间转为为unix时间戳格式
定义时间转换为unix时间方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
"""def datetime_timestamp(dt): # dt为字符串 # 中间过程,一般都需要将字符串转化为时间数组 time.strptime(dt, '%y-%m-%d %h:%m:%s') ## time.struct_time(tm_year=2018, tm_mon=10, tm_mday=25, tm_hour=10, tm_min=0, tm_sec=0, tm_wday=0, tm_yday=88, tm_isdst=-1) # 将"2018-10-25 10:00:00"转化为时间戳 s = time.mktime(time.strptime(dt, '%y-%m-%d %h:%m:%s')) return int(s) # 定义连接数据库方法 def get_link_server(): connection = pymssql.connect(server, user, password, database=dbname) if connection: return connection else: raise valueerror('connect dbserver failed.') """ |
定义获取用户列表,因为微信企业号一次最大只能获取100个,所以需要转换为列表格式,分批次获取
我这里设置是从db中获取有权限微信打卡的人员(select * from table),换成自己的方式即可
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
""" def get_userid_list(): """ 获取用户列表 : return : """ conn = get_link_server() cursor = conn.cursor() sql = "select * from table" cursor.execute(sql) row = cursor.fetchone() userlist = [] while row: userlist.append(row[0]) row = cursor.fetchone() if userlist: return userlist else: raise valueerror('get userlist failed.') conn.close() """ |
获取access_token,因为token有时效(2小时),所以需要存在本地,这样不需要频繁调用,所以我定义了存储过程(sp_getwx_access_token)来判断之前存储的token是否有效,有效的话就不需要重复获取了
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
|
""" def get_access_token(refresh=false): """ 获取access token : return : """ if not refresh: api_access_token_url = "https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid=%s&corpsecret=%s" % ( corp_id, corp_secret) response = requests.get(api_access_token_url, verify = false) if response.status_code = = 200 : rep_dict = json.loads(response.text) errcode = rep_dict.get( 'errcode' ) if errcode: raise valueerror( 'get wechat access token failed, errcode=%s.' % errcode) else : access_token = rep_dict.get( 'access_token' ) if access_token: conn = get_link_server() cursor = conn.cursor() cursor.execute( 'exec sp_getwx_access_token @access_token=%s' , access_token) conn.commit() conn.close() return access_token else : raise valueerror( 'get wechat access token failed.' ) else : raise valueerror( 'get wechat access token failed.' ) else : conn = get_link_server() cursor = conn.cursor() cursor.execute( "select access_token from wx_accesstoken where id=1" ) access_token = cursor.fetchone() if access_token: return access_token[ 0 ] conn.close() else : api_access_token_url = "https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid=%s&corpsecret=%s" % ( corp_id, corp_secret) response = requests.get(api_access_token_url, verify = false) if response.status_code = = 200 : rep_dict = json.loads(response.text) errcode = rep_dict.get( 'errcode' ) if errcode: raise valueerror( 'get wechat access token failed, errcode=%s.' % errcode) else : access_token = rep_dict.get( 'access_token' ) if access_token: conn = get_link_server() cursor = conn.cursor() cursor.execute( 'exec sp_getwx_access_token @access_token=%s' , access_token) conn.commit() conn.close() return access_token else : raise valueerror( 'get wechat access token failed.' ) else : raise valueerror( 'get wechat access token failed.' ) # 获取微信打卡的json格式 def get_punchcard_info(access_token, opencheckindatatype, starttime, endtime, useridlist): api_punch_card_url = 'https://qyapi.weixin.qq.com/cgi-bin/checkin/getcheckindata?access_token=' + access_token json_str = json.dumps( { 'opencheckindatatype' : opencheckindatatype, 'starttime' : starttime, 'endtime' : endtime, 'useridlist' : useridlist}) response = requests.post(api_punch_card_url, data = json_str, verify = false) if response.status_code = = 200 : rep_dic = json.loads(response.text) errcode = rep_dic.get( 'errcode' ) if errcode = = 42001 : access_token = get_access_token(true) api_punch_card_url = 'https://qyapi.weixin.qq.com/cgi-bin/checkin/getcheckindata?access_token=' + access_token json_str = json.dumps( { 'opencheckindatatype' : opencheckindatatype, 'starttime' : starttime, 'endtime' : endtime, 'useridlist' : useridlist}) response = requests.post(api_punch_card_url, data = json_str, verify = false) rep_dic = json.loads(response.text) errcode = rep_dic.get( 'errcode' ) if errcode: raise valueerror( 'get punch data failed1, errcode=%s' % errcode) else : value_str = rep_dic.get( 'checkindata' ) if value_str: return value_str else : raise valueerror( 'get punch data failed2.' ) elif errcode: raise valueerror ( 'get punch data failed3, errcode=%s' % errcode) else : value_str = rep_dic.get( 'checkindata' ) if value_str: return value_str else : raise valueerror( 'i do not find employee punch data.' ) else : raise valueerror ( 'get punch data failed5.' ) # 调用接口,获得数据 if __name__ = = '__main__' : today = datetime.date.today() oneday = datetime.timedelta(days = 3 ) # days,即获取几天内的 yesterday = today - oneday starttime = datetime_timestamp(yesterday.strftime( '%y-%m-%d' ) + ' 00:00:00' ) endtime = datetime_timestamp(today.strftime( '%y-%m-%d' ) + ' 23:59:59' ) opencheckindatatype = 3 access_token = get_access_token() if access_token: useridlist = get_userid_list() if useridlist: step = 100 total = len (useridlist) n = math.ceil(total / step) for i in range (n): # print (useridlist[i*step:(i+1)*step]) punch_card = get_punchcard_info(access_token, opencheckindatatype, starttime, endtime,useridlist[i * step:(i + 1 ) * step]) # print (punch_card) if punch_card: conn = get_link_server() cursor = conn.cursor() for dic_obj in punch_card: cursor.execute( 'exec sp_analysispunchcard @json=%s' , (json.dumps(dic_obj, ensure_ascii = false))) # print((json.dumps(dic_obj, ensure_ascii=false))),sp_analysispunchcard把获取到的数据解析后存入数据库中 conn.commit() conn.close() print ( 'get punch card successed.' ) else : raise valueerror( 'no userlist exists' ) |
设置windows计划任务
通过控制面板-管理工具-任务计划程序,右击选择创建基本任务,这里注意的是路径和程序。
程序或脚本:python.exe
添加参数(可选)(a):你的py文件目录
起始于:python目录,如果不知道python安装到哪去了,按照下列cmd命令,输入python后进入python命令查询
1
2
|
import sys sys.prefix,回车 |
到此,配置完成,可自行右击任务-执行查询效果,或者通过python命令执行py文件
进入到py文件目录
python xxx.py
总结
以上所述是小编给大家介绍的python获取微信企业号打卡数据并生成windows计划任务,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对服务器之家网站的支持!
如果你觉得本文对你有帮助,欢迎转载,烦请注明出处,谢谢!原文链接:https://blog.csdn.net/henvey1988/article/details/83411665