本文实例讲述了Python模拟简单电梯调度算法。分享给大家供大家参考,具体如下:
经常在公司坐电梯,由于楼层较高,是双联装的电梯,但是经常等电梯很久,经常有人骂写电梯调度算法的。回来闲来无事,自己尝试写了一个简单的。
场景很简单,每一层电梯口只有一个按钮,不区分上下,当有人按下这个键后,电梯会过来停在此层,这个人可以进去,并选择自己想去的层。电梯的调度策略也很简单,在一次向上的过程中,如果有人在下面按了键,电梯并不直接向下,而是运行到此次向上的最顶层,然后再下次向下运行的过程中去服务这个请求。
elevator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
|
import time from myque import myque class elevator: def __init__( self ,layers): self .building_layers = layers self .direction = 'up' self .cur_layer = 1 self .up_queue = myque() self .down_queue = myque( True ) self .switcher = 'open' def stop( self ): self .switcher = 'stop' def push_button( self ,layer,direction = None ): if self .cur_layer>layer: self .down_queue.insert(layer) elif self .cur_layer<layer: self .up_queue.insert(layer) else : if self .direction = = 'up' : self .down_queue.insert(layer) else : self .up_queue.insert(layer) def handle_queue( self ,direction): self .direction = direction if direction = = 'up' : inc = 1 else : inc = - 1 que = getattr ( self , direction + '_queue' ) while que.length(): while self .cur_layer ! = que.front(): print '/nelevator in ' , self .cur_layer time.sleep( 1 ) self .cur_layer + = inc print '/nelevator arrives at ' , self .cur_layer que.pop_front() def run( self ): while self .switcher = = 'open' : if self .up_queue.empty() and self .down_queue.empty(): """elevator now is waiting, stop at a layer""" time.sleep( 1 ) continue """go up""" self .handle_queue( 'up' ) """go down""" self .handle_queue( 'down' ) |
myque.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
import threading class myque: def __init__( self ,reverse = False ): self .mode = reverse self .buf = [] self .lock = threading.Lock() def insert( self , object ): self .lock.acquire() self .buf.append( object ) self .buf.sort(reverse = self .mode) self .lock.release() def front( self ): return self .buf[ 0 ] def pop_front( self ): self .lock.acquire() self .buf.pop( 0 ) self .lock.release() def length( self ): self .lock.acquire() size = len ( self .buf) self .lock.release() return size def empty( self ): self .lock.acquire() size = len ( self .buf) self .lock.release() return size = = 0 |
deploy.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
import threading from elevator import elevator def init_elevator(building_layers): e = elevator(building_layers) t = threading.Thread(target = e.run) t.setDaemon( True ) t.start() return (e,t) def main(): myelevator,ctl_thread = init_elevator( 17 ) while True : str = raw_input ( "Input valid layer :" ) try : layer = int ( str ) except Exception: if str = = 'quit' : myelevator.stop() ctl_thread.join() break else : print 'invalid input' , str continue if layer not in range ( 1 ,myelevator.building_layers + 1 ): continue myelevator.push_button(layer) if __name__ = = '__main__' : main() |
运行结果如下:
如果扩展的话,很容易将各层的按钮扩展为带上下指示的。如果有机会可以扩展为多联装电梯,并将调度算法做的更加智能,可以根据历史数据和时间进行动态调整。
希望本文所述对大家Python程序设计有所帮助。
原文链接:https://blog.csdn.net/zzulp/article/details/5681482