其实网上已经有许多python语言书写的串口,但大部分都是python2写的,没有找到一个合适的python编写的串口助手,只能自己来写一个串口助手,由于我只需要串口能够接收读取数据就可以了,故而这个串口助手只实现了数据的接收读取。
创建串口助手首先需要创建一个类,重构类的实现过程如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
|
#coding=gb18030 import threading import time import serial class comthread: def __init__( self , port = 'com3' ): #构造串口的属性 self .l_serial = none self .alive = false self .waitend = none self .port = port self . id = none self .data = none #定义串口等待的函数 def waiting( self ): if not self .waitend is none: self .waitend.wait() def setstopevent( self ): if not self .waitend is none: self .waitend. set () self .alive = false self .stop() #启动串口的函数 def start( self ): self .l_serial = serial.serial() self .l_serial.port = self .port self .l_serial.baudrate = 115200 #设置等待时间,若超出这停止等待 self .l_serial.timeout = 2 self .l_serial. open () #判断串口是否已经打开 if self .l_serial.isopen(): self .waitend = threading.event() self .alive = true self .thread_read = none self .thread_read = threading.thread(target = self .firstreader) self .thread_read.setdaemon( 1 ) self .thread_read.start() return true else : return false |
创建好类后,就要实现串口读取的功能,其读取数据的函数如下:
首先要创建一个字符串来存放接收到的数据:
1
2
|
data = '' data = data.encode( 'utf-8' ) #由于串口使用的是字节,故而要进行转码,否则串口会不识别 |
在创建好变量来接收数据后就要开始接收数据:
1
2
3
4
5
6
7
8
|
n = self .l_serial.inwaiting() #获取接收到的数据长度 if n: #读取数据并将数据存入data data = data + self .l_serial.read(n) #输出接收到的数据 print ( 'get data from serial port:' , data) #显示data的类型,便于如果出错时检查错误 print ( type (data)) |
将数据接收完后,就要对接收到的数据进行处理,提取出有用信息,由于下位机使用的协议不一样,因此处理的方法也不一样,我使用的协议是**+id+*data+*,因此我的提取方法如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
#获取还没接收到的数据长度 n = self .l_serial.inwaiting() #判断是否已经将下位机传输过来的数据全部提取完毕,防止之前没有获取全部数据 if len (data)> 0 and n = = 0 : try : #将数据转换为字符型 temp = data.decode( 'gb18030' ) #输出temp类型,看转码是否成功 print ( type (temp)) #输出接收到的数据 print (temp) 将数据按换行分割并输出 car,temp = str (temp).split( "\n" , 1 ) print (car,temp) #将temp按':'分割,并获取第二个数据 string = str (temp).strip().split( ":" )[ 1 ] #由于前面分割后的数据类型是列表,因此需要转换成字符串,而后按照'*'分割,得到的也就是我们需要的id和data str_id,str_data = str (string).split( "*" , 1 ) print (str_id) print (str_data) print ( type (str_id), type (str_data)) #判断data最后一位是否是'*',若是则退出,若不是则输出异常 if str_data[ - 1 ] = = '*' : break else : print (str_data[ - 1 ]) print ( 'str_data[-1]!=*' ) except : print ( "读卡错误,请重试!\n" ) |
其输出结果为:
1
2
3
4
5
6
7
8
9
|
get data from serial port:b 'id:4a622e29\n\xbf\xa8\xd6\xd0\xbf\xe924\xca\xfd\xbe\xdd\xce\xaa:1*80*\r\n' < class 'bytes' > < class 'str' > id : 4a622e29 卡中块 24 数据为: 1 * 80 * id : 4a622e29 卡中块 24 数据为: 1 * 80 * 80 * < class 'str' > < class 'str' > |
串口助手完整代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
|
#coding=gb18030 import threading import time import serial class comthread: def __init__( self , port = 'com3' ): self .l_serial = none self .alive = false self .waitend = none self .port = port self . id = none self .data = none def waiting( self ): if not self .waitend is none: self .waitend.wait() def setstopevent( self ): if not self .waitend is none: self .waitend. set () self .alive = false self .stop() def start( self ): self .l_serial = serial.serial() self .l_serial.port = self .port self .l_serial.baudrate = 115200 self .l_serial.timeout = 2 self .l_serial. open () if self .l_serial.isopen(): self .waitend = threading.event() self .alive = true self .thread_read = none self .thread_read = threading.thread(target = self .firstreader) self .thread_read.setdaemon( 1 ) self .thread_read.start() return true else : return false def senddate( self ,i_msg,send): lmsg = '' isok = false if isinstance (i_msg): lmsg = i_msg.encode( 'gb18030' ) else : lmsg = i_msg try : # 发送数据到相应的处理组件 self .l_serial.write(send) except exception as ex: pass ; return isok def firstreader( self ): while self .alive: time.sleep( 0.1 ) data = '' data = data.encode( 'utf-8' ) n = self .l_serial.inwaiting() if n: data = data + self .l_serial.read(n) print ( 'get data from serial port:' , data) print ( type (data)) n = self .l_serial.inwaiting() if len (data)> 0 and n = = 0 : try : temp = data.decode( 'gb18030' ) print ( type (temp)) print (temp) car,temp = str (temp).split( "\n" , 1 ) print (car,temp) string = str (temp).strip().split( ":" )[ 1 ] str_id,str_data = str (string).split( "*" , 1 ) print (str_id) print (str_data) print ( type (str_id), type (str_data)) if str_data[ - 1 ] = = '*' : break else : print (str_data[ - 1 ]) print ( 'str_data[-1]!=*' ) except : print ( "读卡错误,请重试!\n" ) self . id = str_id self .data = str_data[ 0 : - 1 ] self .waitend. set () self .alive = false def stop( self ): self .alive = false self .thread_read.join() if self .l_serial.isopen(): self .l_serial.close() #调用串口,测试串口 def main(): rt = comthread() rt.sendport = '**1*80*' try : if rt.start(): print (rt.l_serial.name) rt.waiting() print ( "the data is:%s,the id is:%s" % (rt.data,rt. id )) rt.stop() else : pass except exception as se: print ( str (se)) if rt.alive: rt.stop() print ('') print ( 'end ok .' ) temp_id = rt. id temp_data = rt.data del rt return temp_id,temp_data if __name__ = = '__main__' : #设置一个主函数,用来运行窗口,便于若其他地方下需要调用串口是可以直接调用main函数 id ,data = main() print ( "******" ) print ( id ,data) |
以上这篇对python3 serial 串口助手的接收读取数据方法详解就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持服务器之家。
原文链接:https://www.cnblogs.com/attentle/p/7098408.html