Classes in this File | Line Coverage | Branch Coverage | Complexity | ||||
ZooKeeperExt |
|
| 3.6;3.6 | ||||
ZooKeeperExt$1 |
|
| 3.6;3.6 | ||||
ZooKeeperExt$PathStat |
|
| 3.6;3.6 |
1 | /* | |
2 | * Licensed to the Apache Software Foundation (ASF) under one | |
3 | * or more contributor license agreements. See the NOTICE file | |
4 | * distributed with this work for additional information | |
5 | * regarding copyright ownership. The ASF licenses this file | |
6 | * to you under the Apache License, Version 2.0 (the | |
7 | * "License"); you may not use this file except in compliance | |
8 | * with the License. You may obtain a copy of the License at | |
9 | * | |
10 | * http://www.apache.org/licenses/LICENSE-2.0 | |
11 | * | |
12 | * Unless required by applicable law or agreed to in writing, software | |
13 | * distributed under the License is distributed on an "AS IS" BASIS, | |
14 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
15 | * See the License for the specific language governing permissions and | |
16 | * limitations under the License. | |
17 | */ | |
18 | ||
19 | package org.apache.giraph.zk; | |
20 | ||
21 | import java.io.IOException; | |
22 | ||
23 | import org.apache.log4j.Logger; | |
24 | import org.apache.zookeeper.KeeperException; | |
25 | import org.apache.zookeeper.CreateMode; | |
26 | import org.apache.zookeeper.data.ACL; | |
27 | import org.apache.zookeeper.data.Stat; | |
28 | ||
29 | import java.util.ArrayList; | |
30 | import java.util.Collections; | |
31 | import java.util.Comparator; | |
32 | import java.util.List; | |
33 | ||
34 | import org.apache.zookeeper.Watcher; | |
35 | import org.apache.zookeeper.ZooKeeper; | |
36 | ||
37 | /** | |
38 | * ZooKeeper provides only atomic operations. ZooKeeperExt provides additional | |
39 | * non-atomic operations that are useful. | |
40 | */ | |
41 | public class ZooKeeperExt extends ZooKeeper { | |
42 | /** Internal logger */ | |
43 | 1 | private static final Logger LOG = Logger.getLogger(ZooKeeperExt.class); |
44 | /** Length of the ZK sequence number */ | |
45 | private static final int SEQUENCE_NUMBER_LENGTH = 10; | |
46 | ||
47 | /** | |
48 | * Constructor to connect to ZooKeeper | |
49 | * | |
50 | * @param connectString Comma separated host:port pairs, each corresponding | |
51 | * to a zk server. e.g. | |
52 | * "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" If the optional | |
53 | * chroot suffix is used the example would look | |
54 | * like: "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002/app/a" | |
55 | * where the client would be rooted at "/app/a" and all paths | |
56 | * would be relative to this root - ie getting/setting/etc... | |
57 | * "/foo/bar" would result in operations being run on | |
58 | * "/app/a/foo/bar" (from the server perspective). | |
59 | * @param sessionTimeout Session timeout in milliseconds | |
60 | * @param watcher A watcher object which will be notified of state changes, | |
61 | * may also be notified for node events | |
62 | * @throws IOException | |
63 | */ | |
64 | public ZooKeeperExt(String connectString, | |
65 | int sessionTimeout, | |
66 | Watcher watcher) throws IOException { | |
67 | 48 | super(connectString, sessionTimeout, watcher); |
68 | 48 | } |
69 | ||
70 | /** | |
71 | * Provides a possibility of a creating a path consisting of more than one | |
72 | * znode (not atomic). If recursive is false, operates exactly the | |
73 | * same as create(). | |
74 | * | |
75 | * @param path path to create | |
76 | * @param data data to set on the final znode | |
77 | * @param acl acls on each znode created | |
78 | * @param createMode only affects the final znode | |
79 | * @param recursive if true, creates all ancestors | |
80 | * @return Actual created path | |
81 | * @throws KeeperException | |
82 | * @throws InterruptedException | |
83 | */ | |
84 | public String createExt( | |
85 | final String path, | |
86 | byte[] data, | |
87 | List<ACL> acl, | |
88 | CreateMode createMode, | |
89 | boolean recursive) throws KeeperException, InterruptedException { | |
90 | 2627 | if (LOG.isDebugEnabled()) { |
91 | 0 | LOG.debug("createExt: Creating path " + path); |
92 | } | |
93 | ||
94 | 2627 | if (!recursive) { |
95 | 24 | return create(path, data, acl, createMode); |
96 | } | |
97 | ||
98 | try { | |
99 | 2603 | return create(path, data, acl, createMode); |
100 | 410 | } catch (KeeperException.NoNodeException e) { |
101 | 410 | if (LOG.isDebugEnabled()) { |
102 | 0 | LOG.debug("createExt: Cannot directly create node " + path); |
103 | } | |
104 | } | |
105 | ||
106 | 410 | int pos = path.indexOf("/", 1); |
107 | 5112 | for (; pos != -1; pos = path.indexOf("/", pos + 1)) { |
108 | try { | |
109 | 2351 | create( |
110 | path.substring(0, pos), null, acl, CreateMode.PERSISTENT); | |
111 | 1825 | } catch (KeeperException.NodeExistsException e) { |
112 | 1825 | if (LOG.isDebugEnabled()) { |
113 | 0 | LOG.debug("createExt: Znode " + path.substring(0, pos) + |
114 | " already exists"); | |
115 | } | |
116 | 526 | } |
117 | } | |
118 | 410 | return create(path, data, acl, createMode); |
119 | } | |
120 | ||
121 | /** | |
122 | * Data structure for handling the output of createOrSet() | |
123 | */ | |
124 | public static class PathStat { | |
125 | /** Path to created znode (if any) */ | |
126 | private String path; | |
127 | /** Stat from set znode (if any) */ | |
128 | private Stat stat; | |
129 | ||
130 | /** | |
131 | * Put in results from createOrSet() | |
132 | * | |
133 | * @param path Path to created znode (or null) | |
134 | * @param stat Stat from set znode (if set) | |
135 | */ | |
136 | 1479 | public PathStat(String path, Stat stat) { |
137 | 1479 | this.path = path; |
138 | 1479 | this.stat = stat; |
139 | 1479 | } |
140 | ||
141 | /** | |
142 | * Get the path of the created znode if it was created. | |
143 | * | |
144 | * @return Path of created znode or null if not created | |
145 | */ | |
146 | public String getPath() { | |
147 | 0 | return path; |
148 | } | |
149 | ||
150 | /** | |
151 | * Get the stat of the set znode if set | |
152 | * | |
153 | * @return Stat of set znode or null if not set | |
154 | */ | |
155 | public Stat getStat() { | |
156 | 0 | return stat; |
157 | } | |
158 | } | |
159 | ||
160 | /** | |
161 | * Create a znode. Set the znode if the created znode already exists. | |
162 | * | |
163 | * @param path path to create | |
164 | * @param data data to set on the final znode | |
165 | * @param acl acls on each znode created | |
166 | * @param createMode only affects the final znode | |
167 | * @param recursive if true, creates all ancestors | |
168 | * @param version Version to set if setting | |
169 | * @return Path of created znode or Stat of set znode | |
170 | * @throws InterruptedException | |
171 | * @throws KeeperException | |
172 | */ | |
173 | public PathStat createOrSetExt(final String path, | |
174 | byte[] data, | |
175 | List<ACL> acl, | |
176 | CreateMode createMode, | |
177 | boolean recursive, | |
178 | int version) throws KeeperException, InterruptedException { | |
179 | 437 | String createdPath = null; |
180 | 437 | Stat setStat = null; |
181 | try { | |
182 | 437 | createdPath = createExt(path, data, acl, createMode, recursive); |
183 | 0 | } catch (KeeperException.NodeExistsException e) { |
184 | 0 | if (LOG.isDebugEnabled()) { |
185 | 0 | LOG.debug("createOrSet: Node exists on path " + path); |
186 | } | |
187 | 0 | setStat = setData(path, data, version); |
188 | 437 | } |
189 | 437 | return new PathStat(createdPath, setStat); |
190 | } | |
191 | ||
192 | /** | |
193 | * Create a znode if there is no other znode there | |
194 | * | |
195 | * @param path path to create | |
196 | * @param data data to set on the final znode | |
197 | * @param acl acls on each znode created | |
198 | * @param createMode only affects the final znode | |
199 | * @param recursive if true, creates all ancestors | |
200 | * @return Path of created znode or Stat of set znode | |
201 | * @throws InterruptedException | |
202 | * @throws KeeperException | |
203 | */ | |
204 | public PathStat createOnceExt(final String path, | |
205 | byte[] data, | |
206 | List<ACL> acl, | |
207 | CreateMode createMode, | |
208 | boolean recursive) throws KeeperException, InterruptedException { | |
209 | 1042 | String createdPath = null; |
210 | 1042 | Stat setStat = null; |
211 | try { | |
212 | 1042 | createdPath = createExt(path, data, acl, createMode, recursive); |
213 | 306 | } catch (KeeperException.NodeExistsException e) { |
214 | 306 | if (LOG.isDebugEnabled()) { |
215 | 0 | LOG.debug("createOnceExt: Node already exists on path " + path); |
216 | } | |
217 | 736 | } |
218 | 1042 | return new PathStat(createdPath, setStat); |
219 | } | |
220 | ||
221 | /** | |
222 | * Delete a path recursively. When the deletion is recursive, it is a | |
223 | * non-atomic operation, hence, not part of ZooKeeper. | |
224 | * @param path path to remove (i.e. /tmp will remove /tmp/1 and /tmp/2) | |
225 | * @param version expected version (-1 for all) | |
226 | * @param recursive if true, remove all children, otherwise behave like | |
227 | * remove() | |
228 | * @throws InterruptedException | |
229 | * @throws KeeperException | |
230 | */ | |
231 | public void deleteExt(final String path, int version, boolean recursive) | |
232 | throws InterruptedException, KeeperException { | |
233 | 2689 | if (!recursive) { |
234 | 0 | delete(path, version); |
235 | 0 | return; |
236 | } | |
237 | ||
238 | try { | |
239 | 2689 | delete(path, version); |
240 | 1655 | return; |
241 | 1034 | } catch (KeeperException.NotEmptyException e) { |
242 | 1034 | if (LOG.isDebugEnabled()) { |
243 | 0 | LOG.debug("deleteExt: Cannot directly remove node " + path); |
244 | } | |
245 | } | |
246 | ||
247 | 1034 | List<String> childList = getChildren(path, false); |
248 | 1034 | for (String child : childList) { |
249 | 2494 | deleteExt(path + "/" + child, -1, true); |
250 | } | |
251 | ||
252 | 1034 | delete(path, version); |
253 | 1034 | } |
254 | ||
255 | /** | |
256 | * Get the children of the path with extensions. | |
257 | * Extension 1: Sort the children based on sequence number | |
258 | * Extension 2: Get the full path instead of relative path | |
259 | * | |
260 | * @param path path to znode | |
261 | * @param watch set the watch? | |
262 | * @param sequenceSorted sort by the sequence number | |
263 | * @param fullPath if true, get the fully znode path back | |
264 | * @return list of children | |
265 | * @throws InterruptedException | |
266 | * @throws KeeperException | |
267 | */ | |
268 | public List<String> getChildrenExt( | |
269 | final String path, | |
270 | boolean watch, | |
271 | boolean sequenceSorted, | |
272 | boolean fullPath) throws KeeperException, InterruptedException { | |
273 | 2412 | List<String> childList = getChildren(path, watch); |
274 | /* Sort children according to the sequence number, if desired */ | |
275 | 2412 | if (sequenceSorted) { |
276 | 75 | Collections.sort(childList, new Comparator<String>() { |
277 | public int compare(String s1, String s2) { | |
278 | 0 | if ((s1.length() <= SEQUENCE_NUMBER_LENGTH) || |
279 | (s2.length() <= SEQUENCE_NUMBER_LENGTH)) { | |
280 | 0 | throw new RuntimeException( |
281 | "getChildrenExt: Invalid length for sequence " + | |
282 | " sorting > " + | |
283 | SEQUENCE_NUMBER_LENGTH + | |
284 | " for s1 (" + | |
285 | s1.length() + ") or s2 (" + s2.length() + ")"); | |
286 | } | |
287 | 0 | int s1sequenceNumber = Integer.parseInt( |
288 | s1.substring(s1.length() - | |
289 | SEQUENCE_NUMBER_LENGTH)); | |
290 | 0 | int s2sequenceNumber = Integer.parseInt( |
291 | s2.substring(s2.length() - | |
292 | SEQUENCE_NUMBER_LENGTH)); | |
293 | 0 | return s1sequenceNumber - s2sequenceNumber; |
294 | } | |
295 | }); | |
296 | } | |
297 | 2412 | if (fullPath) { |
298 | 1484 | List<String> fullChildList = new ArrayList<String>(); |
299 | 1484 | for (String child : childList) { |
300 | 1124 | fullChildList.add(path + "/" + child); |
301 | } | |
302 | 1484 | return fullChildList; |
303 | } | |
304 | 928 | return childList; |
305 | } | |
306 | } |