View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.security.access;
20  
21  import java.io.Closeable;
22  import java.io.IOException;
23  import java.util.HashMap;
24  import java.util.List;
25  import java.util.Map;
26  import java.util.concurrent.ConcurrentSkipListMap;
27  import java.util.concurrent.atomic.AtomicLong;
28  
29  import org.apache.commons.logging.Log;
30  import org.apache.commons.logging.LogFactory;
31  import org.apache.hadoop.hbase.AuthUtil;
32  import org.apache.hadoop.hbase.classification.InterfaceAudience;
33  import org.apache.hadoop.conf.Configuration;
34  import org.apache.hadoop.hbase.Cell;
35  import org.apache.hadoop.hbase.TableName;
36  import org.apache.hadoop.hbase.exceptions.DeserializationException;
37  import org.apache.hadoop.hbase.security.Superusers;
38  import org.apache.hadoop.hbase.security.User;
39  import org.apache.hadoop.hbase.security.UserProvider;
40  import org.apache.hadoop.hbase.util.Bytes;
41  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
42  import org.apache.zookeeper.KeeperException;
43  
44  import com.google.common.annotations.VisibleForTesting;
45  import com.google.common.collect.ArrayListMultimap;
46  import com.google.common.collect.ListMultimap;
47  import com.google.common.collect.Lists;
48  
49  /**
50   * Performs authorization checks for a given user's assigned permissions
51   */
52  @InterfaceAudience.Private
53  public class TableAuthManager implements Closeable {
54    private static class PermissionCache<T extends Permission> {
55      /** Cache of user permissions */
56      private ListMultimap<String,T> userCache = ArrayListMultimap.create();
57      /** Cache of group permissions */
58      private ListMultimap<String,T> groupCache = ArrayListMultimap.create();
59  
60      public List<T> getUser(String user) {
61        return userCache.get(user);
62      }
63  
64      public void putUser(String user, T perm) {
65        userCache.put(user, perm);
66      }
67  
68      public List<T> replaceUser(String user, Iterable<? extends T> perms) {
69        return userCache.replaceValues(user, perms);
70      }
71  
72      public List<T> getGroup(String group) {
73        return groupCache.get(group);
74      }
75  
76      public void putGroup(String group, T perm) {
77        groupCache.put(group, perm);
78      }
79  
80      public List<T> replaceGroup(String group, Iterable<? extends T> perms) {
81        return groupCache.replaceValues(group, perms);
82      }
83  
84      /**
85       * Returns a combined map of user and group permissions, with group names prefixed by
86       * {@link AuthUtil#GROUP_PREFIX}.
87       */
88      public ListMultimap<String,T> getAllPermissions() {
89        ListMultimap<String,T> tmp = ArrayListMultimap.create();
90        tmp.putAll(userCache);
91        for (String group : groupCache.keySet()) {
92          tmp.putAll(AuthUtil.toGroupEntry(group), groupCache.get(group));
93        }
94        return tmp;
95      }
96    }
97  
98    private static final Log LOG = LogFactory.getLog(TableAuthManager.class);
99  
100   /** Cache of global permissions */
101   private volatile PermissionCache<Permission> globalCache;
102 
103   private ConcurrentSkipListMap<TableName, PermissionCache<TablePermission>> tableCache =
104       new ConcurrentSkipListMap<TableName, PermissionCache<TablePermission>>();
105 
106   private ConcurrentSkipListMap<String, PermissionCache<TablePermission>> nsCache =
107     new ConcurrentSkipListMap<String, PermissionCache<TablePermission>>();
108 
109   private Configuration conf;
110   private ZKPermissionWatcher zkperms;
111   private final AtomicLong mtime = new AtomicLong(0L);
112 
113   private TableAuthManager(ZooKeeperWatcher watcher, Configuration conf)
114       throws IOException {
115     this.conf = conf;
116 
117     // initialize global permissions based on configuration
118     globalCache = initGlobal(conf);
119 
120     this.zkperms = new ZKPermissionWatcher(watcher, this, conf);
121     try {
122       this.zkperms.start();
123     } catch (KeeperException ke) {
124       LOG.error("ZooKeeper initialization failed", ke);
125     }
126   }
127 
128   @Override
129   public void close() {
130     this.zkperms.close();
131   }
132 
133   /**
134    * Returns a new {@code PermissionCache} initialized with permission assignments
135    * from the {@code hbase.superuser} configuration key.
136    */
137   private PermissionCache<Permission> initGlobal(Configuration conf) throws IOException {
138     UserProvider userProvider = UserProvider.instantiate(conf);
139     User user = userProvider.getCurrent();
140     if (user == null) {
141       throw new IOException("Unable to obtain the current user, " +
142           "authorization checks for internal operations will not work correctly!");
143     }
144     PermissionCache<Permission> newCache = new PermissionCache<Permission>();
145     String currentUser = user.getShortName();
146 
147     // the system user is always included
148     List<String> superusers = Lists.asList(currentUser, conf.getStrings(
149         Superusers.SUPERUSER_CONF_KEY, new String[0]));
150     if (superusers != null) {
151       for (String name : superusers) {
152         if (AuthUtil.isGroupPrincipal(name)) {
153           newCache.putGroup(AuthUtil.getGroupName(name),
154               new Permission(Permission.Action.values()));
155         } else {
156           newCache.putUser(name, new Permission(Permission.Action.values()));
157         }
158       }
159     }
160     return newCache;
161   }
162 
163   public ZKPermissionWatcher getZKPermissionWatcher() {
164     return this.zkperms;
165   }
166 
167   public void refreshTableCacheFromWritable(TableName table,
168                                        byte[] data) throws IOException {
169     if (data != null && data.length > 0) {
170       ListMultimap<String,TablePermission> perms;
171       try {
172         perms = AccessControlLists.readPermissions(data, conf);
173       } catch (DeserializationException e) {
174         throw new IOException(e);
175       }
176 
177       if (perms != null) {
178         if (Bytes.equals(table.getName(), AccessControlLists.ACL_GLOBAL_NAME)) {
179           updateGlobalCache(perms);
180         } else {
181           updateTableCache(table, perms);
182         }
183       }
184     } else {
185       LOG.debug("Skipping permission cache refresh because writable data is empty");
186     }
187   }
188 
189   public void refreshNamespaceCacheFromWritable(String namespace, byte[] data) throws IOException {
190     if (data != null && data.length > 0) {
191       ListMultimap<String,TablePermission> perms;
192       try {
193         perms = AccessControlLists.readPermissions(data, conf);
194       } catch (DeserializationException e) {
195         throw new IOException(e);
196       }
197       if (perms != null) {
198         updateNsCache(namespace, perms);
199       }
200     } else {
201       LOG.debug("Skipping permission cache refresh because writable data is empty");
202     }
203   }
204 
205   /**
206    * Updates the internal global permissions cache
207    *
208    * @param userPerms
209    */
210   private void updateGlobalCache(ListMultimap<String,TablePermission> userPerms) {
211     PermissionCache<Permission> newCache = null;
212     try {
213       newCache = initGlobal(conf);
214       for (Map.Entry<String,TablePermission> entry : userPerms.entries()) {
215         if (AuthUtil.isGroupPrincipal(entry.getKey())) {
216           newCache.putGroup(AuthUtil.getGroupName(entry.getKey()),
217               new Permission(entry.getValue().getActions()));
218         } else {
219           newCache.putUser(entry.getKey(), new Permission(entry.getValue().getActions()));
220         }
221       }
222       globalCache = newCache;
223       mtime.incrementAndGet();
224     } catch (IOException e) {
225       // Never happens
226       LOG.error("Error occured while updating the global cache", e);
227     }
228   }
229 
230   /**
231    * Updates the internal permissions cache for a single table, splitting
232    * the permissions listed into separate caches for users and groups to optimize
233    * group lookups.
234    *
235    * @param table
236    * @param tablePerms
237    */
238   private void updateTableCache(TableName table,
239                                 ListMultimap<String,TablePermission> tablePerms) {
240     PermissionCache<TablePermission> newTablePerms = new PermissionCache<TablePermission>();
241 
242     for (Map.Entry<String,TablePermission> entry : tablePerms.entries()) {
243       if (AuthUtil.isGroupPrincipal(entry.getKey())) {
244         newTablePerms.putGroup(AuthUtil.getGroupName(entry.getKey()), entry.getValue());
245       } else {
246         newTablePerms.putUser(entry.getKey(), entry.getValue());
247       }
248     }
249 
250     tableCache.put(table, newTablePerms);
251     mtime.incrementAndGet();
252   }
253 
254   /**
255    * Updates the internal permissions cache for a single table, splitting
256    * the permissions listed into separate caches for users and groups to optimize
257    * group lookups.
258    *
259    * @param namespace
260    * @param tablePerms
261    */
262   private void updateNsCache(String namespace,
263                              ListMultimap<String, TablePermission> tablePerms) {
264     PermissionCache<TablePermission> newTablePerms = new PermissionCache<TablePermission>();
265 
266     for (Map.Entry<String, TablePermission> entry : tablePerms.entries()) {
267       if (AuthUtil.isGroupPrincipal(entry.getKey())) {
268         newTablePerms.putGroup(AuthUtil.getGroupName(entry.getKey()), entry.getValue());
269       } else {
270         newTablePerms.putUser(entry.getKey(), entry.getValue());
271       }
272     }
273 
274     nsCache.put(namespace, newTablePerms);
275     mtime.incrementAndGet();
276   }
277 
278   private PermissionCache<TablePermission> getTablePermissions(TableName table) {
279     if (!tableCache.containsKey(table)) {
280       tableCache.putIfAbsent(table, new PermissionCache<TablePermission>());
281     }
282     return tableCache.get(table);
283   }
284 
285   private PermissionCache<TablePermission> getNamespacePermissions(String namespace) {
286     if (!nsCache.containsKey(namespace)) {
287       nsCache.putIfAbsent(namespace, new PermissionCache<TablePermission>());
288     }
289     return nsCache.get(namespace);
290   }
291 
292   /**
293    * Authorizes a global permission
294    * @param perms
295    * @param action
296    * @return true if authorized, false otherwise
297    */
298   private boolean authorize(List<Permission> perms, Permission.Action action) {
299     if (perms != null) {
300       for (Permission p : perms) {
301         if (p.implies(action)) {
302           return true;
303         }
304       }
305     } else if (LOG.isDebugEnabled()) {
306       LOG.debug("No permissions found for " + action);
307     }
308 
309     return false;
310   }
311 
312   /**
313    * Authorize a global permission based on ACLs for the given user and the
314    * user's groups.
315    * @param user
316    * @param action
317    * @return true if known and authorized, false otherwise
318    */
319   public boolean authorize(User user, Permission.Action action) {
320     if (user == null) {
321       return false;
322     }
323 
324     if (authorize(globalCache.getUser(user.getShortName()), action)) {
325       return true;
326     }
327 
328     String[] groups = user.getGroupNames();
329     if (groups != null) {
330       for (String group : groups) {
331         if (authorize(globalCache.getGroup(group), action)) {
332           return true;
333         }
334       }
335     }
336     return false;
337   }
338 
339   private boolean authorize(List<TablePermission> perms,
340                             TableName table, byte[] family,
341                             Permission.Action action) {
342     return authorize(perms, table, family, null, action);
343   }
344 
345   private boolean authorize(List<TablePermission> perms,
346                             TableName table, byte[] family,
347                             byte[] qualifier, Permission.Action action) {
348     if (perms != null) {
349       for (TablePermission p : perms) {
350         if (p.implies(table, family, qualifier, action)) {
351           return true;
352         }
353       }
354     } else if (LOG.isDebugEnabled()) {
355       LOG.debug("No permissions found for table="+table);
356     }
357     return false;
358   }
359 
360   private boolean hasAccess(List<TablePermission> perms,
361                             TableName table, Permission.Action action) {
362     if (perms != null) {
363       for (TablePermission p : perms) {
364         if (p.implies(action)) {
365           return true;
366         }
367       }
368     } else if (LOG.isDebugEnabled()) {
369       LOG.debug("No permissions found for table="+table);
370     }
371     return false;
372   }
373 
374   /**
375    * Authorize a user for a given KV. This is called from AccessControlFilter.
376    */
377   public boolean authorize(User user, TableName table, Cell cell, Permission.Action action) {
378     try {
379       List<Permission> perms = AccessControlLists.getCellPermissionsForUser(user, cell);
380       if (LOG.isTraceEnabled()) {
381         LOG.trace("Perms for user " + user.getShortName() + " in cell " + cell + ": " +
382           (perms != null ? perms : ""));
383       }
384       if (perms != null) {
385         for (Permission p: perms) {
386           if (p.implies(action)) {
387             return true;
388           }
389         }
390       }
391     } catch (IOException e) {
392       // We failed to parse the KV tag
393       LOG.error("Failed parse of ACL tag in cell " + cell);
394       // Fall through to check with the table and CF perms we were able
395       // to collect regardless
396     }
397     return false;
398   }
399 
400   public boolean authorize(User user, String namespace, Permission.Action action) {
401     // Global authorizations supercede namespace level
402     if (authorize(user, action)) {
403       return true;
404     }
405     // Check namespace permissions
406     PermissionCache<TablePermission> tablePerms = nsCache.get(namespace);
407     if (tablePerms != null) {
408       List<TablePermission> userPerms = tablePerms.getUser(user.getShortName());
409       if (authorize(userPerms, namespace, action)) {
410         return true;
411       }
412       String[] groupNames = user.getGroupNames();
413       if (groupNames != null) {
414         for (String group : groupNames) {
415           List<TablePermission> groupPerms = tablePerms.getGroup(group);
416           if (authorize(groupPerms, namespace, action)) {
417             return true;
418           }
419         }
420       }
421     }
422     return false;
423   }
424 
425   private boolean authorize(List<TablePermission> perms, String namespace,
426                             Permission.Action action) {
427     if (perms != null) {
428       for (TablePermission p : perms) {
429         if (p.implies(namespace, action)) {
430           return true;
431         }
432       }
433     } else if (LOG.isDebugEnabled()) {
434       LOG.debug("No permissions for authorize() check, table=" + namespace);
435     }
436 
437     return false;
438   }
439 
440   /**
441    * Checks authorization to a given table and column family for a user, based on the
442    * stored user permissions.
443    *
444    * @param user
445    * @param table
446    * @param family
447    * @param action
448    * @return true if known and authorized, false otherwise
449    */
450   public boolean authorizeUser(User user, TableName table, byte[] family,
451       Permission.Action action) {
452     return authorizeUser(user, table, family, null, action);
453   }
454 
455   public boolean authorizeUser(User user, TableName table, byte[] family,
456       byte[] qualifier, Permission.Action action) {
457     if (table == null) table = AccessControlLists.ACL_TABLE_NAME;
458     // Global and namespace authorizations supercede table level
459     if (authorize(user, table.getNamespaceAsString(), action)) {
460       return true;
461     }
462     // Check table permissions
463     return authorize(getTablePermissions(table).getUser(user.getShortName()), table, family,
464         qualifier, action);
465   }
466 
467   /**
468    * Checks if the user has access to the full table or at least a family/qualifier
469    * for the specified action.
470    *
471    * @param user
472    * @param table
473    * @param action
474    * @return true if the user has access to the table, false otherwise
475    */
476   public boolean userHasAccess(User user, TableName table, Permission.Action action) {
477     if (table == null) table = AccessControlLists.ACL_TABLE_NAME;
478     // Global and namespace authorizations supercede table level
479     if (authorize(user, table.getNamespaceAsString(), action)) {
480       return true;
481     }
482     // Check table permissions
483     return hasAccess(getTablePermissions(table).getUser(user.getShortName()), table, action);
484   }
485 
486   /**
487    * Checks global authorization for a given action for a group, based on the stored
488    * permissions.
489    */
490   public boolean authorizeGroup(String groupName, Permission.Action action) {
491     List<Permission> perms = globalCache.getGroup(groupName);
492     if (LOG.isDebugEnabled()) {
493       LOG.debug("authorizing " + (perms != null && !perms.isEmpty() ? perms.get(0) : "") +
494         " for " + action);
495     }
496     return authorize(perms, action);
497   }
498 
499   /**
500    * Checks authorization to a given table, column family and column for a group, based
501    * on the stored permissions.
502    * @param groupName
503    * @param table
504    * @param family
505    * @param qualifier
506    * @param action
507    * @return true if known and authorized, false otherwise
508    */
509   public boolean authorizeGroup(String groupName, TableName table, byte[] family,
510       byte[] qualifier, Permission.Action action) {
511     // Global authorization supercedes table level
512     if (authorizeGroup(groupName, action)) {
513       return true;
514     }
515     if (table == null) table = AccessControlLists.ACL_TABLE_NAME;
516     // Namespace authorization supercedes table level
517     String namespace = table.getNamespaceAsString();
518     if (authorize(getNamespacePermissions(namespace).getGroup(groupName), namespace, action)) {
519       return true;
520     }
521     // Check table level
522     List<TablePermission> tblPerms = getTablePermissions(table).getGroup(groupName);
523     if (LOG.isDebugEnabled()) {
524       LOG.debug("authorizing " + (tblPerms != null && !tblPerms.isEmpty() ? tblPerms.get(0) : "") +
525         " for " +groupName + " on " + table + "." + Bytes.toString(family) + "." +
526         Bytes.toString(qualifier) + " with " + action);
527     }
528     return authorize(tblPerms, table, family, qualifier, action);
529   }
530 
531   /**
532    * Checks if the user has access to the full table or at least a family/qualifier
533    * for the specified action.
534    * @param groupName
535    * @param table
536    * @param action
537    * @return true if the group has access to the table, false otherwise
538    */
539   public boolean groupHasAccess(String groupName, TableName table, Permission.Action action) {
540     // Global authorization supercedes table level
541     if (authorizeGroup(groupName, action)) {
542       return true;
543     }
544     if (table == null) table = AccessControlLists.ACL_TABLE_NAME;
545     // Namespace authorization supercedes table level
546     if (hasAccess(getNamespacePermissions(table.getNamespaceAsString()).getGroup(groupName),
547         table, action)) {
548       return true;
549     }
550     // Check table level
551     return hasAccess(getTablePermissions(table).getGroup(groupName), table, action);
552   }
553 
554   public boolean authorize(User user, TableName table, byte[] family,
555       byte[] qualifier, Permission.Action action) {
556     if (authorizeUser(user, table, family, qualifier, action)) {
557       return true;
558     }
559 
560     String[] groups = user.getGroupNames();
561     if (groups != null) {
562       for (String group : groups) {
563         if (authorizeGroup(group, table, family, qualifier, action)) {
564           return true;
565         }
566       }
567     }
568     return false;
569   }
570 
571   public boolean hasAccess(User user, TableName table, Permission.Action action) {
572     if (userHasAccess(user, table, action)) {
573       return true;
574     }
575 
576     String[] groups = user.getGroupNames();
577     if (groups != null) {
578       for (String group : groups) {
579         if (groupHasAccess(group, table, action)) {
580           return true;
581         }
582       }
583     }
584     return false;
585   }
586 
587   public boolean authorize(User user, TableName table, byte[] family,
588       Permission.Action action) {
589     return authorize(user, table, family, null, action);
590   }
591 
592   /**
593    * Returns true if the given user has a {@link TablePermission} matching up
594    * to the column family portion of a permission.  Note that this permission
595    * may be scoped to a given column qualifier and does not guarantee that
596    * authorize() on the same column family would return true.
597    */
598   public boolean matchPermission(User user,
599       TableName table, byte[] family, Permission.Action action) {
600     PermissionCache<TablePermission> tablePerms = tableCache.get(table);
601     if (tablePerms != null) {
602       List<TablePermission> userPerms = tablePerms.getUser(user.getShortName());
603       if (userPerms != null) {
604         for (TablePermission p : userPerms) {
605           if (p.matchesFamily(table, family, action)) {
606             return true;
607           }
608         }
609       }
610 
611       String[] groups = user.getGroupNames();
612       if (groups != null) {
613         for (String group : groups) {
614           List<TablePermission> groupPerms = tablePerms.getGroup(group);
615           if (groupPerms != null) {
616             for (TablePermission p : groupPerms) {
617               if (p.matchesFamily(table, family, action)) {
618                 return true;
619               }
620             }
621           }
622         }
623       }
624     }
625 
626     return false;
627   }
628 
629   public boolean matchPermission(User user,
630       TableName table, byte[] family, byte[] qualifier,
631       Permission.Action action) {
632     PermissionCache<TablePermission> tablePerms = tableCache.get(table);
633     if (tablePerms != null) {
634       List<TablePermission> userPerms = tablePerms.getUser(user.getShortName());
635       if (userPerms != null) {
636         for (TablePermission p : userPerms) {
637           if (p.matchesFamilyQualifier(table, family, qualifier, action)) {
638             return true;
639           }
640         }
641       }
642 
643       String[] groups = user.getGroupNames();
644       if (groups != null) {
645         for (String group : groups) {
646           List<TablePermission> groupPerms = tablePerms.getGroup(group);
647           if (groupPerms != null) {
648             for (TablePermission p : groupPerms) {
649               if (p.matchesFamilyQualifier(table, family, qualifier, action)) {
650                 return true;
651               }
652             }
653           }
654         }
655       }
656     }
657     return false;
658   }
659 
660   public void removeNamespace(byte[] ns) {
661     nsCache.remove(Bytes.toString(ns));
662   }
663 
664   public void removeTable(TableName table) {
665     tableCache.remove(table);
666   }
667 
668   /**
669    * Overwrites the existing permission set for a given user for a table, and
670    * triggers an update for zookeeper synchronization.
671    * @param username
672    * @param table
673    * @param perms
674    */
675   public void setTableUserPermissions(String username, TableName table,
676       List<TablePermission> perms) {
677     PermissionCache<TablePermission> tablePerms = getTablePermissions(table);
678     tablePerms.replaceUser(username, perms);
679     writeTableToZooKeeper(table, tablePerms);
680   }
681 
682   /**
683    * Overwrites the existing permission set for a group and triggers an update
684    * for zookeeper synchronization.
685    * @param group
686    * @param table
687    * @param perms
688    */
689   public void setTableGroupPermissions(String group, TableName table,
690       List<TablePermission> perms) {
691     PermissionCache<TablePermission> tablePerms = getTablePermissions(table);
692     tablePerms.replaceGroup(group, perms);
693     writeTableToZooKeeper(table, tablePerms);
694   }
695 
696   /**
697    * Overwrites the existing permission set for a given user for a table, and
698    * triggers an update for zookeeper synchronization.
699    * @param username
700    * @param namespace
701    * @param perms
702    */
703   public void setNamespaceUserPermissions(String username, String namespace,
704       List<TablePermission> perms) {
705     PermissionCache<TablePermission> tablePerms = getNamespacePermissions(namespace);
706     tablePerms.replaceUser(username, perms);
707     writeNamespaceToZooKeeper(namespace, tablePerms);
708   }
709 
710   /**
711    * Overwrites the existing permission set for a group and triggers an update
712    * for zookeeper synchronization.
713    * @param group
714    * @param namespace
715    * @param perms
716    */
717   public void setNamespaceGroupPermissions(String group, String namespace,
718       List<TablePermission> perms) {
719     PermissionCache<TablePermission> tablePerms = getNamespacePermissions(namespace);
720     tablePerms.replaceGroup(group, perms);
721     writeNamespaceToZooKeeper(namespace, tablePerms);
722   }
723 
724   public void writeTableToZooKeeper(TableName table,
725       PermissionCache<TablePermission> tablePerms) {
726     byte[] serialized = new byte[0];
727     if (tablePerms != null) {
728       serialized = AccessControlLists.writePermissionsAsBytes(tablePerms.getAllPermissions(), conf);
729     }
730     zkperms.writeToZookeeper(table.getName(), serialized);
731   }
732 
733   public void writeNamespaceToZooKeeper(String namespace,
734       PermissionCache<TablePermission> tablePerms) {
735     byte[] serialized = new byte[0];
736     if (tablePerms != null) {
737       serialized = AccessControlLists.writePermissionsAsBytes(tablePerms.getAllPermissions(), conf);
738     }
739     zkperms.writeToZookeeper(Bytes.toBytes(AccessControlLists.toNamespaceEntry(namespace)),
740         serialized);
741   }
742 
743   public long getMTime() {
744     return mtime.get();
745   }
746 
747   private static Map<ZooKeeperWatcher,TableAuthManager> managerMap =
748     new HashMap<ZooKeeperWatcher,TableAuthManager>();
749 
750   private static Map<TableAuthManager, Integer> refCount = new HashMap<>();
751 
752   /** Returns a TableAuthManager from the cache. If not cached, constructs a new one. Returned
753    * instance should be released back by calling {@link #release(TableAuthManager)}. */
754   public synchronized static TableAuthManager getOrCreate(
755       ZooKeeperWatcher watcher, Configuration conf) throws IOException {
756     TableAuthManager instance = managerMap.get(watcher);
757     if (instance == null) {
758       instance = new TableAuthManager(watcher, conf);
759       managerMap.put(watcher, instance);
760     }
761     int ref = refCount.get(instance) == null ? 0 : refCount.get(instance).intValue();
762     refCount.put(instance, ref + 1);
763     return instance;
764   }
765 
766   @VisibleForTesting
767   static int getTotalRefCount() {
768     int total = 0;
769     for (int count : refCount.values()) {
770       total += count;
771     }
772     return total;
773   }
774 
775   /**
776    * Releases the resources for the given TableAuthManager if the reference count is down to 0.
777    * @param instance TableAuthManager to be released
778    */
779   public synchronized static void release(TableAuthManager instance) {
780     if (refCount.get(instance) == null || refCount.get(instance) < 1) {
781       String msg = "Something wrong with the TableAuthManager reference counting: " + instance
782           + " whose count is " + refCount.get(instance);
783       LOG.fatal(msg);
784       instance.close();
785       managerMap.remove(instance.getZKPermissionWatcher().getWatcher());
786       instance.getZKPermissionWatcher().getWatcher().abort(msg, null);
787     } else {
788       int ref = refCount.get(instance);
789       refCount.put(instance, ref-1);
790       if (ref-1 == 0) {
791         instance.close();
792         managerMap.remove(instance.getZKPermissionWatcher().getWatcher());
793         refCount.remove(instance);
794       }
795     }
796   }
797 }