1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.master;
20
21 import java.io.IOException;
22 import java.net.InetAddress;
23 import java.util.ArrayList;
24 import java.util.Collections;
25 import java.util.HashMap;
26 import java.util.HashSet;
27 import java.util.Iterator;
28 import java.util.List;
29 import java.util.Map;
30 import java.util.Map.Entry;
31 import java.util.Set;
32 import java.util.concurrent.ConcurrentHashMap;
33 import java.util.concurrent.ConcurrentNavigableMap;
34 import java.util.concurrent.ConcurrentSkipListMap;
35 import java.util.concurrent.CopyOnWriteArrayList;
36
37 import org.apache.commons.logging.Log;
38 import org.apache.commons.logging.LogFactory;
39 import org.apache.hadoop.conf.Configuration;
40 import org.apache.hadoop.hbase.ClockOutOfSyncException;
41 import org.apache.hadoop.hbase.HConstants;
42 import org.apache.hadoop.hbase.HRegionInfo;
43 import org.apache.hadoop.hbase.NotServingRegionException;
44 import org.apache.hadoop.hbase.RegionLoad;
45 import org.apache.hadoop.hbase.Server;
46 import org.apache.hadoop.hbase.ServerLoad;
47 import org.apache.hadoop.hbase.ServerName;
48 import org.apache.hadoop.hbase.YouAreDeadException;
49 import org.apache.hadoop.hbase.ZooKeeperConnectionException;
50 import org.apache.hadoop.hbase.classification.InterfaceAudience;
51 import org.apache.hadoop.hbase.client.ClusterConnection;
52 import org.apache.hadoop.hbase.client.RetriesExhaustedException;
53 import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer;
54 import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
55 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
56 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
57 import org.apache.hadoop.hbase.protobuf.RequestConverter;
58 import org.apache.hadoop.hbase.protobuf.ResponseConverter;
59 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
60 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest;
61 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse;
62 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ServerInfo;
63 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
64 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
65 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.StoreSequenceId;
66 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
67 import org.apache.hadoop.hbase.regionserver.HRegionServer;
68 import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
69 import org.apache.hadoop.hbase.util.Bytes;
70 import org.apache.hadoop.hbase.util.Pair;
71 import org.apache.hadoop.hbase.util.RetryCounter;
72 import org.apache.hadoop.hbase.util.RetryCounterFactory;
73 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
74 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
75 import org.apache.zookeeper.KeeperException;
76
77 import com.google.common.annotations.VisibleForTesting;
78 import com.google.protobuf.ByteString;
79 import com.google.protobuf.ServiceException;
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103 @InterfaceAudience.Private
104 public class ServerManager {
105 public static final String WAIT_ON_REGIONSERVERS_MAXTOSTART =
106 "hbase.master.wait.on.regionservers.maxtostart";
107
108 public static final String WAIT_ON_REGIONSERVERS_MINTOSTART =
109 "hbase.master.wait.on.regionservers.mintostart";
110
111 public static final String WAIT_ON_REGIONSERVERS_TIMEOUT =
112 "hbase.master.wait.on.regionservers.timeout";
113
114 public static final String WAIT_ON_REGIONSERVERS_INTERVAL =
115 "hbase.master.wait.on.regionservers.interval";
116
117 private static final Log LOG = LogFactory.getLog(ServerManager.class);
118
119
120 private volatile boolean clusterShutdown = false;
121
122
123
124
125 private final ConcurrentNavigableMap<byte[], Long> flushedSequenceIdByRegion =
126 new ConcurrentSkipListMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
127
128
129
130
131 private final ConcurrentNavigableMap<byte[], ConcurrentNavigableMap<byte[], Long>>
132 storeFlushedSequenceIdsByRegion =
133 new ConcurrentSkipListMap<byte[], ConcurrentNavigableMap<byte[], Long>>(Bytes.BYTES_COMPARATOR);
134
135
136 private final ConcurrentHashMap<ServerName, ServerLoad> onlineServers =
137 new ConcurrentHashMap<ServerName, ServerLoad>();
138
139
140
141
142
143 private final Map<ServerName, AdminService.BlockingInterface> rsAdmins =
144 new HashMap<ServerName, AdminService.BlockingInterface>();
145
146
147
148
149
150 private final ArrayList<ServerName> drainingServers =
151 new ArrayList<ServerName>();
152
153 private final Server master;
154 private final MasterServices services;
155 private final ClusterConnection connection;
156
157 private final DeadServer deadservers = new DeadServer();
158
159 private final long maxSkew;
160 private final long warningSkew;
161
162 private final RetryCounterFactory pingRetryCounterFactory;
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180 private Set<ServerName> queuedDeadServers = new HashSet<ServerName>();
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197 private Map<ServerName, Boolean> requeuedDeadServers
198 = new ConcurrentHashMap<ServerName, Boolean>();
199
200
201 private List<ServerListener> listeners = new CopyOnWriteArrayList<ServerListener>();
202
203
204
205
206
207
208
209 public ServerManager(final Server master, final MasterServices services)
210 throws IOException {
211 this(master, services, true);
212 }
213
214 ServerManager(final Server master, final MasterServices services,
215 final boolean connect) throws IOException {
216 this.master = master;
217 this.services = services;
218 Configuration c = master.getConfiguration();
219 maxSkew = c.getLong("hbase.master.maxclockskew", 30000);
220 warningSkew = c.getLong("hbase.master.warningclockskew", 10000);
221 this.connection = connect ? master.getConnection() : null;
222 int pingMaxAttempts = Math.max(1, master.getConfiguration().getInt(
223 "hbase.master.maximum.ping.server.attempts", 10));
224 int pingSleepInterval = Math.max(1, master.getConfiguration().getInt(
225 "hbase.master.ping.server.retry.sleep.interval", 100));
226 this.pingRetryCounterFactory = new RetryCounterFactory(pingMaxAttempts, pingSleepInterval);
227 }
228
229
230
231
232
233 public void registerListener(final ServerListener listener) {
234 this.listeners.add(listener);
235 }
236
237
238
239
240
241 public boolean unregisterListener(final ServerListener listener) {
242 return this.listeners.remove(listener);
243 }
244
245
246
247
248
249
250
251
252 ServerName regionServerStartup(RegionServerStartupRequest request, InetAddress ia)
253 throws IOException {
254
255
256
257
258
259
260
261
262 final String hostname = request.hasUseThisHostnameInstead() ?
263 request.getUseThisHostnameInstead() :ia.getHostName();
264 ServerName sn = ServerName.valueOf(hostname, request.getPort(),
265 request.getServerStartCode());
266 checkClockSkew(sn, request.getServerCurrentTime());
267 checkIsDead(sn, "STARTUP");
268 if (!checkAndRecordNewServer(sn, ServerLoad.EMPTY_SERVERLOAD)) {
269 LOG.warn("THIS SHOULD NOT HAPPEN, RegionServerStartup"
270 + " could not record the server: " + sn);
271 }
272 return sn;
273 }
274
275 private ConcurrentNavigableMap<byte[], Long> getOrCreateStoreFlushedSequenceId(
276 byte[] regionName) {
277 ConcurrentNavigableMap<byte[], Long> storeFlushedSequenceId =
278 storeFlushedSequenceIdsByRegion.get(regionName);
279 if (storeFlushedSequenceId != null) {
280 return storeFlushedSequenceId;
281 }
282 storeFlushedSequenceId = new ConcurrentSkipListMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
283 ConcurrentNavigableMap<byte[], Long> alreadyPut =
284 storeFlushedSequenceIdsByRegion.putIfAbsent(regionName, storeFlushedSequenceId);
285 return alreadyPut == null ? storeFlushedSequenceId : alreadyPut;
286 }
287
288
289
290
291
292 private void updateLastFlushedSequenceIds(ServerName sn, ServerLoad hsl) {
293 Map<byte[], RegionLoad> regionsLoad = hsl.getRegionsLoad();
294 for (Entry<byte[], RegionLoad> entry : regionsLoad.entrySet()) {
295 byte[] encodedRegionName = Bytes.toBytes(HRegionInfo.encodeRegionName(entry.getKey()));
296 Long existingValue = flushedSequenceIdByRegion.get(encodedRegionName);
297 long l = entry.getValue().getCompleteSequenceId();
298
299 if (LOG.isTraceEnabled()) {
300 LOG.trace(Bytes.toString(encodedRegionName) + ", existingValue=" + existingValue +
301 ", completeSequenceId=" + l);
302 }
303 if (existingValue == null || (l != HConstants.NO_SEQNUM && l > existingValue)) {
304 flushedSequenceIdByRegion.put(encodedRegionName, l);
305 } else if (l != HConstants.NO_SEQNUM && l < existingValue) {
306 LOG.warn("RegionServer " + sn + " indicates a last flushed sequence id ("
307 + l + ") that is less than the previous last flushed sequence id ("
308 + existingValue + ") for region " + Bytes.toString(entry.getKey()) + " Ignoring.");
309 }
310 ConcurrentNavigableMap<byte[], Long> storeFlushedSequenceId =
311 getOrCreateStoreFlushedSequenceId(encodedRegionName);
312 for (StoreSequenceId storeSeqId : entry.getValue().getStoreCompleteSequenceId()) {
313 byte[] family = storeSeqId.getFamilyName().toByteArray();
314 existingValue = storeFlushedSequenceId.get(family);
315 l = storeSeqId.getSequenceId();
316 if (LOG.isTraceEnabled()) {
317 LOG.trace(Bytes.toString(encodedRegionName) + ", family=" + Bytes.toString(family) +
318 ", existingValue=" + existingValue + ", completeSequenceId=" + l);
319 }
320
321 if (existingValue == null || (l != HConstants.NO_SEQNUM && l > existingValue.longValue())) {
322 storeFlushedSequenceId.put(family, l);
323 }
324 }
325 }
326 }
327
328 void regionServerReport(ServerName sn,
329 ServerLoad sl) throws YouAreDeadException {
330 checkIsDead(sn, "REPORT");
331 if (null == this.onlineServers.replace(sn, sl)) {
332
333
334
335
336
337
338 if (!checkAndRecordNewServer(sn, sl)) {
339 LOG.info("RegionServerReport ignored, could not record the server: " + sn);
340 return;
341 }
342 }
343 updateLastFlushedSequenceIds(sn, sl);
344 }
345
346
347
348
349
350
351
352
353
354 boolean checkAndRecordNewServer(
355 final ServerName serverName, final ServerLoad sl) {
356 ServerName existingServer = null;
357 synchronized (this.onlineServers) {
358 existingServer = findServerWithSameHostnamePortWithLock(serverName);
359 if (existingServer != null && (existingServer.getStartcode() > serverName.getStartcode())) {
360 LOG.info("Server serverName=" + serverName + " rejected; we already have "
361 + existingServer.toString() + " registered with same hostname and port");
362 return false;
363 }
364 recordNewServerWithLock(serverName, sl);
365 }
366
367
368 if (!this.listeners.isEmpty()) {
369 for (ServerListener listener : this.listeners) {
370 listener.serverAdded(serverName);
371 }
372 }
373
374
375
376 if (existingServer != null && (existingServer.getStartcode() < serverName.getStartcode())) {
377 LOG.info("Triggering server recovery; existingServer " +
378 existingServer + " looks stale, new server:" + serverName);
379 expireServer(existingServer);
380 }
381 return true;
382 }
383
384
385
386
387
388
389
390
391
392 private void checkClockSkew(final ServerName serverName, final long serverCurrentTime)
393 throws ClockOutOfSyncException {
394 long skew = Math.abs(System.currentTimeMillis() - serverCurrentTime);
395 if (skew > maxSkew) {
396 String message = "Server " + serverName + " has been " +
397 "rejected; Reported time is too far out of sync with master. " +
398 "Time difference of " + skew + "ms > max allowed of " + maxSkew + "ms";
399 LOG.warn(message);
400 throw new ClockOutOfSyncException(message);
401 } else if (skew > warningSkew){
402 String message = "Reported time for server " + serverName + " is out of sync with master " +
403 "by " + skew + "ms. (Warning threshold is " + warningSkew + "ms; " +
404 "error threshold is " + maxSkew + "ms)";
405 LOG.warn(message);
406 }
407 }
408
409
410
411
412
413
414
415
416
417 private void checkIsDead(final ServerName serverName, final String what)
418 throws YouAreDeadException {
419 if (this.deadservers.isDeadServer(serverName)) {
420
421
422 String message = "Server " + what + " rejected; currently processing " +
423 serverName + " as dead server";
424 LOG.debug(message);
425 throw new YouAreDeadException(message);
426 }
427
428
429 if ((this.services == null || ((HMaster) this.services).isInitialized())
430 && this.deadservers.cleanPreviousInstance(serverName)) {
431
432
433 LOG.debug(what + ":" + " Server " + serverName + " came back up," +
434 " removed it from the dead servers list");
435 }
436 }
437
438
439
440
441
442 private ServerName findServerWithSameHostnamePortWithLock(
443 final ServerName serverName) {
444 for (ServerName sn: this.onlineServers.keySet()) {
445 if (ServerName.isSameHostnameAndPort(serverName, sn)) return sn;
446 }
447 return null;
448 }
449
450
451
452
453
454
455
456 @VisibleForTesting
457 void recordNewServerWithLock(final ServerName serverName, final ServerLoad sl) {
458 LOG.info("Registering server=" + serverName);
459 this.onlineServers.put(serverName, sl);
460 this.rsAdmins.remove(serverName);
461 }
462
463 public RegionStoreSequenceIds getLastFlushedSequenceId(byte[] encodedRegionName) {
464 RegionStoreSequenceIds.Builder builder = RegionStoreSequenceIds.newBuilder();
465 Long seqId = flushedSequenceIdByRegion.get(encodedRegionName);
466 builder.setLastFlushedSequenceId(seqId != null ? seqId.longValue() : HConstants.NO_SEQNUM);
467 Map<byte[], Long> storeFlushedSequenceId =
468 storeFlushedSequenceIdsByRegion.get(encodedRegionName);
469 if (storeFlushedSequenceId != null) {
470 for (Map.Entry<byte[], Long> entry : storeFlushedSequenceId.entrySet()) {
471 builder.addStoreSequenceId(StoreSequenceId.newBuilder()
472 .setFamilyName(ByteString.copyFrom(entry.getKey()))
473 .setSequenceId(entry.getValue().longValue()).build());
474 }
475 }
476 return builder.build();
477 }
478
479
480
481
482
483 public ServerLoad getLoad(final ServerName serverName) {
484 return this.onlineServers.get(serverName);
485 }
486
487
488
489
490
491
492
493 public double getAverageLoad() {
494 int totalLoad = 0;
495 int numServers = 0;
496 for (ServerLoad sl: this.onlineServers.values()) {
497 numServers++;
498 totalLoad += sl.getNumberOfRegions();
499 }
500 return numServers == 0 ? 0 :
501 (double)totalLoad / (double)numServers;
502 }
503
504
505 public int countOfRegionServers() {
506
507 return this.onlineServers.size();
508 }
509
510
511
512
513 public Map<ServerName, ServerLoad> getOnlineServers() {
514
515 synchronized (this.onlineServers) {
516 return Collections.unmodifiableMap(this.onlineServers);
517 }
518 }
519
520
521 public DeadServer getDeadServers() {
522 return this.deadservers;
523 }
524
525
526
527
528
529 public boolean areDeadServersInProgress() {
530 return this.deadservers.areDeadServersInProgress();
531 }
532
533 void letRegionServersShutdown() {
534 long previousLogTime = 0;
535 ServerName sn = master.getServerName();
536 ZooKeeperWatcher zkw = master.getZooKeeper();
537 int onlineServersCt;
538 while ((onlineServersCt = onlineServers.size()) > 0){
539
540 if (System.currentTimeMillis() > (previousLogTime + 1000)) {
541 Set<ServerName> remainingServers = onlineServers.keySet();
542 synchronized (onlineServers) {
543 if (remainingServers.size() == 1 && remainingServers.contains(sn)) {
544
545 return;
546 }
547 }
548 StringBuilder sb = new StringBuilder();
549
550 for (ServerName key : remainingServers) {
551 if (sb.length() > 0) {
552 sb.append(", ");
553 }
554 sb.append(key);
555 }
556 LOG.info("Waiting on regionserver(s) to go down " + sb.toString());
557 previousLogTime = System.currentTimeMillis();
558 }
559
560 try {
561 List<String> servers = ZKUtil.listChildrenNoWatch(zkw, zkw.rsZNode);
562 if (servers == null || servers.size() == 0 || (servers.size() == 1
563 && servers.contains(sn.toString()))) {
564 LOG.info("ZK shows there is only the master self online, exiting now");
565
566 break;
567 }
568 } catch (KeeperException ke) {
569 LOG.warn("Failed to list regionservers", ke);
570
571 break;
572 }
573 synchronized (onlineServers) {
574 try {
575 if (onlineServersCt == onlineServers.size()) onlineServers.wait(100);
576 } catch (InterruptedException ignored) {
577
578 }
579 }
580 }
581 }
582
583
584
585
586
587 public synchronized void expireServer(final ServerName serverName) {
588 if (serverName.equals(master.getServerName())) {
589 if (!(master.isAborted() || master.isStopped())) {
590 master.stop("We lost our znode?");
591 }
592 return;
593 }
594 if (!services.isServerCrashProcessingEnabled()) {
595 LOG.info("Master doesn't enable ServerShutdownHandler during initialization, "
596 + "delay expiring server " + serverName);
597 this.queuedDeadServers.add(serverName);
598 return;
599 }
600 if (this.deadservers.isDeadServer(serverName)) {
601
602 LOG.warn("Expiration of " + serverName +
603 " but server shutdown already in progress");
604 return;
605 }
606 moveFromOnelineToDeadServers(serverName);
607
608
609
610 if (this.clusterShutdown) {
611 LOG.info("Cluster shutdown set; " + serverName +
612 " expired; onlineServers=" + this.onlineServers.size());
613 if (this.onlineServers.isEmpty()) {
614 master.stop("Cluster shutdown set; onlineServer=0");
615 }
616 return;
617 }
618
619 boolean carryingMeta = services.getAssignmentManager().isCarryingMeta(serverName);
620 this.services.getMasterProcedureExecutor().
621 submitProcedure(new ServerCrashProcedure(serverName, true, carryingMeta));
622 LOG.debug("Added=" + serverName +
623 " to dead servers, submitted shutdown handler to be executed meta=" + carryingMeta);
624
625
626 if (!this.listeners.isEmpty()) {
627 for (ServerListener listener : this.listeners) {
628 listener.serverRemoved(serverName);
629 }
630 }
631 }
632
633 @VisibleForTesting
634 public void moveFromOnelineToDeadServers(final ServerName sn) {
635 synchronized (onlineServers) {
636 if (!this.onlineServers.containsKey(sn)) {
637 LOG.warn("Expiration of " + sn + " but server not online");
638 }
639
640
641
642 this.deadservers.add(sn);
643 this.onlineServers.remove(sn);
644 onlineServers.notifyAll();
645 }
646 this.rsAdmins.remove(sn);
647 }
648
649 public synchronized void processDeadServer(final ServerName serverName, boolean shouldSplitWal) {
650
651
652
653
654
655
656
657
658 if (!services.getAssignmentManager().isFailoverCleanupDone()) {
659 requeuedDeadServers.put(serverName, shouldSplitWal);
660 return;
661 }
662
663 this.deadservers.add(serverName);
664 this.services.getMasterProcedureExecutor().
665 submitProcedure(new ServerCrashProcedure(serverName, shouldSplitWal, false));
666 }
667
668
669
670
671
672 synchronized void processQueuedDeadServers() {
673 if (!services.isServerCrashProcessingEnabled()) {
674 LOG.info("Master hasn't enabled ServerShutdownHandler");
675 }
676 Iterator<ServerName> serverIterator = queuedDeadServers.iterator();
677 while (serverIterator.hasNext()) {
678 ServerName tmpServerName = serverIterator.next();
679 expireServer(tmpServerName);
680 serverIterator.remove();
681 requeuedDeadServers.remove(tmpServerName);
682 }
683
684 if (!services.getAssignmentManager().isFailoverCleanupDone()) {
685 LOG.info("AssignmentManager hasn't finished failover cleanup; waiting");
686 }
687
688 for (Map.Entry<ServerName, Boolean> entry : requeuedDeadServers.entrySet()) {
689 processDeadServer(entry.getKey(), entry.getValue());
690 }
691 requeuedDeadServers.clear();
692 }
693
694
695
696
697 public boolean removeServerFromDrainList(final ServerName sn) {
698
699
700
701 if (!this.isServerOnline(sn)) {
702 LOG.warn("Server " + sn + " is not currently online. " +
703 "Removing from draining list anyway, as requested.");
704 }
705
706 return this.drainingServers.remove(sn);
707 }
708
709
710
711
712 public boolean addServerToDrainList(final ServerName sn) {
713
714
715
716 if (!this.isServerOnline(sn)) {
717 LOG.warn("Server " + sn + " is not currently online. " +
718 "Ignoring request to add it to draining list.");
719 return false;
720 }
721
722
723 if (this.drainingServers.contains(sn)) {
724 LOG.warn("Server " + sn + " is already in the draining server list." +
725 "Ignoring request to add it again.");
726 return false;
727 }
728 return this.drainingServers.add(sn);
729 }
730
731
732
733
734
735
736
737
738
739
740
741
742 public RegionOpeningState sendRegionOpen(final ServerName server,
743 HRegionInfo region, List<ServerName> favoredNodes)
744 throws IOException {
745 AdminService.BlockingInterface admin = getRsAdmin(server);
746 if (admin == null) {
747 throw new IOException("Attempting to send OPEN RPC to server " + server.toString() +
748 " failed because no RPC connection found to this server");
749 }
750 OpenRegionRequest request = RequestConverter.buildOpenRegionRequest(server,
751 region, favoredNodes,
752 (RecoveryMode.LOG_REPLAY == this.services.getMasterFileSystem().getLogRecoveryMode()));
753 try {
754 OpenRegionResponse response = admin.openRegion(null, request);
755 return ResponseConverter.getRegionOpeningState(response);
756 } catch (ServiceException se) {
757 throw ProtobufUtil.getRemoteException(se);
758 }
759 }
760
761
762
763
764
765
766
767
768
769
770 public List<RegionOpeningState> sendRegionOpen(ServerName server,
771 List<Pair<HRegionInfo, List<ServerName>>> regionOpenInfos)
772 throws IOException {
773 AdminService.BlockingInterface admin = getRsAdmin(server);
774 if (admin == null) {
775 throw new IOException("Attempting to send OPEN RPC to server " + server.toString() +
776 " failed because no RPC connection found to this server");
777 }
778
779 OpenRegionRequest request = RequestConverter.buildOpenRegionRequest(server, regionOpenInfos,
780 (RecoveryMode.LOG_REPLAY == this.services.getMasterFileSystem().getLogRecoveryMode()));
781 try {
782 OpenRegionResponse response = admin.openRegion(null, request);
783 return ResponseConverter.getRegionOpeningStateList(response);
784 } catch (ServiceException se) {
785 throw ProtobufUtil.getRemoteException(se);
786 }
787 }
788
789
790
791
792
793
794
795
796
797
798
799 public boolean sendRegionClose(ServerName server, HRegionInfo region,
800 ServerName dest) throws IOException {
801 if (server == null) throw new NullPointerException("Passed server is null");
802 AdminService.BlockingInterface admin = getRsAdmin(server);
803 if (admin == null) {
804 throw new IOException("Attempting to send CLOSE RPC to server " +
805 server.toString() + " for region " +
806 region.getRegionNameAsString() +
807 " failed because no RPC connection found to this server");
808 }
809 return ProtobufUtil.closeRegion(admin, server, region.getRegionName(),
810 dest);
811 }
812
813 public boolean sendRegionClose(ServerName server,
814 HRegionInfo region) throws IOException {
815 return sendRegionClose(server, region, null);
816 }
817
818
819
820
821
822
823
824
825
826 public void sendRegionWarmup(ServerName server,
827 HRegionInfo region) {
828 if (server == null) return;
829 try {
830 AdminService.BlockingInterface admin = getRsAdmin(server);
831 ProtobufUtil.warmupRegion(admin, region);
832 } catch (IOException e) {
833 LOG.error("Received exception in RPC for warmup server:" +
834 server + "region: " + region +
835 "exception: " + e);
836 }
837 }
838
839
840
841
842
843 public static void closeRegionSilentlyAndWait(ClusterConnection connection,
844 ServerName server, HRegionInfo region, long timeout) throws IOException, InterruptedException {
845 AdminService.BlockingInterface rs = connection.getAdmin(server);
846 try {
847 ProtobufUtil.closeRegion(rs, server, region.getRegionName());
848 } catch (IOException e) {
849 LOG.warn("Exception when closing region: " + region.getRegionNameAsString(), e);
850 }
851 long expiration = timeout + System.currentTimeMillis();
852 while (System.currentTimeMillis() < expiration) {
853 try {
854 HRegionInfo rsRegion =
855 ProtobufUtil.getRegionInfo(rs, region.getRegionName());
856 if (rsRegion == null) return;
857 } catch (IOException ioe) {
858 if (ioe instanceof NotServingRegionException)
859 return;
860 LOG.warn("Exception when retrieving regioninfo from: " + region.getRegionNameAsString(), ioe);
861 }
862 Thread.sleep(1000);
863 }
864 throw new IOException("Region " + region + " failed to close within"
865 + " timeout " + timeout);
866 }
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881 public void sendRegionsMerge(ServerName server, HRegionInfo region_a,
882 HRegionInfo region_b, boolean forcible) throws IOException {
883 if (server == null)
884 throw new NullPointerException("Passed server is null");
885 if (region_a == null || region_b == null)
886 throw new NullPointerException("Passed region is null");
887 AdminService.BlockingInterface admin = getRsAdmin(server);
888 if (admin == null) {
889 throw new IOException("Attempting to send MERGE REGIONS RPC to server "
890 + server.toString() + " for region "
891 + region_a.getRegionNameAsString() + ","
892 + region_b.getRegionNameAsString()
893 + " failed because no RPC connection found to this server");
894 }
895 ProtobufUtil.mergeRegions(admin, region_a, region_b, forcible);
896 }
897
898
899
900
901 public boolean isServerReachable(ServerName server) {
902 if (server == null) throw new NullPointerException("Passed server is null");
903
904 RetryCounter retryCounter = pingRetryCounterFactory.create();
905 while (retryCounter.shouldRetry()) {
906 try {
907 AdminService.BlockingInterface admin = getRsAdmin(server);
908 if (admin != null) {
909 ServerInfo info = ProtobufUtil.getServerInfo(admin);
910 return info != null && info.hasServerName()
911 && server.getStartcode() == info.getServerName().getStartCode();
912 }
913 } catch (IOException ioe) {
914 LOG.debug("Couldn't reach " + server + ", try=" + retryCounter.getAttemptTimes()
915 + " of " + retryCounter.getMaxAttempts(), ioe);
916 try {
917 retryCounter.sleepUntilNextRetry();
918 } catch(InterruptedException ie) {
919 Thread.currentThread().interrupt();
920 }
921 }
922 }
923 return false;
924 }
925
926
927
928
929
930
931
932 private AdminService.BlockingInterface getRsAdmin(final ServerName sn)
933 throws IOException {
934 AdminService.BlockingInterface admin = this.rsAdmins.get(sn);
935 if (admin == null) {
936 LOG.debug("New admin connection to " + sn.toString());
937 if (sn.equals(master.getServerName()) && master instanceof HRegionServer) {
938
939 admin = ((HRegionServer)master).getRSRpcServices();
940 } else {
941 admin = this.connection.getAdmin(sn);
942 }
943 this.rsAdmins.put(sn, admin);
944 }
945 return admin;
946 }
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961 public void waitForRegionServers(MonitoredTask status)
962 throws InterruptedException {
963 final long interval = this.master.getConfiguration().
964 getLong(WAIT_ON_REGIONSERVERS_INTERVAL, 1500);
965 final long timeout = this.master.getConfiguration().
966 getLong(WAIT_ON_REGIONSERVERS_TIMEOUT, 4500);
967 int defaultMinToStart = 1;
968 if (BaseLoadBalancer.tablesOnMaster(master.getConfiguration())) {
969
970
971
972
973 defaultMinToStart = 2;
974 }
975 int minToStart = this.master.getConfiguration().
976 getInt(WAIT_ON_REGIONSERVERS_MINTOSTART, defaultMinToStart);
977 if (minToStart < 1) {
978 LOG.warn(String.format(
979 "The value of '%s' (%d) can not be less than 1, ignoring.",
980 WAIT_ON_REGIONSERVERS_MINTOSTART, minToStart));
981 minToStart = 1;
982 }
983 int maxToStart = this.master.getConfiguration().
984 getInt(WAIT_ON_REGIONSERVERS_MAXTOSTART, Integer.MAX_VALUE);
985 if (maxToStart < minToStart) {
986 LOG.warn(String.format(
987 "The value of '%s' (%d) is set less than '%s' (%d), ignoring.",
988 WAIT_ON_REGIONSERVERS_MAXTOSTART, maxToStart,
989 WAIT_ON_REGIONSERVERS_MINTOSTART, minToStart));
990 maxToStart = Integer.MAX_VALUE;
991 }
992
993 long now = System.currentTimeMillis();
994 final long startTime = now;
995 long slept = 0;
996 long lastLogTime = 0;
997 long lastCountChange = startTime;
998 int count = countOfRegionServers();
999 int oldCount = 0;
1000 while (!this.master.isStopped() && count < maxToStart
1001 && (lastCountChange+interval > now || timeout > slept || count < minToStart)) {
1002
1003 if (oldCount != count || lastLogTime+interval < now){
1004 lastLogTime = now;
1005 String msg =
1006 "Waiting for region servers count to settle; currently"+
1007 " checked in " + count + ", slept for " + slept + " ms," +
1008 " expecting minimum of " + minToStart + ", maximum of "+ maxToStart+
1009 ", timeout of "+timeout+" ms, interval of "+interval+" ms.";
1010 LOG.info(msg);
1011 status.setStatus(msg);
1012 }
1013
1014
1015 final long sleepTime = 50;
1016 Thread.sleep(sleepTime);
1017 now = System.currentTimeMillis();
1018 slept = now - startTime;
1019
1020 oldCount = count;
1021 count = countOfRegionServers();
1022 if (count != oldCount) {
1023 lastCountChange = now;
1024 }
1025 }
1026
1027 LOG.info("Finished waiting for region servers count to settle;" +
1028 " checked in " + count + ", slept for " + slept + " ms," +
1029 " expecting minimum of " + minToStart + ", maximum of "+ maxToStart+","+
1030 " master is "+ (this.master.isStopped() ? "stopped.": "running")
1031 );
1032 }
1033
1034
1035
1036
1037 public List<ServerName> getOnlineServersList() {
1038
1039
1040 return new ArrayList<ServerName>(this.onlineServers.keySet());
1041 }
1042
1043
1044
1045
1046 public List<ServerName> getDrainingServersList() {
1047 return new ArrayList<ServerName>(this.drainingServers);
1048 }
1049
1050
1051
1052
1053 Set<ServerName> getDeadNotExpiredServers() {
1054 return new HashSet<ServerName>(this.queuedDeadServers);
1055 }
1056
1057
1058
1059
1060
1061
1062 void removeRequeuedDeadServers() {
1063 requeuedDeadServers.clear();
1064 }
1065
1066
1067
1068
1069
1070 Map<ServerName, Boolean> getRequeuedDeadServers() {
1071 return Collections.unmodifiableMap(this.requeuedDeadServers);
1072 }
1073
1074 public boolean isServerOnline(ServerName serverName) {
1075 return serverName != null && onlineServers.containsKey(serverName);
1076 }
1077
1078
1079
1080
1081
1082
1083
1084 public synchronized boolean isServerDead(ServerName serverName) {
1085 return serverName == null || deadservers.isDeadServer(serverName)
1086 || queuedDeadServers.contains(serverName)
1087 || requeuedDeadServers.containsKey(serverName);
1088 }
1089
1090 public void shutdownCluster() {
1091 this.clusterShutdown = true;
1092 this.master.stop("Cluster shutdown requested");
1093 }
1094
1095 public boolean isClusterShutdown() {
1096 return this.clusterShutdown;
1097 }
1098
1099
1100
1101
1102 public void stop() {
1103 if (connection != null) {
1104 try {
1105 connection.close();
1106 } catch (IOException e) {
1107 LOG.error("Attempt to close connection to master failed", e);
1108 }
1109 }
1110 }
1111
1112
1113
1114
1115
1116
1117 public List<ServerName> createDestinationServersList(final ServerName serverToExclude){
1118 final List<ServerName> destServers = getOnlineServersList();
1119
1120 if (serverToExclude != null){
1121 destServers.remove(serverToExclude);
1122 }
1123
1124
1125 final List<ServerName> drainingServersCopy = getDrainingServersList();
1126 if (!drainingServersCopy.isEmpty()) {
1127 for (final ServerName server: drainingServersCopy) {
1128 destServers.remove(server);
1129 }
1130 }
1131
1132
1133 removeDeadNotExpiredServers(destServers);
1134 return destServers;
1135 }
1136
1137
1138
1139
1140 public List<ServerName> createDestinationServersList(){
1141 return createDestinationServersList(null);
1142 }
1143
1144
1145
1146
1147
1148
1149
1150 void removeDeadNotExpiredServers(List<ServerName> servers) {
1151 Set<ServerName> deadNotExpiredServersCopy = this.getDeadNotExpiredServers();
1152 if (!deadNotExpiredServersCopy.isEmpty()) {
1153 for (ServerName server : deadNotExpiredServersCopy) {
1154 LOG.debug("Removing dead but not expired server: " + server
1155 + " from eligible server pool.");
1156 servers.remove(server);
1157 }
1158 }
1159 }
1160
1161
1162
1163
1164 void clearDeadServersWithSameHostNameAndPortOfOnlineServer() {
1165 for (ServerName serverName : getOnlineServersList()) {
1166 deadservers.cleanAllPreviousInstances(serverName);
1167 }
1168 }
1169 }