View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  package org.apache.accumulo.core.client.impl;
18  
19  import java.io.IOException;
20  import java.lang.management.CompilationMXBean;
21  import java.lang.management.GarbageCollectorMXBean;
22  import java.lang.management.ManagementFactory;
23  import java.util.ArrayList;
24  import java.util.Collections;
25  import java.util.HashMap;
26  import java.util.HashSet;
27  import java.util.Iterator;
28  import java.util.List;
29  import java.util.Map;
30  import java.util.Map.Entry;
31  import java.util.Set;
32  import java.util.Timer;
33  import java.util.TimerTask;
34  import java.util.concurrent.ExecutorService;
35  import java.util.concurrent.TimeUnit;
36  import java.util.concurrent.atomic.AtomicLong;
37  
38  import org.apache.accumulo.trace.instrument.Span;
39  import org.apache.accumulo.trace.instrument.Trace;
40  import org.apache.accumulo.trace.instrument.Tracer;
41  import org.apache.accumulo.trace.thrift.TInfo;
42  import org.apache.accumulo.core.Constants;
43  import org.apache.accumulo.core.client.AccumuloException;
44  import org.apache.accumulo.core.client.AccumuloSecurityException;
45  import org.apache.accumulo.core.client.BatchWriterConfig;
46  import org.apache.accumulo.core.client.Instance;
47  import org.apache.accumulo.core.client.MutationsRejectedException;
48  import org.apache.accumulo.core.client.TableDeletedException;
49  import org.apache.accumulo.core.client.TableNotFoundException;
50  import org.apache.accumulo.core.client.TableOfflineException;
51  import org.apache.accumulo.core.client.TimedOutException;
52  import org.apache.accumulo.core.client.impl.TabletLocator.TabletServerMutations;
53  import org.apache.accumulo.core.conf.Property;
54  import org.apache.accumulo.core.constraints.Violations;
55  import org.apache.accumulo.core.data.ConstraintViolationSummary;
56  import org.apache.accumulo.core.data.KeyExtent;
57  import org.apache.accumulo.core.data.Mutation;
58  import org.apache.accumulo.core.data.thrift.TMutation;
59  import org.apache.accumulo.core.data.thrift.UpdateErrors;
60  import org.apache.accumulo.core.master.state.tables.TableState;
61  import org.apache.accumulo.core.security.thrift.Credential;
62  import org.apache.accumulo.core.security.thrift.SecurityErrorCode;
63  import org.apache.accumulo.core.security.thrift.ThriftSecurityException;
64  import org.apache.accumulo.core.tabletserver.thrift.ConstraintViolationException;
65  import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException;
66  import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException;
67  import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
68  import org.apache.accumulo.core.util.SimpleThreadPool;
69  import org.apache.accumulo.core.util.ThriftUtil;
70  import org.apache.hadoop.io.Text;
71  import org.apache.log4j.Logger;
72  import org.apache.thrift.TApplicationException;
73  import org.apache.thrift.TException;
74  import org.apache.thrift.TServiceClient;
75  import org.apache.thrift.transport.TTransport;
76  import org.apache.thrift.transport.TTransportException;
77  
78  /*
79   * Differences from previous TabletServerBatchWriter
80   *   + As background threads finish sending mutations to tablet servers they decrement memory usage
81   *   + Once the queue of unprocessed mutations reaches 50% it is always pushed to the background threads, 
82   *      even if they are currently processing... new mutations are merged with mutations currently 
83   *      processing in the background
84   *   + Failed mutations are held for 1000ms and then re-added to the unprocessed queue
85   *   + Flush holds adding of new mutations so it does not wait indefinitely
86   * 
87   * Considerations
88   *   + All background threads must catch and note Throwable
89   *   + mutations for a single tablet server are only processed by one thread concurrently (if new mutations 
90   *      come in for a tablet server while one thread is processing mutations for it, no other thread should 
91   *      start processing those mutations)
92   *   
93   * Memory accounting
94   *   + when a mutation enters the system memory is incremented
95   *   + when a mutation successfully leaves the system memory is decremented
96   * 
97   * 
98   * 
99   */
100 
101 public class TabletServerBatchWriter {
102   
103   private static final Logger log = Logger.getLogger(TabletServerBatchWriter.class);
104   
105   private long totalMemUsed = 0;
106   private long maxMem;
107   private MutationSet mutations;
108   private boolean flushing;
109   private boolean closed;
110   private MutationWriter writer;
111   private FailedMutations failedMutations;
112   
113   private Instance instance;
114   private Credential credentials;
115   
116   private Violations violations;
117   private Map<KeyExtent,Set<SecurityErrorCode>> authorizationFailures;
118   private HashSet<String> serverSideErrors;
119   private int unknownErrors = 0;
120   private boolean somethingFailed = false;
121   
122   private Timer jtimer;
123   
124   private long maxLatency;
125   
126   private long timeout;
127   
128   private long lastProcessingStartTime;
129   
130   private long totalAdded = 0;
131   private AtomicLong totalSent = new AtomicLong(0);
132   private AtomicLong totalBinned = new AtomicLong(0);
133   private AtomicLong totalBinTime = new AtomicLong(0);
134   private AtomicLong totalSendTime = new AtomicLong(0);
135   private long startTime = 0;
136   private long initialGCTimes;
137   private long initialCompileTimes;
138   private double initialSystemLoad;
139   
140   private int tabletServersBatchSum = 0;
141   private int tabletBatchSum = 0;
142   private int numBatches = 0;
143   private int maxTabletBatch = Integer.MIN_VALUE;
144   private int minTabletBatch = Integer.MAX_VALUE;
145   private int minTabletServersBatch = Integer.MAX_VALUE;
146   private int maxTabletServersBatch = Integer.MIN_VALUE;
147   
148   private Throwable lastUnknownError = null;
149   
150   private Map<String,TimeoutTracker> timeoutTrackers;
151   
152   private static class TimeoutTracker {
153     
154     String server;
155     long timeOut;
156     long activityTime;
157     Long firstErrorTime = null;
158     
159     TimeoutTracker(String server, long timeOut) {
160       this.timeOut = timeOut;
161       this.server = server;
162     }
163     
164     void startingWrite() {
165       activityTime = System.currentTimeMillis();
166     }
167     
168     void madeProgress() {
169       activityTime = System.currentTimeMillis();
170       firstErrorTime = null;
171     }
172     
173     void wroteNothing() {
174       if (firstErrorTime == null) {
175         firstErrorTime = activityTime;
176       } else if (System.currentTimeMillis() - firstErrorTime > timeOut) {
177         throw new TimedOutException(Collections.singleton(server));
178       }
179     }
180     
181     void errorOccured(Exception e) {
182       wroteNothing();
183     }
184     
185     public long getTimeOut() {
186       return timeOut;
187     }
188   }
189   
190   public TabletServerBatchWriter(Instance instance, Credential credentials, BatchWriterConfig config) {
191     this.instance = instance;
192     this.maxMem = config.getMaxMemory();
193     this.maxLatency = config.getMaxLatency(TimeUnit.MILLISECONDS) <= 0 ? Long.MAX_VALUE : config.getMaxLatency(TimeUnit.MILLISECONDS);
194     this.credentials = credentials;
195     this.timeout = config.getTimeout(TimeUnit.MILLISECONDS);
196     mutations = new MutationSet();
197     
198     violations = new Violations();
199     
200     authorizationFailures = new HashMap<KeyExtent,Set<SecurityErrorCode>>();
201     serverSideErrors = new HashSet<String>();
202     
203     lastProcessingStartTime = System.currentTimeMillis();
204     
205     jtimer = new Timer("BatchWriterLatencyTimer", true);
206     
207     writer = new MutationWriter(config.getMaxWriteThreads());
208     failedMutations = new FailedMutations();
209     
210     timeoutTrackers = Collections.synchronizedMap(new HashMap<String,TabletServerBatchWriter.TimeoutTracker>());
211     
212     if (this.maxLatency != Long.MAX_VALUE) {
213       jtimer.schedule(new TimerTask() {
214         public void run() {
215           try {
216             synchronized (TabletServerBatchWriter.this) {
217               if ((System.currentTimeMillis() - lastProcessingStartTime) > TabletServerBatchWriter.this.maxLatency)
218                 startProcessing();
219             }
220           } catch (Throwable t) {
221             updateUnknownErrors("Max latency task failed " + t.getMessage(), t);
222           }
223         }
224       }, 0, this.maxLatency / 4);
225     }
226   }
227   
228   private synchronized void startProcessing() {
229     if (mutations.getMemoryUsed() == 0)
230       return;
231     lastProcessingStartTime = System.currentTimeMillis();
232     writer.addMutations(mutations);
233     mutations = new MutationSet();
234   }
235   
236   private synchronized void decrementMemUsed(long amount) {
237     totalMemUsed -= amount;
238     this.notifyAll();
239   }
240   
241   public synchronized void addMutation(String table, Mutation m) throws MutationsRejectedException {
242     
243     if (closed)
244       throw new IllegalStateException("Closed");
245     if (m.size() == 0)
246       throw new IllegalArgumentException("Can not add empty mutations");
247     
248     checkForFailures();
249     
250     while ((totalMemUsed >= maxMem || flushing) && !somethingFailed) {
251       waitRTE();
252     }
253     
254     // do checks again since things could have changed while waiting and not holding lock
255     if (closed)
256       throw new IllegalStateException("Closed");
257     checkForFailures();
258     
259     if (startTime == 0) {
260       startTime = System.currentTimeMillis();
261       
262       List<GarbageCollectorMXBean> gcmBeans = ManagementFactory.getGarbageCollectorMXBeans();
263       for (GarbageCollectorMXBean garbageCollectorMXBean : gcmBeans) {
264         initialGCTimes += garbageCollectorMXBean.getCollectionTime();
265       }
266       
267       CompilationMXBean compMxBean = ManagementFactory.getCompilationMXBean();
268       if (compMxBean.isCompilationTimeMonitoringSupported()) {
269         initialCompileTimes = compMxBean.getTotalCompilationTime();
270       }
271       
272       initialSystemLoad = ManagementFactory.getOperatingSystemMXBean().getSystemLoadAverage();
273     }
274     
275     // create a copy of mutation so that after this method returns the user
276     // is free to reuse the mutation object, like calling readFields... this
277     // is important for the case where a mutation is passed from map to reduce
278     // to batch writer... the map reduce code will keep passing the same mutation
279     // object into the reduce method
280     m = new Mutation(m);
281     
282     totalMemUsed += m.estimatedMemoryUsed();
283     mutations.addMutation(table, m);
284     totalAdded++;
285     
286     if (mutations.getMemoryUsed() >= maxMem / 2) {
287       startProcessing();
288       checkForFailures();
289     }
290   }
291   
292   public void addMutation(String table, Iterator<Mutation> iterator) throws MutationsRejectedException {
293     while (iterator.hasNext()) {
294       addMutation(table, iterator.next());
295     }
296   }
297   
298   public synchronized void flush() throws MutationsRejectedException {
299     
300     if (closed)
301       throw new IllegalStateException("Closed");
302     
303     Span span = Trace.start("flush");
304     
305     try {
306       checkForFailures();
307       
308       if (flushing) {
309         // some other thread is currently flushing, so wait
310         while (flushing && !somethingFailed)
311           waitRTE();
312         
313         checkForFailures();
314         
315         return;
316       }
317       
318       flushing = true;
319       
320       startProcessing();
321       checkForFailures();
322       
323       while (totalMemUsed > 0 && !somethingFailed) {
324         waitRTE();
325       }
326       
327       flushing = false;
328       this.notifyAll();
329       
330       checkForFailures();
331     } finally {
332       span.stop();
333     }
334   }
335   
336   public synchronized void close() throws MutationsRejectedException {
337     
338     if (closed)
339       return;
340     
341     Span span = Trace.start("close");
342     try {
343       closed = true;
344       
345       startProcessing();
346       
347       while (totalMemUsed > 0 && !somethingFailed) {
348         waitRTE();
349       }
350       
351       logStats();
352       
353       checkForFailures();
354     } finally {
355       // make a best effort to release these resources
356       writer.sendThreadPool.shutdownNow();
357       jtimer.cancel();
358       span.stop();
359     }
360   }
361   
362   private void logStats() {
363     long finishTime = System.currentTimeMillis();
364     
365     long finalGCTimes = 0;
366     List<GarbageCollectorMXBean> gcmBeans = ManagementFactory.getGarbageCollectorMXBeans();
367     for (GarbageCollectorMXBean garbageCollectorMXBean : gcmBeans) {
368       finalGCTimes += garbageCollectorMXBean.getCollectionTime();
369     }
370     
371     CompilationMXBean compMxBean = ManagementFactory.getCompilationMXBean();
372     long finalCompileTimes = 0;
373     if (compMxBean.isCompilationTimeMonitoringSupported()) {
374       finalCompileTimes = compMxBean.getTotalCompilationTime();
375     }
376     
377     double averageRate = totalSent.get() / (totalSendTime.get() / 1000.0);
378     double overallRate = totalAdded / ((finishTime - startTime) / 1000.0);
379     
380     double finalSystemLoad = ManagementFactory.getOperatingSystemMXBean().getSystemLoadAverage();
381     
382     if (log.isTraceEnabled()) {
383       log.trace("");
384       log.trace("TABLET SERVER BATCH WRITER STATISTICS");
385       log.trace(String.format("Added                : %,10d mutations", totalAdded));
386       log.trace(String.format("Sent                 : %,10d mutations", totalSent.get()));
387       log.trace(String.format("Resent percentage   : %10.2f%s", (totalSent.get() - totalAdded) / (double) totalAdded * 100.0, "%"));
388       log.trace(String.format("Overall time         : %,10.2f secs", (finishTime - startTime) / 1000.0));
389       log.trace(String.format("Overall send rate    : %,10.2f mutations/sec", overallRate));
390       log.trace(String.format("Send efficiency      : %10.2f%s", overallRate / averageRate * 100.0, "%"));
391       log.trace("");
392       log.trace("BACKGROUND WRITER PROCESS STATISTICS");
393       log.trace(String.format("Total send time      : %,10.2f secs %6.2f%s", totalSendTime.get() / 1000.0, 100.0 * totalSendTime.get()
394           / (finishTime - startTime), "%"));
395       log.trace(String.format("Average send rate    : %,10.2f mutations/sec", averageRate));
396       log.trace(String.format("Total bin time       : %,10.2f secs %6.2f%s", totalBinTime.get() / 1000.0,
397           100.0 * totalBinTime.get() / (finishTime - startTime), "%"));
398       log.trace(String.format("Average bin rate     : %,10.2f mutations/sec", totalBinned.get() / (totalBinTime.get() / 1000.0)));
399       log.trace(String.format("tservers per batch   : %,8.2f avg  %,6d min %,6d max", (tabletServersBatchSum / (double) numBatches), minTabletServersBatch,
400           maxTabletServersBatch));
401       log.trace(String.format("tablets per batch    : %,8.2f avg  %,6d min %,6d max", (tabletBatchSum / (double) numBatches), minTabletBatch, maxTabletBatch));
402       log.trace("");
403       log.trace("SYSTEM STATISTICS");
404       log.trace(String.format("JVM GC Time          : %,10.2f secs", ((finalGCTimes - initialGCTimes) / 1000.0)));
405       if (compMxBean.isCompilationTimeMonitoringSupported()) {
406         log.trace(String.format("JVM Compile Time     : %,10.2f secs", ((finalCompileTimes - initialCompileTimes) / 1000.0)));
407       }
408       log.trace(String.format("System load average : initial=%6.2f final=%6.2f", initialSystemLoad, finalSystemLoad));
409     }
410   }
411   
412   private void updateSendStats(long count, long time) {
413     totalSent.addAndGet(count);
414     totalSendTime.addAndGet(time);
415   }
416   
417   public void updateBinningStats(int count, long time, Map<String,TabletServerMutations> binnedMutations) {
418     totalBinTime.addAndGet(time);
419     totalBinned.addAndGet(count);
420     updateBatchStats(binnedMutations);
421   }
422   
423   private synchronized void updateBatchStats(Map<String,TabletServerMutations> binnedMutations) {
424     tabletServersBatchSum += binnedMutations.size();
425     
426     minTabletServersBatch = Math.min(minTabletServersBatch, binnedMutations.size());
427     maxTabletServersBatch = Math.max(maxTabletServersBatch, binnedMutations.size());
428     
429     int numTablets = 0;
430     
431     for (Entry<String,TabletServerMutations> entry : binnedMutations.entrySet()) {
432       TabletServerMutations tsm = entry.getValue();
433       numTablets += tsm.getMutations().size();
434     }
435     
436     tabletBatchSum += numTablets;
437     
438     minTabletBatch = Math.min(minTabletBatch, numTablets);
439     maxTabletBatch = Math.max(maxTabletBatch, numTablets);
440     
441     numBatches++;
442   }
443   
444   private void waitRTE() {
445     try {
446       wait();
447     } catch (InterruptedException e) {
448       throw new RuntimeException(e);
449     }
450   }
451   
452   // BEGIN code for handling unrecoverable errors
453   
454   private void updatedConstraintViolations(List<ConstraintViolationSummary> cvsList) {
455     if (cvsList.size() > 0) {
456       synchronized (this) {
457         somethingFailed = true;
458         violations.add(cvsList);
459         this.notifyAll();
460       }
461     }
462   }
463   
464   private void updateAuthorizationFailures(Set<KeyExtent> keySet, SecurityErrorCode code) {
465     HashMap<KeyExtent, SecurityErrorCode> map = new HashMap<KeyExtent, SecurityErrorCode>();
466     for (KeyExtent ke : keySet)
467       map.put(ke, code);
468     
469     updateAuthorizationFailures(map);
470   }
471   
472   private void updateAuthorizationFailures(Map<KeyExtent,SecurityErrorCode> authorizationFailures) {
473     if (authorizationFailures.size() > 0) {
474       
475       // was a table deleted?
476       HashSet<String> tableIds = new HashSet<String>();
477       for (KeyExtent ke : authorizationFailures.keySet())
478         tableIds.add(ke.getTableId().toString());
479       
480       Tables.clearCache(instance);
481       for (String tableId : tableIds)
482         if (!Tables.exists(instance, tableId))
483           throw new TableDeletedException(tableId);
484       
485       synchronized (this) {
486         somethingFailed = true;
487         mergeAuthorizationFailures(this.authorizationFailures, authorizationFailures);
488         this.notifyAll();
489       }
490     }
491   }
492   
493   private void mergeAuthorizationFailures(Map<KeyExtent,Set<SecurityErrorCode>> source, Map<KeyExtent,SecurityErrorCode> addition) {
494     for (Entry<KeyExtent,SecurityErrorCode> entry : addition.entrySet()) {
495       Set<SecurityErrorCode> secs = source.get(entry.getKey());
496       if (secs == null) {
497         secs = new HashSet<SecurityErrorCode>();
498         source.put(entry.getKey(), secs);
499       }
500       secs.add(entry.getValue());
501     }
502   }
503   
504   private synchronized void updateServerErrors(String server, Exception e) {
505     somethingFailed = true;
506     this.serverSideErrors.add(server);
507     this.notifyAll();
508     log.error("Server side error on " + server);
509   }
510   
511   private synchronized void updateUnknownErrors(String msg, Throwable t) {
512     somethingFailed = true;
513     unknownErrors++;
514     this.lastUnknownError = t;
515     this.notifyAll();
516     if (t instanceof TableDeletedException || t instanceof TableOfflineException || t instanceof TimedOutException)
517       log.debug(msg, t); // this is not unknown
518     else
519       log.error(msg, t);
520   }
521   
522   private void checkForFailures() throws MutationsRejectedException {
523     if (somethingFailed) {
524       List<ConstraintViolationSummary> cvsList = violations.asList();
525       throw new MutationsRejectedException(cvsList, new HashMap<KeyExtent,Set<SecurityErrorCode>>(authorizationFailures), serverSideErrors, unknownErrors, lastUnknownError);
526     }
527   }
528   
529   // END code for handling unrecoverable errors
530   
531   // BEGIN code for handling failed mutations
532   
533   /**
534    * Add mutations that previously failed back into the mix
535    * 
536    * @param mutationsprivate
537    *          static final Logger log = Logger.getLogger(TabletServerBatchWriter.class);
538    */
539   private synchronized void addFailedMutations(MutationSet failedMutations) throws Exception {
540     mutations.addAll(failedMutations);
541     if (mutations.getMemoryUsed() >= maxMem / 2 || closed || flushing) {
542       startProcessing();
543     }
544   }
545   
546   private class FailedMutations extends TimerTask {
547     
548     private MutationSet recentFailures = null;
549     private long initTime;
550     
551     FailedMutations() {
552       jtimer.schedule(this, 0, 500);
553     }
554     
555     private MutationSet init() {
556       if (recentFailures == null) {
557         recentFailures = new MutationSet();
558         initTime = System.currentTimeMillis();
559       }
560       return recentFailures;
561     }
562     
563     synchronized void add(String table, ArrayList<Mutation> tableFailures) {
564       init().addAll(table, tableFailures);
565     }
566     
567     synchronized void add(MutationSet failures) {
568       init().addAll(failures);
569     }
570     
571     synchronized void add(String location, TabletServerMutations tsm) {
572       init();
573       for (Entry<KeyExtent,List<Mutation>> entry : tsm.getMutations().entrySet()) {
574         recentFailures.addAll(entry.getKey().getTableId().toString(), entry.getValue());
575       }
576       
577     }
578     
579     @Override
580     public void run() {
581       try {
582         MutationSet rf = null;
583         
584         synchronized (this) {
585           if (recentFailures != null && System.currentTimeMillis() - initTime > 1000) {
586             rf = recentFailures;
587             recentFailures = null;
588           }
589         }
590         
591         if (rf != null) {
592           if (log.isTraceEnabled())
593             log.trace("requeuing " + rf.size() + " failed mutations");
594           addFailedMutations(rf);
595         }
596       } catch (Throwable t) {
597         updateUnknownErrors("Failed to requeue failed mutations " + t.getMessage(), t);
598         cancel();
599       }
600     }
601   }
602   
603   // END code for handling failed mutations
604   
605   // BEGIN code for sending mutations to tablet servers using background threads
606   
607   private class MutationWriter {
608     
609     private static final int MUTATION_BATCH_SIZE = 1 << 17;
610     private ExecutorService sendThreadPool;
611     private Map<String,TabletServerMutations> serversMutations;
612     private Set<String> queued;
613     private Map<String,TabletLocator> locators;
614     
615     public MutationWriter(int numSendThreads) {
616       serversMutations = new HashMap<String,TabletServerMutations>();
617       queued = new HashSet<String>();
618       sendThreadPool = new SimpleThreadPool(numSendThreads, this.getClass().getName());
619       locators = new HashMap<String,TabletLocator>();
620     }
621     
622     private TabletLocator getLocator(String tableId) {
623       TabletLocator ret = locators.get(tableId);
624       if (ret == null) {
625         ret = TabletLocator.getInstance(instance, credentials, new Text(tableId));
626         ret = new TimeoutTabletLocator(ret, timeout);
627         locators.put(tableId, ret);
628       }
629       
630       return ret;
631     }
632     
633     private void binMutations(MutationSet mutationsToProcess, Map<String,TabletServerMutations> binnedMutations) {
634       try {
635         Set<Entry<String,List<Mutation>>> es = mutationsToProcess.getMutations().entrySet();
636         for (Entry<String,List<Mutation>> entry : es) {
637           TabletLocator locator = getLocator(entry.getKey());
638           
639           String table = entry.getKey();
640           List<Mutation> tableMutations = entry.getValue();
641           
642           if (tableMutations != null) {
643             ArrayList<Mutation> tableFailures = new ArrayList<Mutation>();
644             locator.binMutations(tableMutations, binnedMutations, tableFailures);
645             
646             if (tableFailures.size() > 0) {
647               failedMutations.add(table, tableFailures);
648               
649               if (tableFailures.size() == tableMutations.size())
650                 if (!Tables.exists(instance, entry.getKey()))
651                   throw new TableDeletedException(entry.getKey());
652                 else if (Tables.getTableState(instance, table) == TableState.OFFLINE)
653                   throw new TableOfflineException(instance, entry.getKey());
654             }
655           }
656           
657         }
658         return;
659       } catch (AccumuloServerException ase) {
660         updateServerErrors(ase.getServer(), ase);
661       } catch (AccumuloException ae) {
662         // assume an IOError communicating with !METADATA tablet
663         failedMutations.add(mutationsToProcess);
664       } catch (AccumuloSecurityException e) {
665         updateAuthorizationFailures(Collections.singletonMap(new KeyExtent(new Text(Constants.METADATA_TABLE_ID), null, null), e.getErrorCode()));
666       } catch (TableDeletedException e) {
667         updateUnknownErrors(e.getMessage(), e);
668       } catch (TableOfflineException e) {
669         updateUnknownErrors(e.getMessage(), e);
670       } catch (TableNotFoundException e) {
671         updateUnknownErrors(e.getMessage(), e);
672       }
673       
674       // an error ocurred
675       binnedMutations.clear();
676       
677     }
678     
679     void addMutations(MutationSet mutationsToSend) {
680       Map<String,TabletServerMutations> binnedMutations = new HashMap<String,TabletServerMutations>();
681       Span span = Trace.start("binMutations");
682       try {
683         long t1 = System.currentTimeMillis();
684         binMutations(mutationsToSend, binnedMutations);
685         long t2 = System.currentTimeMillis();
686         updateBinningStats(mutationsToSend.size(), (t2 - t1), binnedMutations);
687       } finally {
688         span.stop();
689       }
690       addMutations(binnedMutations);
691     }
692     
693     private synchronized void addMutations(Map<String,TabletServerMutations> binnedMutations) {
694       
695       int count = 0;
696       
697       // merge mutations into existing mutations for a tablet server
698       for (Entry<String,TabletServerMutations> entry : binnedMutations.entrySet()) {
699         String server = entry.getKey();
700         
701         TabletServerMutations currentMutations = serversMutations.get(server);
702         
703         if (currentMutations == null) {
704           serversMutations.put(server, entry.getValue());
705         } else {
706           for (Entry<KeyExtent,List<Mutation>> entry2 : entry.getValue().getMutations().entrySet()) {
707             for (Mutation m : entry2.getValue()) {
708               currentMutations.addMutation(entry2.getKey(), m);
709             }
710           }
711         }
712         
713         if (log.isTraceEnabled())
714           for (Entry<KeyExtent,List<Mutation>> entry2 : entry.getValue().getMutations().entrySet())
715             count += entry2.getValue().size();
716         
717       }
718       
719       if (count > 0 && log.isTraceEnabled())
720         log.trace(String.format("Started sending %,d mutations to %,d tablet servers", count, binnedMutations.keySet().size()));
721       
722       // randomize order of servers
723       ArrayList<String> servers = new ArrayList<String>(binnedMutations.keySet());
724       Collections.shuffle(servers);
725       
726       for (String server : servers)
727         if (!queued.contains(server)) {
728           sendThreadPool.submit(Trace.wrap(new SendTask(server)));
729           queued.add(server);
730         }
731     }
732     
733     private synchronized TabletServerMutations getMutationsToSend(String server) {
734       TabletServerMutations tsmuts = serversMutations.remove(server);
735       if (tsmuts == null)
736         queued.remove(server);
737       
738       return tsmuts;
739     }
740     
741     class SendTask implements Runnable {
742       
743       private String location;
744       
745       SendTask(String server) {
746         this.location = server;
747       }
748       
749       @Override
750       public void run() {
751         try {
752           TabletServerMutations tsmuts = getMutationsToSend(location);
753           
754           while (tsmuts != null) {
755             send(tsmuts);
756             tsmuts = getMutationsToSend(location);
757           }
758           
759           return;
760         } catch (Throwable t) {
761           updateUnknownErrors("Failed to send tablet server " + location + " its batch : " + t.getMessage(), t);
762         }
763       }
764       
765       public void send(TabletServerMutations tsm) throws AccumuloServerException, AccumuloSecurityException {
766         
767         MutationSet failures = null;
768         
769         String oldName = Thread.currentThread().getName();
770         
771         Map<KeyExtent,List<Mutation>> mutationBatch = tsm.getMutations();
772         try {
773           
774           long count = 0;
775           for (List<Mutation> list : mutationBatch.values()) {
776             count += list.size();
777           }
778           String msg = "sending " + String.format("%,d", count) + " mutations to " + String.format("%,d", mutationBatch.size()) + " tablets at " + location;
779           Thread.currentThread().setName(msg);
780           
781           Span span = Trace.start("sendMutations");
782           try {
783             
784             TimeoutTracker timeoutTracker = timeoutTrackers.get(location);
785             if (timeoutTracker == null) {
786               timeoutTracker = new TimeoutTracker(location, timeout);
787               timeoutTrackers.put(location, timeoutTracker);
788             }
789             
790             long st1 = System.currentTimeMillis();
791             failures = sendMutationsToTabletServer(location, mutationBatch, timeoutTracker);
792             long st2 = System.currentTimeMillis();
793             if (log.isTraceEnabled())
794               log.trace("sent " + String.format("%,d", count) + " mutations to " + location + " in "
795                   + String.format("%.2f secs (%,.2f mutations/sec) with %,d failures", (st2 - st1) / 1000.0, count / ((st2 - st1) / 1000.0), failures.size()));
796             
797             long successBytes = 0;
798             for (Entry<KeyExtent,List<Mutation>> entry : mutationBatch.entrySet()) {
799               for (Mutation mutation : entry.getValue()) {
800                 successBytes += mutation.estimatedMemoryUsed();
801               }
802             }
803             
804             if (failures.size() > 0) {
805               failedMutations.add(failures);
806               successBytes -= failures.getMemoryUsed();
807             }
808             
809             updateSendStats(count, st2 - st1);
810             decrementMemUsed(successBytes);
811             
812           } finally {
813             span.stop();
814           }
815         } catch (IOException e) {
816           if (log.isTraceEnabled())
817             log.trace("failed to send mutations to " + location + " : " + e.getMessage());
818           
819           HashSet<String> tables = new HashSet<String>();
820           for (KeyExtent ke : mutationBatch.keySet())
821             tables.add(ke.getTableId().toString());
822           
823           for (String table : tables)
824             TabletLocator.getInstance(instance, credentials, new Text(table)).invalidateCache(location);
825           
826           failedMutations.add(location, tsm);
827         } finally {
828           Thread.currentThread().setName(oldName);
829         }
830       }
831     }
832     
833     private MutationSet sendMutationsToTabletServer(String location, Map<KeyExtent,List<Mutation>> tabMuts, TimeoutTracker timeoutTracker) throws IOException,
834         AccumuloSecurityException, AccumuloServerException {
835       if (tabMuts.size() == 0) {
836         return new MutationSet();
837       }
838       TInfo tinfo = Tracer.traceInfo();
839       TTransport transport = null;
840       
841       timeoutTracker.startingWrite();
842       
843       try {
844         TabletClientService.Iface client;
845         
846         if (timeoutTracker.getTimeOut() < instance.getConfiguration().getTimeInMillis(Property.GENERAL_RPC_TIMEOUT))
847           client = ThriftUtil.getTServerClient(location, instance.getConfiguration(), timeoutTracker.getTimeOut());
848         else
849           client = ThriftUtil.getTServerClient(location, instance.getConfiguration());
850         
851         try {
852           MutationSet allFailures = new MutationSet();
853           
854           if (tabMuts.size() == 1 && tabMuts.values().iterator().next().size() == 1) {
855             Entry<KeyExtent,List<Mutation>> entry = tabMuts.entrySet().iterator().next();
856             
857             try {
858               client.update(tinfo, credentials, entry.getKey().toThrift(), entry.getValue().get(0).toThrift());
859             } catch (NotServingTabletException e) {
860               allFailures.addAll(entry.getKey().getTableId().toString(), entry.getValue());
861               TabletLocator.getInstance(instance, credentials, new Text(entry.getKey().getTableId())).invalidateCache(entry.getKey());
862             } catch (ConstraintViolationException e) {
863               updatedConstraintViolations(Translator.translate(e.violationSummaries, Translator.TCVST));
864             }
865             timeoutTracker.madeProgress();
866           } else {
867             
868             long usid = client.startUpdate(tinfo, credentials);
869             
870             List<TMutation> updates = new ArrayList<TMutation>();
871             for (Entry<KeyExtent,List<Mutation>> entry : tabMuts.entrySet()) {
872               long size = 0;
873               Iterator<Mutation> iter = entry.getValue().iterator();
874               while (iter.hasNext()) {
875                 while (size < MUTATION_BATCH_SIZE && iter.hasNext()) {
876                   Mutation mutation = iter.next();
877                   updates.add(mutation.toThrift());
878                   size += mutation.numBytes();
879                 }
880                 
881                 client.applyUpdates(tinfo, usid, entry.getKey().toThrift(), updates);
882                 updates.clear();
883                 size = 0;
884               }
885             }
886             
887             UpdateErrors updateErrors = client.closeUpdate(tinfo, usid);
888             
889             Map<KeyExtent,Long> failures = Translator.translate(updateErrors.failedExtents, Translator.TKET);
890             updatedConstraintViolations(Translator.translate(updateErrors.violationSummaries, Translator.TCVST));
891             updateAuthorizationFailures(Translator.translate(updateErrors.authorizationFailures, Translator.TKET));
892             
893             long totalCommitted = 0;
894             
895             for (Entry<KeyExtent,Long> entry : failures.entrySet()) {
896               KeyExtent failedExtent = entry.getKey();
897               int numCommitted = (int) (long) entry.getValue();
898               totalCommitted += numCommitted;
899               
900               String table = failedExtent.getTableId().toString();
901               
902               TabletLocator.getInstance(instance, credentials, new Text(table)).invalidateCache(failedExtent);
903               
904               ArrayList<Mutation> mutations = (ArrayList<Mutation>) tabMuts.get(failedExtent);
905               allFailures.addAll(table, mutations.subList(numCommitted, mutations.size()));
906             }
907             
908             if (failures.keySet().containsAll(tabMuts.keySet()) && totalCommitted == 0) {
909               // nothing was successfully written
910               timeoutTracker.wroteNothing();
911             } else {
912               // successfully wrote something to tablet server
913               timeoutTracker.madeProgress();
914             }
915           }
916           return allFailures;
917         } finally {
918           ThriftUtil.returnClient((TServiceClient) client);
919         }
920       } catch (TTransportException e) {
921         timeoutTracker.errorOccured(e);
922         throw new IOException(e);
923       } catch (TApplicationException tae) {
924         updateServerErrors(location, tae);
925         throw new AccumuloServerException(location, tae);
926       } catch (ThriftSecurityException e) {
927         updateAuthorizationFailures(tabMuts.keySet(), e.code);
928         throw new AccumuloSecurityException(e.user, e.code, e);
929       } catch (NoSuchScanIDException e) {
930         throw new IOException(e);
931       } catch (TException e) {
932         throw new IOException(e);
933       } finally {
934         ThriftTransportPool.getInstance().returnTransport(transport);
935       }
936     }
937   }
938   
939   // END code for sending mutations to tablet servers using background threads
940   
941   private static class MutationSet {
942     
943     private HashMap<String,List<Mutation>> mutations;
944     private int memoryUsed = 0;
945     
946     MutationSet() {
947       mutations = new HashMap<String,List<Mutation>>();
948     }
949     
950     void addMutation(String table, Mutation mutation) {
951       List<Mutation> tabMutList = mutations.get(table);
952       if (tabMutList == null) {
953         tabMutList = new ArrayList<Mutation>();
954         mutations.put(table, tabMutList);
955       }
956       
957       tabMutList.add(mutation);
958       
959       memoryUsed += mutation.estimatedMemoryUsed();
960     }
961     
962     Map<String,List<Mutation>> getMutations() {
963       return mutations;
964     }
965     
966     int size() {
967       int result = 0;
968       for (List<Mutation> perTable : mutations.values()) {
969         result += perTable.size();
970       }
971       return result;
972     }
973     
974     public void addAll(MutationSet failures) {
975       Set<Entry<String,List<Mutation>>> es = failures.getMutations().entrySet();
976       
977       for (Entry<String,List<Mutation>> entry : es) {
978         String table = entry.getKey();
979         
980         for (Mutation mutation : entry.getValue()) {
981           addMutation(table, mutation);
982         }
983       }
984     }
985     
986     public void addAll(String table, List<Mutation> mutations) {
987       for (Mutation mutation : mutations) {
988         addMutation(table, mutation);
989       }
990     }
991     
992     public int getMemoryUsed() {
993       return memoryUsed;
994     }
995     
996   }
997 }