Coverage Report - org.apache.giraph.zk.ZooKeeperExt
 
Classes in this File Line Coverage Branch Coverage Complexity
ZooKeeperExt
81%
47/58
69%
18/26
3.6
ZooKeeperExt$1
16%
1/6
0%
0/4
3.6
ZooKeeperExt$PathStat
66%
4/6
N/A
3.6
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.zk;
 20  
 
 21  
 import java.io.IOException;
 22  
 
 23  
 import org.apache.log4j.Logger;
 24  
 import org.apache.zookeeper.KeeperException;
 25  
 import org.apache.zookeeper.CreateMode;
 26  
 import org.apache.zookeeper.data.ACL;
 27  
 import org.apache.zookeeper.data.Stat;
 28  
 
 29  
 import java.util.ArrayList;
 30  
 import java.util.Collections;
 31  
 import java.util.Comparator;
 32  
 import java.util.List;
 33  
 
 34  
 import org.apache.zookeeper.Watcher;
 35  
 import org.apache.zookeeper.ZooKeeper;
 36  
 
 37  
 /**
 38  
  * ZooKeeper provides only atomic operations.  ZooKeeperExt provides additional
 39  
  * non-atomic operations that are useful.
 40  
  */
 41  
 public class ZooKeeperExt extends ZooKeeper {
 42  
   /** Internal logger */
 43  1
   private static final Logger LOG = Logger.getLogger(ZooKeeperExt.class);
 44  
   /** Length of the ZK sequence number */
 45  
   private static final int SEQUENCE_NUMBER_LENGTH = 10;
 46  
 
 47  
   /**
 48  
    * Constructor to connect to ZooKeeper
 49  
    *
 50  
    * @param connectString Comma separated host:port pairs, each corresponding
 51  
    *        to a zk server. e.g.
 52  
    *        "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" If the optional
 53  
    *        chroot suffix is used the example would look
 54  
    *        like: "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002/app/a"
 55  
    *        where the client would be rooted at "/app/a" and all paths
 56  
    *        would be relative to this root - ie getting/setting/etc...
 57  
    *        "/foo/bar" would result in operations being run on
 58  
    *        "/app/a/foo/bar" (from the server perspective).
 59  
    * @param sessionTimeout Session timeout in milliseconds
 60  
    * @param watcher A watcher object which will be notified of state changes,
 61  
    *        may also be notified for node events
 62  
    * @throws IOException
 63  
    */
 64  
   public ZooKeeperExt(String connectString,
 65  
       int sessionTimeout,
 66  
       Watcher watcher) throws IOException {
 67  48
     super(connectString, sessionTimeout, watcher);
 68  48
   }
 69  
 
 70  
   /**
 71  
    * Provides a possibility of a creating a path consisting of more than one
 72  
    * znode (not atomic).  If recursive is false, operates exactly the
 73  
    * same as create().
 74  
    *
 75  
    * @param path path to create
 76  
    * @param data data to set on the final znode
 77  
    * @param acl acls on each znode created
 78  
    * @param createMode only affects the final znode
 79  
    * @param recursive if true, creates all ancestors
 80  
    * @return Actual created path
 81  
    * @throws KeeperException
 82  
    * @throws InterruptedException
 83  
    */
 84  
   public String createExt(
 85  
       final String path,
 86  
       byte[] data,
 87  
       List<ACL> acl,
 88  
       CreateMode createMode,
 89  
       boolean recursive) throws KeeperException, InterruptedException {
 90  2627
     if (LOG.isDebugEnabled()) {
 91  0
       LOG.debug("createExt: Creating path " + path);
 92  
     }
 93  
 
 94  2627
     if (!recursive) {
 95  24
       return create(path, data, acl, createMode);
 96  
     }
 97  
 
 98  
     try {
 99  2603
       return create(path, data, acl, createMode);
 100  410
     } catch (KeeperException.NoNodeException e) {
 101  410
       if (LOG.isDebugEnabled()) {
 102  0
         LOG.debug("createExt: Cannot directly create node " + path);
 103  
       }
 104  
     }
 105  
 
 106  410
     int pos = path.indexOf("/", 1);
 107  5112
     for (; pos != -1; pos = path.indexOf("/", pos + 1)) {
 108  
       try {
 109  2351
         create(
 110  
             path.substring(0, pos), null, acl, CreateMode.PERSISTENT);
 111  1825
       } catch (KeeperException.NodeExistsException e) {
 112  1825
         if (LOG.isDebugEnabled()) {
 113  0
           LOG.debug("createExt: Znode " + path.substring(0, pos) +
 114  
               " already exists");
 115  
         }
 116  526
       }
 117  
     }
 118  410
     return create(path, data, acl, createMode);
 119  
   }
 120  
 
 121  
   /**
 122  
    * Data structure for handling the output of createOrSet()
 123  
    */
 124  
   public static class PathStat {
 125  
     /** Path to created znode (if any) */
 126  
     private String path;
 127  
     /** Stat from set znode (if any) */
 128  
     private Stat stat;
 129  
 
 130  
     /**
 131  
      * Put in results from createOrSet()
 132  
      *
 133  
      * @param path Path to created znode (or null)
 134  
      * @param stat Stat from set znode (if set)
 135  
      */
 136  1479
     public PathStat(String path, Stat stat) {
 137  1479
       this.path = path;
 138  1479
       this.stat = stat;
 139  1479
     }
 140  
 
 141  
     /**
 142  
      * Get the path of the created znode if it was created.
 143  
      *
 144  
      * @return Path of created znode or null if not created
 145  
      */
 146  
     public String getPath() {
 147  0
       return path;
 148  
     }
 149  
 
 150  
     /**
 151  
      * Get the stat of the set znode if set
 152  
      *
 153  
      * @return Stat of set znode or null if not set
 154  
      */
 155  
     public Stat getStat() {
 156  0
       return stat;
 157  
     }
 158  
   }
 159  
 
 160  
   /**
 161  
    * Create a znode.  Set the znode if the created znode already exists.
 162  
    *
 163  
    * @param path path to create
 164  
    * @param data data to set on the final znode
 165  
    * @param acl acls on each znode created
 166  
    * @param createMode only affects the final znode
 167  
    * @param recursive if true, creates all ancestors
 168  
    * @param version Version to set if setting
 169  
    * @return Path of created znode or Stat of set znode
 170  
    * @throws InterruptedException
 171  
    * @throws KeeperException
 172  
    */
 173  
   public PathStat createOrSetExt(final String path,
 174  
       byte[] data,
 175  
       List<ACL> acl,
 176  
       CreateMode createMode,
 177  
       boolean recursive,
 178  
       int version) throws KeeperException, InterruptedException {
 179  437
     String createdPath = null;
 180  437
     Stat setStat = null;
 181  
     try {
 182  437
       createdPath = createExt(path, data, acl, createMode, recursive);
 183  0
     } catch (KeeperException.NodeExistsException e) {
 184  0
       if (LOG.isDebugEnabled()) {
 185  0
         LOG.debug("createOrSet: Node exists on path " + path);
 186  
       }
 187  0
       setStat = setData(path, data, version);
 188  437
     }
 189  437
     return new PathStat(createdPath, setStat);
 190  
   }
 191  
 
 192  
   /**
 193  
    * Create a znode if there is no other znode there
 194  
    *
 195  
    * @param path path to create
 196  
    * @param data data to set on the final znode
 197  
    * @param acl acls on each znode created
 198  
    * @param createMode only affects the final znode
 199  
    * @param recursive if true, creates all ancestors
 200  
    * @return Path of created znode or Stat of set znode
 201  
    * @throws InterruptedException
 202  
    * @throws KeeperException
 203  
    */
 204  
   public PathStat createOnceExt(final String path,
 205  
       byte[] data,
 206  
       List<ACL> acl,
 207  
       CreateMode createMode,
 208  
       boolean recursive) throws KeeperException, InterruptedException {
 209  1042
     String createdPath = null;
 210  1042
     Stat setStat = null;
 211  
     try {
 212  1042
       createdPath = createExt(path, data, acl, createMode, recursive);
 213  306
     } catch (KeeperException.NodeExistsException e) {
 214  306
       if (LOG.isDebugEnabled()) {
 215  0
         LOG.debug("createOnceExt: Node already exists on path " + path);
 216  
       }
 217  736
     }
 218  1042
     return new PathStat(createdPath, setStat);
 219  
   }
 220  
 
 221  
   /**
 222  
    * Delete a path recursively.  When the deletion is recursive, it is a
 223  
    * non-atomic operation, hence, not part of ZooKeeper.
 224  
    * @param path path to remove (i.e. /tmp will remove /tmp/1 and /tmp/2)
 225  
    * @param version expected version (-1 for all)
 226  
    * @param recursive if true, remove all children, otherwise behave like
 227  
    *        remove()
 228  
    * @throws InterruptedException
 229  
    * @throws KeeperException
 230  
    */
 231  
   public void deleteExt(final String path, int version, boolean recursive)
 232  
     throws InterruptedException, KeeperException {
 233  2689
     if (!recursive) {
 234  0
       delete(path, version);
 235  0
       return;
 236  
     }
 237  
 
 238  
     try {
 239  2689
       delete(path, version);
 240  1655
       return;
 241  1034
     } catch (KeeperException.NotEmptyException e) {
 242  1034
       if (LOG.isDebugEnabled()) {
 243  0
         LOG.debug("deleteExt: Cannot directly remove node " + path);
 244  
       }
 245  
     }
 246  
 
 247  1034
     List<String> childList = getChildren(path, false);
 248  1034
     for (String child : childList) {
 249  2494
       deleteExt(path + "/" + child, -1, true);
 250  
     }
 251  
 
 252  1034
     delete(path, version);
 253  1034
   }
 254  
 
 255  
   /**
 256  
    * Get the children of the path with extensions.
 257  
    * Extension 1: Sort the children based on sequence number
 258  
    * Extension 2: Get the full path instead of relative path
 259  
    *
 260  
    * @param path path to znode
 261  
    * @param watch set the watch?
 262  
    * @param sequenceSorted sort by the sequence number
 263  
    * @param fullPath if true, get the fully znode path back
 264  
    * @return list of children
 265  
    * @throws InterruptedException
 266  
    * @throws KeeperException
 267  
    */
 268  
   public List<String> getChildrenExt(
 269  
       final String path,
 270  
       boolean watch,
 271  
       boolean sequenceSorted,
 272  
       boolean fullPath) throws KeeperException, InterruptedException {
 273  2412
     List<String> childList = getChildren(path, watch);
 274  
     /* Sort children according to the sequence number, if desired */
 275  2412
     if (sequenceSorted) {
 276  75
       Collections.sort(childList, new Comparator<String>() {
 277  
         public int compare(String s1, String s2) {
 278  0
           if ((s1.length() <= SEQUENCE_NUMBER_LENGTH) ||
 279  
               (s2.length() <= SEQUENCE_NUMBER_LENGTH)) {
 280  0
             throw new RuntimeException(
 281  
                 "getChildrenExt: Invalid length for sequence " +
 282  
                     " sorting > " +
 283  
                     SEQUENCE_NUMBER_LENGTH +
 284  
                     " for s1 (" +
 285  
                     s1.length() + ") or s2 (" + s2.length() + ")");
 286  
           }
 287  0
           int s1sequenceNumber = Integer.parseInt(
 288  
               s1.substring(s1.length() -
 289  
                   SEQUENCE_NUMBER_LENGTH));
 290  0
           int s2sequenceNumber = Integer.parseInt(
 291  
               s2.substring(s2.length() -
 292  
                   SEQUENCE_NUMBER_LENGTH));
 293  0
           return s1sequenceNumber - s2sequenceNumber;
 294  
         }
 295  
       });
 296  
     }
 297  2412
     if (fullPath) {
 298  1484
       List<String> fullChildList = new ArrayList<String>();
 299  1484
       for (String child : childList) {
 300  1124
         fullChildList.add(path + "/" + child);
 301  
       }
 302  1484
       return fullChildList;
 303  
     }
 304  928
     return childList;
 305  
   }
 306  
 }