1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.accumulo.core.client.impl;
18
19 import java.util.ArrayList;
20 import java.util.Collection;
21 import java.util.Collections;
22 import java.util.Comparator;
23 import java.util.HashMap;
24 import java.util.Iterator;
25 import java.util.List;
26 import java.util.Map;
27 import java.util.Map.Entry;
28 import java.util.SortedMap;
29 import java.util.TreeMap;
30 import java.util.TreeSet;
31 import java.util.concurrent.locks.Lock;
32 import java.util.concurrent.locks.ReentrantReadWriteLock;
33
34 import org.apache.accumulo.core.client.AccumuloException;
35 import org.apache.accumulo.core.client.AccumuloSecurityException;
36 import org.apache.accumulo.core.client.TableNotFoundException;
37 import org.apache.accumulo.core.data.Key;
38 import org.apache.accumulo.core.data.KeyExtent;
39 import org.apache.accumulo.core.data.Mutation;
40 import org.apache.accumulo.core.data.PartialKey;
41 import org.apache.accumulo.core.data.Range;
42 import org.apache.accumulo.core.util.OpTimer;
43 import org.apache.accumulo.core.util.TextUtil;
44 import org.apache.accumulo.core.util.UtilWaitThread;
45 import org.apache.hadoop.io.Text;
46 import org.apache.hadoop.io.WritableComparator;
47 import org.apache.log4j.Level;
48 import org.apache.log4j.Logger;
49
50 public class TabletLocatorImpl extends TabletLocator {
51
52 private static final Logger log = Logger.getLogger(TabletLocatorImpl.class);
53
54
55
56
57 static final Text MAX_TEXT = new Text();
58
59 private static class EndRowComparator implements Comparator<Text> {
60
61 public int compare(Text o1, Text o2) {
62
63 int ret;
64
65 if (o1 == MAX_TEXT)
66 if (o2 == MAX_TEXT)
67 ret = 0;
68 else
69 ret = 1;
70 else if (o2 == MAX_TEXT)
71 ret = -1;
72 else
73 ret = o1.compareTo(o2);
74
75 return ret;
76 }
77
78 }
79
80 static final EndRowComparator endRowComparator = new EndRowComparator();
81
82 protected Text tableId;
83 protected TabletLocator parent;
84 protected TreeMap<Text,TabletLocation> metaCache = new TreeMap<Text,TabletLocation>(endRowComparator);
85 protected TabletLocationObtainer locationObtainer;
86 protected Text lastTabletRow;
87
88 private TreeSet<KeyExtent> badExtents = new TreeSet<KeyExtent>();
89 private ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
90 private final Lock rLock = rwLock.readLock();
91 private final Lock wLock = rwLock.writeLock();
92
93 public static interface TabletLocationObtainer {
94 /**
95 * @return null when unable to read information successfully
96 */
97 TabletLocations lookupTablet(TabletLocation src, Text row, Text stopRow, TabletLocator parent) throws AccumuloSecurityException, AccumuloException;
98
99 List<TabletLocation> lookupTablets(String tserver, Map<KeyExtent,List<Range>> map, TabletLocator parent) throws AccumuloSecurityException,
100 AccumuloException;
101 }
102
103 public TabletLocatorImpl(Text table, TabletLocator parent, TabletLocationObtainer tlo) {
104 this.tableId = table;
105 this.parent = parent;
106 this.locationObtainer = tlo;
107
108 this.lastTabletRow = new Text(tableId);
109 lastTabletRow.append(new byte[] {'<'}, 0, 1);
110 }
111
112 @Override
113 public void binMutations(List<Mutation> mutations, Map<String,TabletServerMutations> binnedMutations, List<Mutation> failures) throws AccumuloException,
114 AccumuloSecurityException, TableNotFoundException {
115
116 OpTimer opTimer = null;
117 if (log.isTraceEnabled())
118 opTimer = new OpTimer(log, Level.TRACE).start("Binning " + mutations.size() + " mutations for table " + tableId);
119
120 ArrayList<Mutation> notInCache = new ArrayList<Mutation>();
121 Text row = new Text();
122
123 rLock.lock();
124 try {
125 processInvalidated();
126
127
128
129
130
131
132
133 for (Mutation mutation : mutations) {
134 row.set(mutation.getRow());
135 TabletLocation tl = locateTabletInCache(row);
136 if (tl == null)
137 notInCache.add(mutation);
138 else
139 addMutation(binnedMutations, mutation, tl);
140
141 }
142 } finally {
143 rLock.unlock();
144 }
145
146 if (notInCache.size() > 0) {
147 Collections.sort(notInCache, new Comparator<Mutation>() {
148 public int compare(Mutation o1, Mutation o2) {
149 return WritableComparator.compareBytes(o1.getRow(), 0, o1.getRow().length, o2.getRow(), 0, o2.getRow().length);
150 }
151 });
152
153 wLock.lock();
154 try {
155 boolean failed = false;
156 for (Mutation mutation : notInCache) {
157 if (failed) {
158
159
160 failures.add(mutation);
161 continue;
162 }
163
164 row.set(mutation.getRow());
165
166 TabletLocation tl = _locateTablet(row, false, false, false);
167
168 if (tl == null) {
169 failures.add(mutation);
170 failed = true;
171 } else {
172 addMutation(binnedMutations, mutation, tl);
173 }
174 }
175 } finally {
176 wLock.unlock();
177 }
178 }
179
180 if (opTimer != null)
181 opTimer.stop("Binned " + mutations.size() + " mutations for table " + tableId + " to " + binnedMutations.size() + " tservers in %DURATION%");
182 }
183
184 private void addMutation(Map<String,TabletServerMutations> binnedMutations, Mutation mutation, TabletLocation tl) {
185 TabletServerMutations tsm = binnedMutations.get(tl.tablet_location);
186
187 if (tsm == null) {
188 tsm = new TabletServerMutations();
189 binnedMutations.put(tl.tablet_location, tsm);
190 }
191
192 tsm.addMutation(tl.tablet_extent, mutation);
193 }
194
195 private List<Range> binRanges(List<Range> ranges, Map<String,Map<KeyExtent,List<Range>>> binnedRanges, boolean useCache) throws AccumuloException,
196 AccumuloSecurityException, TableNotFoundException {
197 List<Range> failures = new ArrayList<Range>();
198 List<TabletLocation> tabletLocations = new ArrayList<TabletLocation>();
199
200 boolean lookupFailed = false;
201
202 l1: for (Range range : ranges) {
203 tabletLocations.clear();
204
205 Text startRow;
206
207 if (range.getStartKey() != null) {
208 startRow = range.getStartKey().getRow();
209 } else
210 startRow = new Text();
211
212 TabletLocation tl = null;
213
214 if (useCache)
215 tl = locateTabletInCache(startRow);
216 else if (!lookupFailed)
217 tl = _locateTablet(startRow, false, false, false);
218
219 if (tl == null) {
220 failures.add(range);
221 if (!useCache)
222 lookupFailed = true;
223 continue;
224 }
225
226 tabletLocations.add(tl);
227
228 while (tl.tablet_extent.getEndRow() != null && !range.afterEndKey(new Key(tl.tablet_extent.getEndRow()).followingKey(PartialKey.ROW))) {
229 if (useCache) {
230 Text row = new Text(tl.tablet_extent.getEndRow());
231 row.append(new byte[] {0}, 0, 1);
232 tl = locateTabletInCache(row);
233 } else {
234 tl = _locateTablet(tl.tablet_extent.getEndRow(), true, false, false);
235 }
236
237 if (tl == null) {
238 failures.add(range);
239 if (!useCache)
240 lookupFailed = true;
241 continue l1;
242 }
243 tabletLocations.add(tl);
244 }
245
246 for (TabletLocation tl2 : tabletLocations) {
247 TabletLocatorImpl.addRange(binnedRanges, tl2.tablet_location, tl2.tablet_extent, range);
248 }
249
250 }
251
252 return failures;
253 }
254
255 @Override
256 public List<Range> binRanges(List<Range> ranges, Map<String,Map<KeyExtent,List<Range>>> binnedRanges) throws AccumuloException, AccumuloSecurityException,
257 TableNotFoundException {
258
259
260
261
262
263
264 OpTimer opTimer = null;
265 if (log.isTraceEnabled())
266 opTimer = new OpTimer(log, Level.TRACE).start("Binning " + ranges.size() + " ranges for table " + tableId);
267
268 List<Range> failures;
269 rLock.lock();
270 try {
271 processInvalidated();
272
273
274
275
276
277
278 failures = binRanges(ranges, binnedRanges, true);
279 } finally {
280 rLock.unlock();
281 }
282
283 if (failures.size() > 0) {
284
285 Collections.sort(failures);
286
287
288 wLock.lock();
289 try {
290 failures = binRanges(failures, binnedRanges, false);
291 } finally {
292 wLock.unlock();
293 }
294 }
295
296 if (opTimer != null)
297 opTimer.stop("Binned " + ranges.size() + " ranges for table " + tableId + " to " + binnedRanges.size() + " tservers in %DURATION%");
298
299 return failures;
300 }
301
302 @Override
303 public void invalidateCache(KeyExtent failedExtent) {
304 wLock.lock();
305 try {
306 badExtents.add(failedExtent);
307 } finally {
308 wLock.unlock();
309 }
310 if (log.isTraceEnabled())
311 log.trace("Invalidated extent=" + failedExtent);
312 }
313
314 @Override
315 public void invalidateCache(Collection<KeyExtent> keySet) {
316 wLock.lock();
317 try {
318 badExtents.addAll(keySet);
319 } finally {
320 wLock.unlock();
321 }
322 if (log.isTraceEnabled())
323 log.trace("Invalidated " + keySet.size() + " cache entries for table " + tableId);
324 }
325
326 @Override
327 public void invalidateCache(String server) {
328 int invalidatedCount = 0;
329
330 wLock.lock();
331 try {
332 for (TabletLocation cacheEntry : metaCache.values())
333 if (cacheEntry.tablet_location.equals(server)) {
334 badExtents.add(cacheEntry.tablet_extent);
335 invalidatedCount++;
336 }
337 } finally {
338 wLock.unlock();
339 }
340
341 if (log.isTraceEnabled())
342 log.trace("invalidated " + invalidatedCount + " cache entries table=" + tableId + " server=" + server);
343
344 }
345
346 @Override
347 public void invalidateCache() {
348 int invalidatedCount;
349 wLock.lock();
350 try {
351 invalidatedCount = metaCache.size();
352 metaCache.clear();
353 } finally {
354 wLock.unlock();
355 }
356 if (log.isTraceEnabled())
357 log.trace("invalidated all " + invalidatedCount + " cache entries for table=" + tableId);
358 }
359
360 @Override
361 public TabletLocation locateTablet(Text row, boolean skipRow, boolean retry) throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
362
363 OpTimer opTimer = null;
364 if (log.isTraceEnabled())
365 opTimer = new OpTimer(log, Level.TRACE).start("Locating tablet table=" + tableId + " row=" + TextUtil.truncate(row) + " skipRow=" + skipRow + " retry="
366 + retry);
367
368 while (true) {
369
370 TabletLocation tl;
371
372 tl = _locateTablet(row, skipRow, retry, true);
373
374 if (retry && tl == null) {
375 UtilWaitThread.sleep(100);
376 if (log.isTraceEnabled())
377 log.trace("Failed to locate tablet containing row " + TextUtil.truncate(row) + " in table " + tableId + ", will retry...");
378 continue;
379 }
380
381 if (opTimer != null)
382 opTimer.stop("Located tablet " + (tl == null ? null : tl.tablet_extent) + " at " + (tl == null ? null : tl.tablet_location) + " in %DURATION%");
383
384 return tl;
385 }
386 }
387
388 private void lookupTabletLocation(Text row, boolean retry) throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
389 Text metadataRow = new Text(tableId);
390 metadataRow.append(new byte[] {';'}, 0, 1);
391 metadataRow.append(row.getBytes(), 0, row.getLength());
392 TabletLocation ptl = parent.locateTablet(metadataRow, false, retry);
393
394 if (ptl != null) {
395 TabletLocations locations = locationObtainer.lookupTablet(ptl, metadataRow, lastTabletRow, parent);
396 while (locations != null && locations.getLocations().isEmpty() && locations.getLocationless().isEmpty()
397 && !ptl.tablet_extent.isRootTablet()) {
398
399 Text er = ptl.tablet_extent.getEndRow();
400 if (er != null && er.compareTo(lastTabletRow) < 0) {
401
402 ptl = parent.locateTablet(er, true, retry);
403 if (ptl != null)
404 locations = locationObtainer.lookupTablet(ptl, metadataRow, lastTabletRow, parent);
405 else
406 break;
407 } else {
408 break;
409 }
410 }
411
412 if (locations == null)
413 return;
414
415
416
417
418 Text lastEndRow = null;
419 for (TabletLocation tabletLocation : locations.getLocations()) {
420
421 KeyExtent ke = tabletLocation.tablet_extent;
422 TabletLocation locToCache;
423
424
425 if ((lastEndRow != null) && (ke.getPrevEndRow() != null) && ke.getPrevEndRow().equals(lastEndRow)) {
426 locToCache = new TabletLocation(new KeyExtent(ke.getTableId(), ke.getEndRow(), lastEndRow), tabletLocation.tablet_location);
427 } else {
428 locToCache = tabletLocation;
429 }
430
431
432 lastEndRow = locToCache.tablet_extent.getEndRow();
433
434 updateCache(locToCache);
435 }
436 }
437
438 }
439
440 private void updateCache(TabletLocation tabletLocation) {
441 if (!tabletLocation.tablet_extent.getTableId().equals(tableId)) {
442
443 throw new IllegalStateException("Unexpected extent returned " + tableId + " " + tabletLocation.tablet_extent);
444 }
445
446 if (tabletLocation.tablet_location == null) {
447
448 throw new IllegalStateException("Cannot add null locations to cache " + tableId + " " + tabletLocation.tablet_extent);
449 }
450
451 if (!tabletLocation.tablet_extent.getTableId().equals(tableId)) {
452
453 throw new IllegalStateException("Cannot add other table ids to locations cache " + tableId + " " + tabletLocation.tablet_extent);
454 }
455
456
457 removeOverlapping(metaCache, tabletLocation.tablet_extent);
458
459
460 Text er = tabletLocation.tablet_extent.getEndRow();
461 if (er == null)
462 er = MAX_TEXT;
463 metaCache.put(er, tabletLocation);
464
465 if (badExtents.size() > 0)
466 removeOverlapping(badExtents, tabletLocation.tablet_extent);
467 }
468
469 static void removeOverlapping(TreeMap<Text,TabletLocation> metaCache, KeyExtent nke) {
470 Iterator<Entry<Text,TabletLocation>> iter = null;
471
472 if (nke.getPrevEndRow() == null) {
473 iter = metaCache.entrySet().iterator();
474 } else {
475 Text row = rowAfterPrevRow(nke);
476 SortedMap<Text,TabletLocation> tailMap = metaCache.tailMap(row);
477 iter = tailMap.entrySet().iterator();
478 }
479
480 while (iter.hasNext()) {
481 Entry<Text,TabletLocation> entry = iter.next();
482
483 KeyExtent ke = entry.getValue().tablet_extent;
484
485 if (stopRemoving(nke, ke)) {
486 break;
487 }
488
489 iter.remove();
490 }
491 }
492
493 private static boolean stopRemoving(KeyExtent nke, KeyExtent ke) {
494 return ke.getPrevEndRow() != null && nke.getEndRow() != null && ke.getPrevEndRow().compareTo(nke.getEndRow()) >= 0;
495 }
496
497 private static Text rowAfterPrevRow(KeyExtent nke) {
498 Text row = new Text(nke.getPrevEndRow());
499 row.append(new byte[] {0}, 0, 1);
500 return row;
501 }
502
503 static void removeOverlapping(TreeSet<KeyExtent> extents, KeyExtent nke) {
504 for (KeyExtent overlapping : KeyExtent.findOverlapping(nke, extents)) {
505 extents.remove(overlapping);
506 }
507 }
508
509 private TabletLocation locateTabletInCache(Text row) {
510
511 Entry<Text,TabletLocation> entry = metaCache.ceilingEntry(row);
512
513 if (entry != null) {
514 KeyExtent ke = entry.getValue().tablet_extent;
515 if (ke.getPrevEndRow() == null || ke.getPrevEndRow().compareTo(row) < 0)
516 return entry.getValue();
517 }
518 return null;
519 }
520
521 protected TabletLocation _locateTablet(Text row, boolean skipRow, boolean retry, boolean lock) throws AccumuloException, AccumuloSecurityException,
522 TableNotFoundException {
523
524 if (skipRow) {
525 row = new Text(row);
526 row.append(new byte[] {0}, 0, 1);
527 }
528
529 TabletLocation tl;
530
531 if (lock)
532 rLock.lock();
533 try {
534 processInvalidated();
535 tl = locateTabletInCache(row);
536 } finally {
537 if (lock)
538 rLock.unlock();
539 }
540
541 if (tl == null) {
542 if (lock)
543 wLock.lock();
544 try {
545
546 lookupTabletLocation(row, retry);
547
548 tl = locateTabletInCache(row);
549 } finally {
550 if (lock)
551 wLock.unlock();
552 }
553 }
554
555 return tl;
556 }
557
558 private void processInvalidated() throws AccumuloSecurityException, AccumuloException, TableNotFoundException {
559
560 if (badExtents.size() == 0)
561 return;
562
563 boolean writeLockHeld = rwLock.isWriteLockedByCurrentThread();
564 try {
565 if (!writeLockHeld) {
566 rLock.unlock();
567 wLock.lock();
568 if (badExtents.size() == 0)
569 return;
570 }
571
572 List<Range> lookups = new ArrayList<Range>(badExtents.size());
573
574 for (KeyExtent be : badExtents) {
575 lookups.add(be.toMetadataRange());
576 removeOverlapping(metaCache, be);
577 }
578
579 lookups = Range.mergeOverlapping(lookups);
580
581 Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>();
582
583 parent.binRanges(lookups, binnedRanges);
584
585
586 ArrayList<String> tabletServers = new ArrayList<String>(binnedRanges.keySet());
587 Collections.shuffle(tabletServers);
588
589 for (String tserver : tabletServers) {
590 List<TabletLocation> locations = locationObtainer.lookupTablets(tserver, binnedRanges.get(tserver), parent);
591
592 for (TabletLocation tabletLocation : locations) {
593 updateCache(tabletLocation);
594 }
595 }
596 } finally {
597 if (!writeLockHeld) {
598 rLock.lock();
599 wLock.unlock();
600 }
601 }
602 }
603
604 protected static void addRange(Map<String,Map<KeyExtent,List<Range>>> binnedRanges, String location, KeyExtent ke, Range range) {
605 Map<KeyExtent,List<Range>> tablets = binnedRanges.get(location);
606 if (tablets == null) {
607 tablets = new HashMap<KeyExtent,List<Range>>();
608 binnedRanges.put(location, tablets);
609 }
610
611 List<Range> tabletsRanges = tablets.get(ke);
612 if (tabletsRanges == null) {
613 tabletsRanges = new ArrayList<Range>();
614 tablets.put(ke, tabletsRanges);
615 }
616
617 tabletsRanges.add(range);
618 }
619
620 }