View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  package org.apache.accumulo.core.client.impl;
18  
19  import java.util.ArrayList;
20  import java.util.Collection;
21  import java.util.Collections;
22  import java.util.Comparator;
23  import java.util.HashMap;
24  import java.util.Iterator;
25  import java.util.List;
26  import java.util.Map;
27  import java.util.Map.Entry;
28  import java.util.SortedMap;
29  import java.util.TreeMap;
30  import java.util.TreeSet;
31  import java.util.concurrent.locks.Lock;
32  import java.util.concurrent.locks.ReentrantReadWriteLock;
33  
34  import org.apache.accumulo.core.client.AccumuloException;
35  import org.apache.accumulo.core.client.AccumuloSecurityException;
36  import org.apache.accumulo.core.client.TableNotFoundException;
37  import org.apache.accumulo.core.data.Key;
38  import org.apache.accumulo.core.data.KeyExtent;
39  import org.apache.accumulo.core.data.Mutation;
40  import org.apache.accumulo.core.data.PartialKey;
41  import org.apache.accumulo.core.data.Range;
42  import org.apache.accumulo.core.util.OpTimer;
43  import org.apache.accumulo.core.util.TextUtil;
44  import org.apache.accumulo.core.util.UtilWaitThread;
45  import org.apache.hadoop.io.Text;
46  import org.apache.hadoop.io.WritableComparator;
47  import org.apache.log4j.Level;
48  import org.apache.log4j.Logger;
49  
50  public class TabletLocatorImpl extends TabletLocator {
51    
52    private static final Logger log = Logger.getLogger(TabletLocatorImpl.class);
53    
54    // there seems to be a bug in TreeMap.tailMap related to
55    // putting null in the treemap.. therefore instead of
56    // putting null, put MAX_TEXT
57    static final Text MAX_TEXT = new Text();
58    
59    private static class EndRowComparator implements Comparator<Text> {
60      
61      public int compare(Text o1, Text o2) {
62        
63        int ret;
64        
65        if (o1 == MAX_TEXT)
66          if (o2 == MAX_TEXT)
67            ret = 0;
68          else
69            ret = 1;
70        else if (o2 == MAX_TEXT)
71          ret = -1;
72        else
73          ret = o1.compareTo(o2);
74        
75        return ret;
76      }
77      
78    }
79    
80    static final EndRowComparator endRowComparator = new EndRowComparator();
81    
82    protected Text tableId;
83    protected TabletLocator parent;
84    protected TreeMap<Text,TabletLocation> metaCache = new TreeMap<Text,TabletLocation>(endRowComparator);
85    protected TabletLocationObtainer locationObtainer;
86    protected Text lastTabletRow;
87    
88    private TreeSet<KeyExtent> badExtents = new TreeSet<KeyExtent>();
89    private ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
90    private final Lock rLock = rwLock.readLock();
91    private final Lock wLock = rwLock.writeLock();
92    
93    public static interface TabletLocationObtainer {
94      /**
95       * @return null when unable to read information successfully
96       */
97      TabletLocations lookupTablet(TabletLocation src, Text row, Text stopRow, TabletLocator parent) throws AccumuloSecurityException, AccumuloException;
98      
99      List<TabletLocation> lookupTablets(String tserver, Map<KeyExtent,List<Range>> map, TabletLocator parent) throws AccumuloSecurityException,
100         AccumuloException;
101   }
102   
103   public TabletLocatorImpl(Text table, TabletLocator parent, TabletLocationObtainer tlo) {
104     this.tableId = table;
105     this.parent = parent;
106     this.locationObtainer = tlo;
107     
108     this.lastTabletRow = new Text(tableId);
109     lastTabletRow.append(new byte[] {'<'}, 0, 1);
110   }
111   
112   @Override
113   public void binMutations(List<Mutation> mutations, Map<String,TabletServerMutations> binnedMutations, List<Mutation> failures) throws AccumuloException,
114       AccumuloSecurityException, TableNotFoundException {
115     
116     OpTimer opTimer = null;
117     if (log.isTraceEnabled())
118       opTimer = new OpTimer(log, Level.TRACE).start("Binning " + mutations.size() + " mutations for table " + tableId);
119     
120     ArrayList<Mutation> notInCache = new ArrayList<Mutation>();
121     Text row = new Text();
122     
123     rLock.lock();
124     try {
125       processInvalidated();
126       
127       // for this to be efficient rows need to be in sorted order, but always sorting is slow... therefore only sort the
128       // stuff not in the cache.... it is most efficient to pass _locateTablet rows in sorted order
129       
130       // For this to be efficient, need to avoid fine grained synchronization and fine grained logging.
131       // Therefore methods called by this are not synchronized and should not log.
132       
133       for (Mutation mutation : mutations) {
134         row.set(mutation.getRow());
135         TabletLocation tl = locateTabletInCache(row);
136         if (tl == null)
137           notInCache.add(mutation);
138         else
139           addMutation(binnedMutations, mutation, tl);
140         
141       }
142     } finally {
143       rLock.unlock();
144     }
145     
146     if (notInCache.size() > 0) {
147       Collections.sort(notInCache, new Comparator<Mutation>() {
148         public int compare(Mutation o1, Mutation o2) {
149           return WritableComparator.compareBytes(o1.getRow(), 0, o1.getRow().length, o2.getRow(), 0, o2.getRow().length);
150         }
151       });
152       
153       wLock.lock();
154       try {
155         boolean failed = false;
156         for (Mutation mutation : notInCache) {
157           if (failed) {
158             // when one table does not return a location, something is probably
159             // screwy, go ahead and fail everything.
160             failures.add(mutation);
161             continue;
162           }
163           
164           row.set(mutation.getRow());
165           
166           TabletLocation tl = _locateTablet(row, false, false, false);
167           
168           if (tl == null) {
169             failures.add(mutation);
170             failed = true;
171           } else {
172             addMutation(binnedMutations, mutation, tl);
173           }
174         }
175       } finally {
176         wLock.unlock();
177       }
178     }
179     
180     if (opTimer != null)
181       opTimer.stop("Binned " + mutations.size() + " mutations for table " + tableId + " to " + binnedMutations.size() + " tservers in %DURATION%");
182   }
183   
184   private void addMutation(Map<String,TabletServerMutations> binnedMutations, Mutation mutation, TabletLocation tl) {
185     TabletServerMutations tsm = binnedMutations.get(tl.tablet_location);
186     
187     if (tsm == null) {
188       tsm = new TabletServerMutations();
189       binnedMutations.put(tl.tablet_location, tsm);
190     }
191     
192     tsm.addMutation(tl.tablet_extent, mutation);
193   }
194   
195   private List<Range> binRanges(List<Range> ranges, Map<String,Map<KeyExtent,List<Range>>> binnedRanges, boolean useCache) throws AccumuloException,
196       AccumuloSecurityException, TableNotFoundException {
197     List<Range> failures = new ArrayList<Range>();
198     List<TabletLocation> tabletLocations = new ArrayList<TabletLocation>();
199     
200     boolean lookupFailed = false;
201     
202     l1: for (Range range : ranges) {
203       tabletLocations.clear();
204       
205       Text startRow;
206       
207       if (range.getStartKey() != null) {
208         startRow = range.getStartKey().getRow();
209       } else
210         startRow = new Text();
211       
212       TabletLocation tl = null;
213       
214       if (useCache)
215         tl = locateTabletInCache(startRow);
216       else if (!lookupFailed)
217         tl = _locateTablet(startRow, false, false, false);
218       
219       if (tl == null) {
220         failures.add(range);
221         if (!useCache)
222           lookupFailed = true;
223         continue;
224       }
225       
226       tabletLocations.add(tl);
227       
228       while (tl.tablet_extent.getEndRow() != null && !range.afterEndKey(new Key(tl.tablet_extent.getEndRow()).followingKey(PartialKey.ROW))) {
229         if (useCache) {
230           Text row = new Text(tl.tablet_extent.getEndRow());
231           row.append(new byte[] {0}, 0, 1);
232           tl = locateTabletInCache(row);
233         } else {
234           tl = _locateTablet(tl.tablet_extent.getEndRow(), true, false, false);
235         }
236         
237         if (tl == null) {
238           failures.add(range);
239           if (!useCache)
240             lookupFailed = true;
241           continue l1;
242         }
243         tabletLocations.add(tl);
244       }
245       
246       for (TabletLocation tl2 : tabletLocations) {
247         TabletLocatorImpl.addRange(binnedRanges, tl2.tablet_location, tl2.tablet_extent, range);
248       }
249       
250     }
251     
252     return failures;
253   }
254   
255   @Override
256   public List<Range> binRanges(List<Range> ranges, Map<String,Map<KeyExtent,List<Range>>> binnedRanges) throws AccumuloException, AccumuloSecurityException,
257       TableNotFoundException {
258     
259     /*
260      * For this to be efficient, need to avoid fine grained synchronization and fine grained logging. Therefore methods called by this are not synchronized and
261      * should not log.
262      */
263     
264     OpTimer opTimer = null;
265     if (log.isTraceEnabled())
266       opTimer = new OpTimer(log, Level.TRACE).start("Binning " + ranges.size() + " ranges for table " + tableId);
267     
268     List<Range> failures;
269     rLock.lock();
270     try {
271       processInvalidated();
272       
273       // for this to be optimal, need to look ranges up in sorted order when
274       // ranges are not present in cache... however do not want to always
275       // sort ranges... therefore try binning ranges using only the cache
276       // and sort whatever fails and retry
277       
278       failures = binRanges(ranges, binnedRanges, true);
279     } finally {
280       rLock.unlock();
281     }
282     
283     if (failures.size() > 0) {
284       // sort failures by range start key
285       Collections.sort(failures);
286       
287       // try lookups again
288       wLock.lock();
289       try {
290         failures = binRanges(failures, binnedRanges, false);
291       } finally {
292         wLock.unlock();
293       }
294     }
295     
296     if (opTimer != null)
297       opTimer.stop("Binned " + ranges.size() + " ranges for table " + tableId + " to " + binnedRanges.size() + " tservers in %DURATION%");
298     
299     return failures;
300   }
301   
302   @Override
303   public void invalidateCache(KeyExtent failedExtent) {
304     wLock.lock();
305     try {
306       badExtents.add(failedExtent);
307     } finally {
308       wLock.unlock();
309     }
310     if (log.isTraceEnabled())
311       log.trace("Invalidated extent=" + failedExtent);
312   }
313   
314   @Override
315   public void invalidateCache(Collection<KeyExtent> keySet) {
316     wLock.lock();
317     try {
318       badExtents.addAll(keySet);
319     } finally {
320       wLock.unlock();
321     }
322     if (log.isTraceEnabled())
323       log.trace("Invalidated " + keySet.size() + " cache entries for table " + tableId);
324   }
325   
326   @Override
327   public void invalidateCache(String server) {
328     int invalidatedCount = 0;
329     
330     wLock.lock();
331     try {
332       for (TabletLocation cacheEntry : metaCache.values())
333         if (cacheEntry.tablet_location.equals(server)) {
334           badExtents.add(cacheEntry.tablet_extent);
335           invalidatedCount++;
336         }
337     } finally {
338       wLock.unlock();
339     }
340     
341     if (log.isTraceEnabled())
342       log.trace("invalidated " + invalidatedCount + " cache entries  table=" + tableId + " server=" + server);
343     
344   }
345   
346   @Override
347   public void invalidateCache() {
348     int invalidatedCount;
349     wLock.lock();
350     try {
351       invalidatedCount = metaCache.size();
352       metaCache.clear();
353     } finally {
354       wLock.unlock();
355     }
356     if (log.isTraceEnabled())
357       log.trace("invalidated all " + invalidatedCount + " cache entries for table=" + tableId);
358   }
359   
360   @Override
361   public TabletLocation locateTablet(Text row, boolean skipRow, boolean retry) throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
362     
363     OpTimer opTimer = null;
364     if (log.isTraceEnabled())
365       opTimer = new OpTimer(log, Level.TRACE).start("Locating tablet  table=" + tableId + " row=" + TextUtil.truncate(row) + "  skipRow=" + skipRow + " retry="
366           + retry);
367     
368     while (true) {
369       
370       TabletLocation tl;
371       
372       tl = _locateTablet(row, skipRow, retry, true);
373       
374       if (retry && tl == null) {
375         UtilWaitThread.sleep(100);
376         if (log.isTraceEnabled())
377           log.trace("Failed to locate tablet containing row " + TextUtil.truncate(row) + " in table " + tableId + ", will retry...");
378         continue;
379       }
380       
381       if (opTimer != null)
382         opTimer.stop("Located tablet " + (tl == null ? null : tl.tablet_extent) + " at " + (tl == null ? null : tl.tablet_location) + " in %DURATION%");
383       
384       return tl;
385     }
386   }
387   
388   private void lookupTabletLocation(Text row, boolean retry) throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
389     Text metadataRow = new Text(tableId);
390     metadataRow.append(new byte[] {';'}, 0, 1);
391     metadataRow.append(row.getBytes(), 0, row.getLength());
392     TabletLocation ptl = parent.locateTablet(metadataRow, false, retry);
393     
394     if (ptl != null) {
395       TabletLocations locations = locationObtainer.lookupTablet(ptl, metadataRow, lastTabletRow, parent);
396       while (locations != null && locations.getLocations().isEmpty() && locations.getLocationless().isEmpty()
397  && !ptl.tablet_extent.isRootTablet()) {
398         // try the next tablet, the current tablet does not have any tablets that overlap the row
399         Text er = ptl.tablet_extent.getEndRow();
400         if (er != null && er.compareTo(lastTabletRow) < 0) {
401           // System.out.println("er "+er+"  ltr "+lastTabletRow);
402           ptl = parent.locateTablet(er, true, retry);
403           if (ptl != null)
404             locations = locationObtainer.lookupTablet(ptl, metadataRow, lastTabletRow, parent);
405           else
406             break;
407         } else {
408           break;
409         }
410       }
411       
412       if (locations == null)
413         return;
414 
415       // cannot assume the list contains contiguous key extents... so it is probably
416       // best to deal with each extent individually
417       
418       Text lastEndRow = null;
419       for (TabletLocation tabletLocation : locations.getLocations()) {
420         
421         KeyExtent ke = tabletLocation.tablet_extent;
422         TabletLocation locToCache;
423         
424         // create new location if current prevEndRow == endRow
425         if ((lastEndRow != null) && (ke.getPrevEndRow() != null) && ke.getPrevEndRow().equals(lastEndRow)) {
426           locToCache = new TabletLocation(new KeyExtent(ke.getTableId(), ke.getEndRow(), lastEndRow), tabletLocation.tablet_location);
427         } else {
428           locToCache = tabletLocation;
429         }
430         
431         // save endRow for next iteration
432         lastEndRow = locToCache.tablet_extent.getEndRow();
433         
434         updateCache(locToCache);
435       }
436     }
437     
438   }
439   
440   private void updateCache(TabletLocation tabletLocation) {
441     if (!tabletLocation.tablet_extent.getTableId().equals(tableId)) {
442       // sanity check
443       throw new IllegalStateException("Unexpected extent returned " + tableId + "  " + tabletLocation.tablet_extent);
444     }
445     
446     if (tabletLocation.tablet_location == null) {
447       // sanity check
448       throw new IllegalStateException("Cannot add null locations to cache " + tableId + "  " + tabletLocation.tablet_extent);
449     }
450     
451     if (!tabletLocation.tablet_extent.getTableId().equals(tableId)) {
452       // sanity check
453       throw new IllegalStateException("Cannot add other table ids to locations cache " + tableId + "  " + tabletLocation.tablet_extent);
454     }
455     
456     // clear out any overlapping extents in cache
457     removeOverlapping(metaCache, tabletLocation.tablet_extent);
458     
459     // add it to cache
460     Text er = tabletLocation.tablet_extent.getEndRow();
461     if (er == null)
462       er = MAX_TEXT;
463     metaCache.put(er, tabletLocation);
464     
465     if (badExtents.size() > 0)
466       removeOverlapping(badExtents, tabletLocation.tablet_extent);
467   }
468   
469   static void removeOverlapping(TreeMap<Text,TabletLocation> metaCache, KeyExtent nke) {
470     Iterator<Entry<Text,TabletLocation>> iter = null;
471     
472     if (nke.getPrevEndRow() == null) {
473       iter = metaCache.entrySet().iterator();
474     } else {
475       Text row = rowAfterPrevRow(nke);
476       SortedMap<Text,TabletLocation> tailMap = metaCache.tailMap(row);
477       iter = tailMap.entrySet().iterator();
478     }
479     
480     while (iter.hasNext()) {
481       Entry<Text,TabletLocation> entry = iter.next();
482       
483       KeyExtent ke = entry.getValue().tablet_extent;
484       
485       if (stopRemoving(nke, ke)) {
486         break;
487       }
488       
489       iter.remove();
490     }
491   }
492   
493   private static boolean stopRemoving(KeyExtent nke, KeyExtent ke) {
494     return ke.getPrevEndRow() != null && nke.getEndRow() != null && ke.getPrevEndRow().compareTo(nke.getEndRow()) >= 0;
495   }
496   
497   private static Text rowAfterPrevRow(KeyExtent nke) {
498     Text row = new Text(nke.getPrevEndRow());
499     row.append(new byte[] {0}, 0, 1);
500     return row;
501   }
502   
503   static void removeOverlapping(TreeSet<KeyExtent> extents, KeyExtent nke) {
504     for (KeyExtent overlapping : KeyExtent.findOverlapping(nke, extents)) {
505       extents.remove(overlapping);
506     }
507   }
508   
509   private TabletLocation locateTabletInCache(Text row) {
510     
511     Entry<Text,TabletLocation> entry = metaCache.ceilingEntry(row);
512     
513     if (entry != null) {
514       KeyExtent ke = entry.getValue().tablet_extent;
515       if (ke.getPrevEndRow() == null || ke.getPrevEndRow().compareTo(row) < 0)
516         return entry.getValue();
517     }
518     return null;
519   }
520   
521   protected TabletLocation _locateTablet(Text row, boolean skipRow, boolean retry, boolean lock) throws AccumuloException, AccumuloSecurityException,
522       TableNotFoundException {
523     
524     if (skipRow) {
525       row = new Text(row);
526       row.append(new byte[] {0}, 0, 1);
527     }
528     
529     TabletLocation tl;
530     
531     if (lock)
532       rLock.lock();
533     try {
534       processInvalidated();
535       tl = locateTabletInCache(row);
536     } finally {
537       if (lock)
538         rLock.unlock();
539     }
540     
541     if (tl == null) {
542       if (lock)
543         wLock.lock();
544       try {
545         // not in cache, so obtain info
546         lookupTabletLocation(row, retry);
547         
548         tl = locateTabletInCache(row);
549       } finally {
550         if (lock)
551           wLock.unlock();
552       }
553     }
554     
555     return tl;
556   }
557   
558   private void processInvalidated() throws AccumuloSecurityException, AccumuloException, TableNotFoundException {
559     
560     if (badExtents.size() == 0)
561       return;
562     
563     boolean writeLockHeld = rwLock.isWriteLockedByCurrentThread();
564     try {
565       if (!writeLockHeld) {
566         rLock.unlock();
567         wLock.lock();
568         if (badExtents.size() == 0)
569           return;
570       }
571       
572       List<Range> lookups = new ArrayList<Range>(badExtents.size());
573       
574       for (KeyExtent be : badExtents) {
575         lookups.add(be.toMetadataRange());
576         removeOverlapping(metaCache, be);
577       }
578       
579       lookups = Range.mergeOverlapping(lookups);
580       
581       Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>();
582       
583       parent.binRanges(lookups, binnedRanges);
584       
585       // randomize server order
586       ArrayList<String> tabletServers = new ArrayList<String>(binnedRanges.keySet());
587       Collections.shuffle(tabletServers);
588       
589       for (String tserver : tabletServers) {
590         List<TabletLocation> locations = locationObtainer.lookupTablets(tserver, binnedRanges.get(tserver), parent);
591         
592         for (TabletLocation tabletLocation : locations) {
593           updateCache(tabletLocation);
594         }
595       }
596     } finally {
597       if (!writeLockHeld) {
598         rLock.lock();
599         wLock.unlock();
600       }
601     }
602   }
603   
604   protected static void addRange(Map<String,Map<KeyExtent,List<Range>>> binnedRanges, String location, KeyExtent ke, Range range) {
605     Map<KeyExtent,List<Range>> tablets = binnedRanges.get(location);
606     if (tablets == null) {
607       tablets = new HashMap<KeyExtent,List<Range>>();
608       binnedRanges.put(location, tablets);
609     }
610     
611     List<Range> tabletsRanges = tablets.get(ke);
612     if (tabletsRanges == null) {
613       tabletsRanges = new ArrayList<Range>();
614       tablets.put(ke, tabletsRanges);
615     }
616     
617     tabletsRanges.add(range);
618   }
619   
620 }