1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.accumulo.core.client.admin;
18
19 import java.io.BufferedReader;
20 import java.io.IOException;
21 import java.io.InputStreamReader;
22 import java.nio.ByteBuffer;
23 import java.util.ArrayList;
24 import java.util.Arrays;
25 import java.util.Collection;
26 import java.util.Collections;
27 import java.util.HashMap;
28 import java.util.HashSet;
29 import java.util.LinkedList;
30 import java.util.List;
31 import java.util.Map;
32 import java.util.Map.Entry;
33 import java.util.Set;
34 import java.util.SortedSet;
35 import java.util.TreeMap;
36 import java.util.TreeSet;
37 import java.util.concurrent.CountDownLatch;
38 import java.util.concurrent.ExecutorService;
39 import java.util.concurrent.Executors;
40 import java.util.concurrent.TimeUnit;
41 import java.util.concurrent.atomic.AtomicReference;
42 import java.util.zip.ZipEntry;
43 import java.util.zip.ZipInputStream;
44
45 import org.apache.accumulo.core.Constants;
46 import org.apache.accumulo.core.client.AccumuloException;
47 import org.apache.accumulo.core.client.AccumuloSecurityException;
48 import org.apache.accumulo.core.client.Instance;
49 import org.apache.accumulo.core.client.IteratorSetting;
50 import org.apache.accumulo.core.client.Scanner;
51 import org.apache.accumulo.core.client.TableDeletedException;
52 import org.apache.accumulo.core.client.TableExistsException;
53 import org.apache.accumulo.core.client.TableNotFoundException;
54 import org.apache.accumulo.core.client.TableOfflineException;
55 import org.apache.accumulo.core.client.impl.AccumuloServerException;
56 import org.apache.accumulo.core.client.impl.ClientExec;
57 import org.apache.accumulo.core.client.impl.ClientExecReturn;
58 import org.apache.accumulo.core.client.impl.MasterClient;
59 import org.apache.accumulo.core.client.impl.ServerClient;
60 import org.apache.accumulo.core.client.impl.Tables;
61 import org.apache.accumulo.core.client.impl.TabletLocator;
62 import org.apache.accumulo.core.client.impl.TabletLocator.TabletLocation;
63 import org.apache.accumulo.core.client.impl.thrift.ClientService;
64 import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException;
65 import org.apache.accumulo.core.conf.AccumuloConfiguration;
66 import org.apache.accumulo.core.conf.ConfigurationCopy;
67 import org.apache.accumulo.core.conf.Property;
68 import org.apache.accumulo.core.data.ByteSequence;
69 import org.apache.accumulo.core.data.KeyExtent;
70 import org.apache.accumulo.core.data.Range;
71 import org.apache.accumulo.core.file.FileUtil;
72 import org.apache.accumulo.core.iterators.IteratorUtil;
73 import org.apache.accumulo.core.master.state.tables.TableState;
74 import org.apache.accumulo.core.master.thrift.MasterClientService;
75 import org.apache.accumulo.core.master.thrift.TableOperation;
76 import org.apache.accumulo.core.security.Authorizations;
77 import org.apache.accumulo.core.security.CredentialHelper;
78 import org.apache.accumulo.core.security.thrift.Credential;
79 import org.apache.accumulo.core.security.thrift.ThriftSecurityException;
80 import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException;
81 import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
82 import org.apache.accumulo.core.util.ArgumentChecker;
83 import org.apache.accumulo.core.util.CachedConfiguration;
84 import org.apache.accumulo.core.util.LocalityGroupUtil;
85 import org.apache.accumulo.core.util.MetadataTable;
86 import org.apache.accumulo.core.util.NamingThreadFactory;
87 import org.apache.accumulo.core.util.OpTimer;
88 import org.apache.accumulo.core.util.StringUtil;
89 import org.apache.accumulo.core.util.TextUtil;
90 import org.apache.accumulo.core.util.ThriftUtil;
91 import org.apache.accumulo.core.util.UtilWaitThread;
92 import org.apache.accumulo.trace.instrument.Tracer;
93 import org.apache.hadoop.fs.FileStatus;
94 import org.apache.hadoop.fs.FileSystem;
95 import org.apache.hadoop.fs.Path;
96 import org.apache.hadoop.io.Text;
97 import org.apache.log4j.Level;
98 import org.apache.log4j.Logger;
99 import org.apache.thrift.TApplicationException;
100 import org.apache.thrift.TException;
101 import org.apache.thrift.transport.TTransportException;
102
103 /**
104 * Provides a class for administering tables
105 *
106 */
107 public class TableOperationsImpl extends TableOperationsHelper {
108 private Instance instance;
109 private Credential credentials;
110
111 private static final Logger log = Logger.getLogger(TableOperations.class);
112
113 /**
114 * @param instance
115 * the connection information for this instance
116 * @param credentials
117 * the username/password for this connection
118 */
119 public TableOperationsImpl(Instance instance, Credential credentials) {
120 ArgumentChecker.notNull(instance, credentials);
121 this.instance = instance;
122 this.credentials = credentials;
123 }
124
125 /**
126 * Retrieve a list of tables in Accumulo.
127 *
128 * @return List of tables in accumulo
129 */
130 @Override
131 public SortedSet<String> list() {
132 OpTimer opTimer = new OpTimer(log, Level.TRACE).start("Fetching list of tables...");
133 TreeSet<String> tableNames = new TreeSet<String>(Tables.getNameToIdMap(instance).keySet());
134 opTimer.stop("Fetched " + tableNames.size() + " table names in %DURATION%");
135 return tableNames;
136 }
137
138 /**
139 * A method to check if a table exists in Accumulo.
140 *
141 * @param tableName
142 * the name of the table
143 * @return true if the table exists
144 */
145 @Override
146 public boolean exists(String tableName) {
147 ArgumentChecker.notNull(tableName);
148 if (tableName.equals(Constants.METADATA_TABLE_NAME))
149 return true;
150
151 OpTimer opTimer = new OpTimer(log, Level.TRACE).start("Checking if table " + tableName + "exists...");
152 boolean exists = Tables.getNameToIdMap(instance).containsKey(tableName);
153 opTimer.stop("Checked existance of " + exists + " in %DURATION%");
154 return exists;
155 }
156
157 /**
158 * Create a table with no special configuration
159 *
160 * @param tableName
161 * the name of the table
162 * @throws AccumuloException
163 * if a general error occurs
164 * @throws AccumuloSecurityException
165 * if the user does not have permission
166 * @throws TableExistsException
167 * if the table already exists
168 */
169 @Override
170 public void create(String tableName) throws AccumuloException, AccumuloSecurityException, TableExistsException {
171 create(tableName, true, TimeType.MILLIS);
172 }
173
174 /**
175 * @param tableName
176 * the name of the table
177 * @param limitVersion
178 * Enables/disables the versioning iterator, which will limit the number of Key versions kept.
179 */
180 @Override
181 public void create(String tableName, boolean limitVersion) throws AccumuloException, AccumuloSecurityException, TableExistsException {
182 create(tableName, limitVersion, TimeType.MILLIS);
183 }
184
185 /**
186 * @param tableName
187 * the name of the table
188 * @param timeType
189 * specifies logical or real-time based time recording for entries in the table
190 * @param limitVersion
191 * Enables/disables the versioning iterator, which will limit the number of Key versions kept.
192 */
193 @Override
194 public void create(String tableName, boolean limitVersion, TimeType timeType) throws AccumuloException, AccumuloSecurityException, TableExistsException {
195 ArgumentChecker.notNull(tableName, timeType);
196
197 List<ByteBuffer> args = Arrays.asList(ByteBuffer.wrap(tableName.getBytes()), ByteBuffer.wrap(timeType.name().getBytes()));
198
199 Map<String,String> opts = IteratorUtil.generateInitialTableProperties(limitVersion);
200
201 try {
202 doTableOperation(TableOperation.CREATE, args, opts);
203 } catch (TableNotFoundException e1) {
204
205 throw new RuntimeException(e1);
206 }
207 }
208
209 private long beginTableOperation() throws ThriftSecurityException, TException {
210 while (true) {
211 MasterClientService.Iface client = null;
212 try {
213 client = MasterClient.getConnectionWithRetry(instance);
214 return client.beginTableOperation(Tracer.traceInfo(), credentials);
215 } catch (TTransportException tte) {
216 log.debug("Failed to call beginTableOperation(), retrying ... ", tte);
217 UtilWaitThread.sleep(100);
218 } finally {
219 MasterClient.close(client);
220 }
221 }
222 }
223
224 private void executeTableOperation(long opid, TableOperation op, List<ByteBuffer> args, Map<String,String> opts, boolean autoCleanUp)
225 throws ThriftSecurityException, TException, ThriftTableOperationException {
226 while (true) {
227 MasterClientService.Iface client = null;
228 try {
229 client = MasterClient.getConnectionWithRetry(instance);
230 client.executeTableOperation(Tracer.traceInfo(), credentials, opid, op, args, opts, autoCleanUp);
231 break;
232 } catch (TTransportException tte) {
233 log.debug("Failed to call executeTableOperation(), retrying ... ", tte);
234 UtilWaitThread.sleep(100);
235 } finally {
236 MasterClient.close(client);
237 }
238 }
239 }
240
241 private String waitForTableOperation(long opid) throws ThriftSecurityException, TException, ThriftTableOperationException {
242 while (true) {
243 MasterClientService.Iface client = null;
244 try {
245 client = MasterClient.getConnectionWithRetry(instance);
246 return client.waitForTableOperation(Tracer.traceInfo(), credentials, opid);
247 } catch (TTransportException tte) {
248 log.debug("Failed to call waitForTableOperation(), retrying ... ", tte);
249 UtilWaitThread.sleep(100);
250 } finally {
251 MasterClient.close(client);
252 }
253 }
254 }
255
256 private void finishTableOperation(long opid) throws ThriftSecurityException, TException {
257 while (true) {
258 MasterClientService.Iface client = null;
259 try {
260 client = MasterClient.getConnectionWithRetry(instance);
261 client.finishTableOperation(Tracer.traceInfo(), credentials, opid);
262 break;
263 } catch (TTransportException tte) {
264 log.debug("Failed to call finishTableOperation(), retrying ... ", tte);
265 UtilWaitThread.sleep(100);
266 } finally {
267 MasterClient.close(client);
268 }
269 }
270 }
271
272 private String doTableOperation(TableOperation op, List<ByteBuffer> args, Map<String,String> opts) throws AccumuloSecurityException, TableExistsException,
273 TableNotFoundException, AccumuloException {
274 return doTableOperation(op, args, opts, true);
275 }
276
277 private String doTableOperation(TableOperation op, List<ByteBuffer> args, Map<String,String> opts, boolean wait) throws AccumuloSecurityException,
278 TableExistsException, TableNotFoundException, AccumuloException {
279 Long opid = null;
280
281 try {
282 opid = beginTableOperation();
283 executeTableOperation(opid, op, args, opts, !wait);
284 if (!wait) {
285 opid = null;
286 return null;
287 }
288 String ret = waitForTableOperation(opid);
289 Tables.clearCache(instance);
290 return ret;
291 } catch (ThriftSecurityException e) {
292 throw new AccumuloSecurityException(e.user, e.code, e);
293 } catch (ThriftTableOperationException e) {
294 switch (e.getType()) {
295 case EXISTS:
296 throw new TableExistsException(e);
297 case NOTFOUND:
298 throw new TableNotFoundException(e);
299 case OFFLINE:
300 throw new TableOfflineException(instance, null);
301 case OTHER:
302 default:
303 throw new AccumuloException(e.description, e);
304 }
305 } catch (Exception e) {
306 throw new AccumuloException(e.getMessage(), e);
307 } finally {
308
309 if (opid != null)
310 try {
311 finishTableOperation(opid);
312 } catch (Exception e) {
313 log.warn(e.getMessage(), e);
314 }
315 }
316 }
317
318 private static class SplitEnv {
319 private String tableName;
320 private String tableId;
321 private ExecutorService executor;
322 private CountDownLatch latch;
323 private AtomicReference<Exception> exception;
324
325 SplitEnv(String tableName, String tableId, ExecutorService executor, CountDownLatch latch, AtomicReference<Exception> exception) {
326 this.tableName = tableName;
327 this.tableId = tableId;
328 this.executor = executor;
329 this.latch = latch;
330 this.exception = exception;
331 }
332 }
333
334 private class SplitTask implements Runnable {
335
336 private List<Text> splits;
337 private SplitEnv env;
338
339 SplitTask(SplitEnv env, List<Text> splits) {
340 this.env = env;
341 this.splits = splits;
342 }
343
344 @Override
345 public void run() {
346 try {
347 if (env.exception.get() != null)
348 return;
349
350 if (splits.size() <= 2) {
351 addSplits(env.tableName, new TreeSet<Text>(splits), env.tableId);
352 for (int i = 0; i < splits.size(); i++)
353 env.latch.countDown();
354 return;
355 }
356
357 int mid = splits.size() / 2;
358
359
360
361 addSplits(env.tableName, new TreeSet<Text>(splits.subList(mid, mid + 1)), env.tableId);
362 env.latch.countDown();
363
364 env.executor.submit(new SplitTask(env, splits.subList(0, mid)));
365 env.executor.submit(new SplitTask(env, splits.subList(mid + 1, splits.size())));
366
367 } catch (Exception e) {
368 env.exception.compareAndSet(null, e);
369 }
370 }
371
372 }
373
374 /**
375 * @param tableName
376 * the name of the table
377 * @param partitionKeys
378 * a sorted set of row key values to pre-split the table on
379 * @throws AccumuloException
380 * if a general error occurs
381 * @throws AccumuloSecurityException
382 * if the user does not have permission
383 * @throws TableNotFoundException
384 * if the table does not exist
385 */
386 @Override
387 public void addSplits(String tableName, SortedSet<Text> partitionKeys) throws TableNotFoundException, AccumuloException, AccumuloSecurityException {
388 String tableId = Tables.getTableId(instance, tableName);
389
390 List<Text> splits = new ArrayList<Text>(partitionKeys);
391
392
393 Collections.sort(splits);
394
395 CountDownLatch latch = new CountDownLatch(splits.size());
396 AtomicReference<Exception> exception = new AtomicReference<Exception>(null);
397
398 ExecutorService executor = Executors.newFixedThreadPool(16, new NamingThreadFactory("addSplits"));
399 try {
400 executor.submit(new SplitTask(new SplitEnv(tableName, tableId, executor, latch, exception), splits));
401
402 while (!latch.await(100, TimeUnit.MILLISECONDS)) {
403 if (exception.get() != null) {
404 executor.shutdownNow();
405 Exception excep = exception.get();
406 if (excep instanceof TableNotFoundException)
407 throw (TableNotFoundException) excep;
408 else if (excep instanceof AccumuloException)
409 throw (AccumuloException) excep;
410 else if (excep instanceof AccumuloSecurityException)
411 throw (AccumuloSecurityException) excep;
412 else if (excep instanceof RuntimeException)
413 throw (RuntimeException) excep;
414 else
415 throw new RuntimeException(excep);
416 }
417 }
418 } catch (InterruptedException e) {
419 throw new RuntimeException(e);
420 } finally {
421 executor.shutdown();
422 }
423 }
424
425 private void addSplits(String tableName, SortedSet<Text> partitionKeys, String tableId) throws AccumuloException, AccumuloSecurityException,
426 TableNotFoundException, AccumuloServerException {
427 TabletLocator tabLocator = TabletLocator.getInstance(instance, credentials, new Text(tableId));
428
429 for (Text split : partitionKeys) {
430 boolean successful = false;
431 int attempt = 0;
432
433 while (!successful) {
434
435 if (attempt > 0)
436 UtilWaitThread.sleep(100);
437
438 attempt++;
439
440 TabletLocation tl = tabLocator.locateTablet(split, false, false);
441
442 if (tl == null) {
443 if (!Tables.exists(instance, tableId))
444 throw new TableNotFoundException(tableId, tableName, null);
445 else if (Tables.getTableState(instance, tableId) == TableState.OFFLINE)
446 throw new TableOfflineException(instance, tableId);
447 continue;
448 }
449
450 try {
451 TabletClientService.Client client = ThriftUtil.getTServerClient(tl.tablet_location, instance.getConfiguration());
452 try {
453 OpTimer opTimer = null;
454 if (log.isTraceEnabled())
455 opTimer = new OpTimer(log, Level.TRACE).start("Splitting tablet " + tl.tablet_extent + " on " + tl.tablet_location + " at " + split);
456
457 client.splitTablet(Tracer.traceInfo(), credentials, tl.tablet_extent.toThrift(), TextUtil.getByteBuffer(split));
458
459
460 tabLocator.invalidateCache(tl.tablet_extent);
461
462 if (opTimer != null)
463 opTimer.stop("Split tablet in %DURATION%");
464 } finally {
465 ThriftUtil.returnClient(client);
466 }
467
468 } catch (TApplicationException tae) {
469 throw new AccumuloServerException(tl.tablet_location, tae);
470 } catch (TTransportException e) {
471 tabLocator.invalidateCache(tl.tablet_location);
472 continue;
473 } catch (ThriftSecurityException e) {
474 Tables.clearCache(instance);
475 if (!Tables.exists(instance, tableId))
476 throw new TableNotFoundException(tableId, tableName, null);
477 throw new AccumuloSecurityException(e.user, e.code, e);
478 } catch (NotServingTabletException e) {
479 tabLocator.invalidateCache(tl.tablet_extent);
480 continue;
481 } catch (TException e) {
482 tabLocator.invalidateCache(tl.tablet_location);
483 continue;
484 }
485
486 successful = true;
487 }
488 }
489 }
490
491 @Override
492 public void merge(String tableName, Text start, Text end) throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
493
494 ArgumentChecker.notNull(tableName);
495 ByteBuffer EMPTY = ByteBuffer.allocate(0);
496 List<ByteBuffer> args = Arrays.asList(ByteBuffer.wrap(tableName.getBytes()), start == null ? EMPTY : TextUtil.getByteBuffer(start), end == null ? EMPTY
497 : TextUtil.getByteBuffer(end));
498 Map<String,String> opts = new HashMap<String,String>();
499 try {
500 doTableOperation(TableOperation.MERGE, args, opts);
501 } catch (TableExistsException e) {
502
503 throw new RuntimeException(e);
504 }
505 }
506
507 @Override
508 public void deleteRows(String tableName, Text start, Text end) throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
509
510 ArgumentChecker.notNull(tableName);
511 ByteBuffer EMPTY = ByteBuffer.allocate(0);
512 List<ByteBuffer> args = Arrays.asList(ByteBuffer.wrap(tableName.getBytes()), start == null ? EMPTY : TextUtil.getByteBuffer(start), end == null ? EMPTY
513 : TextUtil.getByteBuffer(end));
514 Map<String,String> opts = new HashMap<String,String>();
515 try {
516 doTableOperation(TableOperation.DELETE_RANGE, args, opts);
517 } catch (TableExistsException e) {
518
519 throw new RuntimeException(e);
520 }
521 }
522
523 /**
524 * @param tableName
525 * the name of the table
526 * @return the split points (end-row names) for the table's current split profile
527 */
528 @Override
529 public Collection<Text> getSplits(String tableName) throws TableNotFoundException {
530
531 ArgumentChecker.notNull(tableName);
532
533 if (!exists(tableName)) {
534 throw new TableNotFoundException(null, tableName, "Unknown table for getSplits");
535 }
536
537 SortedSet<KeyExtent> tablets = new TreeSet<KeyExtent>();
538 Map<KeyExtent,String> locations = new TreeMap<KeyExtent,String>();
539
540 while (true) {
541 try {
542 tablets.clear();
543 locations.clear();
544 MetadataTable.getEntries(instance, credentials, tableName, false, locations, tablets);
545 break;
546 } catch (Throwable t) {
547 log.info(t.getMessage() + " ... retrying ...");
548 UtilWaitThread.sleep(3000);
549 }
550 }
551
552 ArrayList<Text> endRows = new ArrayList<Text>(tablets.size());
553
554 for (KeyExtent ke : tablets)
555 if (ke.getEndRow() != null)
556 endRows.add(ke.getEndRow());
557
558 return endRows;
559 }
560
561 /**
562 * @param tableName
563 * the name of the table
564 * @param maxSplits
565 * specifies the maximum number of splits to return
566 * @return the split points (end-row names) for the table's current split profile, grouped into fewer splits so as not to exceed maxSplits
567 * @throws TableNotFoundException
568 */
569 @Override
570 public Collection<Text> getSplits(String tableName, int maxSplits) throws TableNotFoundException {
571 Collection<Text> endRows = getSplits(tableName);
572
573 if (endRows.size() <= maxSplits)
574 return endRows;
575
576 double r = (maxSplits + 1) / (double) (endRows.size());
577 double pos = 0;
578
579 ArrayList<Text> subset = new ArrayList<Text>(maxSplits);
580
581 int j = 0;
582 for (int i = 0; i < endRows.size() && j < maxSplits; i++) {
583 pos += r;
584 while (pos > 1) {
585 subset.add(((ArrayList<Text>) endRows).get(i));
586 j++;
587 pos -= 1;
588 }
589 }
590
591 return subset;
592 }
593
594 /**
595 * Delete a table
596 *
597 * @param tableName
598 * the name of the table
599 * @throws AccumuloException
600 * if a general error occurs
601 * @throws AccumuloSecurityException
602 * if the user does not have permission
603 * @throws TableNotFoundException
604 * if the table does not exist
605 */
606 @Override
607 public void delete(String tableName) throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
608 ArgumentChecker.notNull(tableName);
609
610 List<ByteBuffer> args = Arrays.asList(ByteBuffer.wrap(tableName.getBytes()));
611 Map<String,String> opts = new HashMap<String,String>();
612
613 try {
614 doTableOperation(TableOperation.DELETE, args, opts);
615 } catch (TableExistsException e) {
616
617 throw new RuntimeException(e);
618 }
619
620 }
621
622 @Override
623 public void clone(String srcTableName, String newTableName, boolean flush, Map<String,String> propertiesToSet, Set<String> propertiesToExclude)
624 throws AccumuloSecurityException, TableNotFoundException, AccumuloException, TableExistsException {
625
626 ArgumentChecker.notNull(srcTableName, newTableName);
627
628 String srcTableId = Tables.getTableId(instance, srcTableName);
629
630 if (flush)
631 _flush(srcTableId, null, null, true);
632
633 if (propertiesToExclude == null)
634 propertiesToExclude = Collections.emptySet();
635
636 if (propertiesToSet == null)
637 propertiesToSet = Collections.emptyMap();
638
639 if (!Collections.disjoint(propertiesToExclude, propertiesToSet.keySet()))
640 throw new IllegalArgumentException("propertiesToSet and propertiesToExclude not disjoint");
641
642 List<ByteBuffer> args = Arrays.asList(ByteBuffer.wrap(srcTableId.getBytes()), ByteBuffer.wrap(newTableName.getBytes()));
643 Map<String,String> opts = new HashMap<String,String>();
644 opts.putAll(propertiesToSet);
645 for (String prop : propertiesToExclude)
646 opts.put(prop, null);
647
648 doTableOperation(TableOperation.CLONE, args, opts);
649 }
650
651 /**
652 * Rename a table
653 *
654 * @param oldTableName
655 * the old table name
656 * @param newTableName
657 * the new table name
658 * @throws AccumuloException
659 * if a general error occurs
660 * @throws AccumuloSecurityException
661 * if the user does not have permission
662 * @throws TableNotFoundException
663 * if the old table name does not exist
664 * @throws TableExistsException
665 * if the new table name already exists
666 */
667 @Override
668 public void rename(String oldTableName, String newTableName) throws AccumuloSecurityException, TableNotFoundException, AccumuloException,
669 TableExistsException {
670
671 List<ByteBuffer> args = Arrays.asList(ByteBuffer.wrap(oldTableName.getBytes()), ByteBuffer.wrap(newTableName.getBytes()));
672 Map<String,String> opts = new HashMap<String,String>();
673 doTableOperation(TableOperation.RENAME, args, opts);
674 }
675
676 /**
677 * @deprecated since 1.4 {@link #flush(String, Text, Text, boolean)}
678 */
679 @Override
680 @Deprecated
681 public void flush(String tableName) throws AccumuloException, AccumuloSecurityException {
682 try {
683 flush(tableName, null, null, false);
684 } catch (TableNotFoundException e) {
685 throw new AccumuloException(e.getMessage(), e);
686 }
687 }
688
689 /**
690 * Flush a table
691 *
692 * @param tableName
693 * the name of the table
694 * @throws AccumuloException
695 * if a general error occurs
696 * @throws AccumuloSecurityException
697 * if the user does not have permission
698 * @throws TableNotFoundException
699 */
700 @Override
701 public void flush(String tableName, Text start, Text end, boolean wait) throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
702 ArgumentChecker.notNull(tableName);
703
704 String tableId = Tables.getTableId(instance, tableName);
705 _flush(tableId, start, end, wait);
706 }
707
708 @Override
709 public void compact(String tableName, Text start, Text end, boolean flush, boolean wait) throws AccumuloSecurityException, TableNotFoundException,
710 AccumuloException {
711 compact(tableName, start, end, new ArrayList<IteratorSetting>(), flush, wait);
712 }
713
714 @Override
715 public void compact(String tableName, Text start, Text end, List<IteratorSetting> iterators, boolean flush, boolean wait) throws AccumuloSecurityException,
716 TableNotFoundException, AccumuloException {
717 ArgumentChecker.notNull(tableName);
718 ByteBuffer EMPTY = ByteBuffer.allocate(0);
719
720 String tableId = Tables.getTableId(instance, tableName);
721
722 if (flush)
723 _flush(tableId, start, end, true);
724
725 List<ByteBuffer> args = Arrays.asList(ByteBuffer.wrap(tableId.getBytes()), start == null ? EMPTY : TextUtil.getByteBuffer(start), end == null ? EMPTY
726 : TextUtil.getByteBuffer(end), ByteBuffer.wrap(IteratorUtil.encodeIteratorSettings(iterators)));
727
728 Map<String,String> opts = new HashMap<String,String>();
729 try {
730 doTableOperation(TableOperation.COMPACT, args, opts, wait);
731 } catch (TableExistsException e) {
732
733 throw new RuntimeException(e);
734 }
735 }
736
737 @Override
738 public void cancelCompaction(String tableName) throws AccumuloSecurityException, TableNotFoundException, AccumuloException {
739 String tableId = Tables.getTableId(instance, tableName);
740
741 List<ByteBuffer> args = Arrays.asList(ByteBuffer.wrap(tableId.getBytes()));
742
743 Map<String,String> opts = new HashMap<String,String>();
744 try {
745 doTableOperation(TableOperation.COMPACT_CANCEL, args, opts, true);
746 } catch (TableExistsException e) {
747
748 throw new RuntimeException(e);
749 }
750
751 }
752
753 private void _flush(String tableId, Text start, Text end, boolean wait) throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
754
755 try {
756 long flushID;
757
758
759
760
761 while (true) {
762 MasterClientService.Iface client = null;
763 try {
764 client = MasterClient.getConnectionWithRetry(instance);
765 flushID = client.initiateFlush(Tracer.traceInfo(), credentials, tableId);
766 break;
767 } catch (TTransportException tte) {
768 log.debug("Failed to call initiateFlush, retrying ... ", tte);
769 UtilWaitThread.sleep(100);
770 } finally {
771 MasterClient.close(client);
772 }
773 }
774
775 while (true) {
776 MasterClientService.Iface client = null;
777 try {
778 client = MasterClient.getConnectionWithRetry(instance);
779 client.waitForFlush(Tracer.traceInfo(), credentials, tableId, TextUtil.getByteBuffer(start), TextUtil.getByteBuffer(end), flushID,
780 wait ? Long.MAX_VALUE : 1);
781 break;
782 } catch (TTransportException tte) {
783 log.debug("Failed to call initiateFlush, retrying ... ", tte);
784 UtilWaitThread.sleep(100);
785 } finally {
786 MasterClient.close(client);
787 }
788 }
789 } catch (ThriftSecurityException e) {
790 log.debug("flush security exception on table id " + tableId);
791 throw new AccumuloSecurityException(e.user, e.code, e);
792 } catch (ThriftTableOperationException e) {
793 switch (e.getType()) {
794 case NOTFOUND:
795 throw new TableNotFoundException(e);
796 case OTHER:
797 default:
798 throw new AccumuloException(e.description, e);
799 }
800 } catch (Exception e) {
801 throw new AccumuloException(e);
802 }
803 }
804
805 /**
806 * Sets a property on a table
807 *
808 * @param tableName
809 * the name of the table
810 * @param property
811 * the name of a per-table property
812 * @param value
813 * the value to set a per-table property to
814 * @throws AccumuloException
815 * if a general error occurs
816 * @throws AccumuloSecurityException
817 * if the user does not have permission
818 */
819 @Override
820 public void setProperty(final String tableName, final String property, final String value) throws AccumuloException, AccumuloSecurityException {
821 ArgumentChecker.notNull(tableName, property, value);
822 MasterClient.execute(instance, new ClientExec<MasterClientService.Client>() {
823 @Override
824 public void execute(MasterClientService.Client client) throws Exception {
825 client.setTableProperty(Tracer.traceInfo(), credentials, tableName, property, value);
826 }
827 });
828 }
829
830 /**
831 * Removes a property from a table
832 *
833 * @param tableName
834 * the name of the table
835 * @param property
836 * the name of a per-table property
837 * @throws AccumuloException
838 * if a general error occurs
839 * @throws AccumuloSecurityException
840 * if the user does not have permission
841 */
842 @Override
843 public void removeProperty(final String tableName, final String property) throws AccumuloException, AccumuloSecurityException {
844 ArgumentChecker.notNull(tableName, property);
845 MasterClient.execute(instance, new ClientExec<MasterClientService.Client>() {
846 @Override
847 public void execute(MasterClientService.Client client) throws Exception {
848 client.removeTableProperty(Tracer.traceInfo(), credentials, tableName, property);
849 }
850 });
851 }
852
853 /**
854 * Gets properties of a table
855 *
856 * @param tableName
857 * the name of the table
858 * @return all properties visible by this table (system and per-table properties)
859 * @throws TableNotFoundException
860 * if the table does not exist
861 */
862 @Override
863 public Iterable<Entry<String,String>> getProperties(final String tableName) throws AccumuloException, TableNotFoundException {
864 ArgumentChecker.notNull(tableName);
865 try {
866 return ServerClient.executeRaw(instance, new ClientExecReturn<Map<String,String>,ClientService.Client>() {
867 @Override
868 public Map<String,String> execute(ClientService.Client client) throws Exception {
869 return client.getTableConfiguration(tableName);
870 }
871 }).entrySet();
872 } catch (ThriftTableOperationException e) {
873 switch (e.getType()) {
874 case NOTFOUND:
875 throw new TableNotFoundException(e);
876 case OTHER:
877 default:
878 throw new AccumuloException(e.description, e);
879 }
880 } catch (AccumuloException e) {
881 throw e;
882 } catch (Exception e) {
883 throw new AccumuloException(e);
884 }
885
886 }
887
888 /**
889 * Sets a tables locality groups. A tables locality groups can be changed at any time.
890 *
891 * @param tableName
892 * the name of the table
893 * @param groups
894 * mapping of locality group names to column families in the locality group
895 * @throws AccumuloException
896 * if a general error occurs
897 * @throws AccumuloSecurityException
898 * if the user does not have permission
899 * @throws TableNotFoundException
900 * if the table does not exist
901 */
902 @Override
903 public void setLocalityGroups(String tableName, Map<String,Set<Text>> groups) throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
904
905 HashSet<Text> all = new HashSet<Text>();
906 for (Entry<String,Set<Text>> entry : groups.entrySet()) {
907
908 if (!Collections.disjoint(all, entry.getValue())) {
909 throw new IllegalArgumentException("Group " + entry.getKey() + " overlaps with another group");
910 }
911
912 all.addAll(entry.getValue());
913 }
914
915 for (Entry<String,Set<Text>> entry : groups.entrySet()) {
916 Set<Text> colFams = entry.getValue();
917 String value = LocalityGroupUtil.encodeColumnFamilies(colFams);
918 setProperty(tableName, Property.TABLE_LOCALITY_GROUP_PREFIX + entry.getKey(), value);
919 }
920
921 setProperty(tableName, Property.TABLE_LOCALITY_GROUPS.getKey(), StringUtil.join(groups.keySet(), ","));
922
923
924 String prefix = Property.TABLE_LOCALITY_GROUP_PREFIX.getKey();
925 for (Entry<String,String> entry : getProperties(tableName)) {
926 String property = entry.getKey();
927 if (property.startsWith(prefix)) {
928
929
930 String[] parts = property.split("\\.");
931 String group = parts[parts.length - 1];
932
933 if (!groups.containsKey(group)) {
934 removeProperty(tableName, property);
935 }
936 }
937 }
938 }
939
940 /**
941 *
942 * Gets the locality groups currently set for a table.
943 *
944 * @param tableName
945 * the name of the table
946 * @return mapping of locality group names to column families in the locality group
947 * @throws AccumuloException
948 * if a general error occurs
949 * @throws TableNotFoundException
950 * if the table does not exist
951 */
952 @Override
953 public Map<String,Set<Text>> getLocalityGroups(String tableName) throws AccumuloException, TableNotFoundException {
954 AccumuloConfiguration conf = new ConfigurationCopy(this.getProperties(tableName));
955 Map<String,Set<ByteSequence>> groups = LocalityGroupUtil.getLocalityGroups(conf);
956
957 Map<String,Set<Text>> groups2 = new HashMap<String,Set<Text>>();
958 for (Entry<String,Set<ByteSequence>> entry : groups.entrySet()) {
959
960 HashSet<Text> colFams = new HashSet<Text>();
961
962 for (ByteSequence bs : entry.getValue()) {
963 colFams.add(new Text(bs.toArray()));
964 }
965
966 groups2.put(entry.getKey(), colFams);
967 }
968
969 return groups2;
970 }
971
972 /**
973 * @param tableName
974 * the name of the table
975 * @param range
976 * a range to split
977 * @param maxSplits
978 * the maximum number of splits
979 * @return the range, split into smaller ranges that fall on boundaries of the table's split points as evenly as possible
980 * @throws AccumuloException
981 * if a general error occurs
982 * @throws AccumuloSecurityException
983 * if the user does not have permission
984 * @throws TableNotFoundException
985 * if the table does not exist
986 */
987 @Override
988 public Set<Range> splitRangeByTablets(String tableName, Range range, int maxSplits) throws AccumuloException, AccumuloSecurityException,
989 TableNotFoundException {
990 ArgumentChecker.notNull(tableName, range);
991 if (maxSplits < 1)
992 throw new IllegalArgumentException("maximum splits must be >= 1");
993 if (maxSplits == 1)
994 return Collections.singleton(range);
995
996 Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>();
997 String tableId = Tables.getTableId(instance, tableName);
998 TabletLocator tl = TabletLocator.getInstance(instance, credentials, new Text(tableId));
999
1000 tl.invalidateCache();
1001 while (!tl.binRanges(Collections.singletonList(range), binnedRanges).isEmpty()) {
1002 if (!Tables.exists(instance, tableId))
1003 throw new TableDeletedException(tableId);
1004 if (Tables.getTableState(instance, tableId) == TableState.OFFLINE)
1005 throw new TableOfflineException(instance, tableId);
1006
1007 log.warn("Unable to locate bins for specified range. Retrying.");
1008
1009 UtilWaitThread.sleep(100 + (int) (Math.random() * 100));
1010 binnedRanges.clear();
1011 tl.invalidateCache();
1012 }
1013
1014
1015 LinkedList<KeyExtent> unmergedExtents = new LinkedList<KeyExtent>();
1016 List<KeyExtent> mergedExtents = new ArrayList<KeyExtent>();
1017
1018 for (Map<KeyExtent,List<Range>> map : binnedRanges.values())
1019 unmergedExtents.addAll(map.keySet());
1020
1021
1022 Collections.sort(unmergedExtents);
1023
1024 while (unmergedExtents.size() + mergedExtents.size() > maxSplits) {
1025 if (unmergedExtents.size() >= 2) {
1026 KeyExtent first = unmergedExtents.removeFirst();
1027 KeyExtent second = unmergedExtents.removeFirst();
1028 first.setEndRow(second.getEndRow());
1029 mergedExtents.add(first);
1030 } else {
1031 mergedExtents.addAll(unmergedExtents);
1032 unmergedExtents.clear();
1033 unmergedExtents.addAll(mergedExtents);
1034 mergedExtents.clear();
1035 }
1036
1037 }
1038
1039 mergedExtents.addAll(unmergedExtents);
1040
1041 Set<Range> ranges = new HashSet<Range>();
1042 for (KeyExtent k : mergedExtents)
1043 ranges.add(k.toDataRange().clip(range));
1044
1045 return ranges;
1046 }
1047
1048 @Override
1049 public void importDirectory(String tableName, String dir, String failureDir, boolean setTime) throws IOException, AccumuloSecurityException,
1050 TableNotFoundException, AccumuloException {
1051 ArgumentChecker.notNull(tableName, dir, failureDir);
1052 FileSystem fs = FileUtil.getFileSystem(CachedConfiguration.getInstance(), instance.getConfiguration());
1053 Path failPath = fs.makeQualified(new Path(failureDir));
1054 if (!fs.exists(new Path(dir)))
1055 throw new AccumuloException("Bulk import directory " + dir + " does not exist!");
1056 if (!fs.exists(failPath))
1057 throw new AccumuloException("Bulk import failure directory " + failureDir + " does not exist!");
1058 FileStatus[] listStatus = fs.listStatus(failPath);
1059 if (listStatus != null && listStatus.length != 0) {
1060 if (listStatus.length == 1 && listStatus[0].isDir())
1061 throw new AccumuloException("Bulk import directory " + failPath + " is a file");
1062 throw new AccumuloException("Bulk import failure directory " + failPath + " is not empty");
1063 }
1064
1065 List<ByteBuffer> args = Arrays.asList(ByteBuffer.wrap(tableName.getBytes()), ByteBuffer.wrap(dir.getBytes()), ByteBuffer.wrap(failureDir.getBytes()),
1066 ByteBuffer.wrap((setTime + "").getBytes()));
1067 Map<String,String> opts = new HashMap<String,String>();
1068
1069 try {
1070 doTableOperation(TableOperation.BULK_IMPORT, args, opts);
1071 } catch (TableExistsException e) {
1072
1073 throw new RuntimeException(e);
1074 }
1075
1076
1077 }
1078
1079 /**
1080 *
1081 * @param tableName
1082 * the table to take offline
1083 * @throws AccumuloException
1084 * when there is a general accumulo error
1085 * @throws AccumuloSecurityException
1086 * when the user does not have the proper permissions
1087 * @throws TableNotFoundException
1088 */
1089 @Override
1090 public void offline(String tableName) throws AccumuloSecurityException, AccumuloException, TableNotFoundException {
1091
1092 ArgumentChecker.notNull(tableName);
1093 List<ByteBuffer> args = Arrays.asList(ByteBuffer.wrap(tableName.getBytes()));
1094 Map<String,String> opts = new HashMap<String,String>();
1095
1096 try {
1097 doTableOperation(TableOperation.OFFLINE, args, opts);
1098 } catch (TableExistsException e) {
1099
1100 throw new RuntimeException(e);
1101 }
1102 }
1103
1104 /**
1105 *
1106 * @param tableName
1107 * the table to take online
1108 * @throws AccumuloException
1109 * when there is a general accumulo error
1110 * @throws AccumuloSecurityException
1111 * when the user does not have the proper permissions
1112 * @throws TableNotFoundException
1113 */
1114 @Override
1115 public void online(String tableName) throws AccumuloSecurityException, AccumuloException, TableNotFoundException {
1116 ArgumentChecker.notNull(tableName);
1117 List<ByteBuffer> args = Arrays.asList(ByteBuffer.wrap(tableName.getBytes()));
1118 Map<String,String> opts = new HashMap<String,String>();
1119
1120 try {
1121 doTableOperation(TableOperation.ONLINE, args, opts);
1122 } catch (TableExistsException e) {
1123
1124 throw new RuntimeException(e);
1125 }
1126 }
1127
1128 /**
1129 * Clears the tablet locator cache for a specified table
1130 *
1131 * @param tableName
1132 * the name of the table
1133 * @throws TableNotFoundException
1134 * if table does not exist
1135 */
1136 @Override
1137 public void clearLocatorCache(String tableName) throws TableNotFoundException {
1138 ArgumentChecker.notNull(tableName);
1139 TabletLocator tabLocator = TabletLocator.getInstance(instance, credentials, new Text(Tables.getTableId(instance, tableName)));
1140 tabLocator.invalidateCache();
1141 }
1142
1143 /**
1144 * Get a mapping of table name to internal table id.
1145 *
1146 * @return the map from table name to internal table id
1147 */
1148 @Override
1149 public Map<String,String> tableIdMap() {
1150 return Tables.getNameToIdMap(instance);
1151 }
1152
1153 @Override
1154 public Text getMaxRow(String tableName, Authorizations auths, Text startRow, boolean startInclusive, Text endRow, boolean endInclusive)
1155 throws TableNotFoundException, AccumuloException, AccumuloSecurityException {
1156 ArgumentChecker.notNull(tableName, auths);
1157 Scanner scanner = instance.getConnector(credentials.getPrincipal(), CredentialHelper.extractToken(credentials)).createScanner(tableName, auths);
1158 return FindMax.findMax(scanner, startRow, startInclusive, endRow, endInclusive);
1159 }
1160
1161 public static Map<String,String> getExportedProps(FileSystem fs, Path path) throws IOException {
1162 HashMap<String,String> props = new HashMap<String,String>();
1163
1164 ZipInputStream zis = new ZipInputStream(fs.open(path));
1165 try {
1166 ZipEntry zipEntry;
1167 while ((zipEntry = zis.getNextEntry()) != null) {
1168 if (zipEntry.getName().equals(Constants.EXPORT_TABLE_CONFIG_FILE)) {
1169 BufferedReader in = new BufferedReader(new InputStreamReader(zis));
1170 String line;
1171 while ((line = in.readLine()) != null) {
1172 String sa[] = line.split("=", 2);
1173 props.put(sa[0], sa[1]);
1174 }
1175
1176 break;
1177 }
1178 }
1179 } finally {
1180 zis.close();
1181 }
1182 return props;
1183 }
1184
1185 @Override
1186 public void importTable(String tableName, String importDir) throws TableExistsException, AccumuloException, AccumuloSecurityException {
1187 ArgumentChecker.notNull(tableName, importDir);
1188
1189 try {
1190 FileSystem fs = FileUtil.getFileSystem(CachedConfiguration.getInstance(), instance.getConfiguration());
1191 ;
1192 Map<String,String> props = getExportedProps(fs, new Path(importDir, Constants.EXPORT_FILE));
1193
1194 for (String propKey : props.keySet()) {
1195 if (Property.isClassProperty(propKey) && !props.get(propKey).contains(Constants.CORE_PACKAGE_NAME)) {
1196 Logger.getLogger(this.getClass()).info(
1197 "Imported table sets '" + propKey + "' to '" + props.get(propKey) + "'. Ensure this class is on Accumulo classpath.");
1198 }
1199 }
1200
1201 } catch (IOException ioe) {
1202 Logger.getLogger(this.getClass()).warn("Failed to check if imported table references external java classes : " + ioe.getMessage());
1203 }
1204
1205 List<ByteBuffer> args = Arrays.asList(ByteBuffer.wrap(tableName.getBytes()), ByteBuffer.wrap(importDir.getBytes()));
1206
1207 Map<String,String> opts = Collections.emptyMap();
1208
1209 try {
1210 doTableOperation(TableOperation.IMPORT, args, opts);
1211 } catch (TableNotFoundException e1) {
1212
1213 throw new RuntimeException(e1);
1214 }
1215
1216 }
1217
1218 @Override
1219 public void exportTable(String tableName, String exportDir) throws TableNotFoundException, AccumuloException, AccumuloSecurityException {
1220 ArgumentChecker.notNull(tableName, exportDir);
1221
1222 List<ByteBuffer> args = Arrays.asList(ByteBuffer.wrap(tableName.getBytes()), ByteBuffer.wrap(exportDir.getBytes()));
1223
1224 Map<String,String> opts = Collections.emptyMap();
1225
1226 try {
1227 doTableOperation(TableOperation.EXPORT, args, opts);
1228 } catch (TableExistsException e1) {
1229
1230 throw new RuntimeException(e1);
1231 }
1232 }
1233 }