View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.client.replication;
20  
21  import java.io.Closeable;
22  import java.io.IOException;
23  import java.util.ArrayList;
24  import java.util.Collection;
25  import java.util.HashMap;
26  import java.util.HashSet;
27  import java.util.List;
28  import java.util.Map;
29  import java.util.Map.Entry;
30  import java.util.Set;
31  
32  import org.apache.commons.lang.StringUtils;
33  import org.apache.commons.logging.Log;
34  import org.apache.commons.logging.LogFactory;
35  import org.apache.hadoop.conf.Configuration;
36  import org.apache.hadoop.hbase.Abortable;
37  import org.apache.hadoop.hbase.HColumnDescriptor;
38  import org.apache.hadoop.hbase.HConstants;
39  import org.apache.hadoop.hbase.HTableDescriptor;
40  import org.apache.hadoop.hbase.TableName;
41  import org.apache.hadoop.hbase.TableNotFoundException;
42  import org.apache.hadoop.hbase.classification.InterfaceAudience;
43  import org.apache.hadoop.hbase.classification.InterfaceStability;
44  import org.apache.hadoop.hbase.client.Admin;
45  import org.apache.hadoop.hbase.client.Connection;
46  import org.apache.hadoop.hbase.client.ConnectionFactory;
47  import org.apache.hadoop.hbase.client.RegionLocator;
48  import org.apache.hadoop.hbase.replication.ReplicationException;
49  import org.apache.hadoop.hbase.replication.ReplicationFactory;
50  import org.apache.hadoop.hbase.replication.ReplicationPeer;
51  import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
52  import org.apache.hadoop.hbase.replication.ReplicationPeerZKImpl;
53  import org.apache.hadoop.hbase.replication.ReplicationPeers;
54  import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
55  import org.apache.hadoop.hbase.util.Pair;
56  import org.apache.hadoop.hbase.zookeeper.ZKUtil;
57  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
58  import org.apache.zookeeper.KeeperException;
59  import org.apache.zookeeper.data.Stat;
60  
61  import com.google.common.annotations.VisibleForTesting;
62  import com.google.common.collect.Lists;
63  
64  /**
65   * <p>
66   * This class provides the administrative interface to HBase cluster
67   * replication. In order to use it, the cluster and the client using
68   * ReplicationAdmin must be configured with <code>hbase.replication</code>
69   * set to true.
70   * </p>
71   * <p>
72   * Adding a new peer results in creating new outbound connections from every
73   * region server to a subset of region servers on the slave cluster. Each
74   * new stream of replication will start replicating from the beginning of the
75   * current WAL, meaning that edits from that past will be replicated.
76   * </p>
77   * <p>
78   * Removing a peer is a destructive and irreversible operation that stops
79   * all the replication streams for the given cluster and deletes the metadata
80   * used to keep track of the replication state.
81   * </p>
82   * <p>
83   * To see which commands are available in the shell, type
84   * <code>replication</code>.
85   * </p>
86   */
87  @InterfaceAudience.Public
88  @InterfaceStability.Evolving
89  public class ReplicationAdmin implements Closeable {
90    private static final Log LOG = LogFactory.getLog(ReplicationAdmin.class);
91  
92    public static final String TNAME = "tableName";
93    public static final String CFNAME = "columnFamilyName";
94  
95    // only Global for now, can add other type
96    // such as, 1) no global replication, or 2) the table is replicated to this cluster, etc.
97    public static final String REPLICATIONTYPE = "replicationType";
98    public static final String REPLICATIONGLOBAL = Integer
99        .toString(HConstants.REPLICATION_SCOPE_GLOBAL);
100 
101   private final Connection connection;
102   // TODO: replication should be managed by master. All the classes except ReplicationAdmin should
103   // be moved to hbase-server. Resolve it in HBASE-11392.
104   private final ReplicationQueuesClient replicationQueuesClient;
105   private final ReplicationPeers replicationPeers;
106   /**
107    * A watcher used by replicationPeers and replicationQueuesClient. Keep reference so can dispose
108    * on {@link #close()}.
109    */
110   private final ZooKeeperWatcher zkw;
111 
112   /**
113    * Constructor that creates a connection to the local ZooKeeper ensemble.
114    * @param conf Configuration to use
115    * @throws IOException if an internal replication error occurs
116    * @throws RuntimeException if replication isn't enabled.
117    */
118   public ReplicationAdmin(Configuration conf) throws IOException {
119     if (!conf.getBoolean(HConstants.REPLICATION_ENABLE_KEY,
120         HConstants.REPLICATION_ENABLE_DEFAULT)) {
121       throw new RuntimeException("hbase.replication isn't true, please " +
122           "enable it in order to use replication");
123     }
124     this.connection = ConnectionFactory.createConnection(conf);
125     try {
126       zkw = createZooKeeperWatcher();
127       try {
128         this.replicationQueuesClient =
129             ReplicationFactory.getReplicationQueuesClient(zkw, conf, this.connection);
130         this.replicationQueuesClient.init();
131         this.replicationPeers = ReplicationFactory.getReplicationPeers(zkw, conf,
132           this.replicationQueuesClient, this.connection);
133         this.replicationPeers.init();
134       } catch (Exception exception) {
135         if (zkw != null) {
136           zkw.close();
137         }
138         throw exception;
139       }
140     } catch (Exception exception) {
141       if (connection != null) {
142         connection.close();
143       }
144       if (exception instanceof IOException) {
145         throw (IOException) exception;
146       } else if (exception instanceof RuntimeException) {
147         throw (RuntimeException) exception;
148       } else {
149         throw new IOException("Error initializing the replication admin client.", exception);
150       }
151     }
152   }
153 
154   private ZooKeeperWatcher createZooKeeperWatcher() throws IOException {
155     // This Abortable doesn't 'abort'... it just logs.
156     return new ZooKeeperWatcher(connection.getConfiguration(), "ReplicationAdmin", new Abortable() {
157       @Override
158       public void abort(String why, Throwable e) {
159         LOG.error(why, e);
160         // We used to call system.exit here but this script can be embedded by other programs that
161         // want to do replication stuff... so inappropriate calling System.exit. Just log for now.
162       }
163 
164       @Override
165       public boolean isAborted() {
166         return false;
167       }
168     });
169   }
170 
171   /**
172    * Add a new peer cluster to replicate to.
173    * @param id a short name that identifies the cluster
174    * @param clusterKey the concatenation of the slave cluster's
175    * <code>hbase.zookeeper.quorum:hbase.zookeeper.property.clientPort:zookeeper.znode.parent</code>
176    * @throws IllegalStateException if there's already one slave since
177    * multi-slave isn't supported yet.
178    * @deprecated Use addPeer(String, ReplicationPeerConfig, Map) instead.
179    */
180   @Deprecated
181   public void addPeer(String id, String clusterKey) throws ReplicationException {
182     this.addPeer(id, new ReplicationPeerConfig().setClusterKey(clusterKey), null);
183   }
184 
185   @Deprecated
186   public void addPeer(String id, String clusterKey, String tableCFs)
187     throws ReplicationException {
188     this.replicationPeers.addPeer(id,
189       new ReplicationPeerConfig().setClusterKey(clusterKey), tableCFs);
190   }
191   
192   /**
193    * Add a new remote slave cluster for replication.
194    * @param id a short name that identifies the cluster
195    * @param peerConfig configuration for the replication slave cluster
196    * @param tableCfs the table and column-family list which will be replicated for this peer.
197    * A map from tableName to column family names. An empty collection can be passed
198    * to indicate replicating all column families. Pass null for replicating all table and column
199    * families
200    */
201   public void addPeer(String id, ReplicationPeerConfig peerConfig,
202       Map<TableName, ? extends Collection<String>> tableCfs) throws ReplicationException {
203     this.replicationPeers.addPeer(id, peerConfig, getTableCfsStr(tableCfs));
204   }
205 
206   public static Map<TableName, List<String>> parseTableCFsFromConfig(String tableCFsConfig) {
207     if (tableCFsConfig == null || tableCFsConfig.trim().length() == 0) {
208       return null;
209     }
210 
211     Map<TableName, List<String>> tableCFsMap = null;
212     // TODO: This should be a PB object rather than a String to be parsed!! See HBASE-11393
213     // parse out (table, cf-list) pairs from tableCFsConfig
214     // format: "table1:cf1,cf2;table2:cfA,cfB"
215     String[] tables = tableCFsConfig.split(";");
216     for (String tab : tables) {
217       // 1 ignore empty table config
218       tab = tab.trim();
219       if (tab.length() == 0) {
220         continue;
221       }
222       // 2 split to "table" and "cf1,cf2"
223       //   for each table: "table:cf1,cf2" or "table"
224       String[] pair = tab.split(":");
225       String tabName = pair[0].trim();
226       if (pair.length > 2 || tabName.length() == 0) {
227         LOG.error("ignore invalid tableCFs setting: " + tab);
228         continue;
229       }
230 
231       // 3 parse "cf1,cf2" part to List<cf>
232       List<String> cfs = null;
233       if (pair.length == 2) {
234         String[] cfsList = pair[1].split(",");
235         for (String cf : cfsList) {
236           String cfName = cf.trim();
237           if (cfName.length() > 0) {
238             if (cfs == null) {
239               cfs = new ArrayList<String>();
240             }
241             cfs.add(cfName);
242           }
243         }
244       }
245 
246       // 4 put <table, List<cf>> to map
247       if (tableCFsMap == null) {
248         tableCFsMap = new HashMap<TableName, List<String>>();
249       }
250       tableCFsMap.put(TableName.valueOf(tabName), cfs);
251     }
252     return tableCFsMap;
253   }
254 
255   @VisibleForTesting
256   static String getTableCfsStr(Map<TableName, ? extends Collection<String>> tableCfs) {
257     String tableCfsStr = null;
258     if (tableCfs != null) {
259       // Format: table1:cf1,cf2;table2:cfA,cfB;table3
260       StringBuilder builder = new StringBuilder();
261       for (Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) {
262         if (builder.length() > 0) {
263           builder.append(";");
264         }
265         builder.append(entry.getKey());
266         if (entry.getValue() != null && !entry.getValue().isEmpty()) {
267           builder.append(":");
268           builder.append(StringUtils.join(entry.getValue(), ","));
269         }
270       }
271       tableCfsStr = builder.toString();
272     }
273     return tableCfsStr;
274   }
275 
276   /**
277    * Removes a peer cluster and stops the replication to it.
278    * @param id a short name that identifies the cluster
279    */
280   public void removePeer(String id) throws ReplicationException {
281     this.replicationPeers.removePeer(id);
282   }
283 
284   /**
285    * Restart the replication stream to the specified peer.
286    * @param id a short name that identifies the cluster
287    */
288   public void enablePeer(String id) throws ReplicationException {
289     this.replicationPeers.enablePeer(id);
290   }
291 
292   /**
293    * Stop the replication stream to the specified peer.
294    * @param id a short name that identifies the cluster
295    */
296   public void disablePeer(String id) throws ReplicationException {
297     this.replicationPeers.disablePeer(id);
298   }
299 
300   /**
301    * Get the number of slave clusters the local cluster has.
302    * @return number of slave clusters
303    */
304   public int getPeersCount() {
305     return this.replicationPeers.getAllPeerIds().size();
306   }
307 
308   /**
309    * Map of this cluster's peers for display.
310    * @return A map of peer ids to peer cluster keys
311    * @deprecated use {@link #listPeerConfigs()}
312    */
313   @Deprecated
314   public Map<String, String> listPeers() {
315     Map<String, ReplicationPeerConfig> peers = this.listPeerConfigs();
316     Map<String, String> ret = new HashMap<String, String>(peers.size());
317 
318     for (Map.Entry<String, ReplicationPeerConfig> entry : peers.entrySet()) {
319       ret.put(entry.getKey(), entry.getValue().getClusterKey());
320     }
321     return ret;
322   }
323 
324   public Map<String, ReplicationPeerConfig> listPeerConfigs() {
325     return this.replicationPeers.getAllPeerConfigs();
326   }
327 
328   public ReplicationPeerConfig getPeerConfig(String id) throws ReplicationException {
329     return this.replicationPeers.getReplicationPeerConfig(id);
330   }
331 
332   /**
333    * Get the replicable table-cf config of the specified peer.
334    * @param id a short name that identifies the cluster
335    */
336   public String getPeerTableCFs(String id) throws ReplicationException {
337     return this.replicationPeers.getPeerTableCFsConfig(id);
338   }
339 
340   /**
341    * Set the replicable table-cf config of the specified peer
342    * @param id a short name that identifies the cluster
343    * @deprecated use {@link #setPeerTableCFs(String, Map)}
344    */
345   @Deprecated
346   public void setPeerTableCFs(String id, String tableCFs) throws ReplicationException {
347     this.replicationPeers.setPeerTableCFsConfig(id, tableCFs);
348   }
349 
350   /**
351    * Append the replicable table-cf config of the specified peer
352    * @param id a short that identifies the cluster
353    * @param tableCfs table-cfs config str
354    * @throws ReplicationException
355    */
356   public void appendPeerTableCFs(String id, String tableCfs) throws ReplicationException {
357     appendPeerTableCFs(id, parseTableCFsFromConfig(tableCfs));
358   }
359 
360   /**
361    * Append the replicable table-cf config of the specified peer
362    * @param id a short that identifies the cluster
363    * @param tableCfs A map from tableName to column family names
364    * @throws ReplicationException
365    */
366   public void appendPeerTableCFs(String id, Map<TableName, ? extends Collection<String>> tableCfs)
367       throws ReplicationException {
368     if (tableCfs == null) {
369       throw new ReplicationException("tableCfs is null");
370     }
371     Map<TableName, List<String>> preTableCfs = parseTableCFsFromConfig(getPeerTableCFs(id));
372     if (preTableCfs == null) {
373       setPeerTableCFs(id, tableCfs);
374       return;
375     }
376 
377     for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) {
378       TableName table = entry.getKey();
379       Collection<String> appendCfs = entry.getValue();
380       if (preTableCfs.containsKey(table)) {
381         List<String> cfs = preTableCfs.get(table);
382         if (cfs == null || appendCfs == null) {
383           preTableCfs.put(table, null);
384         } else {
385           Set<String> cfSet = new HashSet<String>(cfs);
386           cfSet.addAll(appendCfs);
387           preTableCfs.put(table, Lists.newArrayList(cfSet));
388         }
389       } else {
390         if (appendCfs == null || appendCfs.isEmpty()) {
391           preTableCfs.put(table, null);
392         } else {
393           preTableCfs.put(table, Lists.newArrayList(appendCfs));
394         }
395       }
396     }
397     setPeerTableCFs(id, preTableCfs);
398   }
399 
400   /**
401    * Remove some table-cfs from table-cfs config of the specified peer
402    * @param id a short name that identifies the cluster
403    * @param tableCf table-cfs config str
404    * @throws ReplicationException
405    */
406   public void removePeerTableCFs(String id, String tableCf) throws ReplicationException {
407     removePeerTableCFs(id, parseTableCFsFromConfig(tableCf));
408   }
409 
410   /**
411    * Remove some table-cfs from config of the specified peer
412    * @param id a short name that identifies the cluster
413    * @param tableCfs A map from tableName to column family names
414    * @throws ReplicationException
415    */
416   public void removePeerTableCFs(String id, Map<TableName, ? extends Collection<String>> tableCfs)
417       throws ReplicationException {
418     if (tableCfs == null) {
419       throw new ReplicationException("tableCfs is null");
420     }
421 
422     Map<TableName, List<String>> preTableCfs = parseTableCFsFromConfig(getPeerTableCFs(id));
423     if (preTableCfs == null) {
424       throw new ReplicationException("Table-Cfs for peer" + id + " is null");
425     }
426     for (Map.Entry<TableName, ? extends Collection<String>> entry: tableCfs.entrySet()) {
427       TableName table = entry.getKey();
428       Collection<String> removeCfs = entry.getValue();
429       if (preTableCfs.containsKey(table)) {
430         List<String> cfs = preTableCfs.get(table);
431         if (cfs == null && removeCfs == null) {
432           preTableCfs.remove(table);
433         } else if (cfs != null && removeCfs != null) {
434           Set<String> cfSet = new HashSet<String>(cfs);
435           cfSet.removeAll(removeCfs);
436           if (cfSet.isEmpty()) {
437             preTableCfs.remove(table);
438           } else {
439             preTableCfs.put(table, Lists.newArrayList(cfSet));
440           }
441         } else if (cfs == null && removeCfs != null) {
442           throw new ReplicationException("Cannot remove cf of table: " + table
443               + " which doesn't specify cfs from table-cfs config in peer: " + id);
444         } else if (cfs != null && removeCfs == null) {
445           throw new ReplicationException("Cannot remove table: " + table
446               + " which has specified cfs from table-cfs config in peer: " + id);
447         }
448       } else {
449         throw new ReplicationException("No table: " + table + " in table-cfs config of peer: " + id);
450       }
451     }
452     setPeerTableCFs(id, preTableCfs);
453   }
454 
455   /**
456    * Set the replicable table-cf config of the specified peer
457    * @param id a short name that identifies the cluster
458    * @param tableCfs the table and column-family list which will be replicated for this peer.
459    * A map from tableName to column family names. An empty collection can be passed
460    * to indicate replicating all column families. Pass null for replicating all table and column
461    * families
462    */
463   public void setPeerTableCFs(String id, Map<TableName, ? extends Collection<String>> tableCfs)
464       throws ReplicationException {
465     this.replicationPeers.setPeerTableCFsConfig(id, getTableCfsStr(tableCfs));
466   }
467 
468   /**
469    * Get the state of the specified peer cluster
470    * @param id String format of the Short name that identifies the peer,
471    * an IllegalArgumentException is thrown if it doesn't exist
472    * @return true if replication is enabled to that peer, false if it isn't
473    */
474   public boolean getPeerState(String id) throws ReplicationException {
475     return this.replicationPeers.getStatusOfPeerFromBackingStore(id);
476   }
477 
478   @Override
479   public void close() throws IOException {
480     if (this.zkw != null) {
481       this.zkw.close();
482     }
483     if (this.connection != null) {
484       this.connection.close();
485     }
486   }
487 
488 
489   /**
490    * Find all column families that are replicated from this cluster
491    * @return the full list of the replicated column families of this cluster as:
492    *        tableName, family name, replicationType
493    *
494    * Currently replicationType is Global. In the future, more replication
495    * types may be extended here. For example
496    *  1) the replication may only apply to selected peers instead of all peers
497    *  2) the replicationType may indicate the host Cluster servers as Slave
498    *     for the table:columnFam.
499    */
500   public List<HashMap<String, String>> listReplicated() throws IOException {
501     List<HashMap<String, String>> replicationColFams = new ArrayList<HashMap<String, String>>();
502 
503     Admin admin = connection.getAdmin();
504     HTableDescriptor[] tables;
505     try {
506       tables = admin.listTables();
507     } finally {
508       if (admin!= null) admin.close();
509     }
510 
511     for (HTableDescriptor table : tables) {
512       HColumnDescriptor[] columns = table.getColumnFamilies();
513       String tableName = table.getNameAsString();
514       for (HColumnDescriptor column : columns) {
515         if (column.getScope() != HConstants.REPLICATION_SCOPE_LOCAL) {
516           // At this moment, the columfam is replicated to all peers
517           HashMap<String, String> replicationEntry = new HashMap<String, String>();
518           replicationEntry.put(TNAME, tableName);
519           replicationEntry.put(CFNAME, column.getNameAsString());
520           replicationEntry.put(REPLICATIONTYPE, REPLICATIONGLOBAL);
521           replicationColFams.add(replicationEntry);
522         }
523       }
524     }
525 
526     return replicationColFams;
527   }
528 
529   /**
530    * Enable a table's replication switch.
531    * @param tableName name of the table
532    * @throws IOException if a remote or network exception occurs
533    */
534   public void enableTableRep(final TableName tableName) throws IOException {
535     if (tableName == null) {
536       throw new IllegalArgumentException("Table name cannot be null");
537     }
538     try (Admin admin = this.connection.getAdmin()) {
539       if (!admin.tableExists(tableName)) {
540         throw new TableNotFoundException("Table '" + tableName.getNameAsString()
541             + "' does not exists.");
542       }
543     }
544     byte[][] splits = getTableSplitRowKeys(tableName);
545     checkAndSyncTableDescToPeers(tableName, splits);
546     setTableRep(tableName, true);
547   }
548 
549   /**
550    * Disable a table's replication switch.
551    * @param tableName name of the table
552    * @throws IOException if a remote or network exception occurs
553    */
554   public void disableTableRep(final TableName tableName) throws IOException {
555     if (tableName == null) {
556       throw new IllegalArgumentException("Table name is null");
557     }
558     try (Admin admin = this.connection.getAdmin()) {
559       if (!admin.tableExists(tableName)) {
560         throw new TableNotFoundException("Table '" + tableName.getNamespaceAsString()
561             + "' does not exists.");
562       }
563     }
564     setTableRep(tableName, false);
565   }
566 
567   /**
568    * Get the split row keys of table
569    * @param tableName table name
570    * @return array of split row keys
571    * @throws IOException
572    */
573   private byte[][] getTableSplitRowKeys(TableName tableName) throws IOException {
574     try (RegionLocator locator = connection.getRegionLocator(tableName);) {
575       byte[][] startKeys = locator.getStartKeys();
576       if (startKeys.length == 1) {
577         return null;
578       }
579       byte[][] splits = new byte[startKeys.length - 1][];
580       for (int i = 1; i < startKeys.length; i++) {
581         splits[i - 1] = startKeys[i];
582       }
583       return splits;
584     }
585   }
586 
587   /**
588    * Connect to peer and check the table descriptor on peer:
589    * <ol>
590    * <li>Create the same table on peer when not exist.</li>
591    * <li>Throw exception if the table exists on peer cluster but descriptors are not same.</li>
592    * </ol>
593    * @param tableName name of the table to sync to the peer
594    * @param splits table split keys
595    * @throws IOException
596    */
597   private void checkAndSyncTableDescToPeers(final TableName tableName, final byte[][] splits)
598       throws IOException {
599     List<ReplicationPeer> repPeers = listValidReplicationPeers();
600     if (repPeers == null || repPeers.size() <= 0) {
601       throw new IllegalArgumentException("Found no peer cluster for replication.");
602     }
603     for (ReplicationPeer repPeer : repPeers) {
604       Configuration peerConf = repPeer.getConfiguration();
605       HTableDescriptor htd = null;
606       try (Connection conn = ConnectionFactory.createConnection(peerConf);
607           Admin admin = this.connection.getAdmin();
608           Admin repHBaseAdmin = conn.getAdmin()) {
609         htd = admin.getTableDescriptor(tableName);
610         HTableDescriptor peerHtd = null;
611         if (!repHBaseAdmin.tableExists(tableName)) {
612           repHBaseAdmin.createTable(htd, splits);
613         } else {
614           peerHtd = repHBaseAdmin.getTableDescriptor(tableName);
615           if (peerHtd == null) {
616             throw new IllegalArgumentException("Failed to get table descriptor for table "
617                 + tableName.getNameAsString() + " from peer cluster " + repPeer.getId());
618           } else if (!peerHtd.equals(htd)) {
619             throw new IllegalArgumentException("Table " + tableName.getNameAsString()
620                 + " exists in peer cluster " + repPeer.getId()
621                 + ", but the table descriptors are not same when comapred with source cluster."
622                 + " Thus can not enable the table's replication switch.");
623           }
624         }
625       }
626     }
627   }
628 
629   private List<ReplicationPeer> listValidReplicationPeers() {
630     Map<String, ReplicationPeerConfig> peers = listPeerConfigs();
631     if (peers == null || peers.size() <= 0) {
632       return null;
633     }
634     List<ReplicationPeer> validPeers = new ArrayList<ReplicationPeer>(peers.size());
635     for (Entry<String, ReplicationPeerConfig> peerEntry : peers.entrySet()) {
636       String peerId = peerEntry.getKey();
637       String clusterKey = peerEntry.getValue().getClusterKey();
638       Configuration peerConf = new Configuration(this.connection.getConfiguration());
639       Stat s = null;
640       try {
641         ZKUtil.applyClusterKeyToConf(peerConf, clusterKey);
642         Pair<ReplicationPeerConfig, Configuration> pair = this.replicationPeers.getPeerConf(peerId);
643         ReplicationPeer peer = new ReplicationPeerZKImpl(peerConf, peerId, pair.getFirst());
644         s =
645             zkw.getRecoverableZooKeeper().exists(peerConf.get(HConstants.ZOOKEEPER_ZNODE_PARENT),
646               null);
647         if (null == s) {
648           LOG.info(peerId + ' ' + clusterKey + " is invalid now.");
649           continue;
650         }
651         validPeers.add(peer);
652       } catch (ReplicationException e) {
653         LOG.warn("Failed to get valid replication peers. "
654             + "Error connecting to peer cluster with peerId=" + peerId);
655         LOG.debug("Failure details to get valid replication peers.", e);
656         continue;
657       } catch (KeeperException e) {
658         LOG.warn("Failed to get valid replication peers. KeeperException code="
659             + e.code().intValue());
660         LOG.debug("Failure details to get valid replication peers.", e);
661         continue;
662       } catch (InterruptedException e) {
663         LOG.warn("Failed to get valid replication peers due to InterruptedException.");
664         LOG.debug("Failure details to get valid replication peers.", e);
665         Thread.currentThread().interrupt();
666         continue;
667       } catch (IOException e) {
668         LOG.warn("Failed to get valid replication peers due to IOException.");
669         LOG.debug("Failure details to get valid replication peers.", e);
670         continue;
671       }
672     }
673     return validPeers;
674   }
675 
676   /**
677    * Set the table's replication switch if the table's replication switch is already not set.
678    * @param tableName name of the table
679    * @param isRepEnabled is replication switch enable or disable
680    * @throws IOException if a remote or network exception occurs
681    */
682   private void setTableRep(final TableName tableName, boolean isRepEnabled) throws IOException {
683     Admin admin = null;
684     try {
685       admin = this.connection.getAdmin();
686       HTableDescriptor htd = admin.getTableDescriptor(tableName);
687       if (isTableRepEnabled(htd) ^ isRepEnabled) {
688         boolean isOnlineSchemaUpdateEnabled =
689             this.connection.getConfiguration()
690                 .getBoolean("hbase.online.schema.update.enable", true);
691         if (!isOnlineSchemaUpdateEnabled) {
692           admin.disableTable(tableName);
693         }
694         for (HColumnDescriptor hcd : htd.getFamilies()) {
695           hcd.setScope(isRepEnabled ? HConstants.REPLICATION_SCOPE_GLOBAL
696               : HConstants.REPLICATION_SCOPE_LOCAL);
697         }
698         admin.modifyTable(tableName, htd);
699         if (!isOnlineSchemaUpdateEnabled) {
700           admin.enableTable(tableName);
701         }
702       }
703     } finally {
704       if (admin != null) {
705         try {
706           admin.close();
707         } catch (IOException e) {
708           LOG.warn("Failed to close admin connection.");
709           LOG.debug("Details on failure to close admin connection.", e);
710         }
711       }
712     }
713   }
714 
715   /**
716    * @param htd table descriptor details for the table to check
717    * @return true if table's replication switch is enabled
718    */
719   private boolean isTableRepEnabled(HTableDescriptor htd) {
720     for (HColumnDescriptor hcd : htd.getFamilies()) {
721       if (hcd.getScope() != HConstants.REPLICATION_SCOPE_GLOBAL) {
722         return false;
723       }
724     }
725     return true;
726   }
727 }