View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  package org.apache.accumulo.core.client.admin;
18  
19  import java.io.BufferedReader;
20  import java.io.IOException;
21  import java.io.InputStreamReader;
22  import java.nio.ByteBuffer;
23  import java.util.ArrayList;
24  import java.util.Arrays;
25  import java.util.Collection;
26  import java.util.Collections;
27  import java.util.HashMap;
28  import java.util.HashSet;
29  import java.util.LinkedList;
30  import java.util.List;
31  import java.util.Map;
32  import java.util.Map.Entry;
33  import java.util.Set;
34  import java.util.SortedSet;
35  import java.util.TreeMap;
36  import java.util.TreeSet;
37  import java.util.concurrent.CountDownLatch;
38  import java.util.concurrent.ExecutorService;
39  import java.util.concurrent.Executors;
40  import java.util.concurrent.TimeUnit;
41  import java.util.concurrent.atomic.AtomicReference;
42  import java.util.zip.ZipEntry;
43  import java.util.zip.ZipInputStream;
44  
45  import org.apache.accumulo.core.Constants;
46  import org.apache.accumulo.core.client.AccumuloException;
47  import org.apache.accumulo.core.client.AccumuloSecurityException;
48  import org.apache.accumulo.core.client.Instance;
49  import org.apache.accumulo.core.client.IteratorSetting;
50  import org.apache.accumulo.core.client.Scanner;
51  import org.apache.accumulo.core.client.TableDeletedException;
52  import org.apache.accumulo.core.client.TableExistsException;
53  import org.apache.accumulo.core.client.TableNotFoundException;
54  import org.apache.accumulo.core.client.TableOfflineException;
55  import org.apache.accumulo.core.client.impl.AccumuloServerException;
56  import org.apache.accumulo.core.client.impl.ClientExec;
57  import org.apache.accumulo.core.client.impl.ClientExecReturn;
58  import org.apache.accumulo.core.client.impl.MasterClient;
59  import org.apache.accumulo.core.client.impl.ServerClient;
60  import org.apache.accumulo.core.client.impl.Tables;
61  import org.apache.accumulo.core.client.impl.TabletLocator;
62  import org.apache.accumulo.core.client.impl.TabletLocator.TabletLocation;
63  import org.apache.accumulo.core.client.impl.thrift.ClientService;
64  import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException;
65  import org.apache.accumulo.core.conf.AccumuloConfiguration;
66  import org.apache.accumulo.core.conf.ConfigurationCopy;
67  import org.apache.accumulo.core.conf.Property;
68  import org.apache.accumulo.core.data.ByteSequence;
69  import org.apache.accumulo.core.data.KeyExtent;
70  import org.apache.accumulo.core.data.Range;
71  import org.apache.accumulo.core.file.FileUtil;
72  import org.apache.accumulo.core.iterators.IteratorUtil;
73  import org.apache.accumulo.core.master.state.tables.TableState;
74  import org.apache.accumulo.core.master.thrift.MasterClientService;
75  import org.apache.accumulo.core.master.thrift.TableOperation;
76  import org.apache.accumulo.core.security.Authorizations;
77  import org.apache.accumulo.core.security.CredentialHelper;
78  import org.apache.accumulo.core.security.thrift.Credential;
79  import org.apache.accumulo.core.security.thrift.ThriftSecurityException;
80  import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException;
81  import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
82  import org.apache.accumulo.core.util.ArgumentChecker;
83  import org.apache.accumulo.core.util.CachedConfiguration;
84  import org.apache.accumulo.core.util.LocalityGroupUtil;
85  import org.apache.accumulo.core.util.MetadataTable;
86  import org.apache.accumulo.core.util.NamingThreadFactory;
87  import org.apache.accumulo.core.util.OpTimer;
88  import org.apache.accumulo.core.util.StringUtil;
89  import org.apache.accumulo.core.util.TextUtil;
90  import org.apache.accumulo.core.util.ThriftUtil;
91  import org.apache.accumulo.core.util.UtilWaitThread;
92  import org.apache.accumulo.trace.instrument.Tracer;
93  import org.apache.hadoop.fs.FileStatus;
94  import org.apache.hadoop.fs.FileSystem;
95  import org.apache.hadoop.fs.Path;
96  import org.apache.hadoop.io.Text;
97  import org.apache.log4j.Level;
98  import org.apache.log4j.Logger;
99  import org.apache.thrift.TApplicationException;
100 import org.apache.thrift.TException;
101 import org.apache.thrift.transport.TTransportException;
102 
103 /**
104  * Provides a class for administering tables
105  * 
106  */
107 public class TableOperationsImpl extends TableOperationsHelper {
108   private Instance instance;
109   private Credential credentials;
110   
111   private static final Logger log = Logger.getLogger(TableOperations.class);
112   
113   /**
114    * @param instance
115    *          the connection information for this instance
116    * @param credentials
117    *          the username/password for this connection
118    */
119   public TableOperationsImpl(Instance instance, Credential credentials) {
120     ArgumentChecker.notNull(instance, credentials);
121     this.instance = instance;
122     this.credentials = credentials;
123   }
124   
125   /**
126    * Retrieve a list of tables in Accumulo.
127    * 
128    * @return List of tables in accumulo
129    */
130   @Override
131   public SortedSet<String> list() {
132     OpTimer opTimer = new OpTimer(log, Level.TRACE).start("Fetching list of tables...");
133     TreeSet<String> tableNames = new TreeSet<String>(Tables.getNameToIdMap(instance).keySet());
134     opTimer.stop("Fetched " + tableNames.size() + " table names in %DURATION%");
135     return tableNames;
136   }
137   
138   /**
139    * A method to check if a table exists in Accumulo.
140    * 
141    * @param tableName
142    *          the name of the table
143    * @return true if the table exists
144    */
145   @Override
146   public boolean exists(String tableName) {
147     ArgumentChecker.notNull(tableName);
148     if (tableName.equals(Constants.METADATA_TABLE_NAME))
149       return true;
150     
151     OpTimer opTimer = new OpTimer(log, Level.TRACE).start("Checking if table " + tableName + "exists...");
152     boolean exists = Tables.getNameToIdMap(instance).containsKey(tableName);
153     opTimer.stop("Checked existance of " + exists + " in %DURATION%");
154     return exists;
155   }
156   
157   /**
158    * Create a table with no special configuration
159    * 
160    * @param tableName
161    *          the name of the table
162    * @throws AccumuloException
163    *           if a general error occurs
164    * @throws AccumuloSecurityException
165    *           if the user does not have permission
166    * @throws TableExistsException
167    *           if the table already exists
168    */
169   @Override
170   public void create(String tableName) throws AccumuloException, AccumuloSecurityException, TableExistsException {
171     create(tableName, true, TimeType.MILLIS);
172   }
173   
174   /**
175    * @param tableName
176    *          the name of the table
177    * @param limitVersion
178    *          Enables/disables the versioning iterator, which will limit the number of Key versions kept.
179    */
180   @Override
181   public void create(String tableName, boolean limitVersion) throws AccumuloException, AccumuloSecurityException, TableExistsException {
182     create(tableName, limitVersion, TimeType.MILLIS);
183   }
184   
185   /**
186    * @param tableName
187    *          the name of the table
188    * @param timeType
189    *          specifies logical or real-time based time recording for entries in the table
190    * @param limitVersion
191    *          Enables/disables the versioning iterator, which will limit the number of Key versions kept.
192    */
193   @Override
194   public void create(String tableName, boolean limitVersion, TimeType timeType) throws AccumuloException, AccumuloSecurityException, TableExistsException {
195     ArgumentChecker.notNull(tableName, timeType);
196     
197     List<ByteBuffer> args = Arrays.asList(ByteBuffer.wrap(tableName.getBytes()), ByteBuffer.wrap(timeType.name().getBytes()));
198     
199     Map<String,String> opts = IteratorUtil.generateInitialTableProperties(limitVersion);
200     
201     try {
202       doTableOperation(TableOperation.CREATE, args, opts);
203     } catch (TableNotFoundException e1) {
204       // should not happen
205       throw new RuntimeException(e1);
206     }
207   }
208   
209   private long beginTableOperation() throws ThriftSecurityException, TException {
210     while (true) {
211       MasterClientService.Iface client = null;
212       try {
213         client = MasterClient.getConnectionWithRetry(instance);
214         return client.beginTableOperation(Tracer.traceInfo(), credentials);
215       } catch (TTransportException tte) {
216         log.debug("Failed to call beginTableOperation(), retrying ... ", tte);
217         UtilWaitThread.sleep(100);
218       } finally {
219         MasterClient.close(client);
220       }
221     }
222   }
223   
224   private void executeTableOperation(long opid, TableOperation op, List<ByteBuffer> args, Map<String,String> opts, boolean autoCleanUp)
225       throws ThriftSecurityException, TException, ThriftTableOperationException {
226     while (true) {
227       MasterClientService.Iface client = null;
228       try {
229         client = MasterClient.getConnectionWithRetry(instance);
230         client.executeTableOperation(Tracer.traceInfo(), credentials, opid, op, args, opts, autoCleanUp);
231         break;
232       } catch (TTransportException tte) {
233         log.debug("Failed to call executeTableOperation(), retrying ... ", tte);
234         UtilWaitThread.sleep(100);
235       } finally {
236         MasterClient.close(client);
237       }
238     }
239   }
240   
241   private String waitForTableOperation(long opid) throws ThriftSecurityException, TException, ThriftTableOperationException {
242     while (true) {
243       MasterClientService.Iface client = null;
244       try {
245         client = MasterClient.getConnectionWithRetry(instance);
246         return client.waitForTableOperation(Tracer.traceInfo(), credentials, opid);
247       } catch (TTransportException tte) {
248         log.debug("Failed to call waitForTableOperation(), retrying ... ", tte);
249         UtilWaitThread.sleep(100);
250       } finally {
251         MasterClient.close(client);
252       }
253     }
254   }
255   
256   private void finishTableOperation(long opid) throws ThriftSecurityException, TException {
257     while (true) {
258       MasterClientService.Iface client = null;
259       try {
260         client = MasterClient.getConnectionWithRetry(instance);
261         client.finishTableOperation(Tracer.traceInfo(), credentials, opid);
262         break;
263       } catch (TTransportException tte) {
264         log.debug("Failed to call finishTableOperation(), retrying ... ", tte);
265         UtilWaitThread.sleep(100);
266       } finally {
267         MasterClient.close(client);
268       }
269     }
270   }
271   
272   private String doTableOperation(TableOperation op, List<ByteBuffer> args, Map<String,String> opts) throws AccumuloSecurityException, TableExistsException,
273       TableNotFoundException, AccumuloException {
274     return doTableOperation(op, args, opts, true);
275   }
276   
277   private String doTableOperation(TableOperation op, List<ByteBuffer> args, Map<String,String> opts, boolean wait) throws AccumuloSecurityException,
278       TableExistsException, TableNotFoundException, AccumuloException {
279     Long opid = null;
280     
281     try {
282       opid = beginTableOperation();
283       executeTableOperation(opid, op, args, opts, !wait);
284       if (!wait) {
285         opid = null;
286         return null;
287       }
288       String ret = waitForTableOperation(opid);
289       Tables.clearCache(instance);
290       return ret;
291     } catch (ThriftSecurityException e) {
292       throw new AccumuloSecurityException(e.user, e.code, e);
293     } catch (ThriftTableOperationException e) {
294       switch (e.getType()) {
295         case EXISTS:
296           throw new TableExistsException(e);
297         case NOTFOUND:
298           throw new TableNotFoundException(e);
299         case OFFLINE:
300           throw new TableOfflineException(instance, null);
301         case OTHER:
302         default:
303           throw new AccumuloException(e.description, e);
304       }
305     } catch (Exception e) {
306       throw new AccumuloException(e.getMessage(), e);
307     } finally {
308       // always finish table op, even when exception
309       if (opid != null)
310         try {
311           finishTableOperation(opid);
312         } catch (Exception e) {
313           log.warn(e.getMessage(), e);
314         }
315     }
316   }
317   
318   private static class SplitEnv {
319     private String tableName;
320     private String tableId;
321     private ExecutorService executor;
322     private CountDownLatch latch;
323     private AtomicReference<Exception> exception;
324     
325     SplitEnv(String tableName, String tableId, ExecutorService executor, CountDownLatch latch, AtomicReference<Exception> exception) {
326       this.tableName = tableName;
327       this.tableId = tableId;
328       this.executor = executor;
329       this.latch = latch;
330       this.exception = exception;
331     }
332   }
333   
334   private class SplitTask implements Runnable {
335     
336     private List<Text> splits;
337     private SplitEnv env;
338     
339     SplitTask(SplitEnv env, List<Text> splits) {
340       this.env = env;
341       this.splits = splits;
342     }
343     
344     @Override
345     public void run() {
346       try {
347         if (env.exception.get() != null)
348           return;
349         
350         if (splits.size() <= 2) {
351           addSplits(env.tableName, new TreeSet<Text>(splits), env.tableId);
352           for (int i = 0; i < splits.size(); i++)
353             env.latch.countDown();
354           return;
355         }
356         
357         int mid = splits.size() / 2;
358         
359         // split the middle split point to ensure that child task split different tablets and can therefore
360         // run in parallel
361         addSplits(env.tableName, new TreeSet<Text>(splits.subList(mid, mid + 1)), env.tableId);
362         env.latch.countDown();
363         
364         env.executor.submit(new SplitTask(env, splits.subList(0, mid)));
365         env.executor.submit(new SplitTask(env, splits.subList(mid + 1, splits.size())));
366         
367       } catch (Exception e) {
368         env.exception.compareAndSet(null, e);
369       }
370     }
371     
372   }
373   
374   /**
375    * @param tableName
376    *          the name of the table
377    * @param partitionKeys
378    *          a sorted set of row key values to pre-split the table on
379    * @throws AccumuloException
380    *           if a general error occurs
381    * @throws AccumuloSecurityException
382    *           if the user does not have permission
383    * @throws TableNotFoundException
384    *           if the table does not exist
385    */
386   @Override
387   public void addSplits(String tableName, SortedSet<Text> partitionKeys) throws TableNotFoundException, AccumuloException, AccumuloSecurityException {
388     String tableId = Tables.getTableId(instance, tableName);
389     
390     List<Text> splits = new ArrayList<Text>(partitionKeys);
391     // should be sorted because we copied from a sorted set, but that makes assumptions about
392     // how the copy was done so resort to be sure.
393     Collections.sort(splits);
394     
395     CountDownLatch latch = new CountDownLatch(splits.size());
396     AtomicReference<Exception> exception = new AtomicReference<Exception>(null);
397     
398     ExecutorService executor = Executors.newFixedThreadPool(16, new NamingThreadFactory("addSplits"));
399     try {
400       executor.submit(new SplitTask(new SplitEnv(tableName, tableId, executor, latch, exception), splits));
401       
402       while (!latch.await(100, TimeUnit.MILLISECONDS)) {
403         if (exception.get() != null) {
404           executor.shutdownNow();
405           Exception excep = exception.get();
406           if (excep instanceof TableNotFoundException)
407             throw (TableNotFoundException) excep;
408           else if (excep instanceof AccumuloException)
409             throw (AccumuloException) excep;
410           else if (excep instanceof AccumuloSecurityException)
411             throw (AccumuloSecurityException) excep;
412           else if (excep instanceof RuntimeException)
413             throw (RuntimeException) excep;
414           else
415             throw new RuntimeException(excep);
416         }
417       }
418     } catch (InterruptedException e) {
419       throw new RuntimeException(e);
420     } finally {
421       executor.shutdown();
422     }
423   }
424   
425   private void addSplits(String tableName, SortedSet<Text> partitionKeys, String tableId) throws AccumuloException, AccumuloSecurityException,
426       TableNotFoundException, AccumuloServerException {
427     TabletLocator tabLocator = TabletLocator.getInstance(instance, credentials, new Text(tableId));
428     
429     for (Text split : partitionKeys) {
430       boolean successful = false;
431       int attempt = 0;
432       
433       while (!successful) {
434         
435         if (attempt > 0)
436           UtilWaitThread.sleep(100);
437         
438         attempt++;
439         
440         TabletLocation tl = tabLocator.locateTablet(split, false, false);
441         
442         if (tl == null) {
443           if (!Tables.exists(instance, tableId))
444             throw new TableNotFoundException(tableId, tableName, null);
445           else if (Tables.getTableState(instance, tableId) == TableState.OFFLINE)
446             throw new TableOfflineException(instance, tableId);
447           continue;
448         }
449         
450         try {
451           TabletClientService.Client client = ThriftUtil.getTServerClient(tl.tablet_location, instance.getConfiguration());
452           try {
453             OpTimer opTimer = null;
454             if (log.isTraceEnabled())
455               opTimer = new OpTimer(log, Level.TRACE).start("Splitting tablet " + tl.tablet_extent + " on " + tl.tablet_location + " at " + split);
456             
457             client.splitTablet(Tracer.traceInfo(), credentials, tl.tablet_extent.toThrift(), TextUtil.getByteBuffer(split));
458             
459             // just split it, might as well invalidate it in the cache
460             tabLocator.invalidateCache(tl.tablet_extent);
461             
462             if (opTimer != null)
463               opTimer.stop("Split tablet in %DURATION%");
464           } finally {
465             ThriftUtil.returnClient(client);
466           }
467           
468         } catch (TApplicationException tae) {
469           throw new AccumuloServerException(tl.tablet_location, tae);
470         } catch (TTransportException e) {
471           tabLocator.invalidateCache(tl.tablet_location);
472           continue;
473         } catch (ThriftSecurityException e) {
474           Tables.clearCache(instance);
475           if (!Tables.exists(instance, tableId))
476             throw new TableNotFoundException(tableId, tableName, null);
477           throw new AccumuloSecurityException(e.user, e.code, e);
478         } catch (NotServingTabletException e) {
479           tabLocator.invalidateCache(tl.tablet_extent);
480           continue;
481         } catch (TException e) {
482           tabLocator.invalidateCache(tl.tablet_location);
483           continue;
484         }
485         
486         successful = true;
487       }
488     }
489   }
490   
491   @Override
492   public void merge(String tableName, Text start, Text end) throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
493     
494     ArgumentChecker.notNull(tableName);
495     ByteBuffer EMPTY = ByteBuffer.allocate(0);
496     List<ByteBuffer> args = Arrays.asList(ByteBuffer.wrap(tableName.getBytes()), start == null ? EMPTY : TextUtil.getByteBuffer(start), end == null ? EMPTY
497         : TextUtil.getByteBuffer(end));
498     Map<String,String> opts = new HashMap<String,String>();
499     try {
500       doTableOperation(TableOperation.MERGE, args, opts);
501     } catch (TableExistsException e) {
502       // should not happen
503       throw new RuntimeException(e);
504     }
505   }
506   
507   @Override
508   public void deleteRows(String tableName, Text start, Text end) throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
509     
510     ArgumentChecker.notNull(tableName);
511     ByteBuffer EMPTY = ByteBuffer.allocate(0);
512     List<ByteBuffer> args = Arrays.asList(ByteBuffer.wrap(tableName.getBytes()), start == null ? EMPTY : TextUtil.getByteBuffer(start), end == null ? EMPTY
513         : TextUtil.getByteBuffer(end));
514     Map<String,String> opts = new HashMap<String,String>();
515     try {
516       doTableOperation(TableOperation.DELETE_RANGE, args, opts);
517     } catch (TableExistsException e) {
518       // should not happen
519       throw new RuntimeException(e);
520     }
521   }
522   
523   /**
524    * @param tableName
525    *          the name of the table
526    * @return the split points (end-row names) for the table's current split profile
527    */
528   @Override
529   public Collection<Text> getSplits(String tableName) throws TableNotFoundException {
530     
531     ArgumentChecker.notNull(tableName);
532     
533     if (!exists(tableName)) {
534       throw new TableNotFoundException(null, tableName, "Unknown table for getSplits");
535     }
536     
537     SortedSet<KeyExtent> tablets = new TreeSet<KeyExtent>();
538     Map<KeyExtent,String> locations = new TreeMap<KeyExtent,String>();
539     
540     while (true) {
541       try {
542         tablets.clear();
543         locations.clear();
544         MetadataTable.getEntries(instance, credentials, tableName, false, locations, tablets);
545         break;
546       } catch (Throwable t) {
547         log.info(t.getMessage() + " ... retrying ...");
548         UtilWaitThread.sleep(3000);
549       }
550     }
551     
552     ArrayList<Text> endRows = new ArrayList<Text>(tablets.size());
553     
554     for (KeyExtent ke : tablets)
555       if (ke.getEndRow() != null)
556         endRows.add(ke.getEndRow());
557     
558     return endRows;
559   }
560   
561   /**
562    * @param tableName
563    *          the name of the table
564    * @param maxSplits
565    *          specifies the maximum number of splits to return
566    * @return the split points (end-row names) for the table's current split profile, grouped into fewer splits so as not to exceed maxSplits
567    * @throws TableNotFoundException
568    */
569   @Override
570   public Collection<Text> getSplits(String tableName, int maxSplits) throws TableNotFoundException {
571     Collection<Text> endRows = getSplits(tableName);
572     
573     if (endRows.size() <= maxSplits)
574       return endRows;
575     
576     double r = (maxSplits + 1) / (double) (endRows.size());
577     double pos = 0;
578     
579     ArrayList<Text> subset = new ArrayList<Text>(maxSplits);
580     
581     int j = 0;
582     for (int i = 0; i < endRows.size() && j < maxSplits; i++) {
583       pos += r;
584       while (pos > 1) {
585         subset.add(((ArrayList<Text>) endRows).get(i));
586         j++;
587         pos -= 1;
588       }
589     }
590     
591     return subset;
592   }
593   
594   /**
595    * Delete a table
596    * 
597    * @param tableName
598    *          the name of the table
599    * @throws AccumuloException
600    *           if a general error occurs
601    * @throws AccumuloSecurityException
602    *           if the user does not have permission
603    * @throws TableNotFoundException
604    *           if the table does not exist
605    */
606   @Override
607   public void delete(String tableName) throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
608     ArgumentChecker.notNull(tableName);
609     
610     List<ByteBuffer> args = Arrays.asList(ByteBuffer.wrap(tableName.getBytes()));
611     Map<String,String> opts = new HashMap<String,String>();
612     
613     try {
614       doTableOperation(TableOperation.DELETE, args, opts);
615     } catch (TableExistsException e) {
616       // should not happen
617       throw new RuntimeException(e);
618     }
619     
620   }
621   
622   @Override
623   public void clone(String srcTableName, String newTableName, boolean flush, Map<String,String> propertiesToSet, Set<String> propertiesToExclude)
624       throws AccumuloSecurityException, TableNotFoundException, AccumuloException, TableExistsException {
625     
626     ArgumentChecker.notNull(srcTableName, newTableName);
627     
628     String srcTableId = Tables.getTableId(instance, srcTableName);
629     
630     if (flush)
631       _flush(srcTableId, null, null, true);
632     
633     if (propertiesToExclude == null)
634       propertiesToExclude = Collections.emptySet();
635     
636     if (propertiesToSet == null)
637       propertiesToSet = Collections.emptyMap();
638     
639     if (!Collections.disjoint(propertiesToExclude, propertiesToSet.keySet()))
640       throw new IllegalArgumentException("propertiesToSet and propertiesToExclude not disjoint");
641     
642     List<ByteBuffer> args = Arrays.asList(ByteBuffer.wrap(srcTableId.getBytes()), ByteBuffer.wrap(newTableName.getBytes()));
643     Map<String,String> opts = new HashMap<String,String>();
644     opts.putAll(propertiesToSet);
645     for (String prop : propertiesToExclude)
646       opts.put(prop, null);
647     
648     doTableOperation(TableOperation.CLONE, args, opts);
649   }
650   
651   /**
652    * Rename a table
653    * 
654    * @param oldTableName
655    *          the old table name
656    * @param newTableName
657    *          the new table name
658    * @throws AccumuloException
659    *           if a general error occurs
660    * @throws AccumuloSecurityException
661    *           if the user does not have permission
662    * @throws TableNotFoundException
663    *           if the old table name does not exist
664    * @throws TableExistsException
665    *           if the new table name already exists
666    */
667   @Override
668   public void rename(String oldTableName, String newTableName) throws AccumuloSecurityException, TableNotFoundException, AccumuloException,
669       TableExistsException {
670     
671     List<ByteBuffer> args = Arrays.asList(ByteBuffer.wrap(oldTableName.getBytes()), ByteBuffer.wrap(newTableName.getBytes()));
672     Map<String,String> opts = new HashMap<String,String>();
673     doTableOperation(TableOperation.RENAME, args, opts);
674   }
675   
676   /**
677    * @deprecated since 1.4 {@link #flush(String, Text, Text, boolean)}
678    */
679   @Override
680   @Deprecated
681   public void flush(String tableName) throws AccumuloException, AccumuloSecurityException {
682     try {
683       flush(tableName, null, null, false);
684     } catch (TableNotFoundException e) {
685       throw new AccumuloException(e.getMessage(), e);
686     }
687   }
688   
689   /**
690    * Flush a table
691    * 
692    * @param tableName
693    *          the name of the table
694    * @throws AccumuloException
695    *           if a general error occurs
696    * @throws AccumuloSecurityException
697    *           if the user does not have permission
698    * @throws TableNotFoundException
699    */
700   @Override
701   public void flush(String tableName, Text start, Text end, boolean wait) throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
702     ArgumentChecker.notNull(tableName);
703     
704     String tableId = Tables.getTableId(instance, tableName);
705     _flush(tableId, start, end, wait);
706   }
707   
708   @Override
709   public void compact(String tableName, Text start, Text end, boolean flush, boolean wait) throws AccumuloSecurityException, TableNotFoundException,
710       AccumuloException {
711     compact(tableName, start, end, new ArrayList<IteratorSetting>(), flush, wait);
712   }
713   
714   @Override
715   public void compact(String tableName, Text start, Text end, List<IteratorSetting> iterators, boolean flush, boolean wait) throws AccumuloSecurityException,
716       TableNotFoundException, AccumuloException {
717     ArgumentChecker.notNull(tableName);
718     ByteBuffer EMPTY = ByteBuffer.allocate(0);
719     
720     String tableId = Tables.getTableId(instance, tableName);
721     
722     if (flush)
723       _flush(tableId, start, end, true);
724     
725     List<ByteBuffer> args = Arrays.asList(ByteBuffer.wrap(tableId.getBytes()), start == null ? EMPTY : TextUtil.getByteBuffer(start), end == null ? EMPTY
726         : TextUtil.getByteBuffer(end), ByteBuffer.wrap(IteratorUtil.encodeIteratorSettings(iterators)));
727     
728     Map<String,String> opts = new HashMap<String,String>();
729     try {
730       doTableOperation(TableOperation.COMPACT, args, opts, wait);
731     } catch (TableExistsException e) {
732       // should not happen
733       throw new RuntimeException(e);
734     }
735   }
736   
737   @Override
738   public void cancelCompaction(String tableName) throws AccumuloSecurityException, TableNotFoundException, AccumuloException {
739     String tableId = Tables.getTableId(instance, tableName);
740     
741     List<ByteBuffer> args = Arrays.asList(ByteBuffer.wrap(tableId.getBytes()));
742     
743     Map<String,String> opts = new HashMap<String,String>();
744     try {
745       doTableOperation(TableOperation.COMPACT_CANCEL, args, opts, true);
746     } catch (TableExistsException e) {
747       // should not happen
748       throw new RuntimeException(e);
749     }
750     
751   }
752   
753   private void _flush(String tableId, Text start, Text end, boolean wait) throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
754     
755     try {
756       long flushID;
757       
758       // used to pass the table name. but the tableid associated with a table name could change between calls.
759       // so pass the tableid to both calls
760       
761       while (true) {
762         MasterClientService.Iface client = null;
763         try {
764           client = MasterClient.getConnectionWithRetry(instance);
765           flushID = client.initiateFlush(Tracer.traceInfo(), credentials, tableId);
766           break;
767         } catch (TTransportException tte) {
768           log.debug("Failed to call initiateFlush, retrying ... ", tte);
769           UtilWaitThread.sleep(100);
770         } finally {
771           MasterClient.close(client);
772         }
773       }
774       
775       while (true) {
776         MasterClientService.Iface client = null;
777         try {
778           client = MasterClient.getConnectionWithRetry(instance);
779           client.waitForFlush(Tracer.traceInfo(), credentials, tableId, TextUtil.getByteBuffer(start), TextUtil.getByteBuffer(end), flushID,
780               wait ? Long.MAX_VALUE : 1);
781           break;
782         } catch (TTransportException tte) {
783           log.debug("Failed to call initiateFlush, retrying ... ", tte);
784           UtilWaitThread.sleep(100);
785         } finally {
786           MasterClient.close(client);
787         }
788       }
789     } catch (ThriftSecurityException e) {
790       log.debug("flush security exception on table id " + tableId);
791       throw new AccumuloSecurityException(e.user, e.code, e);
792     } catch (ThriftTableOperationException e) {
793       switch (e.getType()) {
794         case NOTFOUND:
795           throw new TableNotFoundException(e);
796         case OTHER:
797         default:
798           throw new AccumuloException(e.description, e);
799       }
800     } catch (Exception e) {
801       throw new AccumuloException(e);
802     }
803   }
804   
805   /**
806    * Sets a property on a table
807    * 
808    * @param tableName
809    *          the name of the table
810    * @param property
811    *          the name of a per-table property
812    * @param value
813    *          the value to set a per-table property to
814    * @throws AccumuloException
815    *           if a general error occurs
816    * @throws AccumuloSecurityException
817    *           if the user does not have permission
818    */
819   @Override
820   public void setProperty(final String tableName, final String property, final String value) throws AccumuloException, AccumuloSecurityException {
821     ArgumentChecker.notNull(tableName, property, value);
822     MasterClient.execute(instance, new ClientExec<MasterClientService.Client>() {
823       @Override
824       public void execute(MasterClientService.Client client) throws Exception {
825         client.setTableProperty(Tracer.traceInfo(), credentials, tableName, property, value);
826       }
827     });
828   }
829   
830   /**
831    * Removes a property from a table
832    * 
833    * @param tableName
834    *          the name of the table
835    * @param property
836    *          the name of a per-table property
837    * @throws AccumuloException
838    *           if a general error occurs
839    * @throws AccumuloSecurityException
840    *           if the user does not have permission
841    */
842   @Override
843   public void removeProperty(final String tableName, final String property) throws AccumuloException, AccumuloSecurityException {
844     ArgumentChecker.notNull(tableName, property);
845     MasterClient.execute(instance, new ClientExec<MasterClientService.Client>() {
846       @Override
847       public void execute(MasterClientService.Client client) throws Exception {
848         client.removeTableProperty(Tracer.traceInfo(), credentials, tableName, property);
849       }
850     });
851   }
852   
853   /**
854    * Gets properties of a table
855    * 
856    * @param tableName
857    *          the name of the table
858    * @return all properties visible by this table (system and per-table properties)
859    * @throws TableNotFoundException
860    *           if the table does not exist
861    */
862   @Override
863   public Iterable<Entry<String,String>> getProperties(final String tableName) throws AccumuloException, TableNotFoundException {
864     ArgumentChecker.notNull(tableName);
865     try {
866       return ServerClient.executeRaw(instance, new ClientExecReturn<Map<String,String>,ClientService.Client>() {
867         @Override
868         public Map<String,String> execute(ClientService.Client client) throws Exception {
869           return client.getTableConfiguration(tableName);
870         }
871       }).entrySet();
872     } catch (ThriftTableOperationException e) {
873       switch (e.getType()) {
874         case NOTFOUND:
875           throw new TableNotFoundException(e);
876         case OTHER:
877         default:
878           throw new AccumuloException(e.description, e);
879       }
880     } catch (AccumuloException e) {
881       throw e;
882     } catch (Exception e) {
883       throw new AccumuloException(e);
884     }
885     
886   }
887   
888   /**
889    * Sets a tables locality groups. A tables locality groups can be changed at any time.
890    * 
891    * @param tableName
892    *          the name of the table
893    * @param groups
894    *          mapping of locality group names to column families in the locality group
895    * @throws AccumuloException
896    *           if a general error occurs
897    * @throws AccumuloSecurityException
898    *           if the user does not have permission
899    * @throws TableNotFoundException
900    *           if the table does not exist
901    */
902   @Override
903   public void setLocalityGroups(String tableName, Map<String,Set<Text>> groups) throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
904     // ensure locality groups do not overlap
905     HashSet<Text> all = new HashSet<Text>();
906     for (Entry<String,Set<Text>> entry : groups.entrySet()) {
907       
908       if (!Collections.disjoint(all, entry.getValue())) {
909         throw new IllegalArgumentException("Group " + entry.getKey() + " overlaps with another group");
910       }
911       
912       all.addAll(entry.getValue());
913     }
914     
915     for (Entry<String,Set<Text>> entry : groups.entrySet()) {
916       Set<Text> colFams = entry.getValue();
917       String value = LocalityGroupUtil.encodeColumnFamilies(colFams);
918       setProperty(tableName, Property.TABLE_LOCALITY_GROUP_PREFIX + entry.getKey(), value);
919     }
920     
921     setProperty(tableName, Property.TABLE_LOCALITY_GROUPS.getKey(), StringUtil.join(groups.keySet(), ","));
922     
923     // remove anything extraneous
924     String prefix = Property.TABLE_LOCALITY_GROUP_PREFIX.getKey();
925     for (Entry<String,String> entry : getProperties(tableName)) {
926       String property = entry.getKey();
927       if (property.startsWith(prefix)) {
928         // this property configures a locality group, find out which
929         // one:
930         String[] parts = property.split("\\.");
931         String group = parts[parts.length - 1];
932         
933         if (!groups.containsKey(group)) {
934           removeProperty(tableName, property);
935         }
936       }
937     }
938   }
939   
940   /**
941    * 
942    * Gets the locality groups currently set for a table.
943    * 
944    * @param tableName
945    *          the name of the table
946    * @return mapping of locality group names to column families in the locality group
947    * @throws AccumuloException
948    *           if a general error occurs
949    * @throws TableNotFoundException
950    *           if the table does not exist
951    */
952   @Override
953   public Map<String,Set<Text>> getLocalityGroups(String tableName) throws AccumuloException, TableNotFoundException {
954     AccumuloConfiguration conf = new ConfigurationCopy(this.getProperties(tableName));
955     Map<String,Set<ByteSequence>> groups = LocalityGroupUtil.getLocalityGroups(conf);
956     
957     Map<String,Set<Text>> groups2 = new HashMap<String,Set<Text>>();
958     for (Entry<String,Set<ByteSequence>> entry : groups.entrySet()) {
959       
960       HashSet<Text> colFams = new HashSet<Text>();
961       
962       for (ByteSequence bs : entry.getValue()) {
963         colFams.add(new Text(bs.toArray()));
964       }
965       
966       groups2.put(entry.getKey(), colFams);
967     }
968     
969     return groups2;
970   }
971   
972   /**
973    * @param tableName
974    *          the name of the table
975    * @param range
976    *          a range to split
977    * @param maxSplits
978    *          the maximum number of splits
979    * @return the range, split into smaller ranges that fall on boundaries of the table's split points as evenly as possible
980    * @throws AccumuloException
981    *           if a general error occurs
982    * @throws AccumuloSecurityException
983    *           if the user does not have permission
984    * @throws TableNotFoundException
985    *           if the table does not exist
986    */
987   @Override
988   public Set<Range> splitRangeByTablets(String tableName, Range range, int maxSplits) throws AccumuloException, AccumuloSecurityException,
989       TableNotFoundException {
990     ArgumentChecker.notNull(tableName, range);
991     if (maxSplits < 1)
992       throw new IllegalArgumentException("maximum splits must be >= 1");
993     if (maxSplits == 1)
994       return Collections.singleton(range);
995     
996     Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>();
997     String tableId = Tables.getTableId(instance, tableName);
998     TabletLocator tl = TabletLocator.getInstance(instance, credentials, new Text(tableId));
999     // its possible that the cache could contain complete, but old information about a tables tablets... so clear it
1000     tl.invalidateCache();
1001     while (!tl.binRanges(Collections.singletonList(range), binnedRanges).isEmpty()) {
1002       if (!Tables.exists(instance, tableId))
1003         throw new TableDeletedException(tableId);
1004       if (Tables.getTableState(instance, tableId) == TableState.OFFLINE)
1005         throw new TableOfflineException(instance, tableId);
1006       
1007       log.warn("Unable to locate bins for specified range. Retrying.");
1008       // sleep randomly between 100 and 200ms
1009       UtilWaitThread.sleep(100 + (int) (Math.random() * 100));
1010       binnedRanges.clear();
1011       tl.invalidateCache();
1012     }
1013     
1014     // group key extents to get <= maxSplits
1015     LinkedList<KeyExtent> unmergedExtents = new LinkedList<KeyExtent>();
1016     List<KeyExtent> mergedExtents = new ArrayList<KeyExtent>();
1017     
1018     for (Map<KeyExtent,List<Range>> map : binnedRanges.values())
1019       unmergedExtents.addAll(map.keySet());
1020     
1021     // the sort method is efficient for linked list
1022     Collections.sort(unmergedExtents);
1023     
1024     while (unmergedExtents.size() + mergedExtents.size() > maxSplits) {
1025       if (unmergedExtents.size() >= 2) {
1026         KeyExtent first = unmergedExtents.removeFirst();
1027         KeyExtent second = unmergedExtents.removeFirst();
1028         first.setEndRow(second.getEndRow());
1029         mergedExtents.add(first);
1030       } else {
1031         mergedExtents.addAll(unmergedExtents);
1032         unmergedExtents.clear();
1033         unmergedExtents.addAll(mergedExtents);
1034         mergedExtents.clear();
1035       }
1036       
1037     }
1038     
1039     mergedExtents.addAll(unmergedExtents);
1040     
1041     Set<Range> ranges = new HashSet<Range>();
1042     for (KeyExtent k : mergedExtents)
1043       ranges.add(k.toDataRange().clip(range));
1044     
1045     return ranges;
1046   }
1047   
1048   @Override
1049   public void importDirectory(String tableName, String dir, String failureDir, boolean setTime) throws IOException, AccumuloSecurityException,
1050       TableNotFoundException, AccumuloException {
1051     ArgumentChecker.notNull(tableName, dir, failureDir);
1052     FileSystem fs = FileUtil.getFileSystem(CachedConfiguration.getInstance(), instance.getConfiguration());
1053     Path failPath = fs.makeQualified(new Path(failureDir));
1054     if (!fs.exists(new Path(dir)))
1055       throw new AccumuloException("Bulk import directory " + dir + " does not exist!");
1056     if (!fs.exists(failPath))
1057       throw new AccumuloException("Bulk import failure directory " + failureDir + " does not exist!");
1058     FileStatus[] listStatus = fs.listStatus(failPath);
1059     if (listStatus != null && listStatus.length != 0) {
1060       if (listStatus.length == 1 && listStatus[0].isDir())
1061         throw new AccumuloException("Bulk import directory " + failPath + " is a file");
1062       throw new AccumuloException("Bulk import failure directory " + failPath + " is not empty");
1063     }
1064     
1065     List<ByteBuffer> args = Arrays.asList(ByteBuffer.wrap(tableName.getBytes()), ByteBuffer.wrap(dir.getBytes()), ByteBuffer.wrap(failureDir.getBytes()),
1066         ByteBuffer.wrap((setTime + "").getBytes()));
1067     Map<String,String> opts = new HashMap<String,String>();
1068     
1069     try {
1070       doTableOperation(TableOperation.BULK_IMPORT, args, opts);
1071     } catch (TableExistsException e) {
1072       // should not happen
1073       throw new RuntimeException(e);
1074     }
1075     // return new BulkImportHelper(instance, credentials, tableName).importDirectory(new Path(dir), new Path(failureDir), numThreads, numAssignThreads,
1076     // disableGC);
1077   }
1078   
1079   /**
1080    * 
1081    * @param tableName
1082    *          the table to take offline
1083    * @throws AccumuloException
1084    *           when there is a general accumulo error
1085    * @throws AccumuloSecurityException
1086    *           when the user does not have the proper permissions
1087    * @throws TableNotFoundException
1088    */
1089   @Override
1090   public void offline(String tableName) throws AccumuloSecurityException, AccumuloException, TableNotFoundException {
1091     
1092     ArgumentChecker.notNull(tableName);
1093     List<ByteBuffer> args = Arrays.asList(ByteBuffer.wrap(tableName.getBytes()));
1094     Map<String,String> opts = new HashMap<String,String>();
1095     
1096     try {
1097       doTableOperation(TableOperation.OFFLINE, args, opts);
1098     } catch (TableExistsException e) {
1099       // should not happen
1100       throw new RuntimeException(e);
1101     }
1102   }
1103   
1104   /**
1105    * 
1106    * @param tableName
1107    *          the table to take online
1108    * @throws AccumuloException
1109    *           when there is a general accumulo error
1110    * @throws AccumuloSecurityException
1111    *           when the user does not have the proper permissions
1112    * @throws TableNotFoundException
1113    */
1114   @Override
1115   public void online(String tableName) throws AccumuloSecurityException, AccumuloException, TableNotFoundException {
1116     ArgumentChecker.notNull(tableName);
1117     List<ByteBuffer> args = Arrays.asList(ByteBuffer.wrap(tableName.getBytes()));
1118     Map<String,String> opts = new HashMap<String,String>();
1119     
1120     try {
1121       doTableOperation(TableOperation.ONLINE, args, opts);
1122     } catch (TableExistsException e) {
1123       // should not happen
1124       throw new RuntimeException(e);
1125     }
1126   }
1127   
1128   /**
1129    * Clears the tablet locator cache for a specified table
1130    * 
1131    * @param tableName
1132    *          the name of the table
1133    * @throws TableNotFoundException
1134    *           if table does not exist
1135    */
1136   @Override
1137   public void clearLocatorCache(String tableName) throws TableNotFoundException {
1138     ArgumentChecker.notNull(tableName);
1139     TabletLocator tabLocator = TabletLocator.getInstance(instance, credentials, new Text(Tables.getTableId(instance, tableName)));
1140     tabLocator.invalidateCache();
1141   }
1142   
1143   /**
1144    * Get a mapping of table name to internal table id.
1145    * 
1146    * @return the map from table name to internal table id
1147    */
1148   @Override
1149   public Map<String,String> tableIdMap() {
1150     return Tables.getNameToIdMap(instance);
1151   }
1152   
1153   @Override
1154   public Text getMaxRow(String tableName, Authorizations auths, Text startRow, boolean startInclusive, Text endRow, boolean endInclusive)
1155       throws TableNotFoundException, AccumuloException, AccumuloSecurityException {
1156     ArgumentChecker.notNull(tableName, auths);
1157     Scanner scanner = instance.getConnector(credentials.getPrincipal(), CredentialHelper.extractToken(credentials)).createScanner(tableName, auths);
1158     return FindMax.findMax(scanner, startRow, startInclusive, endRow, endInclusive);
1159   }
1160   
1161   public static Map<String,String> getExportedProps(FileSystem fs, Path path) throws IOException {
1162     HashMap<String,String> props = new HashMap<String,String>();
1163     
1164     ZipInputStream zis = new ZipInputStream(fs.open(path));
1165     try {
1166       ZipEntry zipEntry;
1167       while ((zipEntry = zis.getNextEntry()) != null) {
1168         if (zipEntry.getName().equals(Constants.EXPORT_TABLE_CONFIG_FILE)) {
1169           BufferedReader in = new BufferedReader(new InputStreamReader(zis));
1170           String line;
1171           while ((line = in.readLine()) != null) {
1172             String sa[] = line.split("=", 2);
1173             props.put(sa[0], sa[1]);
1174           }
1175           
1176           break;
1177         }
1178       }
1179     } finally {
1180       zis.close();
1181     }
1182     return props;
1183   }
1184   
1185   @Override
1186   public void importTable(String tableName, String importDir) throws TableExistsException, AccumuloException, AccumuloSecurityException {
1187     ArgumentChecker.notNull(tableName, importDir);
1188     
1189     try {
1190       FileSystem fs = FileUtil.getFileSystem(CachedConfiguration.getInstance(), instance.getConfiguration());
1191       ;
1192       Map<String,String> props = getExportedProps(fs, new Path(importDir, Constants.EXPORT_FILE));
1193       
1194       for (String propKey : props.keySet()) {
1195         if (Property.isClassProperty(propKey) && !props.get(propKey).contains(Constants.CORE_PACKAGE_NAME)) {
1196           Logger.getLogger(this.getClass()).info(
1197               "Imported table sets '" + propKey + "' to '" + props.get(propKey) + "'.  Ensure this class is on Accumulo classpath.");
1198         }
1199       }
1200       
1201     } catch (IOException ioe) {
1202       Logger.getLogger(this.getClass()).warn("Failed to check if imported table references external java classes : " + ioe.getMessage());
1203     }
1204     
1205     List<ByteBuffer> args = Arrays.asList(ByteBuffer.wrap(tableName.getBytes()), ByteBuffer.wrap(importDir.getBytes()));
1206     
1207     Map<String,String> opts = Collections.emptyMap();
1208     
1209     try {
1210       doTableOperation(TableOperation.IMPORT, args, opts);
1211     } catch (TableNotFoundException e1) {
1212       // should not happen
1213       throw new RuntimeException(e1);
1214     }
1215     
1216   }
1217   
1218   @Override
1219   public void exportTable(String tableName, String exportDir) throws TableNotFoundException, AccumuloException, AccumuloSecurityException {
1220     ArgumentChecker.notNull(tableName, exportDir);
1221     
1222     List<ByteBuffer> args = Arrays.asList(ByteBuffer.wrap(tableName.getBytes()), ByteBuffer.wrap(exportDir.getBytes()));
1223     
1224     Map<String,String> opts = Collections.emptyMap();
1225     
1226     try {
1227       doTableOperation(TableOperation.EXPORT, args, opts);
1228     } catch (TableExistsException e1) {
1229       // should not happen
1230       throw new RuntimeException(e1);
1231     }
1232   }
1233 }