storm操作zookeeper的主要函数都定义在命名空间backtype.storm.cluster中(即cluster.clj文件中)。 backtype.storm.cluster定义了两个重要protocol:ClusterState和StormClusterState。
clojure中的protocol可以看成java中的接口,封装了一组方法。ClusterState协议中封装了一组与zookeeper进行交互的基础函数,如获取子节点函数,获取子节点数据函数等,ClusterState协议定义如下:
ClusterState协议
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
(defprotocol ClusterState (set-ephemeral-node [this path data]) (delete-node [this path]) (create-sequential [this path data]) ;; if node does not exist, create persistent with this data (set-data [this path data]) (get-data [this path watch?]) (get-version [this path watch?]) (get-data-with-version [this path watch?]) (get-children [this path watch?]) (mkdirs [this path]) (close [this]) (register [this callback]) (unregister [this id])) |
StormClusterState协议封装了一组storm与zookeeper进行交互的函数,可以将StormClusterState协议中的函数看成ClusterState协议中函数的"组合"。StormClusterState协议定义如下:
StormClusterState协议
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
(defprotocol StormClusterState (assignments [this callback]) (assignment-info [this storm-id callback]) (assignment-info-with-version [this storm-id callback]) (assignment-version [this storm-id callback]) (active-storms [this]) (storm-base [this storm-id callback]) (get-worker-heartbeat [this storm-id node port]) (executor-beats [this storm-id executor->node+port]) (supervisors [this callback]) (supervisor-info [this supervisor-id]) ;; returns nil if doesn't exist (setup-heartbeats! [this storm-id]) (teardown-heartbeats! [this storm-id]) (teardown-topology-errors! [this storm-id]) (heartbeat-storms [this]) (error-topologies [this]) (worker-heartbeat! [this storm-id node port info]) (remove-worker-heartbeat! [this storm-id node port]) (supervisor-heartbeat! [this supervisor-id info]) (activate-storm! [this storm-id storm-base]) (update-storm! [this storm-id new-elems]) (remove-storm-base! [this storm-id]) (set-assignment! [this storm-id info]) (remove-storm! [this storm-id]) (report-error [this storm-id task-id node port error]) (errors [this storm-id task-id]) (disconnect [this])) |
命名空间backtype.storm.cluster除了定义ClusterState和StormClusterState这两个重要协议外,还定义了两个重要函数:mk-distributed-cluster-state和mk-storm-cluster-state。
mk-distributed-cluster-state函数如下:
该函数返回一个实现了ClusterState协议的对象,通过这个对象就可以与zookeeper进行交互了。
mk-distributed-cluster-state函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
|
(defn mk-distributed-cluster-state ;; conf绑定了storm.yaml中的配置信息,是一个map对象 [conf] ;; zk绑定一个zk client,Storm使用CuratorFramework与Zookeeper进行交互 (let [zk (zk/mk-client conf (conf STORM-ZOOKEEPER-SERVERS) (conf STORM-ZOOKEEPER-PORT) :auth-conf conf)] ;; 创建storm集群在zookeeper上的根目录,默认值为/storm (zk/mkdirs zk (conf STORM-ZOOKEEPER-ROOT)) (.close zk)) ;; callbacks绑定回调函数集合,是一个map对象 (let [callbacks (atom {}) ;; active标示zookeeper集群状态 active (atom true) ;; zk重新绑定新的zk client,该zk client设置了watcher,这样当zookeeper集群的状态发生变化时,zk server会给zk client发送相应的event,zk client设置的watcher会调用callbacks中相应回调函数来处理event ;; 启动nimbus时,callbacks是一个空集合,所以nimbus端收到event后不会调用任何回调函数;但是启动supervisor时,callbacks中注册了回调函数,所以当supervisor收到zk server发送的event后,会调用相应的回调函数 ;; mk-client函数定义在zookeeper.clj文件中,请参见其定义部分 zk (zk/mk-client conf (conf STORM-ZOOKEEPER-SERVERS) (conf STORM-ZOOKEEPER-PORT) :auth-conf conf :root (conf STORM-ZOOKEEPER-ROOT) ;; :watcher绑定一个函数,指定zk client的默认watcher函数,state标示当前zk client的状态;type标示事件类型;path标示zookeeper上产生该事件的znode ;; 该watcher函数主要功能就是执行callbacks集合中的函数,callbacks集合中的函数是在mk-storm-cluster-state函数中通过调用ClusterState的register函数添加的 :watcher (fn [state type path] (when @active (when-not (= :connected state) (log-warn "Received event " state ":" type ":" path " with disconnected Zookeeper.")) (when-not (= :none type) (doseq [callback (vals @callbacks)] (callback type path))))))] ;; reify相当于java中的implements,这里表示实现一个协议 (reify ClusterState ;; register函数用于将回调函数加入callbacks中,key是一个32位的标识 (register [this callback] (let [id (uuid)] (swap! callbacks assoc id callback) id)) ;; unregister函数用于将指定key的回调函数从callbacks中删除 (unregister [this id] (swap! callbacks dissoc id)) ;; 在zookeeper上添加一个临时节点 (set-ephemeral-node [this path data] (zk/mkdirs zk (parent-path path)) (if (zk/exists zk path false) (try-cause (zk/set-data zk path data) ; should verify that it's ephemeral (catch KeeperException$NoNodeException e (log-warn-error e "Ephemeral node disappeared between checking for existing and setting data") (zk/create-node zk path data :ephemeral) )) (zk/create-node zk path data :ephemeral))) ;; 在zookeeper上添加一个顺序节点 (create-sequential [this path data] (zk/create-node zk path data :sequential)) ;; 修改某个节点数据 (set-data [this path data] ;; note: this does not turn off any existing watches (if (zk/exists zk path false) (zk/set-data zk path data) (do (zk/mkdirs zk (parent-path path)) (zk/create-node zk path data :persistent)))) ;; 删除指定节点 (delete-node [this path] (zk/delete-recursive zk path)) ;; 获取指定节点数据。path标示节点路径;watch?是一个布尔类型值,表示是否需要对该节点进行"观察",如果watch?=true,当调用set-data函数修改该节点数据后, ;; 会给zk client发送一个事件,zk client接收事件后,会调用创建zk client时指定的默认watcher函数(即:watcher绑定的函数) (get-data [this path watch?] (zk/get-data zk path watch?)) ;; 与get-data函数的区别就是获取指定节点数据的同时,获取节点数据的version,version表示节点数据修改的次数 (get-data-with-version [this path watch?] (zk/get-data-with-version zk path watch?)) ;; 获取指定节点的version,watch?的含义与get-data函数中的watch?相同 (get-version [this path watch?] (zk/get-version zk path watch?)) ;; 获取指定节点的子节点列表,watch?的含义与get-data函数中的watch?相同 (get-children [this path watch?] (zk/get-children zk path watch?)) ;; 在zookeeper上创建一个节点 (mkdirs [this path] (zk/mkdirs zk path)) ;; 关闭zk client (close [this] (reset! active false) (.close zk))))) |
mk-storm-cluster-state函数定义如下:
mk-storm-cluster-state函数非常重要,该函数返回一个实现了StormClusterState协议的实例,通过该实例storm就可以更加方便与zookeeper进行交互。
在启动nimbus和supervisor的函数中均调用了mk-storm-cluster-state函数。关于nimbus和supervisor的启动将在之后的文章中介绍。
mk-storm-cluster-state函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
|
(defn mk-storm-cluster-state [cluster-state-spec] ;; satisfies?谓词相当于java中的instanceof,判断cluster-state-spec是不是ClusterState实例 (let [[solo? cluster-state] (if (satisfies? ClusterState cluster-state-spec) [false cluster-state-spec] [true (mk-distributed-cluster-state cluster-state-spec)]) ;; 绑定topology id->回调函数的map,当/assignments/{topology id}数据发生变化时,zk client执行assignment-info-callback中topology id所对应的回调函数 assignment-info-callback (atom {}) ;; assignment-info-with-version-callback与assignment-info-callback类似 assignment-info-with-version-callback (atom {}) ;; assignment-version-callback与assignments-callback类似 assignment-version-callback (atom {}) ;; 当/supervisors标示的znode的子节点发生变化时,zk client执行supervisors-callback指向的函数 supervisors-callback (atom nil) ;; 当/assignments标示的znode的子节点发生变化时,zk client执行assignments-callback指向的函数 assignments-callback (atom nil) ;; 当/storms/{topology id}标示的znode的数据发生变化时,zk client执行storm-base-callback中topology id所对应的回调函数 storm-base-callback (atom {}) ;; register函数将"回调函数(fn ...)"添加到cluster-state的callbacks集合中,并返回标示该回调函数的uuid state-id (register cluster-state ;; 定义"回调函数",type标示事件类型,path标示znode (fn [type path] ;; subtree绑定路径前缀如"assignments"、"storms"、"supervisors"等,args存放topology id (let [[subtree & args] (tokenize-path path)] ;; condp相当于java中的switch (condp = subtree ;; 当subtree="assignments"时,如果args为空,说明是/assignments的子节点发生变化,执行assignments-callback指向的回调函数,否则 ;; 说明/assignments/{topology id}标示的节点数据发生变化,执行assignment-info-callback指向的回调函数 ASSIGNMENTS-ROOT (if (empty? args) (issue-callback! assignments-callback) (issue-map-callback! assignment-info-callback (first args))) ;; 当subtree="supervisors"时,说明是/supervisors的子节点发生变化,执行supervisors-callback指向的回调函数 SUPERVISORS-ROOT (issue-callback! supervisors-callback) ;; 当subtree="storms"时,说明是/storms/{topology id}标示的节点数据发生变化,执行storm-base-callback指向的回调函数 STORMS-ROOT (issue-map-callback! storm-base-callback (first args)) ;; this should never happen (exit-process! 30 "Unknown callback for subtree " subtree args)))))] ;; 在zookeeper上创建storm运行topology所必需的znode (doseq [p [ASSIGNMENTS-SUBTREE STORMS-SUBTREE SUPERVISORS-SUBTREE WORKERBEATS-SUBTREE ERRORS-SUBTREE]] (mkdirs cluster-state p)) ;; 返回一个实现StormClusterState协议的实例 (reify StormClusterState ;; 获取/assignments的子节点列表,如果callback不为空,将其赋值给assignments-callback,并对/assignments添加"节点观察" (assignments [this callback] (when callback (reset! assignments-callback callback)) (get-children cluster-state ASSIGNMENTS-SUBTREE (not-nil? callback))) ;; 获取/assignments/{storm-id}节点数据,即storm-id的分配信息,如果callback不为空,将其添加到assignment-info-callback中,并对/assignments/{storm-id}添加"数据观察" (assignment-info [this storm-id callback] (when callback (swap! assignment-info-callback assoc storm-id callback)) (maybe-deserialize (get-data cluster-state (assignment-path storm-id) (not-nil? callback)))) ;; 获取/assignments/{storm-id}节点数据包括version信息,如果callback不为空,将其添加到assignment-info-with-version-callback中,并对/assignments/{storm-id}添加"数据观察" (assignment-info-with-version [this storm-id callback] (when callback (swap! assignment-info-with-version-callback assoc storm-id callback)) (let [{data :data version :version} (get-data-with-version cluster-state (assignment-path storm-id) (not-nil? callback))] {:data (maybe-deserialize data) :version version})) ;; 获取/assignments/{storm-id}节点数据的version信息,如果callback不为空,将其添加到assignment-version-callback中,并对/assignments/{storm-id}添加"数据观察" (assignment-version [this storm-id callback] (when callback (swap! assignment-version-callback assoc storm-id callback)) (get-version cluster-state (assignment-path storm-id) (not-nil? callback))) ;; 获取storm集群中正在运行的topology id即/storms的子节点列表 (active-storms [this] (get-children cluster-state STORMS-SUBTREE false)) ;; 获取storm集群中所有有心跳的topology id即/workerbeats的子节点列表 (heartbeat-storms [this] (get-children cluster-state WORKERBEATS-SUBTREE false)) ;; 获取所有有错误的topology id即/errors的子节点列表 (error-topologies [this] (get-children cluster-state ERRORS-SUBTREE false)) ;; 获取指定storm-id进程的心跳信息,即/workerbeats/{storm-id}/{node-port}节点数据 (get-worker-heartbeat [this storm-id node port] (-> cluster-state (get-data (workerbeat-path storm-id node port) false) maybe-deserialize)) ;; 获取指定进程中所有线程的心跳信息 (executor-beats [this storm-id executor->node+port] ;; need to take executor->node+port in explicitly so that we don't run into a situation where a ;; long dead worker with a skewed clock overrides all the timestamps. By only checking heartbeats ;; with an assigned node+port, and only reading executors from that heartbeat that are actually assigned, ;; we avoid situations like that (let [node+port->executors (reverse-map executor->node+port) all-heartbeats (for [[[node port] executors] node+port->executors] (->> (get-worker-heartbeat this storm-id node port) (convert-executor-beats executors) ))] (apply merge all-heartbeats))) ;; 获取/supervisors的子节点列表,如果callback不为空,将其赋值给supervisors-callback,并对/supervisors添加"节点观察" (supervisors [this callback] (when callback (reset! supervisors-callback callback)) (get-children cluster-state SUPERVISORS-SUBTREE (not-nil? callback))) ;; 获取/supervisors/{supervisor-id}节点数据,即supervisor的心跳信息 (supervisor-info [this supervisor-id] (maybe-deserialize (get-data cluster-state (supervisor-path supervisor-id) false))) ;; 设置进程心跳信息 (worker-heartbeat! [this storm-id node port info] (set-data cluster-state (workerbeat-path storm-id node port) (Utils/serialize info))) ;; 删除进程心跳信息 (remove-worker-heartbeat! [this storm-id node port] (delete-node cluster-state (workerbeat-path storm-id node port))) ;; 创建指定storm-id的topology的用于存放心跳信息的节点 (setup-heartbeats! [this storm-id] (mkdirs cluster-state (workerbeat-storm-root storm-id))) ;; 删除指定storm-id的topology的心跳信息节点 (teardown-heartbeats! [this storm-id] (try-cause (delete-node cluster-state (workerbeat-storm-root storm-id)) (catch KeeperException e (log-warn-error e "Could not teardown heartbeats for " storm-id)))) ;; 删除指定storm-id的topology的错误信息节点 (teardown-topology-errors! [this storm-id] (try-cause (delete-node cluster-state (error-storm-root storm-id)) (catch KeeperException e (log-warn-error e "Could not teardown errors for " storm-id)))) ;; 创建临时节点存放supervisor的心跳信息 (supervisor-heartbeat! [this supervisor-id info] (set-ephemeral-node cluster-state (supervisor-path supervisor-id) (Utils/serialize info))) ;; 创建/storms/{storm-id}节点 (activate-storm! [this storm-id storm-base] (set-data cluster-state (storm-path storm-id) (Utils/serialize storm-base))) ;; 更新topology对应的StormBase对象,即更新/storm/{storm-id}节点 (update-storm! [this storm-id new-elems] ;; base绑定storm-id在zookeeper上的StormBase对象 (let [base (storm-base this storm-id nil) ;; executors绑定component名称->组件并行度的map executors (:component->executors base) ;; new-elems绑定合并后的组件并行度map,update函数将组件新并行度map合并到旧map中 new-elems (update new-elems :component->executors (partial merge executors))] ;; 更新StormBase对象中的组件并行度map,并写入zookeeper的/storms/{storm-id}节点 (set-data cluster-state (storm-path storm-id) (-> base (merge new-elems) Utils/serialize)))) ;; 获取storm-id的StormBase对象,即读取/storms/{storm-id}节点数据,如果callback不为空,将其赋值给storm-base-callback,并为/storms/{storm-id}节点添加"数据观察" (storm-base [this storm-id callback] (when callback (swap! storm-base-callback assoc storm-id callback)) (maybe-deserialize (get-data cluster-state (storm-path storm-id) (not-nil? callback)))) ;; 删除storm-id的StormBase对象,即删除/storms/{storm-id}节点 (remove-storm-base! [this storm-id] (delete-node cluster-state (storm-path storm-id))) ;; 更新storm-id的分配信息,即更新/assignments/{storm-id}节点数据 (set-assignment! [this storm-id info] (set-data cluster-state (assignment-path storm-id) (Utils/serialize info))) ;; 删除storm-id的分配信息,同时删除其StormBase信息,即删除/assignments/{storm-id}节点和/storms/{storm-id}节点 (remove-storm! [this storm-id] (delete-node cluster-state (assignment-path storm-id)) (remove-storm-base! this storm-id)) ;; 将组件异常信息写入zookeeper (report-error [this storm-id component-id node port error] ;; path绑定"/errors/{storm-id}/{component-id}" (let [path (error-path storm-id component-id) ;; data绑定异常信息,包括异常时间、异常堆栈信息、主机和端口 data {:time-secs (current-time-secs) :error (stringify-error error) :host node :port port} ;; 创建/errors/{storm-id}/{component-id}节点 _ (mkdirs cluster-state path) ;; 创建/errors/{storm-id}/{component-id}的子顺序节点,并写入异常信息 _ (create-sequential cluster-state (str path "/e") (Utils/serialize data)) ;; to-kill绑定除去顺序节点编号最大的前10个节点的剩余节点的集合 to-kill (->> (get-children cluster-state path false) (sort-by parse-error-path) reverse (drop 10))] ;; 删除to-kill中包含的节点 (doseq [k to-kill] (delete-node cluster-state (str path "/" k))))) ;; 得到给定的storm-id component-id下的异常信息 (errors [this storm-id component-id] (let [path (error-path storm-id component-id) _ (mkdirs cluster-state path) children (get-children cluster-state path false) errors (dofor [c children] (let [data (-> (get-data cluster-state (str path "/" c) false) maybe-deserialize)] (when data (struct TaskError (:error data) (:time-secs data) (:host data) (:port data)) ))) ] (->> (filter not-nil? errors) (sort-by (comp - :time-secs))))) ;; 关闭连接,在关闭连接前,将回调函数从cluster-state的callbacks中删除 (disconnect [this] (unregister cluster-state state-id) (when solo? (close cluster-state)))))) |
zookeeper.clj中mk-client函数
mk-client函数创建一个CuratorFramework实例,为该实例注册了CuratorListener,当一个后台操作完成或者指定的watch被触发时将会执行CuratorListener中的eventReceived()。eventReceived中调用的wacher函数就是mk-distributed-cluster-state中:watcher绑定的函数。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
(defnk mk-client [conf servers port :root "" :watcher default-watcher :auth-conf nil] (let [fk (Utils/newCurator conf servers port root (when auth-conf (ZookeeperAuthInfo. auth-conf)))] (.. fk (getCuratorListenable) (addListener (reify CuratorListener (^void eventReceived [this ^CuratorFramework _fk ^CuratorEvent e] (when (= (.getType e) CuratorEventType/WATCHED) (let [^WatchedEvent event (.getWatchedEvent e)] (watcher (zk-keeper-states (.getState event)) (zk-event-types (.getType event)) (.getPath event)))))))) (.start fk) fk)) |
以上就是storm与zookeeper进行交互的源码分析,我觉得最重要的部分就是如何给zk client添加"wacher",storm的很多功能都是通过zookeeper的wacher机制实现的,如"分配信息领取"。添加"wacher"大概分为以下几个步骤:
mk-distributed-cluster-state函数创建了一个zk client,并通过:watcher给该zk client指定了"wacher"函数,这个"wacher"函数只是简单调用ClusterState的callbacks集合中的函数,这样这个"wacher"函数执行 哪些函数将由ClusterState实例决定
ClusterState实例提供register函数来更新callbacks集合,ClusterState实例被传递给了mk-storm-cluster-state函数,在mk-storm-cluster-state中调用register添加了一个函数(fn [type path] ... ),这个函数实现了"watcher"函数的全部逻辑
mk-storm-cluster-state中注册的函数执行的具体内容由StormClusterState实例决定,对zookeeper节点添加"观察"也是通过StormClusterState实例实现的,这样我们就可以通过StormClusterState实例对我们感兴趣的节点添加"观察"和"回调函数",当节点或节点数据发生变化后,zk server就会给zk client发送"通知",zk client中的"wather"函数将被调用,进而我们注册的"回到函数"将被执行。
总结
这部分源码与zookeeper联系十分紧密,涉及了很多zookeeper中的概念和特性,如"数据观察"和"节点观察"等,有关zookeeper的wacher机制请参考:http://www.zzvips.com/article/130109.html,storm并没有直接使用zookeeper的api,而是使用Curator框架,Curator框架简化了访问zookeeper的操作。关于Curator框架请参考:http://www.zzvips.com/article/133166.html。
以上就是本文关于源码阅读之storm操作zookeeper-cluster.clj的全部内容了,希望对大家有所帮助。感谢各位的阅读!
原文链接:http://blog.csdn.net/mangocream/article/details/42456901