1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.accumulo.core.client.impl;
18
19 import java.io.IOException;
20 import java.lang.management.CompilationMXBean;
21 import java.lang.management.GarbageCollectorMXBean;
22 import java.lang.management.ManagementFactory;
23 import java.util.ArrayList;
24 import java.util.Collections;
25 import java.util.HashMap;
26 import java.util.HashSet;
27 import java.util.Iterator;
28 import java.util.List;
29 import java.util.Map;
30 import java.util.Map.Entry;
31 import java.util.Set;
32 import java.util.Timer;
33 import java.util.TimerTask;
34 import java.util.concurrent.ExecutorService;
35 import java.util.concurrent.TimeUnit;
36 import java.util.concurrent.atomic.AtomicLong;
37
38 import org.apache.accumulo.trace.instrument.Span;
39 import org.apache.accumulo.trace.instrument.Trace;
40 import org.apache.accumulo.trace.instrument.Tracer;
41 import org.apache.accumulo.trace.thrift.TInfo;
42 import org.apache.accumulo.core.Constants;
43 import org.apache.accumulo.core.client.AccumuloException;
44 import org.apache.accumulo.core.client.AccumuloSecurityException;
45 import org.apache.accumulo.core.client.BatchWriterConfig;
46 import org.apache.accumulo.core.client.Instance;
47 import org.apache.accumulo.core.client.MutationsRejectedException;
48 import org.apache.accumulo.core.client.TableDeletedException;
49 import org.apache.accumulo.core.client.TableNotFoundException;
50 import org.apache.accumulo.core.client.TableOfflineException;
51 import org.apache.accumulo.core.client.TimedOutException;
52 import org.apache.accumulo.core.client.impl.TabletLocator.TabletServerMutations;
53 import org.apache.accumulo.core.conf.Property;
54 import org.apache.accumulo.core.constraints.Violations;
55 import org.apache.accumulo.core.data.ConstraintViolationSummary;
56 import org.apache.accumulo.core.data.KeyExtent;
57 import org.apache.accumulo.core.data.Mutation;
58 import org.apache.accumulo.core.data.thrift.TMutation;
59 import org.apache.accumulo.core.data.thrift.UpdateErrors;
60 import org.apache.accumulo.core.master.state.tables.TableState;
61 import org.apache.accumulo.core.security.thrift.Credential;
62 import org.apache.accumulo.core.security.thrift.SecurityErrorCode;
63 import org.apache.accumulo.core.security.thrift.ThriftSecurityException;
64 import org.apache.accumulo.core.tabletserver.thrift.ConstraintViolationException;
65 import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException;
66 import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException;
67 import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
68 import org.apache.accumulo.core.util.SimpleThreadPool;
69 import org.apache.accumulo.core.util.ThriftUtil;
70 import org.apache.hadoop.io.Text;
71 import org.apache.log4j.Logger;
72 import org.apache.thrift.TApplicationException;
73 import org.apache.thrift.TException;
74 import org.apache.thrift.TServiceClient;
75 import org.apache.thrift.transport.TTransport;
76 import org.apache.thrift.transport.TTransportException;
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101 public class TabletServerBatchWriter {
102
103 private static final Logger log = Logger.getLogger(TabletServerBatchWriter.class);
104
105 private long totalMemUsed = 0;
106 private long maxMem;
107 private MutationSet mutations;
108 private boolean flushing;
109 private boolean closed;
110 private MutationWriter writer;
111 private FailedMutations failedMutations;
112
113 private Instance instance;
114 private Credential credentials;
115
116 private Violations violations;
117 private Map<KeyExtent,Set<SecurityErrorCode>> authorizationFailures;
118 private HashSet<String> serverSideErrors;
119 private int unknownErrors = 0;
120 private boolean somethingFailed = false;
121
122 private Timer jtimer;
123
124 private long maxLatency;
125
126 private long timeout;
127
128 private long lastProcessingStartTime;
129
130 private long totalAdded = 0;
131 private AtomicLong totalSent = new AtomicLong(0);
132 private AtomicLong totalBinned = new AtomicLong(0);
133 private AtomicLong totalBinTime = new AtomicLong(0);
134 private AtomicLong totalSendTime = new AtomicLong(0);
135 private long startTime = 0;
136 private long initialGCTimes;
137 private long initialCompileTimes;
138 private double initialSystemLoad;
139
140 private int tabletServersBatchSum = 0;
141 private int tabletBatchSum = 0;
142 private int numBatches = 0;
143 private int maxTabletBatch = Integer.MIN_VALUE;
144 private int minTabletBatch = Integer.MAX_VALUE;
145 private int minTabletServersBatch = Integer.MAX_VALUE;
146 private int maxTabletServersBatch = Integer.MIN_VALUE;
147
148 private Throwable lastUnknownError = null;
149
150 private Map<String,TimeoutTracker> timeoutTrackers;
151
152 private static class TimeoutTracker {
153
154 String server;
155 long timeOut;
156 long activityTime;
157 Long firstErrorTime = null;
158
159 TimeoutTracker(String server, long timeOut) {
160 this.timeOut = timeOut;
161 this.server = server;
162 }
163
164 void startingWrite() {
165 activityTime = System.currentTimeMillis();
166 }
167
168 void madeProgress() {
169 activityTime = System.currentTimeMillis();
170 firstErrorTime = null;
171 }
172
173 void wroteNothing() {
174 if (firstErrorTime == null) {
175 firstErrorTime = activityTime;
176 } else if (System.currentTimeMillis() - firstErrorTime > timeOut) {
177 throw new TimedOutException(Collections.singleton(server));
178 }
179 }
180
181 void errorOccured(Exception e) {
182 wroteNothing();
183 }
184
185 public long getTimeOut() {
186 return timeOut;
187 }
188 }
189
190 public TabletServerBatchWriter(Instance instance, Credential credentials, BatchWriterConfig config) {
191 this.instance = instance;
192 this.maxMem = config.getMaxMemory();
193 this.maxLatency = config.getMaxLatency(TimeUnit.MILLISECONDS) <= 0 ? Long.MAX_VALUE : config.getMaxLatency(TimeUnit.MILLISECONDS);
194 this.credentials = credentials;
195 this.timeout = config.getTimeout(TimeUnit.MILLISECONDS);
196 mutations = new MutationSet();
197
198 violations = new Violations();
199
200 authorizationFailures = new HashMap<KeyExtent,Set<SecurityErrorCode>>();
201 serverSideErrors = new HashSet<String>();
202
203 lastProcessingStartTime = System.currentTimeMillis();
204
205 jtimer = new Timer("BatchWriterLatencyTimer", true);
206
207 writer = new MutationWriter(config.getMaxWriteThreads());
208 failedMutations = new FailedMutations();
209
210 timeoutTrackers = Collections.synchronizedMap(new HashMap<String,TabletServerBatchWriter.TimeoutTracker>());
211
212 if (this.maxLatency != Long.MAX_VALUE) {
213 jtimer.schedule(new TimerTask() {
214 public void run() {
215 try {
216 synchronized (TabletServerBatchWriter.this) {
217 if ((System.currentTimeMillis() - lastProcessingStartTime) > TabletServerBatchWriter.this.maxLatency)
218 startProcessing();
219 }
220 } catch (Throwable t) {
221 updateUnknownErrors("Max latency task failed " + t.getMessage(), t);
222 }
223 }
224 }, 0, this.maxLatency / 4);
225 }
226 }
227
228 private synchronized void startProcessing() {
229 if (mutations.getMemoryUsed() == 0)
230 return;
231 lastProcessingStartTime = System.currentTimeMillis();
232 writer.addMutations(mutations);
233 mutations = new MutationSet();
234 }
235
236 private synchronized void decrementMemUsed(long amount) {
237 totalMemUsed -= amount;
238 this.notifyAll();
239 }
240
241 public synchronized void addMutation(String table, Mutation m) throws MutationsRejectedException {
242
243 if (closed)
244 throw new IllegalStateException("Closed");
245 if (m.size() == 0)
246 throw new IllegalArgumentException("Can not add empty mutations");
247
248 checkForFailures();
249
250 while ((totalMemUsed >= maxMem || flushing) && !somethingFailed) {
251 waitRTE();
252 }
253
254
255 if (closed)
256 throw new IllegalStateException("Closed");
257 checkForFailures();
258
259 if (startTime == 0) {
260 startTime = System.currentTimeMillis();
261
262 List<GarbageCollectorMXBean> gcmBeans = ManagementFactory.getGarbageCollectorMXBeans();
263 for (GarbageCollectorMXBean garbageCollectorMXBean : gcmBeans) {
264 initialGCTimes += garbageCollectorMXBean.getCollectionTime();
265 }
266
267 CompilationMXBean compMxBean = ManagementFactory.getCompilationMXBean();
268 if (compMxBean.isCompilationTimeMonitoringSupported()) {
269 initialCompileTimes = compMxBean.getTotalCompilationTime();
270 }
271
272 initialSystemLoad = ManagementFactory.getOperatingSystemMXBean().getSystemLoadAverage();
273 }
274
275
276
277
278
279
280 m = new Mutation(m);
281
282 totalMemUsed += m.estimatedMemoryUsed();
283 mutations.addMutation(table, m);
284 totalAdded++;
285
286 if (mutations.getMemoryUsed() >= maxMem / 2) {
287 startProcessing();
288 checkForFailures();
289 }
290 }
291
292 public void addMutation(String table, Iterator<Mutation> iterator) throws MutationsRejectedException {
293 while (iterator.hasNext()) {
294 addMutation(table, iterator.next());
295 }
296 }
297
298 public synchronized void flush() throws MutationsRejectedException {
299
300 if (closed)
301 throw new IllegalStateException("Closed");
302
303 Span span = Trace.start("flush");
304
305 try {
306 checkForFailures();
307
308 if (flushing) {
309
310 while (flushing && !somethingFailed)
311 waitRTE();
312
313 checkForFailures();
314
315 return;
316 }
317
318 flushing = true;
319
320 startProcessing();
321 checkForFailures();
322
323 while (totalMemUsed > 0 && !somethingFailed) {
324 waitRTE();
325 }
326
327 flushing = false;
328 this.notifyAll();
329
330 checkForFailures();
331 } finally {
332 span.stop();
333 }
334 }
335
336 public synchronized void close() throws MutationsRejectedException {
337
338 if (closed)
339 return;
340
341 Span span = Trace.start("close");
342 try {
343 closed = true;
344
345 startProcessing();
346
347 while (totalMemUsed > 0 && !somethingFailed) {
348 waitRTE();
349 }
350
351 logStats();
352
353 checkForFailures();
354 } finally {
355
356 writer.sendThreadPool.shutdownNow();
357 jtimer.cancel();
358 span.stop();
359 }
360 }
361
362 private void logStats() {
363 long finishTime = System.currentTimeMillis();
364
365 long finalGCTimes = 0;
366 List<GarbageCollectorMXBean> gcmBeans = ManagementFactory.getGarbageCollectorMXBeans();
367 for (GarbageCollectorMXBean garbageCollectorMXBean : gcmBeans) {
368 finalGCTimes += garbageCollectorMXBean.getCollectionTime();
369 }
370
371 CompilationMXBean compMxBean = ManagementFactory.getCompilationMXBean();
372 long finalCompileTimes = 0;
373 if (compMxBean.isCompilationTimeMonitoringSupported()) {
374 finalCompileTimes = compMxBean.getTotalCompilationTime();
375 }
376
377 double averageRate = totalSent.get() / (totalSendTime.get() / 1000.0);
378 double overallRate = totalAdded / ((finishTime - startTime) / 1000.0);
379
380 double finalSystemLoad = ManagementFactory.getOperatingSystemMXBean().getSystemLoadAverage();
381
382 if (log.isTraceEnabled()) {
383 log.trace("");
384 log.trace("TABLET SERVER BATCH WRITER STATISTICS");
385 log.trace(String.format("Added : %,10d mutations", totalAdded));
386 log.trace(String.format("Sent : %,10d mutations", totalSent.get()));
387 log.trace(String.format("Resent percentage : %10.2f%s", (totalSent.get() - totalAdded) / (double) totalAdded * 100.0, "%"));
388 log.trace(String.format("Overall time : %,10.2f secs", (finishTime - startTime) / 1000.0));
389 log.trace(String.format("Overall send rate : %,10.2f mutations/sec", overallRate));
390 log.trace(String.format("Send efficiency : %10.2f%s", overallRate / averageRate * 100.0, "%"));
391 log.trace("");
392 log.trace("BACKGROUND WRITER PROCESS STATISTICS");
393 log.trace(String.format("Total send time : %,10.2f secs %6.2f%s", totalSendTime.get() / 1000.0, 100.0 * totalSendTime.get()
394 / (finishTime - startTime), "%"));
395 log.trace(String.format("Average send rate : %,10.2f mutations/sec", averageRate));
396 log.trace(String.format("Total bin time : %,10.2f secs %6.2f%s", totalBinTime.get() / 1000.0,
397 100.0 * totalBinTime.get() / (finishTime - startTime), "%"));
398 log.trace(String.format("Average bin rate : %,10.2f mutations/sec", totalBinned.get() / (totalBinTime.get() / 1000.0)));
399 log.trace(String.format("tservers per batch : %,8.2f avg %,6d min %,6d max", (tabletServersBatchSum / (double) numBatches), minTabletServersBatch,
400 maxTabletServersBatch));
401 log.trace(String.format("tablets per batch : %,8.2f avg %,6d min %,6d max", (tabletBatchSum / (double) numBatches), minTabletBatch, maxTabletBatch));
402 log.trace("");
403 log.trace("SYSTEM STATISTICS");
404 log.trace(String.format("JVM GC Time : %,10.2f secs", ((finalGCTimes - initialGCTimes) / 1000.0)));
405 if (compMxBean.isCompilationTimeMonitoringSupported()) {
406 log.trace(String.format("JVM Compile Time : %,10.2f secs", ((finalCompileTimes - initialCompileTimes) / 1000.0)));
407 }
408 log.trace(String.format("System load average : initial=%6.2f final=%6.2f", initialSystemLoad, finalSystemLoad));
409 }
410 }
411
412 private void updateSendStats(long count, long time) {
413 totalSent.addAndGet(count);
414 totalSendTime.addAndGet(time);
415 }
416
417 public void updateBinningStats(int count, long time, Map<String,TabletServerMutations> binnedMutations) {
418 totalBinTime.addAndGet(time);
419 totalBinned.addAndGet(count);
420 updateBatchStats(binnedMutations);
421 }
422
423 private synchronized void updateBatchStats(Map<String,TabletServerMutations> binnedMutations) {
424 tabletServersBatchSum += binnedMutations.size();
425
426 minTabletServersBatch = Math.min(minTabletServersBatch, binnedMutations.size());
427 maxTabletServersBatch = Math.max(maxTabletServersBatch, binnedMutations.size());
428
429 int numTablets = 0;
430
431 for (Entry<String,TabletServerMutations> entry : binnedMutations.entrySet()) {
432 TabletServerMutations tsm = entry.getValue();
433 numTablets += tsm.getMutations().size();
434 }
435
436 tabletBatchSum += numTablets;
437
438 minTabletBatch = Math.min(minTabletBatch, numTablets);
439 maxTabletBatch = Math.max(maxTabletBatch, numTablets);
440
441 numBatches++;
442 }
443
444 private void waitRTE() {
445 try {
446 wait();
447 } catch (InterruptedException e) {
448 throw new RuntimeException(e);
449 }
450 }
451
452
453
454 private void updatedConstraintViolations(List<ConstraintViolationSummary> cvsList) {
455 if (cvsList.size() > 0) {
456 synchronized (this) {
457 somethingFailed = true;
458 violations.add(cvsList);
459 this.notifyAll();
460 }
461 }
462 }
463
464 private void updateAuthorizationFailures(Set<KeyExtent> keySet, SecurityErrorCode code) {
465 HashMap<KeyExtent, SecurityErrorCode> map = new HashMap<KeyExtent, SecurityErrorCode>();
466 for (KeyExtent ke : keySet)
467 map.put(ke, code);
468
469 updateAuthorizationFailures(map);
470 }
471
472 private void updateAuthorizationFailures(Map<KeyExtent,SecurityErrorCode> authorizationFailures) {
473 if (authorizationFailures.size() > 0) {
474
475
476 HashSet<String> tableIds = new HashSet<String>();
477 for (KeyExtent ke : authorizationFailures.keySet())
478 tableIds.add(ke.getTableId().toString());
479
480 Tables.clearCache(instance);
481 for (String tableId : tableIds)
482 if (!Tables.exists(instance, tableId))
483 throw new TableDeletedException(tableId);
484
485 synchronized (this) {
486 somethingFailed = true;
487 mergeAuthorizationFailures(this.authorizationFailures, authorizationFailures);
488 this.notifyAll();
489 }
490 }
491 }
492
493 private void mergeAuthorizationFailures(Map<KeyExtent,Set<SecurityErrorCode>> source, Map<KeyExtent,SecurityErrorCode> addition) {
494 for (Entry<KeyExtent,SecurityErrorCode> entry : addition.entrySet()) {
495 Set<SecurityErrorCode> secs = source.get(entry.getKey());
496 if (secs == null) {
497 secs = new HashSet<SecurityErrorCode>();
498 source.put(entry.getKey(), secs);
499 }
500 secs.add(entry.getValue());
501 }
502 }
503
504 private synchronized void updateServerErrors(String server, Exception e) {
505 somethingFailed = true;
506 this.serverSideErrors.add(server);
507 this.notifyAll();
508 log.error("Server side error on " + server);
509 }
510
511 private synchronized void updateUnknownErrors(String msg, Throwable t) {
512 somethingFailed = true;
513 unknownErrors++;
514 this.lastUnknownError = t;
515 this.notifyAll();
516 if (t instanceof TableDeletedException || t instanceof TableOfflineException || t instanceof TimedOutException)
517 log.debug(msg, t);
518 else
519 log.error(msg, t);
520 }
521
522 private void checkForFailures() throws MutationsRejectedException {
523 if (somethingFailed) {
524 List<ConstraintViolationSummary> cvsList = violations.asList();
525 throw new MutationsRejectedException(cvsList, new HashMap<KeyExtent,Set<SecurityErrorCode>>(authorizationFailures), serverSideErrors, unknownErrors, lastUnknownError);
526 }
527 }
528
529
530
531
532
533 /**
534 * Add mutations that previously failed back into the mix
535 *
536 * @param mutationsprivate
537 * static final Logger log = Logger.getLogger(TabletServerBatchWriter.class);
538 */
539 private synchronized void addFailedMutations(MutationSet failedMutations) throws Exception {
540 mutations.addAll(failedMutations);
541 if (mutations.getMemoryUsed() >= maxMem / 2 || closed || flushing) {
542 startProcessing();
543 }
544 }
545
546 private class FailedMutations extends TimerTask {
547
548 private MutationSet recentFailures = null;
549 private long initTime;
550
551 FailedMutations() {
552 jtimer.schedule(this, 0, 500);
553 }
554
555 private MutationSet init() {
556 if (recentFailures == null) {
557 recentFailures = new MutationSet();
558 initTime = System.currentTimeMillis();
559 }
560 return recentFailures;
561 }
562
563 synchronized void add(String table, ArrayList<Mutation> tableFailures) {
564 init().addAll(table, tableFailures);
565 }
566
567 synchronized void add(MutationSet failures) {
568 init().addAll(failures);
569 }
570
571 synchronized void add(String location, TabletServerMutations tsm) {
572 init();
573 for (Entry<KeyExtent,List<Mutation>> entry : tsm.getMutations().entrySet()) {
574 recentFailures.addAll(entry.getKey().getTableId().toString(), entry.getValue());
575 }
576
577 }
578
579 @Override
580 public void run() {
581 try {
582 MutationSet rf = null;
583
584 synchronized (this) {
585 if (recentFailures != null && System.currentTimeMillis() - initTime > 1000) {
586 rf = recentFailures;
587 recentFailures = null;
588 }
589 }
590
591 if (rf != null) {
592 if (log.isTraceEnabled())
593 log.trace("requeuing " + rf.size() + " failed mutations");
594 addFailedMutations(rf);
595 }
596 } catch (Throwable t) {
597 updateUnknownErrors("Failed to requeue failed mutations " + t.getMessage(), t);
598 cancel();
599 }
600 }
601 }
602
603
604
605
606
607 private class MutationWriter {
608
609 private static final int MUTATION_BATCH_SIZE = 1 << 17;
610 private ExecutorService sendThreadPool;
611 private Map<String,TabletServerMutations> serversMutations;
612 private Set<String> queued;
613 private Map<String,TabletLocator> locators;
614
615 public MutationWriter(int numSendThreads) {
616 serversMutations = new HashMap<String,TabletServerMutations>();
617 queued = new HashSet<String>();
618 sendThreadPool = new SimpleThreadPool(numSendThreads, this.getClass().getName());
619 locators = new HashMap<String,TabletLocator>();
620 }
621
622 private TabletLocator getLocator(String tableId) {
623 TabletLocator ret = locators.get(tableId);
624 if (ret == null) {
625 ret = TabletLocator.getInstance(instance, credentials, new Text(tableId));
626 ret = new TimeoutTabletLocator(ret, timeout);
627 locators.put(tableId, ret);
628 }
629
630 return ret;
631 }
632
633 private void binMutations(MutationSet mutationsToProcess, Map<String,TabletServerMutations> binnedMutations) {
634 try {
635 Set<Entry<String,List<Mutation>>> es = mutationsToProcess.getMutations().entrySet();
636 for (Entry<String,List<Mutation>> entry : es) {
637 TabletLocator locator = getLocator(entry.getKey());
638
639 String table = entry.getKey();
640 List<Mutation> tableMutations = entry.getValue();
641
642 if (tableMutations != null) {
643 ArrayList<Mutation> tableFailures = new ArrayList<Mutation>();
644 locator.binMutations(tableMutations, binnedMutations, tableFailures);
645
646 if (tableFailures.size() > 0) {
647 failedMutations.add(table, tableFailures);
648
649 if (tableFailures.size() == tableMutations.size())
650 if (!Tables.exists(instance, entry.getKey()))
651 throw new TableDeletedException(entry.getKey());
652 else if (Tables.getTableState(instance, table) == TableState.OFFLINE)
653 throw new TableOfflineException(instance, entry.getKey());
654 }
655 }
656
657 }
658 return;
659 } catch (AccumuloServerException ase) {
660 updateServerErrors(ase.getServer(), ase);
661 } catch (AccumuloException ae) {
662
663 failedMutations.add(mutationsToProcess);
664 } catch (AccumuloSecurityException e) {
665 updateAuthorizationFailures(Collections.singletonMap(new KeyExtent(new Text(Constants.METADATA_TABLE_ID), null, null), e.getErrorCode()));
666 } catch (TableDeletedException e) {
667 updateUnknownErrors(e.getMessage(), e);
668 } catch (TableOfflineException e) {
669 updateUnknownErrors(e.getMessage(), e);
670 } catch (TableNotFoundException e) {
671 updateUnknownErrors(e.getMessage(), e);
672 }
673
674
675 binnedMutations.clear();
676
677 }
678
679 void addMutations(MutationSet mutationsToSend) {
680 Map<String,TabletServerMutations> binnedMutations = new HashMap<String,TabletServerMutations>();
681 Span span = Trace.start("binMutations");
682 try {
683 long t1 = System.currentTimeMillis();
684 binMutations(mutationsToSend, binnedMutations);
685 long t2 = System.currentTimeMillis();
686 updateBinningStats(mutationsToSend.size(), (t2 - t1), binnedMutations);
687 } finally {
688 span.stop();
689 }
690 addMutations(binnedMutations);
691 }
692
693 private synchronized void addMutations(Map<String,TabletServerMutations> binnedMutations) {
694
695 int count = 0;
696
697
698 for (Entry<String,TabletServerMutations> entry : binnedMutations.entrySet()) {
699 String server = entry.getKey();
700
701 TabletServerMutations currentMutations = serversMutations.get(server);
702
703 if (currentMutations == null) {
704 serversMutations.put(server, entry.getValue());
705 } else {
706 for (Entry<KeyExtent,List<Mutation>> entry2 : entry.getValue().getMutations().entrySet()) {
707 for (Mutation m : entry2.getValue()) {
708 currentMutations.addMutation(entry2.getKey(), m);
709 }
710 }
711 }
712
713 if (log.isTraceEnabled())
714 for (Entry<KeyExtent,List<Mutation>> entry2 : entry.getValue().getMutations().entrySet())
715 count += entry2.getValue().size();
716
717 }
718
719 if (count > 0 && log.isTraceEnabled())
720 log.trace(String.format("Started sending %,d mutations to %,d tablet servers", count, binnedMutations.keySet().size()));
721
722
723 ArrayList<String> servers = new ArrayList<String>(binnedMutations.keySet());
724 Collections.shuffle(servers);
725
726 for (String server : servers)
727 if (!queued.contains(server)) {
728 sendThreadPool.submit(Trace.wrap(new SendTask(server)));
729 queued.add(server);
730 }
731 }
732
733 private synchronized TabletServerMutations getMutationsToSend(String server) {
734 TabletServerMutations tsmuts = serversMutations.remove(server);
735 if (tsmuts == null)
736 queued.remove(server);
737
738 return tsmuts;
739 }
740
741 class SendTask implements Runnable {
742
743 private String location;
744
745 SendTask(String server) {
746 this.location = server;
747 }
748
749 @Override
750 public void run() {
751 try {
752 TabletServerMutations tsmuts = getMutationsToSend(location);
753
754 while (tsmuts != null) {
755 send(tsmuts);
756 tsmuts = getMutationsToSend(location);
757 }
758
759 return;
760 } catch (Throwable t) {
761 updateUnknownErrors("Failed to send tablet server " + location + " its batch : " + t.getMessage(), t);
762 }
763 }
764
765 public void send(TabletServerMutations tsm) throws AccumuloServerException, AccumuloSecurityException {
766
767 MutationSet failures = null;
768
769 String oldName = Thread.currentThread().getName();
770
771 Map<KeyExtent,List<Mutation>> mutationBatch = tsm.getMutations();
772 try {
773
774 long count = 0;
775 for (List<Mutation> list : mutationBatch.values()) {
776 count += list.size();
777 }
778 String msg = "sending " + String.format("%,d", count) + " mutations to " + String.format("%,d", mutationBatch.size()) + " tablets at " + location;
779 Thread.currentThread().setName(msg);
780
781 Span span = Trace.start("sendMutations");
782 try {
783
784 TimeoutTracker timeoutTracker = timeoutTrackers.get(location);
785 if (timeoutTracker == null) {
786 timeoutTracker = new TimeoutTracker(location, timeout);
787 timeoutTrackers.put(location, timeoutTracker);
788 }
789
790 long st1 = System.currentTimeMillis();
791 failures = sendMutationsToTabletServer(location, mutationBatch, timeoutTracker);
792 long st2 = System.currentTimeMillis();
793 if (log.isTraceEnabled())
794 log.trace("sent " + String.format("%,d", count) + " mutations to " + location + " in "
795 + String.format("%.2f secs (%,.2f mutations/sec) with %,d failures", (st2 - st1) / 1000.0, count / ((st2 - st1) / 1000.0), failures.size()));
796
797 long successBytes = 0;
798 for (Entry<KeyExtent,List<Mutation>> entry : mutationBatch.entrySet()) {
799 for (Mutation mutation : entry.getValue()) {
800 successBytes += mutation.estimatedMemoryUsed();
801 }
802 }
803
804 if (failures.size() > 0) {
805 failedMutations.add(failures);
806 successBytes -= failures.getMemoryUsed();
807 }
808
809 updateSendStats(count, st2 - st1);
810 decrementMemUsed(successBytes);
811
812 } finally {
813 span.stop();
814 }
815 } catch (IOException e) {
816 if (log.isTraceEnabled())
817 log.trace("failed to send mutations to " + location + " : " + e.getMessage());
818
819 HashSet<String> tables = new HashSet<String>();
820 for (KeyExtent ke : mutationBatch.keySet())
821 tables.add(ke.getTableId().toString());
822
823 for (String table : tables)
824 TabletLocator.getInstance(instance, credentials, new Text(table)).invalidateCache(location);
825
826 failedMutations.add(location, tsm);
827 } finally {
828 Thread.currentThread().setName(oldName);
829 }
830 }
831 }
832
833 private MutationSet sendMutationsToTabletServer(String location, Map<KeyExtent,List<Mutation>> tabMuts, TimeoutTracker timeoutTracker) throws IOException,
834 AccumuloSecurityException, AccumuloServerException {
835 if (tabMuts.size() == 0) {
836 return new MutationSet();
837 }
838 TInfo tinfo = Tracer.traceInfo();
839 TTransport transport = null;
840
841 timeoutTracker.startingWrite();
842
843 try {
844 TabletClientService.Iface client;
845
846 if (timeoutTracker.getTimeOut() < instance.getConfiguration().getTimeInMillis(Property.GENERAL_RPC_TIMEOUT))
847 client = ThriftUtil.getTServerClient(location, instance.getConfiguration(), timeoutTracker.getTimeOut());
848 else
849 client = ThriftUtil.getTServerClient(location, instance.getConfiguration());
850
851 try {
852 MutationSet allFailures = new MutationSet();
853
854 if (tabMuts.size() == 1 && tabMuts.values().iterator().next().size() == 1) {
855 Entry<KeyExtent,List<Mutation>> entry = tabMuts.entrySet().iterator().next();
856
857 try {
858 client.update(tinfo, credentials, entry.getKey().toThrift(), entry.getValue().get(0).toThrift());
859 } catch (NotServingTabletException e) {
860 allFailures.addAll(entry.getKey().getTableId().toString(), entry.getValue());
861 TabletLocator.getInstance(instance, credentials, new Text(entry.getKey().getTableId())).invalidateCache(entry.getKey());
862 } catch (ConstraintViolationException e) {
863 updatedConstraintViolations(Translator.translate(e.violationSummaries, Translator.TCVST));
864 }
865 timeoutTracker.madeProgress();
866 } else {
867
868 long usid = client.startUpdate(tinfo, credentials);
869
870 List<TMutation> updates = new ArrayList<TMutation>();
871 for (Entry<KeyExtent,List<Mutation>> entry : tabMuts.entrySet()) {
872 long size = 0;
873 Iterator<Mutation> iter = entry.getValue().iterator();
874 while (iter.hasNext()) {
875 while (size < MUTATION_BATCH_SIZE && iter.hasNext()) {
876 Mutation mutation = iter.next();
877 updates.add(mutation.toThrift());
878 size += mutation.numBytes();
879 }
880
881 client.applyUpdates(tinfo, usid, entry.getKey().toThrift(), updates);
882 updates.clear();
883 size = 0;
884 }
885 }
886
887 UpdateErrors updateErrors = client.closeUpdate(tinfo, usid);
888
889 Map<KeyExtent,Long> failures = Translator.translate(updateErrors.failedExtents, Translator.TKET);
890 updatedConstraintViolations(Translator.translate(updateErrors.violationSummaries, Translator.TCVST));
891 updateAuthorizationFailures(Translator.translate(updateErrors.authorizationFailures, Translator.TKET));
892
893 long totalCommitted = 0;
894
895 for (Entry<KeyExtent,Long> entry : failures.entrySet()) {
896 KeyExtent failedExtent = entry.getKey();
897 int numCommitted = (int) (long) entry.getValue();
898 totalCommitted += numCommitted;
899
900 String table = failedExtent.getTableId().toString();
901
902 TabletLocator.getInstance(instance, credentials, new Text(table)).invalidateCache(failedExtent);
903
904 ArrayList<Mutation> mutations = (ArrayList<Mutation>) tabMuts.get(failedExtent);
905 allFailures.addAll(table, mutations.subList(numCommitted, mutations.size()));
906 }
907
908 if (failures.keySet().containsAll(tabMuts.keySet()) && totalCommitted == 0) {
909
910 timeoutTracker.wroteNothing();
911 } else {
912
913 timeoutTracker.madeProgress();
914 }
915 }
916 return allFailures;
917 } finally {
918 ThriftUtil.returnClient((TServiceClient) client);
919 }
920 } catch (TTransportException e) {
921 timeoutTracker.errorOccured(e);
922 throw new IOException(e);
923 } catch (TApplicationException tae) {
924 updateServerErrors(location, tae);
925 throw new AccumuloServerException(location, tae);
926 } catch (ThriftSecurityException e) {
927 updateAuthorizationFailures(tabMuts.keySet(), e.code);
928 throw new AccumuloSecurityException(e.user, e.code, e);
929 } catch (NoSuchScanIDException e) {
930 throw new IOException(e);
931 } catch (TException e) {
932 throw new IOException(e);
933 } finally {
934 ThriftTransportPool.getInstance().returnTransport(transport);
935 }
936 }
937 }
938
939
940
941 private static class MutationSet {
942
943 private HashMap<String,List<Mutation>> mutations;
944 private int memoryUsed = 0;
945
946 MutationSet() {
947 mutations = new HashMap<String,List<Mutation>>();
948 }
949
950 void addMutation(String table, Mutation mutation) {
951 List<Mutation> tabMutList = mutations.get(table);
952 if (tabMutList == null) {
953 tabMutList = new ArrayList<Mutation>();
954 mutations.put(table, tabMutList);
955 }
956
957 tabMutList.add(mutation);
958
959 memoryUsed += mutation.estimatedMemoryUsed();
960 }
961
962 Map<String,List<Mutation>> getMutations() {
963 return mutations;
964 }
965
966 int size() {
967 int result = 0;
968 for (List<Mutation> perTable : mutations.values()) {
969 result += perTable.size();
970 }
971 return result;
972 }
973
974 public void addAll(MutationSet failures) {
975 Set<Entry<String,List<Mutation>>> es = failures.getMutations().entrySet();
976
977 for (Entry<String,List<Mutation>> entry : es) {
978 String table = entry.getKey();
979
980 for (Mutation mutation : entry.getValue()) {
981 addMutation(table, mutation);
982 }
983 }
984 }
985
986 public void addAll(String table, List<Mutation> mutations) {
987 for (Mutation mutation : mutations) {
988 addMutation(table, mutation);
989 }
990 }
991
992 public int getMemoryUsed() {
993 return memoryUsed;
994 }
995
996 }
997 }