1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.client.replication;
20
21 import java.io.Closeable;
22 import java.io.IOException;
23 import java.util.ArrayList;
24 import java.util.Collection;
25 import java.util.HashMap;
26 import java.util.HashSet;
27 import java.util.List;
28 import java.util.Map;
29 import java.util.Map.Entry;
30 import java.util.Set;
31
32 import org.apache.commons.lang.StringUtils;
33 import org.apache.commons.logging.Log;
34 import org.apache.commons.logging.LogFactory;
35 import org.apache.hadoop.conf.Configuration;
36 import org.apache.hadoop.hbase.Abortable;
37 import org.apache.hadoop.hbase.HColumnDescriptor;
38 import org.apache.hadoop.hbase.HConstants;
39 import org.apache.hadoop.hbase.HTableDescriptor;
40 import org.apache.hadoop.hbase.TableName;
41 import org.apache.hadoop.hbase.TableNotFoundException;
42 import org.apache.hadoop.hbase.classification.InterfaceAudience;
43 import org.apache.hadoop.hbase.classification.InterfaceStability;
44 import org.apache.hadoop.hbase.client.Admin;
45 import org.apache.hadoop.hbase.client.Connection;
46 import org.apache.hadoop.hbase.client.ConnectionFactory;
47 import org.apache.hadoop.hbase.client.RegionLocator;
48 import org.apache.hadoop.hbase.replication.ReplicationException;
49 import org.apache.hadoop.hbase.replication.ReplicationFactory;
50 import org.apache.hadoop.hbase.replication.ReplicationPeer;
51 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
52 import org.apache.hadoop.hbase.replication.ReplicationPeerZKImpl;
53 import org.apache.hadoop.hbase.replication.ReplicationPeers;
54 import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
55 import org.apache.hadoop.hbase.util.Pair;
56 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
57 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
58 import org.apache.zookeeper.KeeperException;
59 import org.apache.zookeeper.data.Stat;
60
61 import com.google.common.annotations.VisibleForTesting;
62 import com.google.common.collect.Lists;
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87 @InterfaceAudience.Public
88 @InterfaceStability.Evolving
89 public class ReplicationAdmin implements Closeable {
90 private static final Log LOG = LogFactory.getLog(ReplicationAdmin.class);
91
92 public static final String TNAME = "tableName";
93 public static final String CFNAME = "columnFamilyName";
94
95
96
97 public static final String REPLICATIONTYPE = "replicationType";
98 public static final String REPLICATIONGLOBAL = Integer
99 .toString(HConstants.REPLICATION_SCOPE_GLOBAL);
100
101 private final Connection connection;
102
103
104 private final ReplicationQueuesClient replicationQueuesClient;
105 private final ReplicationPeers replicationPeers;
106
107
108
109
110 private final ZooKeeperWatcher zkw;
111
112
113
114
115
116
117
118 public ReplicationAdmin(Configuration conf) throws IOException {
119 if (!conf.getBoolean(HConstants.REPLICATION_ENABLE_KEY,
120 HConstants.REPLICATION_ENABLE_DEFAULT)) {
121 throw new RuntimeException("hbase.replication isn't true, please " +
122 "enable it in order to use replication");
123 }
124 this.connection = ConnectionFactory.createConnection(conf);
125 try {
126 zkw = createZooKeeperWatcher();
127 try {
128 this.replicationQueuesClient =
129 ReplicationFactory.getReplicationQueuesClient(zkw, conf, this.connection);
130 this.replicationQueuesClient.init();
131 this.replicationPeers = ReplicationFactory.getReplicationPeers(zkw, conf,
132 this.replicationQueuesClient, this.connection);
133 this.replicationPeers.init();
134 } catch (Exception exception) {
135 if (zkw != null) {
136 zkw.close();
137 }
138 throw exception;
139 }
140 } catch (Exception exception) {
141 if (connection != null) {
142 connection.close();
143 }
144 if (exception instanceof IOException) {
145 throw (IOException) exception;
146 } else if (exception instanceof RuntimeException) {
147 throw (RuntimeException) exception;
148 } else {
149 throw new IOException("Error initializing the replication admin client.", exception);
150 }
151 }
152 }
153
154 private ZooKeeperWatcher createZooKeeperWatcher() throws IOException {
155
156 return new ZooKeeperWatcher(connection.getConfiguration(), "ReplicationAdmin", new Abortable() {
157 @Override
158 public void abort(String why, Throwable e) {
159 LOG.error(why, e);
160
161
162 }
163
164 @Override
165 public boolean isAborted() {
166 return false;
167 }
168 });
169 }
170
171
172
173
174
175
176
177
178
179
180 @Deprecated
181 public void addPeer(String id, String clusterKey) throws ReplicationException {
182 this.addPeer(id, new ReplicationPeerConfig().setClusterKey(clusterKey), null);
183 }
184
185 @Deprecated
186 public void addPeer(String id, String clusterKey, String tableCFs)
187 throws ReplicationException {
188 this.replicationPeers.addPeer(id,
189 new ReplicationPeerConfig().setClusterKey(clusterKey), tableCFs);
190 }
191
192
193
194
195
196
197
198
199
200
201 public void addPeer(String id, ReplicationPeerConfig peerConfig,
202 Map<TableName, ? extends Collection<String>> tableCfs) throws ReplicationException {
203 this.replicationPeers.addPeer(id, peerConfig, getTableCfsStr(tableCfs));
204 }
205
206 public static Map<TableName, List<String>> parseTableCFsFromConfig(String tableCFsConfig) {
207 if (tableCFsConfig == null || tableCFsConfig.trim().length() == 0) {
208 return null;
209 }
210
211 Map<TableName, List<String>> tableCFsMap = null;
212
213
214
215 String[] tables = tableCFsConfig.split(";");
216 for (String tab : tables) {
217
218 tab = tab.trim();
219 if (tab.length() == 0) {
220 continue;
221 }
222
223
224 String[] pair = tab.split(":");
225 String tabName = pair[0].trim();
226 if (pair.length > 2 || tabName.length() == 0) {
227 LOG.error("ignore invalid tableCFs setting: " + tab);
228 continue;
229 }
230
231
232 List<String> cfs = null;
233 if (pair.length == 2) {
234 String[] cfsList = pair[1].split(",");
235 for (String cf : cfsList) {
236 String cfName = cf.trim();
237 if (cfName.length() > 0) {
238 if (cfs == null) {
239 cfs = new ArrayList<String>();
240 }
241 cfs.add(cfName);
242 }
243 }
244 }
245
246
247 if (tableCFsMap == null) {
248 tableCFsMap = new HashMap<TableName, List<String>>();
249 }
250 tableCFsMap.put(TableName.valueOf(tabName), cfs);
251 }
252 return tableCFsMap;
253 }
254
255 @VisibleForTesting
256 static String getTableCfsStr(Map<TableName, ? extends Collection<String>> tableCfs) {
257 String tableCfsStr = null;
258 if (tableCfs != null) {
259
260 StringBuilder builder = new StringBuilder();
261 for (Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) {
262 if (builder.length() > 0) {
263 builder.append(";");
264 }
265 builder.append(entry.getKey());
266 if (entry.getValue() != null && !entry.getValue().isEmpty()) {
267 builder.append(":");
268 builder.append(StringUtils.join(entry.getValue(), ","));
269 }
270 }
271 tableCfsStr = builder.toString();
272 }
273 return tableCfsStr;
274 }
275
276
277
278
279
280 public void removePeer(String id) throws ReplicationException {
281 this.replicationPeers.removePeer(id);
282 }
283
284
285
286
287
288 public void enablePeer(String id) throws ReplicationException {
289 this.replicationPeers.enablePeer(id);
290 }
291
292
293
294
295
296 public void disablePeer(String id) throws ReplicationException {
297 this.replicationPeers.disablePeer(id);
298 }
299
300
301
302
303
304 public int getPeersCount() {
305 return this.replicationPeers.getAllPeerIds().size();
306 }
307
308
309
310
311
312
313 @Deprecated
314 public Map<String, String> listPeers() {
315 Map<String, ReplicationPeerConfig> peers = this.listPeerConfigs();
316 Map<String, String> ret = new HashMap<String, String>(peers.size());
317
318 for (Map.Entry<String, ReplicationPeerConfig> entry : peers.entrySet()) {
319 ret.put(entry.getKey(), entry.getValue().getClusterKey());
320 }
321 return ret;
322 }
323
324 public Map<String, ReplicationPeerConfig> listPeerConfigs() {
325 return this.replicationPeers.getAllPeerConfigs();
326 }
327
328 public ReplicationPeerConfig getPeerConfig(String id) throws ReplicationException {
329 return this.replicationPeers.getReplicationPeerConfig(id);
330 }
331
332
333
334
335
336 public String getPeerTableCFs(String id) throws ReplicationException {
337 return this.replicationPeers.getPeerTableCFsConfig(id);
338 }
339
340
341
342
343
344
345 @Deprecated
346 public void setPeerTableCFs(String id, String tableCFs) throws ReplicationException {
347 this.replicationPeers.setPeerTableCFsConfig(id, tableCFs);
348 }
349
350
351
352
353
354
355
356 public void appendPeerTableCFs(String id, String tableCfs) throws ReplicationException {
357 appendPeerTableCFs(id, parseTableCFsFromConfig(tableCfs));
358 }
359
360
361
362
363
364
365
366 public void appendPeerTableCFs(String id, Map<TableName, ? extends Collection<String>> tableCfs)
367 throws ReplicationException {
368 if (tableCfs == null) {
369 throw new ReplicationException("tableCfs is null");
370 }
371 Map<TableName, List<String>> preTableCfs = parseTableCFsFromConfig(getPeerTableCFs(id));
372 if (preTableCfs == null) {
373 setPeerTableCFs(id, tableCfs);
374 return;
375 }
376
377 for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) {
378 TableName table = entry.getKey();
379 Collection<String> appendCfs = entry.getValue();
380 if (preTableCfs.containsKey(table)) {
381 List<String> cfs = preTableCfs.get(table);
382 if (cfs == null || appendCfs == null) {
383 preTableCfs.put(table, null);
384 } else {
385 Set<String> cfSet = new HashSet<String>(cfs);
386 cfSet.addAll(appendCfs);
387 preTableCfs.put(table, Lists.newArrayList(cfSet));
388 }
389 } else {
390 if (appendCfs == null || appendCfs.isEmpty()) {
391 preTableCfs.put(table, null);
392 } else {
393 preTableCfs.put(table, Lists.newArrayList(appendCfs));
394 }
395 }
396 }
397 setPeerTableCFs(id, preTableCfs);
398 }
399
400
401
402
403
404
405
406 public void removePeerTableCFs(String id, String tableCf) throws ReplicationException {
407 removePeerTableCFs(id, parseTableCFsFromConfig(tableCf));
408 }
409
410
411
412
413
414
415
416 public void removePeerTableCFs(String id, Map<TableName, ? extends Collection<String>> tableCfs)
417 throws ReplicationException {
418 if (tableCfs == null) {
419 throw new ReplicationException("tableCfs is null");
420 }
421
422 Map<TableName, List<String>> preTableCfs = parseTableCFsFromConfig(getPeerTableCFs(id));
423 if (preTableCfs == null) {
424 throw new ReplicationException("Table-Cfs for peer" + id + " is null");
425 }
426 for (Map.Entry<TableName, ? extends Collection<String>> entry: tableCfs.entrySet()) {
427 TableName table = entry.getKey();
428 Collection<String> removeCfs = entry.getValue();
429 if (preTableCfs.containsKey(table)) {
430 List<String> cfs = preTableCfs.get(table);
431 if (cfs == null && removeCfs == null) {
432 preTableCfs.remove(table);
433 } else if (cfs != null && removeCfs != null) {
434 Set<String> cfSet = new HashSet<String>(cfs);
435 cfSet.removeAll(removeCfs);
436 if (cfSet.isEmpty()) {
437 preTableCfs.remove(table);
438 } else {
439 preTableCfs.put(table, Lists.newArrayList(cfSet));
440 }
441 } else if (cfs == null && removeCfs != null) {
442 throw new ReplicationException("Cannot remove cf of table: " + table
443 + " which doesn't specify cfs from table-cfs config in peer: " + id);
444 } else if (cfs != null && removeCfs == null) {
445 throw new ReplicationException("Cannot remove table: " + table
446 + " which has specified cfs from table-cfs config in peer: " + id);
447 }
448 } else {
449 throw new ReplicationException("No table: " + table + " in table-cfs config of peer: " + id);
450 }
451 }
452 setPeerTableCFs(id, preTableCfs);
453 }
454
455
456
457
458
459
460
461
462
463 public void setPeerTableCFs(String id, Map<TableName, ? extends Collection<String>> tableCfs)
464 throws ReplicationException {
465 this.replicationPeers.setPeerTableCFsConfig(id, getTableCfsStr(tableCfs));
466 }
467
468
469
470
471
472
473
474 public boolean getPeerState(String id) throws ReplicationException {
475 return this.replicationPeers.getStatusOfPeerFromBackingStore(id);
476 }
477
478 @Override
479 public void close() throws IOException {
480 if (this.zkw != null) {
481 this.zkw.close();
482 }
483 if (this.connection != null) {
484 this.connection.close();
485 }
486 }
487
488
489
490
491
492
493
494
495
496
497
498
499
500 public List<HashMap<String, String>> listReplicated() throws IOException {
501 List<HashMap<String, String>> replicationColFams = new ArrayList<HashMap<String, String>>();
502
503 Admin admin = connection.getAdmin();
504 HTableDescriptor[] tables;
505 try {
506 tables = admin.listTables();
507 } finally {
508 if (admin!= null) admin.close();
509 }
510
511 for (HTableDescriptor table : tables) {
512 HColumnDescriptor[] columns = table.getColumnFamilies();
513 String tableName = table.getNameAsString();
514 for (HColumnDescriptor column : columns) {
515 if (column.getScope() != HConstants.REPLICATION_SCOPE_LOCAL) {
516
517 HashMap<String, String> replicationEntry = new HashMap<String, String>();
518 replicationEntry.put(TNAME, tableName);
519 replicationEntry.put(CFNAME, column.getNameAsString());
520 replicationEntry.put(REPLICATIONTYPE, REPLICATIONGLOBAL);
521 replicationColFams.add(replicationEntry);
522 }
523 }
524 }
525
526 return replicationColFams;
527 }
528
529
530
531
532
533
534 public void enableTableRep(final TableName tableName) throws IOException {
535 if (tableName == null) {
536 throw new IllegalArgumentException("Table name cannot be null");
537 }
538 try (Admin admin = this.connection.getAdmin()) {
539 if (!admin.tableExists(tableName)) {
540 throw new TableNotFoundException("Table '" + tableName.getNameAsString()
541 + "' does not exists.");
542 }
543 }
544 byte[][] splits = getTableSplitRowKeys(tableName);
545 checkAndSyncTableDescToPeers(tableName, splits);
546 setTableRep(tableName, true);
547 }
548
549
550
551
552
553
554 public void disableTableRep(final TableName tableName) throws IOException {
555 if (tableName == null) {
556 throw new IllegalArgumentException("Table name is null");
557 }
558 try (Admin admin = this.connection.getAdmin()) {
559 if (!admin.tableExists(tableName)) {
560 throw new TableNotFoundException("Table '" + tableName.getNamespaceAsString()
561 + "' does not exists.");
562 }
563 }
564 setTableRep(tableName, false);
565 }
566
567
568
569
570
571
572
573 private byte[][] getTableSplitRowKeys(TableName tableName) throws IOException {
574 try (RegionLocator locator = connection.getRegionLocator(tableName);) {
575 byte[][] startKeys = locator.getStartKeys();
576 if (startKeys.length == 1) {
577 return null;
578 }
579 byte[][] splits = new byte[startKeys.length - 1][];
580 for (int i = 1; i < startKeys.length; i++) {
581 splits[i - 1] = startKeys[i];
582 }
583 return splits;
584 }
585 }
586
587
588
589
590
591
592
593
594
595
596
597 private void checkAndSyncTableDescToPeers(final TableName tableName, final byte[][] splits)
598 throws IOException {
599 List<ReplicationPeer> repPeers = listValidReplicationPeers();
600 if (repPeers == null || repPeers.size() <= 0) {
601 throw new IllegalArgumentException("Found no peer cluster for replication.");
602 }
603 for (ReplicationPeer repPeer : repPeers) {
604 Configuration peerConf = repPeer.getConfiguration();
605 HTableDescriptor htd = null;
606 try (Connection conn = ConnectionFactory.createConnection(peerConf);
607 Admin admin = this.connection.getAdmin();
608 Admin repHBaseAdmin = conn.getAdmin()) {
609 htd = admin.getTableDescriptor(tableName);
610 HTableDescriptor peerHtd = null;
611 if (!repHBaseAdmin.tableExists(tableName)) {
612 repHBaseAdmin.createTable(htd, splits);
613 } else {
614 peerHtd = repHBaseAdmin.getTableDescriptor(tableName);
615 if (peerHtd == null) {
616 throw new IllegalArgumentException("Failed to get table descriptor for table "
617 + tableName.getNameAsString() + " from peer cluster " + repPeer.getId());
618 } else if (!peerHtd.equals(htd)) {
619 throw new IllegalArgumentException("Table " + tableName.getNameAsString()
620 + " exists in peer cluster " + repPeer.getId()
621 + ", but the table descriptors are not same when comapred with source cluster."
622 + " Thus can not enable the table's replication switch.");
623 }
624 }
625 }
626 }
627 }
628
629 private List<ReplicationPeer> listValidReplicationPeers() {
630 Map<String, ReplicationPeerConfig> peers = listPeerConfigs();
631 if (peers == null || peers.size() <= 0) {
632 return null;
633 }
634 List<ReplicationPeer> validPeers = new ArrayList<ReplicationPeer>(peers.size());
635 for (Entry<String, ReplicationPeerConfig> peerEntry : peers.entrySet()) {
636 String peerId = peerEntry.getKey();
637 String clusterKey = peerEntry.getValue().getClusterKey();
638 Configuration peerConf = new Configuration(this.connection.getConfiguration());
639 Stat s = null;
640 try {
641 ZKUtil.applyClusterKeyToConf(peerConf, clusterKey);
642 Pair<ReplicationPeerConfig, Configuration> pair = this.replicationPeers.getPeerConf(peerId);
643 ReplicationPeer peer = new ReplicationPeerZKImpl(peerConf, peerId, pair.getFirst());
644 s =
645 zkw.getRecoverableZooKeeper().exists(peerConf.get(HConstants.ZOOKEEPER_ZNODE_PARENT),
646 null);
647 if (null == s) {
648 LOG.info(peerId + ' ' + clusterKey + " is invalid now.");
649 continue;
650 }
651 validPeers.add(peer);
652 } catch (ReplicationException e) {
653 LOG.warn("Failed to get valid replication peers. "
654 + "Error connecting to peer cluster with peerId=" + peerId);
655 LOG.debug("Failure details to get valid replication peers.", e);
656 continue;
657 } catch (KeeperException e) {
658 LOG.warn("Failed to get valid replication peers. KeeperException code="
659 + e.code().intValue());
660 LOG.debug("Failure details to get valid replication peers.", e);
661 continue;
662 } catch (InterruptedException e) {
663 LOG.warn("Failed to get valid replication peers due to InterruptedException.");
664 LOG.debug("Failure details to get valid replication peers.", e);
665 Thread.currentThread().interrupt();
666 continue;
667 } catch (IOException e) {
668 LOG.warn("Failed to get valid replication peers due to IOException.");
669 LOG.debug("Failure details to get valid replication peers.", e);
670 continue;
671 }
672 }
673 return validPeers;
674 }
675
676
677
678
679
680
681
682 private void setTableRep(final TableName tableName, boolean isRepEnabled) throws IOException {
683 Admin admin = null;
684 try {
685 admin = this.connection.getAdmin();
686 HTableDescriptor htd = admin.getTableDescriptor(tableName);
687 if (isTableRepEnabled(htd) ^ isRepEnabled) {
688 boolean isOnlineSchemaUpdateEnabled =
689 this.connection.getConfiguration()
690 .getBoolean("hbase.online.schema.update.enable", true);
691 if (!isOnlineSchemaUpdateEnabled) {
692 admin.disableTable(tableName);
693 }
694 for (HColumnDescriptor hcd : htd.getFamilies()) {
695 hcd.setScope(isRepEnabled ? HConstants.REPLICATION_SCOPE_GLOBAL
696 : HConstants.REPLICATION_SCOPE_LOCAL);
697 }
698 admin.modifyTable(tableName, htd);
699 if (!isOnlineSchemaUpdateEnabled) {
700 admin.enableTable(tableName);
701 }
702 }
703 } finally {
704 if (admin != null) {
705 try {
706 admin.close();
707 } catch (IOException e) {
708 LOG.warn("Failed to close admin connection.");
709 LOG.debug("Details on failure to close admin connection.", e);
710 }
711 }
712 }
713 }
714
715
716
717
718
719 private boolean isTableRepEnabled(HTableDescriptor htd) {
720 for (HColumnDescriptor hcd : htd.getFamilies()) {
721 if (hcd.getScope() != HConstants.REPLICATION_SCOPE_GLOBAL) {
722 return false;
723 }
724 }
725 return true;
726 }
727 }