引子
之前clubot使用的pyxmpp2的默认mainloop也就是一个poll的主循环,但是clubot上线后资源占用非常厉害,使用strace跟踪发现clubot在不停的poll,查看pyxmpp2代码发现pyxmpp2的poll在使用超时阻塞时使用最小超时时间,而最小超时时间一直是0,所以会变成一个没有超时的非阻塞poll很浪费资源,不打算更改库代码,所以自己仿照poll的mainloop写了一个更加高效的epoll的mainloop
实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
|
#!/usr/bin/env python # -*- coding:utf-8 -*- # # Author : cold # E-mail : wh_linux@126.com # Date : 13/01/06 10:41:31 # Desc : Clubot epoll mainloop # from __future__ import absolute_import, division import select from pyxmpp2.mainloop.interfaces import HandlerReady, PrepareAgain from pyxmpp2.mainloop.base import MainLoopBase from plugin.util import get_logger class EpollMainLoop(MainLoopBase): """ Main event loop based on the epoll() syscall on Linux system """ READ_ONLY = (select.EPOLLIN | select.EPOLLPRI | select.EPOLLHUP | select.EPOLLERR |select.EPOLLET) READ_WRITE = READ_ONLY | select.EPOLLOUT def __init__( self , settings = None , handlers = None ): self .epoll = select.epoll() self ._handlers = {} self ._unprepared_handlers = {} self ._timeout = None self ._exists_fd = {} self .logger = get_logger() MainLoopBase.__init__( self , settings, handlers) return def _add_io_handler( self , handler): self ._unprepared_handlers[handler] = None self ._configure_io_handler(handler) def _configure_io_handler( self , handler): if self .check_events(): return if handler in self ._unprepared_handlers: old_fileno = self ._unprepared_handlers[handler] prepared = self ._prepare_io_handler(handler) else : old_fileno = None prepared = True fileno = handler.fileno() if old_fileno is not None and fileno ! = old_fileno: del self ._handlers[old_fileno] self ._exists.pop(old_fileno, None ) self .epoll.unregister(old_fileno) if not prepared: self ._unprepared_handlers[handler] = fileno if not fileno: return self ._handlers[fileno] = handler events = 0 if handler.is_readable(): events | = self .READ_ONLY if handler.is_writable(): events | = self .READ_WRITE if events: if fileno in self ._exists_fd: self .epoll.modify(fileno, events) else : self ._exists_fd.update({fileno: 1 }) self .epoll.register(fileno, events) def _prepare_io_handler( self , handler): ret = handler.prepare() if isinstance (ret, HandlerReady): del self ._unprepared_handlers[handler] prepared = True elif isinstance (ret, PrepareAgain): if ret.timeout is not None : if self ._timeout is not None : self ._timeout = min ( self ._timeout, ret.timeout) else : self ._timeout = ret.timeout prepared = False else : raise TypeError( "Unexpected result from prepare()" ) return prepared def _remove_io_handler( self , handler): if handler in self ._unprepared_handlers: old_fileno = self ._unprepared_handlers[handler] del self ._unprepared_handlers[handler] else : old_fileno = handler.fileno() if old_fileno is not None : try : del self ._handlers[old_fileno] self ._exists.pop(old_fileno, None ) self .epoll.unregister(old_fileno) except KeyError: pass def loop_iteration( self , timeout = 60 ): next_timeout, sources_handled = self ._call_timeout_handlers() if self .check_events(): return if self ._quit: return sources_handled for handler in list ( self ._unprepared_handlers): self ._configure_io_handler(handler) if self ._timeout is not None : timeout = min (timeout, self ._timeout) if next_timeout is not None : timeout = min (next_timeout, timeout) if timeout = = 0 : timeout + = 1 # 带有超时的非阻塞,解约资源 events = self .epoll.poll(timeout) for fd, flag in events: if flag & (select.EPOLLIN | select.EPOLLPRI | select.EPOLLET): self ._handlers[fd].handle_read() if flag & (select.EPOLLOUT|select.EPOLLET): self ._handlers[fd].handle_write() if flag & (select.EPOLLERR | select.EPOLLET): self ._handlers[fd].handle_err() if flag & (select.EPOLLHUP | select.EPOLLET): self ._handlers[fd].handle_hup() #if flag & select.EPOLLNVAL: #self._handlers[fd].handle_nval() sources_handled + = 1 self ._configure_io_handler( self ._handlers[fd]) return sources_handled |
使用
如何使用新的mainloop?只需在实例化Client时传入
1
2
|
mainloop = EpollMainLoop(settings) client = Client(my_jid, [ self , version_provider], settings, mainloop) |
这样就会使用epoll作为mainloop
注意
epoll仅仅在Linux下支持