理解一个算法最快,最深刻的做法,我觉着可能是自己手动实现,虽然项目中不用自己实现,有已经封装好的算法库,供我们调用,我觉着还是有必要自己亲自实践一下。
这里首先说明一下,python这种动态语言,对不熟悉的人可能看着比较别扭,不像java那样参数类型是固定的,所以看着会有些蛋疼。这里环境用的是python2.7。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
class Message: # command MSG_ACCEPTOR_AGREE = 0 # 追随者约定 MSG_ACCEPTOR_ACCEPT = 1 # 追随者接受 MSG_ACCEPTOR_REJECT = 2 # 追随者拒绝-网络不通 MSG_ACCEPTOR_UNACCEPT = 3 # 追随者网络通-不同意 MSG_ACCEPT = 4 # 接受 MSG_PROPOSE = 5 # 提议 MSG_EXT_PROPOSE = 6 # 额外提议 MSG_HEARTBEAT = 7 # 心跳,每隔一段时间同步消息 def __init__( self , command = None ): self .command = command # 把收到的消息原原路返回,作为应答消息 def copyAsReply( self , message): # 提议ID #当前的ID #发给谁 #谁发的 self .proposalID, self .instanceID, self .to, self .source = message.proposalID, message.instanceID, message.source, message.to self .value = message.value # 发的信息 |
然后是利用socket,线程和队列实现的消息处理器:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
|
# 基于socket传递消息,封装网络传递消息 import threading import pickle import socket import queue class MessagePump(threading.Thread): # 收取消息线程 class MPHelper(threading.Thread): # def __init__( self , owner): self .owner = owner threading.Thread.__init__( self ) def run( self ): while not self .owner.abort: # 只要所有者线程没有结束,一直接受消息 try : (bytes, addr) = self .owner.socket.recvfrom( 2048 ) # 收取消息 msg = pickle.loads(bytes) # 读取二进制数据转化为消息 msg.source = addr[ 1 ] self .owner.queue.put(msg) # 队列存入消息 except Exception as e: pass def __init__( self , owner, port, timeout = 2 ): threading.Thread.__init__( self ) self .owner = owner self .abort = False self .timeout = 2 self .port = port self .socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # UDP通信 self .socket.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 200000 ) # 通信参数 self .socket.bind(( "localhost" , port)) # 通信地址,ip,端口 self .socket.settimeout(timeout) # 超时设置 self .queue = queue.Queue() # 队列 self .helper = MessagePump.MPHelper( self ) # 接收消息 # 运行主线程 def run( self ): self .helper.start() # 开启收消息的线程 while not self .abort: message = self .waitForMessage() # 阻塞等待 self .owner.recvMessage(message) # 收取消息 # 等待消息 def waitForMessage( self ): try : msg = self .queue.get( True , 3 ) # 抓取数据,最多等待3s return msg except : return None # 发送消息 def sendMessage( self , message): bytes = pickle.dumps(message) # 转化为二进制 address = ( "localhost" , message.to) # 地址ip,端口(ip,port) self .socket.sendto(bytes, address) return True #是否停止收取消息 def doAbort( self ): self .abort = True |
再来一个消息处理器,模拟消息的传递,延迟,丢包,其实这个类没什么卵用,这个是为模拟测试准备的
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
from MessagePump import MessagePump import random class AdversarialMessagePump(MessagePump): # 类的继承 # 对抗消息传输,延迟消息并任意顺序传递,模拟网络的延迟,消息传送并不是顺序 def __init__( self , owner, port, timeout = 2 ): MessagePump.__init__( self , owner, port, timeout) # 初始化父类 self .messages = set () # 集合避免重复 def waitForMessage( self ): try : msg = self .queue.get( True , 0.1 ) # 从队列抓取数据 self .messages.add(msg) # 添加消息 except Exception as e: # 处理异常 pass # print(e) if len ( self .messages) > 0 and random.random() < 0.95 : # Arbitrary! msg = random.choice( list ( self .messages)) # 随机抓取消息发送 self .messages.remove(msg) # 删除消息 else : msg = None return msg |
再来一个是记录类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
# InstanceRecord本地记录类,主要记录追随者、领导者最高编号的协议 from PaxosLeaderProtocol import PaxosLeaderProtocol class InstanceRecord: def __init__( self ): self .protocols = {} self .highestID = ( - 1 , - 1 ) # (port,count) self .value = None def addProtocol( self , protocol): self .protocols[protocol.proposalID] = protocol # if protocol.proposalID[ 1 ] > self .highestID[ 1 ] or ( protocol.proposalID[ 1 ] = = self .highestID[ 1 ] and protocol.proposalID[ 0 ] > self .highestID[ 0 ]): self .highestID = protocol.proposalID # 取得编号最大的协议 def getProtocol( self , protocolID): return self .protocols[protocolID] def cleanProtocols( self ): keys = self .protocols.keys() for k in keys: protocol = self .protocols[k] if protocol.state = = PaxosLeaderProtocol.STATE_ACCEPTED: print ( "删除协议" ) del self .protocols[k] |
下面就是Acceptor的实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
|
# 追随者 from MessagePump import MessagePump from Message import Message from InstanceRecord import InstanceRecord from PaxosAcceptorProtocol import PaxosAcceptorProtocol class PaxosAcceptor: def __init__( self , port, leaders): self .port = port self .leaders = leaders self .instances = {} # 接口列表 self .msgPump = MessagePump( self , self .port) # 消息传递器 self .failed = False # 开始消息传送 def start( self ): self .msgPump.start() # 停止 def stop( self ): self .msgPump.doAbort() # 失败 def fail( self ): self .failed = True def recover( self ): self .failed = False # 发送消息 def sendMessage( self , message): self .msgPump.sendMessage(message) # 收消息,只收取为提议的消息 def recvMessage( self , message): if message = = None : return if self .failed: # 失败状态不收取消息 return if message.command = = Message.MSG_PROPOSE: # 判断消息是否为提议 if message.instanceID not in self .instances: record = InstanceRecord() # 记录器 self .instances[message.instanceID] = record protocol = PaxosAcceptorProtocol( self ) # 创建协议 protocol.recvProposal(message) # 收取消息 self .instances[message.instanceID].addProtocol(protocol) else : self .instances[message.instanceID].getProtocol(message.proposalID).doTransition(message) # 通知客户端, def notifyClient( self , protocol, message): if protocol.state = = PaxosAcceptorProtocol.STATE_PROPOSAL_ACCEPTED: # 提议被接受,通知 self .instances[protocol.instanceID].value = message.value # 储存信息 print (u "协议被客户端接受 %s" % message.value) # 获取最高同意的建议 def getHighestAgreedProposal( self , instance): return self .instances[instance].highestID # (port,count) # 获取接口数据 def getInstanceValue( self , instance): return self .instances[instance].value |
那再看下AcceptorProtocol的实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
|
from Message import Message class PaxosAcceptorProtocol( object ): # State variables STATE_UNDEFINED = - 1 # 协议没有定义的情况0 STATE_PROPOSAL_RECEIVED = 0 # 收到消息 STATE_PROPOSAL_REJECTED = 1 # 拒绝链接 STATE_PROPOSAL_AGREED = 2 # 同意链接 STATE_PROPOSAL_ACCEPTED = 3 # 同意请求 STATE_PROPOSAL_UNACCEPTED = 4 # 拒绝请求 def __init__( self , client): self .client = client self .state = PaxosAcceptorProtocol.STATE_UNDEFINED # 收取,只处理协议类型的消息 def recvProposal( self , message): if message.command = = Message.MSG_PROPOSE: # 协议 self .proposalID = message.proposalID self .instanceID = message.instanceID (port, count) = self .client.getHighestAgreedProposal(message.instanceID) # 端口,协议内容的最高编号 # 检测编号处理消息协议 # 判断协议是否最高 if count < self .proposalID[ 1 ] or (count = = self .proposalID[ 1 ] and port < self .proposalID[ 0 ]): self .state = PaxosAcceptorProtocol.STATE_PROPOSAL_AGREED # 协议同意 print ( "同意协议:%s, %s " % (message.instanceID, message.value)) value = self .client.getInstanceValue(message.instanceID) msg = Message(Message.MSG_ACCEPTOR_AGREE) # 同意协议 msg.copyAsReply(message) msg.value = value msg.sequence = (port, count) self .client.sendMessage(msg) # 发送消息 else : # 不再接受比最高协议小的提议 self .state = PaxosAcceptorProtocol.STATE_PROPOSAL_REJECTED return self .proposalID else : # 错误重试 pass # 过度 def doTransition( self , message): # 如果当前协议状态是接受连接,消息类型是接受 if self .state = = PaxosAcceptorProtocol.STATE_PROPOSAL_AGREED and message.command = = Message.MSG_ACCEPT: self .state = PaxosAcceptorProtocol.STATE_PROPOSAL_ACCEPTED # 接收协议 msg = Message(Message.MSG_ACCEPTOR_ACCEPT) # 创造消息 msg.copyAsReply(message) # 拷贝并回复 for l in self .client.leaders: msg.to = l self .client.sendMessage(msg) # 给领导发送消息 self .notifyClient(message) # 通知自己 return True raise Exception( "并非预期的状态和命令" ) # 通知 自己客户端 def notifyClient( self , message): self .client.notifyClient( self , message) |
接着看下Leader和LeaderProtocol实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
|
# 领导者 import threading import Queue import time from Message import Message from MessagePump import MessagePump from InstanceRecord import InstanceRecord from PaxosLeaderProtocol import PaxosLeaderProtocol class PaxosLeader: # 定时监听 class HeartbeatListener(threading.Thread): def __init__( self , leader): self .leader = leader self .queue = Queue.Queue() # 消息队列 self .abort = False threading.Thread.__init__( self ) def newHB( self , message): self .queue.put(message) def doAbort( self ): self .abort = True def run( self ): # 读取消息 elapsed = 0 while not self .abort: s = time.time() try : hb = self .queue.get( True , 2 ) # 设定规则,谁的端口号比较高,谁就是领导 if hb.source > self .leader.port: self .leader.setPrimary( False ) except : self .leader.setPrimary( True ) # 定时发送 class HeartbeatSender(threading.Thread): def __init__( self , leader): threading.Thread.__init__( self ) self .leader = leader self .abort = False def doAbort( self ): self .abort = True def run( self ): while not self .abort: time.sleep( 1 ) if self .leader.isPrimary: msg = Message(Message.MSG_HEARTBEAT) msg.source = self .leader.port for leader in self .leader.leaders: msg.to = leader self .leader.sendMessage(msg) def __init__( self , port, leaders = None , acceptors = None ): self .port = port if leaders = = None : self .leaders = [] else : self .leaders = leaders if acceptors = = None : self .acceptors = [] else : self .acceptors = acceptors self .group = self .leaders + self .acceptors # 集合合并 self .isPrimary = False # 自身是不是领导 self .proposalCount = 0 self .msgPump = MessagePump( self , port) # 消息传送器 self .instances = {} self .hbListener = PaxosLeader.HeartbeatListener( self ) # 监听 self .hbSender = PaxosLeader.HeartbeatSender( self ) # 发送心跳 self .highestInstance = - 1 # 协议状态 self .stoped = True # 是否正在运行 self .lasttime = time.time() # 最后一次时间 def sendMessage( self , message): self .msgPump.sendMessage(message) def start( self ): self .hbSender.start() self .hbListener.start() self .msgPump.start() self .stoped = False def stop( self ): self .hbSender.doAbort() self .hbListener.doAbort() self .msgPump.doAbort() self .stoped = True def setPrimary( self , primary): # 设置领导者 if self .isPrimary ! = primary: # Only print if something's changed if primary: print (u "我是leader%s" % self .port) else : print (u "我不是leader%s" % self .port) self .isPrimary = primary # 获取所有的领导下面的追随者 def getGroup( self ): return self .group def getLeaders( self ): return self .leaders def getAcceptors( self ): return self .acceptors # 必须获得1/2以上的人支持 def getQuorumSize( self ): return ( len ( self .getAcceptors()) / 2 ) + 1 def getInstanceValue( self , instanceID): if instanceID in self .instances: return self .instances[instanceID].value return None def getHistory( self ): # 历史记录 return [ self .getInstanceValue(i) for i in range ( 1 , self .highestInstance + 1 )] # 抓取同意的数量 def getNumAccpted( self ): return len ([v for v in self .getHistory() if v ! = None ]) # 抓取空白时间处理下事务 def findAndFillGaps( self ): for i in range ( 1 , self .highestInstance): if self .getInstanceValue(i) = = None : print ( "填充空白" , i) self .newProposal( 0 , i) self .lasttime = time.time() # 采集无用信息 def garbageCollect( self ): for i in self .instances: self .instances[i].cleanProtocols() # 通知领导 def recvMessage( self , message): if self .stoped: return if message = = None : if self .isPrimary and time.time() - self .lasttime > 15.0 : self .findAndFillGaps() self .garbageCollect() return #处理心跳信息 if message.command = = Message.MSG_HEARTBEAT: self .hbListener.newHB(message) return True #处理额外的提议 if message.command = = Message.MSG_EXT_PROPOSE: print ( "额外的协议" , self .port, self .highestInstance) if self .isPrimary: self .newProposal(message.value) return True if self .isPrimary and message.command ! = Message.MSG_ACCEPTOR_ACCEPT: self .instances[message.instanceID].getProtocol(message.proposalID).doTransition(message) if message.command = = Message.MSG_ACCEPTOR_ACCEPT: if message.instanceID not in self .instances: self .instances[message.instanceID] = InstanceRecord() record = self .instances[message.instanceID] if message.proposalID not in record.protocols: #创建协议 protocol = PaxosLeaderProtocol( self ) protocol.state = PaxosLeaderProtocol.STATE_AGREED protocol.proposalID = message.proposalID protocol.instanceID = message.instanceID protocol.value = message.value record.addProtocol(protocol) else : protocol = record.getProtocol(message.proposalID) protocol.doTransition(message) return True # 新建提议 def newProposal( self , value, instance = None ): protocol = PaxosLeaderProtocol( self ) if instance = = None : # 创建协议标号 self .highestInstance + = 1 instanceID = self .highestInstance else : instanceID = instance self .proposalCount + = 1 id = ( self .port, self .proposalCount) if instanceID in self .instances: record = self .instances[instanceID] else : record = InstanceRecord() self .instances[instanceID] = record protocol.propose(value, id , instanceID) record.addProtocol(protocol) def notifyLeader( self , protocol, message): if protocol.state = = PaxosLeaderProtocol.STATE_ACCEPTED: print ( "协议接口%s被%s接受" % (message.instanceID, message.value)) self .instances[message.instanceID].accepted = True self .instances[message.instanceID].value = message.value self .highestInstance = max (message.instanceID, self .highestInstance) return if protocol.state = = PaxosLeaderProtocol.STATE_REJECTED: # 重新尝试 self .proposalCount = max ( self .proposalCount, message.highestPID[ 1 ]) self .newProposal(message.value) return True if protocol.state = = PaxosLeaderProtocol.STATE_UNACCEPTED: pass |
LeaderProtocol实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
|
from Message import Message class PaxosLeaderProtocol( object ): STATE_UNDEFINED = - 1 # 协议没有定义的情况0 STATE_PROPOSED = 0 # 协议消息 STATE_REJECTED = 1 # 拒绝链接 STATE_AGREED = 2 # 同意链接 STATE_ACCEPTED = 3 # 同意请求 STATE_UNACCEPTED = 4 # 拒绝请求 def __init__( self , leader): self .leader = leader self .state = PaxosLeaderProtocol.STATE_UNDEFINED self .proposalID = ( - 1 , - 1 ) self .agreecount, self .acceptcount = ( 0 , 0 ) self .rejectcount, self .unacceptcount = ( 0 , 0 ) self .instanceID = - 1 self .highestseen = ( 0 , 0 ) # 提议 def propose( self , value, pID, instanceID): self .proposalID = pID self .value = value self .instanceID = instanceID message = Message(Message.MSG_PROPOSE) message.proposalID = pID message.instanceID = instanceID message.value = value for server in self .leader.getAcceptors(): message.to = server self .leader.sendMessage(message) self .state = PaxosLeaderProtocol.STATE_PROPOSED return self .proposalID # 過度 def doTransition( self , message): # 根據狀態運行協議 if self .state = = PaxosLeaderProtocol.STATE_PROPOSED: if message.command = = Message.MSG_ACCEPTOR_AGREE: self .agreecount + = 1 if self .agreecount > = self .leader.getQuorumSize(): # 选举 print (u "达成协议的法定人数,最后的价值回答是:%s" % message.value) if message.value ! = None : if message.sequence[ 0 ] > self .highestseen[ 0 ] or ( message.sequence[ 0 ] = = self .highestseen[ 0 ] and message.sequence[ 1 ] > self .highestseen[ 1 ]): self .value = message.value self .highestseen = message.sequence self .state = PaxosLeaderProtocol.STATE_AGREED # 同意更新 # 发送同意消息 msg = Message(Message.MSG_ACCEPT) msg.copyAsReply(message) msg.value = self .value msg.leaderID = msg.to for server in self .leader.getAcceptors(): msg.to = server self .leader.sendMessage(msg) self .leader.notifyLeader( self , message) return True if message.command = = Message.MSG_ACCEPTOR_REJECT: self .rejectcount + = 1 if self .rejectcount > = self .leader.getQuorumSize(): self .state = PaxosLeaderProtocol.STATE_REJECTED self .leader.notifyLeader( self , message) return True if self .state = = PaxosLeaderProtocol.STATE_AGREED: if message.command = = Message.MSG_ACCEPTOR_ACCEPT: # 同意协议 self .acceptcount + = 1 if self .acceptcount > = self .leader.getQuorumSize(): self .state = PaxosLeaderProtocol.STATE_ACCEPTED # 接受 self .leader.notifyLeader( self , message) if message.command = = Message.MSG_ACCEPTOR_UNACCEPT: self .unacceptcount + = 1 if self .unacceptcount > = self .leader.getQuorumSize(): self .state = PaxosLeaderProtocol.STATE_UNACCEPTED self .leader.notifyLeader( self , message) |
测试模块:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
|
import socket, pickle, time from Message import Message from PaxosAcceptor import PaxosAcceptor from PaxosLeader import PaxosLeader if __name__ = = "__main__" : # 设定5个客户端 numclients = 5 clients = [PaxosAcceptor(port, [ 54321 , 54322 ]) for port in range ( 64320 , 64320 + numclients)] # 两个领导者 leader1 = PaxosLeader( 54321 , [ 54322 ], [c.port for c in clients]) leader2 = PaxosLeader( 54322 , [ 54321 ], [c.port for c in clients]) # 开启领导者与追随者 leader1.start() leader1.setPrimary( True ) leader2.setPrimary( True ) leader2.start() for c in clients: c.start() # 破坏,客户端不链接 clients[ 0 ].fail() clients[ 1 ].fail() # 通信 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # udp协议 start = time.time() for i in range ( 1000 ): m = Message(Message.MSG_EXT_PROPOSE) # 消息 m.value = 0 + i # 消息参数 m.to = 54322 # 设置传递的端口 bytes = pickle.dumps(m) # 提取的二进制数据 s.sendto(bytes, ( "localhost" , m.to)) # 发送消息 while leader2.getNumAccpted() < 999 : print ( "休眠的这一秒 %d " % leader2.getNumAccpted()) time.sleep( 1 ) print (u "休眠10秒" ) time.sleep( 10 ) print (u "停止leaders" ) leader1.stop() leader2.stop() print (u "停止客户端" ) for c in clients: c.stop() print (u "leader1历史纪录" ) print (leader1.getHistory()) print (u "leader2历史纪录" ) print (leader2.getHistory()) end = time.time() print (u "一共用了%f秒" % (end - start)) |
代码确实比较长,看起来有些困难,最好还是在pycharm上看这个逻辑,可以快速定位参数指向,如果有不对的地方欢迎指正
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持服务器之家。
原文链接:https://blog.csdn.net/baidu_17508977/article/details/80741916