View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.thrift;
20  
21  import static org.apache.hadoop.hbase.util.Bytes.getBytes;
22  
23  import java.io.IOException;
24  import java.net.InetAddress;
25  import java.net.InetSocketAddress;
26  import java.net.UnknownHostException;
27  import java.nio.ByteBuffer;
28  import java.security.PrivilegedAction;
29  import java.util.ArrayList;
30  import java.util.Arrays;
31  import java.util.Collections;
32  import java.util.HashMap;
33  import java.util.List;
34  import java.util.Map;
35  import java.util.TreeMap;
36  import java.util.concurrent.BlockingQueue;
37  import java.util.concurrent.ExecutorService;
38  import java.util.concurrent.LinkedBlockingQueue;
39  import java.util.concurrent.ThreadPoolExecutor;
40  import java.util.concurrent.TimeUnit;
41  
42  import javax.security.auth.callback.Callback;
43  import javax.security.auth.callback.UnsupportedCallbackException;
44  import javax.security.sasl.AuthorizeCallback;
45  import javax.security.sasl.Sasl;
46  import javax.security.sasl.SaslServer;
47  
48  import org.apache.commons.cli.CommandLine;
49  import org.apache.commons.cli.Option;
50  import org.apache.commons.cli.OptionGroup;
51  import org.apache.commons.logging.Log;
52  import org.apache.commons.logging.LogFactory;
53  import org.apache.hadoop.conf.Configuration;
54  import org.apache.hadoop.hbase.HBaseConfiguration;
55  import org.apache.hadoop.hbase.HColumnDescriptor;
56  import org.apache.hadoop.hbase.HConstants;
57  import org.apache.hadoop.hbase.HRegionInfo;
58  import org.apache.hadoop.hbase.HRegionLocation;
59  import org.apache.hadoop.hbase.HTableDescriptor;
60  import org.apache.hadoop.hbase.KeyValue;
61  import org.apache.hadoop.hbase.MetaTableAccessor;
62  import org.apache.hadoop.hbase.ServerName;
63  import org.apache.hadoop.hbase.TableName;
64  import org.apache.hadoop.hbase.TableNotFoundException;
65  import org.apache.hadoop.hbase.classification.InterfaceAudience;
66  import org.apache.hadoop.hbase.client.Admin;
67  import org.apache.hadoop.hbase.client.Append;
68  import org.apache.hadoop.hbase.client.Delete;
69  import org.apache.hadoop.hbase.client.Durability;
70  import org.apache.hadoop.hbase.client.Get;
71  import org.apache.hadoop.hbase.client.HBaseAdmin;
72  import org.apache.hadoop.hbase.client.Increment;
73  import org.apache.hadoop.hbase.client.OperationWithAttributes;
74  import org.apache.hadoop.hbase.client.Put;
75  import org.apache.hadoop.hbase.client.RegionLocator;
76  import org.apache.hadoop.hbase.client.Result;
77  import org.apache.hadoop.hbase.client.ResultScanner;
78  import org.apache.hadoop.hbase.client.Scan;
79  import org.apache.hadoop.hbase.client.Table;
80  import org.apache.hadoop.hbase.filter.Filter;
81  import org.apache.hadoop.hbase.filter.ParseFilter;
82  import org.apache.hadoop.hbase.filter.PrefixFilter;
83  import org.apache.hadoop.hbase.filter.WhileMatchFilter;
84  import org.apache.hadoop.hbase.security.SecurityUtil;
85  import org.apache.hadoop.hbase.security.UserProvider;
86  import org.apache.hadoop.hbase.thrift.CallQueue.Call;
87  import org.apache.hadoop.hbase.thrift.generated.AlreadyExists;
88  import org.apache.hadoop.hbase.thrift.generated.BatchMutation;
89  import org.apache.hadoop.hbase.thrift.generated.ColumnDescriptor;
90  import org.apache.hadoop.hbase.thrift.generated.Hbase;
91  import org.apache.hadoop.hbase.thrift.generated.IOError;
92  import org.apache.hadoop.hbase.thrift.generated.IllegalArgument;
93  import org.apache.hadoop.hbase.thrift.generated.Mutation;
94  import org.apache.hadoop.hbase.thrift.generated.TAppend;
95  import org.apache.hadoop.hbase.thrift.generated.TCell;
96  import org.apache.hadoop.hbase.thrift.generated.TIncrement;
97  import org.apache.hadoop.hbase.thrift.generated.TRegionInfo;
98  import org.apache.hadoop.hbase.thrift.generated.TRowResult;
99  import org.apache.hadoop.hbase.thrift.generated.TScan;
100 import org.apache.hadoop.hbase.util.Bytes;
101 import org.apache.hadoop.hbase.util.ConnectionCache;
102 import org.apache.hadoop.hbase.util.DNS;
103 import org.apache.hadoop.hbase.util.Strings;
104 import org.apache.hadoop.security.SaslRpcServer.SaslGssCallbackHandler;
105 import org.apache.hadoop.security.UserGroupInformation;
106 import org.apache.hadoop.security.authorize.ProxyUsers;
107 import org.apache.thrift.TException;
108 import org.apache.thrift.TProcessor;
109 import org.apache.thrift.protocol.TBinaryProtocol;
110 import org.apache.thrift.protocol.TCompactProtocol;
111 import org.apache.thrift.protocol.TProtocol;
112 import org.apache.thrift.protocol.TProtocolFactory;
113 import org.apache.thrift.server.THsHaServer;
114 import org.apache.thrift.server.TNonblockingServer;
115 import org.apache.thrift.server.TServer;
116 import org.apache.thrift.server.TServlet;
117 import org.apache.thrift.server.TThreadedSelectorServer;
118 import org.apache.thrift.transport.TFramedTransport;
119 import org.apache.thrift.transport.TNonblockingServerSocket;
120 import org.apache.thrift.transport.TNonblockingServerTransport;
121 import org.apache.thrift.transport.TSaslServerTransport;
122 import org.apache.thrift.transport.TServerSocket;
123 import org.apache.thrift.transport.TServerTransport;
124 import org.apache.thrift.transport.TTransportFactory;
125 import org.mortbay.jetty.Connector;
126 import org.mortbay.jetty.Server;
127 import org.mortbay.jetty.nio.SelectChannelConnector;
128 import org.mortbay.jetty.security.SslSelectChannelConnector;
129 import org.mortbay.jetty.servlet.Context;
130 import org.mortbay.jetty.servlet.ServletHolder;
131 import org.mortbay.thread.QueuedThreadPool;
132 
133 import com.google.common.base.Joiner;
134 import com.google.common.base.Throwables;
135 import com.google.common.util.concurrent.ThreadFactoryBuilder;
136 
137 /**
138  * ThriftServerRunner - this class starts up a Thrift server which implements
139  * the Hbase API specified in the Hbase.thrift IDL file.
140  */
141 @InterfaceAudience.Private
142 public class ThriftServerRunner implements Runnable {
143 
144   private static final Log LOG = LogFactory.getLog(ThriftServerRunner.class);
145 
146   static final String SERVER_TYPE_CONF_KEY =
147       "hbase.regionserver.thrift.server.type";
148 
149   static final String BIND_CONF_KEY = "hbase.regionserver.thrift.ipaddress";
150   static final String COMPACT_CONF_KEY = "hbase.regionserver.thrift.compact";
151   static final String FRAMED_CONF_KEY = "hbase.regionserver.thrift.framed";
152   static final String MAX_FRAME_SIZE_CONF_KEY = "hbase.regionserver.thrift.framed.max_frame_size_in_mb";
153   static final String PORT_CONF_KEY = "hbase.regionserver.thrift.port";
154   static final String COALESCE_INC_KEY = "hbase.regionserver.thrift.coalesceIncrement";
155   static final String USE_HTTP_CONF_KEY = "hbase.regionserver.thrift.http";
156   static final String HTTP_MIN_THREADS = "hbase.thrift.http_threads.min";
157   static final String HTTP_MAX_THREADS = "hbase.thrift.http_threads.max";
158 
159   static final String THRIFT_SSL_ENABLED = "hbase.thrift.ssl.enabled";
160   static final String THRIFT_SSL_KEYSTORE_STORE = "hbase.thrift.ssl.keystore.store";
161   static final String THRIFT_SSL_KEYSTORE_PASSWORD = "hbase.thrift.ssl.keystore.password";
162   static final String THRIFT_SSL_KEYSTORE_KEYPASSWORD = "hbase.thrift.ssl.keystore.keypassword";
163 
164 
165   /**
166    * Thrift quality of protection configuration key. Valid values can be:
167    * auth-conf: authentication, integrity and confidentiality checking
168    * auth-int: authentication and integrity checking
169    * auth: authentication only
170    *
171    * This is used to authenticate the callers and support impersonation.
172    * The thrift server and the HBase cluster must run in secure mode.
173    */
174   static final String THRIFT_QOP_KEY = "hbase.thrift.security.qop";
175   static final String BACKLOG_CONF_KEY = "hbase.regionserver.thrift.backlog";
176 
177   private static final String DEFAULT_BIND_ADDR = "0.0.0.0";
178   public static final int DEFAULT_LISTEN_PORT = 9090;
179   public static final int HREGION_VERSION = 1;
180   static final String THRIFT_SUPPORT_PROXYUSER = "hbase.thrift.support.proxyuser";
181   private final int listenPort;
182 
183   private Configuration conf;
184   volatile TServer tserver;
185   volatile Server httpServer;
186   private final Hbase.Iface handler;
187   private final ThriftMetrics metrics;
188   private final HBaseHandler hbaseHandler;
189   private final UserGroupInformation realUser;
190 
191   private final String qop;
192   private String host;
193 
194   private final boolean securityEnabled;
195   private final boolean doAsEnabled;
196 
197   /** An enum of server implementation selections */
198   enum ImplType {
199     HS_HA("hsha", true, THsHaServer.class, true),
200     NONBLOCKING("nonblocking", true, TNonblockingServer.class, true),
201     THREAD_POOL("threadpool", false, TBoundedThreadPoolServer.class, true),
202     THREADED_SELECTOR(
203         "threadedselector", true, TThreadedSelectorServer.class, true);
204 
205     public static final ImplType DEFAULT = THREAD_POOL;
206 
207     final String option;
208     final boolean isAlwaysFramed;
209     final Class<? extends TServer> serverClass;
210     final boolean canSpecifyBindIP;
211 
212     ImplType(String option, boolean isAlwaysFramed,
213         Class<? extends TServer> serverClass, boolean canSpecifyBindIP) {
214       this.option = option;
215       this.isAlwaysFramed = isAlwaysFramed;
216       this.serverClass = serverClass;
217       this.canSpecifyBindIP = canSpecifyBindIP;
218     }
219 
220     /**
221      * @return <code>-option</code> so we can get the list of options from
222      *         {@link #values()}
223      */
224     @Override
225     public String toString() {
226       return "-" + option;
227     }
228 
229     String getDescription() {
230       StringBuilder sb = new StringBuilder("Use the " +
231           serverClass.getSimpleName());
232       if (isAlwaysFramed) {
233         sb.append(" This implies the framed transport.");
234       }
235       if (this == DEFAULT) {
236         sb.append("This is the default.");
237       }
238       return sb.toString();
239     }
240 
241     static OptionGroup createOptionGroup() {
242       OptionGroup group = new OptionGroup();
243       for (ImplType t : values()) {
244         group.addOption(new Option(t.option, t.getDescription()));
245       }
246       return group;
247     }
248 
249     static ImplType getServerImpl(Configuration conf) {
250       String confType = conf.get(SERVER_TYPE_CONF_KEY, THREAD_POOL.option);
251       for (ImplType t : values()) {
252         if (confType.equals(t.option)) {
253           return t;
254         }
255       }
256       throw new AssertionError("Unknown server ImplType.option:" + confType);
257     }
258 
259     static void setServerImpl(CommandLine cmd, Configuration conf) {
260       ImplType chosenType = null;
261       int numChosen = 0;
262       for (ImplType t : values()) {
263         if (cmd.hasOption(t.option)) {
264           chosenType = t;
265           ++numChosen;
266         }
267       }
268       if (numChosen < 1) {
269         LOG.info("Using default thrift server type");
270         chosenType = DEFAULT;
271       } else if (numChosen > 1) {
272         throw new AssertionError("Exactly one option out of " +
273           Arrays.toString(values()) + " has to be specified");
274       }
275       LOG.info("Using thrift server type " + chosenType.option);
276       conf.set(SERVER_TYPE_CONF_KEY, chosenType.option);
277     }
278 
279     public String simpleClassName() {
280       return serverClass.getSimpleName();
281     }
282 
283     public static List<String> serversThatCannotSpecifyBindIP() {
284       List<String> l = new ArrayList<String>();
285       for (ImplType t : values()) {
286         if (!t.canSpecifyBindIP) {
287           l.add(t.simpleClassName());
288         }
289       }
290       return l;
291     }
292 
293   }
294 
295   public ThriftServerRunner(Configuration conf) throws IOException {
296     UserProvider userProvider = UserProvider.instantiate(conf);
297     // login the server principal (if using secure Hadoop)
298     securityEnabled = userProvider.isHadoopSecurityEnabled()
299       && userProvider.isHBaseSecurityEnabled();
300     if (securityEnabled) {
301       host = Strings.domainNamePointerToHostName(DNS.getDefaultHost(
302         conf.get("hbase.thrift.dns.interface", "default"),
303         conf.get("hbase.thrift.dns.nameserver", "default")));
304       userProvider.login("hbase.thrift.keytab.file",
305         "hbase.thrift.kerberos.principal", host);
306     }
307     this.conf = HBaseConfiguration.create(conf);
308     this.listenPort = conf.getInt(PORT_CONF_KEY, DEFAULT_LISTEN_PORT);
309     this.metrics = new ThriftMetrics(conf, ThriftMetrics.ThriftServerType.ONE);
310     this.hbaseHandler = new HBaseHandler(conf, userProvider);
311     this.hbaseHandler.initMetrics(metrics);
312     this.handler = HbaseHandlerMetricsProxy.newInstance(
313       hbaseHandler, metrics, conf);
314     this.realUser = userProvider.getCurrent().getUGI();
315     qop = conf.get(THRIFT_QOP_KEY);
316     doAsEnabled = conf.getBoolean(THRIFT_SUPPORT_PROXYUSER, false);
317     if (qop != null) {
318       if (!qop.equals("auth") && !qop.equals("auth-int")
319           && !qop.equals("auth-conf")) {
320         throw new IOException("Invalid " + THRIFT_QOP_KEY + ": " + qop
321           + ", it must be 'auth', 'auth-int', or 'auth-conf'");
322       }
323       if (!securityEnabled) {
324         throw new IOException("Thrift server must"
325           + " run in secure mode to support authentication");
326       }
327     }
328   }
329 
330   /*
331    * Runs the Thrift server
332    */
333   @Override
334   public void run() {
335     realUser.doAs(new PrivilegedAction<Object>() {
336       @Override
337       public Object run() {
338         try {
339           if (conf.getBoolean(USE_HTTP_CONF_KEY, false)) {
340             setupHTTPServer();
341             httpServer.start();
342             httpServer.join();
343           } else {
344             setupServer();
345             tserver.serve();
346           }
347         } catch (Exception e) {
348           LOG.fatal("Cannot run ThriftServer", e);
349           // Crash the process if the ThriftServer is not running
350           System.exit(-1);
351         }
352         return null;
353       }
354     });
355 
356   }
357 
358   public void shutdown() {
359     if (tserver != null) {
360       tserver.stop();
361       tserver = null;
362     }
363     if (httpServer != null) {
364       try {
365         httpServer.stop();
366         httpServer = null;
367       } catch (Exception e) {
368         LOG.error("Problem encountered in shutting down HTTP server " + e.getCause());
369       }
370       httpServer = null;
371     }
372   }
373 
374   private void setupHTTPServer() throws IOException {
375     TProtocolFactory protocolFactory = new TBinaryProtocol.Factory();
376     TProcessor processor = new Hbase.Processor<Hbase.Iface>(handler);
377     TServlet thriftHttpServlet = new ThriftHttpServlet(processor, protocolFactory, realUser,
378         conf, hbaseHandler, securityEnabled, doAsEnabled);
379 
380     httpServer = new Server();
381     // Context handler
382     Context context = new Context(httpServer, "/", Context.SESSIONS);
383     context.setContextPath("/");
384     String httpPath = "/*";
385     httpServer.setHandler(context);
386     context.addServlet(new ServletHolder(thriftHttpServlet), httpPath);
387 
388     // set up Jetty and run the embedded server
389     Connector connector = new SelectChannelConnector();
390     if(conf.getBoolean(THRIFT_SSL_ENABLED, false)) {
391       SslSelectChannelConnector sslConnector = new SslSelectChannelConnector();
392       String keystore = conf.get(THRIFT_SSL_KEYSTORE_STORE);
393       String password = HBaseConfiguration.getPassword(conf,
394           THRIFT_SSL_KEYSTORE_PASSWORD, null);
395       String keyPassword = HBaseConfiguration.getPassword(conf,
396           THRIFT_SSL_KEYSTORE_KEYPASSWORD, password);
397       sslConnector.setKeystore(keystore);
398       sslConnector.setPassword(password);
399       sslConnector.setKeyPassword(keyPassword);
400       connector = sslConnector;
401     }
402     String host = getBindAddress(conf).getHostAddress();
403     connector.setPort(listenPort);
404     connector.setHost(host);
405     connector.setHeaderBufferSize(1024 * 64);
406     httpServer.addConnector(connector);
407 
408     if (doAsEnabled) {
409       ProxyUsers.refreshSuperUserGroupsConfiguration(conf);
410     }
411 
412     // Set the default max thread number to 100 to limit
413     // the number of concurrent requests so that Thrfit HTTP server doesn't OOM easily.
414     // Jetty set the default max thread number to 250, if we don't set it.
415     //
416     // Our default min thread number 2 is the same as that used by Jetty.
417     int minThreads = conf.getInt(HTTP_MIN_THREADS, 2);
418     int maxThreads = conf.getInt(HTTP_MAX_THREADS, 100);
419     QueuedThreadPool threadPool = new QueuedThreadPool(maxThreads);
420     threadPool.setMinThreads(minThreads);
421     httpServer.setThreadPool(threadPool);
422 
423     httpServer.setSendServerVersion(false);
424     httpServer.setSendDateHeader(false);
425     httpServer.setStopAtShutdown(true);
426 
427     LOG.info("Starting Thrift HTTP Server on " + Integer.toString(listenPort));
428   }
429 
430   /**
431    * Setting up the thrift TServer
432    */
433   private void setupServer() throws Exception {
434     // Construct correct ProtocolFactory
435     TProtocolFactory protocolFactory;
436     if (conf.getBoolean(COMPACT_CONF_KEY, false)) {
437       LOG.debug("Using compact protocol");
438       protocolFactory = new TCompactProtocol.Factory();
439     } else {
440       LOG.debug("Using binary protocol");
441       protocolFactory = new TBinaryProtocol.Factory();
442     }
443 
444     final TProcessor p = new Hbase.Processor<Hbase.Iface>(handler);
445     ImplType implType = ImplType.getServerImpl(conf);
446     TProcessor processor = p;
447 
448     // Construct correct TransportFactory
449     TTransportFactory transportFactory;
450     if (conf.getBoolean(FRAMED_CONF_KEY, false) || implType.isAlwaysFramed) {
451       if (qop != null) {
452         throw new RuntimeException("Thrift server authentication"
453           + " doesn't work with framed transport yet");
454       }
455       transportFactory = new TFramedTransport.Factory(
456           conf.getInt(MAX_FRAME_SIZE_CONF_KEY, 2)  * 1024 * 1024);
457       LOG.debug("Using framed transport");
458     } else if (qop == null) {
459       transportFactory = new TTransportFactory();
460     } else {
461       // Extract the name from the principal
462       String name = SecurityUtil.getUserFromPrincipal(
463         conf.get("hbase.thrift.kerberos.principal"));
464       Map<String, String> saslProperties = new HashMap<String, String>();
465       saslProperties.put(Sasl.QOP, qop);
466       TSaslServerTransport.Factory saslFactory = new TSaslServerTransport.Factory();
467       saslFactory.addServerDefinition("GSSAPI", name, host, saslProperties,
468         new SaslGssCallbackHandler() {
469           @Override
470           public void handle(Callback[] callbacks)
471               throws UnsupportedCallbackException {
472             AuthorizeCallback ac = null;
473             for (Callback callback : callbacks) {
474               if (callback instanceof AuthorizeCallback) {
475                 ac = (AuthorizeCallback) callback;
476               } else {
477                 throw new UnsupportedCallbackException(callback,
478                     "Unrecognized SASL GSSAPI Callback");
479               }
480             }
481             if (ac != null) {
482               String authid = ac.getAuthenticationID();
483               String authzid = ac.getAuthorizationID();
484               if (!authid.equals(authzid)) {
485                 ac.setAuthorized(false);
486               } else {
487                 ac.setAuthorized(true);
488                 String userName = SecurityUtil.getUserFromPrincipal(authzid);
489                 LOG.info("Effective user: " + userName);
490                 ac.setAuthorizedID(userName);
491               }
492             }
493           }
494         });
495       transportFactory = saslFactory;
496 
497       // Create a processor wrapper, to get the caller
498       processor = new TProcessor() {
499         @Override
500         public boolean process(TProtocol inProt,
501             TProtocol outProt) throws TException {
502           TSaslServerTransport saslServerTransport =
503             (TSaslServerTransport)inProt.getTransport();
504           SaslServer saslServer = saslServerTransport.getSaslServer();
505           String principal = saslServer.getAuthorizationID();
506           hbaseHandler.setEffectiveUser(principal);
507           return p.process(inProt, outProt);
508         }
509       };
510     }
511 
512     if (conf.get(BIND_CONF_KEY) != null && !implType.canSpecifyBindIP) {
513       LOG.error("Server types " + Joiner.on(", ").join(
514           ImplType.serversThatCannotSpecifyBindIP()) + " don't support IP " +
515           "address binding at the moment. See " +
516           "https://issues.apache.org/jira/browse/HBASE-2155 for details.");
517       throw new RuntimeException(
518           "-" + BIND_CONF_KEY + " not supported with " + implType);
519     }
520 
521     // Thrift's implementation uses '0' as a placeholder for 'use the default.'
522     int backlog = conf.getInt(BACKLOG_CONF_KEY, 0);
523 
524     if (implType == ImplType.HS_HA || implType == ImplType.NONBLOCKING ||
525         implType == ImplType.THREADED_SELECTOR) {
526 
527       InetAddress listenAddress = getBindAddress(conf);
528       TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(
529           new InetSocketAddress(listenAddress, listenPort));
530 
531       if (implType == ImplType.NONBLOCKING) {
532         TNonblockingServer.Args serverArgs =
533             new TNonblockingServer.Args(serverTransport);
534         serverArgs.processor(processor)
535                   .transportFactory(transportFactory)
536                   .protocolFactory(protocolFactory);
537         tserver = new TNonblockingServer(serverArgs);
538       } else if (implType == ImplType.HS_HA) {
539         THsHaServer.Args serverArgs = new THsHaServer.Args(serverTransport);
540         CallQueue callQueue =
541             new CallQueue(new LinkedBlockingQueue<Call>(), metrics);
542         ExecutorService executorService = createExecutor(
543             callQueue, serverArgs.getWorkerThreads());
544         serverArgs.executorService(executorService)
545                   .processor(processor)
546                   .transportFactory(transportFactory)
547                   .protocolFactory(protocolFactory);
548         tserver = new THsHaServer(serverArgs);
549       } else { // THREADED_SELECTOR
550         TThreadedSelectorServer.Args serverArgs =
551             new HThreadedSelectorServerArgs(serverTransport, conf);
552         CallQueue callQueue =
553             new CallQueue(new LinkedBlockingQueue<Call>(), metrics);
554         ExecutorService executorService = createExecutor(
555             callQueue, serverArgs.getWorkerThreads());
556         serverArgs.executorService(executorService)
557                   .processor(processor)
558                   .transportFactory(transportFactory)
559                   .protocolFactory(protocolFactory);
560         tserver = new TThreadedSelectorServer(serverArgs);
561       }
562       LOG.info("starting HBase " + implType.simpleClassName() +
563           " server on " + Integer.toString(listenPort));
564     } else if (implType == ImplType.THREAD_POOL) {
565       // Thread pool server. Get the IP address to bind to.
566       InetAddress listenAddress = getBindAddress(conf);
567 
568       TServerTransport serverTransport = new TServerSocket(
569           new TServerSocket.ServerSocketTransportArgs().
570               bindAddr(new InetSocketAddress(listenAddress, listenPort)).backlog(backlog));
571 
572       TBoundedThreadPoolServer.Args serverArgs =
573           new TBoundedThreadPoolServer.Args(serverTransport, conf);
574       serverArgs.processor(processor)
575                 .transportFactory(transportFactory)
576                 .protocolFactory(protocolFactory);
577       LOG.info("starting " + ImplType.THREAD_POOL.simpleClassName() + " on "
578           + listenAddress + ":" + Integer.toString(listenPort)
579           + "; " + serverArgs);
580       TBoundedThreadPoolServer tserver =
581           new TBoundedThreadPoolServer(serverArgs, metrics);
582       this.tserver = tserver;
583     } else {
584       throw new AssertionError("Unsupported Thrift server implementation: " +
585           implType.simpleClassName());
586     }
587 
588     // A sanity check that we instantiated the right type of server.
589     if (tserver.getClass() != implType.serverClass) {
590       throw new AssertionError("Expected to create Thrift server class " +
591           implType.serverClass.getName() + " but got " +
592           tserver.getClass().getName());
593     }
594 
595 
596 
597     registerFilters(conf);
598   }
599 
600   ExecutorService createExecutor(BlockingQueue<Runnable> callQueue,
601                                  int workerThreads) {
602     ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
603     tfb.setDaemon(true);
604     tfb.setNameFormat("thrift-worker-%d");
605     return new ThreadPoolExecutor(workerThreads, workerThreads,
606             Long.MAX_VALUE, TimeUnit.SECONDS, callQueue, tfb.build());
607   }
608 
609   private InetAddress getBindAddress(Configuration conf)
610       throws UnknownHostException {
611     String bindAddressStr = conf.get(BIND_CONF_KEY, DEFAULT_BIND_ADDR);
612     return InetAddress.getByName(bindAddressStr);
613   }
614 
615   protected static class ResultScannerWrapper {
616 
617     private final ResultScanner scanner;
618     private final boolean sortColumns;
619     public ResultScannerWrapper(ResultScanner resultScanner,
620                                 boolean sortResultColumns) {
621       scanner = resultScanner;
622       sortColumns = sortResultColumns;
623    }
624 
625     public ResultScanner getScanner() {
626       return scanner;
627     }
628 
629     public boolean isColumnSorted() {
630       return sortColumns;
631     }
632   }
633 
634   /**
635    * The HBaseHandler is a glue object that connects Thrift RPC calls to the
636    * HBase client API primarily defined in the Admin and Table objects.
637    */
638   public static class HBaseHandler implements Hbase.Iface {
639     protected Configuration conf;
640     protected static final Log LOG = LogFactory.getLog(HBaseHandler.class);
641 
642     // nextScannerId and scannerMap are used to manage scanner state
643     protected int nextScannerId = 0;
644     protected HashMap<Integer, ResultScannerWrapper> scannerMap = null;
645     private ThriftMetrics metrics = null;
646 
647     private final ConnectionCache connectionCache;
648     IncrementCoalescer coalescer = null;
649 
650     static final String CLEANUP_INTERVAL = "hbase.thrift.connection.cleanup-interval";
651     static final String MAX_IDLETIME = "hbase.thrift.connection.max-idletime";
652 
653     /**
654      * Returns a list of all the column families for a given Table.
655      *
656      * @param table
657      * @throws IOException
658      */
659     byte[][] getAllColumns(Table table) throws IOException {
660       HColumnDescriptor[] cds = table.getTableDescriptor().getColumnFamilies();
661       byte[][] columns = new byte[cds.length][];
662       for (int i = 0; i < cds.length; i++) {
663         columns[i] = Bytes.add(cds[i].getName(),
664             KeyValue.COLUMN_FAMILY_DELIM_ARRAY);
665       }
666       return columns;
667     }
668 
669     /**
670      * Creates and returns a Table instance from a given table name.
671      *
672      * @param tableName
673      *          name of table
674      * @return Table object
675      * @throws IOException
676      */
677     public Table getTable(final byte[] tableName) throws
678         IOException {
679       String table = Bytes.toString(tableName);
680       return connectionCache.getTable(table);
681     }
682 
683     public Table getTable(final ByteBuffer tableName) throws IOException {
684       return getTable(getBytes(tableName));
685     }
686 
687     /**
688      * Assigns a unique ID to the scanner and adds the mapping to an internal
689      * hash-map.
690      *
691      * @param scanner
692      * @return integer scanner id
693      */
694     protected synchronized int addScanner(ResultScanner scanner,boolean sortColumns) {
695       int id = nextScannerId++;
696       ResultScannerWrapper resultScannerWrapper = new ResultScannerWrapper(scanner, sortColumns);
697       scannerMap.put(id, resultScannerWrapper);
698       return id;
699     }
700 
701     /**
702      * Returns the scanner associated with the specified ID.
703      *
704      * @param id
705      * @return a Scanner, or null if ID was invalid.
706      */
707     protected synchronized ResultScannerWrapper getScanner(int id) {
708       return scannerMap.get(id);
709     }
710 
711     /**
712      * Removes the scanner associated with the specified ID from the internal
713      * id-&gt;scanner hash-map.
714      *
715      * @param id
716      * @return a Scanner, or null if ID was invalid.
717      */
718     protected synchronized ResultScannerWrapper removeScanner(int id) {
719       return scannerMap.remove(id);
720     }
721 
722     protected HBaseHandler(final Configuration c,
723         final UserProvider userProvider) throws IOException {
724       this.conf = c;
725       scannerMap = new HashMap<Integer, ResultScannerWrapper>();
726       this.coalescer = new IncrementCoalescer(this);
727 
728       int cleanInterval = conf.getInt(CLEANUP_INTERVAL, 10 * 1000);
729       int maxIdleTime = conf.getInt(MAX_IDLETIME, 10 * 60 * 1000);
730       connectionCache = new ConnectionCache(
731         conf, userProvider, cleanInterval, maxIdleTime);
732     }
733 
734     /**
735      * Obtain HBaseAdmin. Creates the instance if it is not already created.
736      */
737     private Admin getAdmin() throws IOException {
738       return connectionCache.getAdmin();
739     }
740 
741     void setEffectiveUser(String effectiveUser) {
742       connectionCache.setEffectiveUser(effectiveUser);
743     }
744 
745     @Override
746     public void enableTable(ByteBuffer tableName) throws IOError {
747       try{
748         getAdmin().enableTable(getTableName(tableName));
749       } catch (IOException e) {
750         LOG.warn(e.getMessage(), e);
751         throw new IOError(Throwables.getStackTraceAsString(e));
752       }
753     }
754 
755     @Override
756     public void disableTable(ByteBuffer tableName) throws IOError{
757       try{
758         getAdmin().disableTable(getTableName(tableName));
759       } catch (IOException e) {
760         LOG.warn(e.getMessage(), e);
761         throw new IOError(Throwables.getStackTraceAsString(e));
762       }
763     }
764 
765     @Override
766     public boolean isTableEnabled(ByteBuffer tableName) throws IOError {
767       try {
768         return this.connectionCache.getAdmin().isTableEnabled(getTableName(tableName));
769       } catch (IOException e) {
770         LOG.warn(e.getMessage(), e);
771         throw new IOError(Throwables.getStackTraceAsString(e));
772       }
773     }
774 
775     @Override
776     public void compact(ByteBuffer tableNameOrRegionName) throws IOError {
777       try {
778         // TODO: HBaseAdmin.compact(byte[]) deprecated and not trivial to replace here.
779         // ThriftServerRunner.compact should be deprecated and replaced with methods specific to
780         // table and region.
781         ((HBaseAdmin) getAdmin()).compact(getBytes(tableNameOrRegionName));
782       } catch (IOException e) {
783         LOG.warn(e.getMessage(), e);
784         throw new IOError(Throwables.getStackTraceAsString(e));
785       }
786     }
787 
788     @Override
789     public void majorCompact(ByteBuffer tableNameOrRegionName) throws IOError {
790       try {
791         // TODO: HBaseAdmin.majorCompact(byte[]) deprecated and not trivial to replace here.
792         // ThriftServerRunner.majorCompact should be deprecated and replaced with methods specific
793         // to table and region.
794         ((HBaseAdmin) getAdmin()).majorCompact(getBytes(tableNameOrRegionName));
795       } catch (IOException e) {
796         LOG.warn(e.getMessage(), e);
797         throw new IOError(Throwables.getStackTraceAsString(e));
798       }
799     }
800 
801     @Override
802     public List<ByteBuffer> getTableNames() throws IOError {
803       try {
804         TableName[] tableNames = this.getAdmin().listTableNames();
805         ArrayList<ByteBuffer> list = new ArrayList<ByteBuffer>(tableNames.length);
806         for (int i = 0; i < tableNames.length; i++) {
807           list.add(ByteBuffer.wrap(tableNames[i].getName()));
808         }
809         return list;
810       } catch (IOException e) {
811         LOG.warn(e.getMessage(), e);
812         throw new IOError(Throwables.getStackTraceAsString(e));
813       }
814     }
815 
816     /**
817      * @return the list of regions in the given table, or an empty list if the table does not exist
818      */
819     @Override
820     public List<TRegionInfo> getTableRegions(ByteBuffer tableName)
821     throws IOError {
822       try (RegionLocator locator = connectionCache.getRegionLocator(getBytes(tableName))) {
823         List<HRegionLocation> regionLocations = locator.getAllRegionLocations();
824         List<TRegionInfo> results = new ArrayList<TRegionInfo>();
825         for (HRegionLocation regionLocation : regionLocations) {
826           HRegionInfo info = regionLocation.getRegionInfo();
827           ServerName serverName = regionLocation.getServerName();
828           TRegionInfo region = new TRegionInfo();
829           region.serverName = ByteBuffer.wrap(
830               Bytes.toBytes(serverName.getHostname()));
831           region.port = serverName.getPort();
832           region.startKey = ByteBuffer.wrap(info.getStartKey());
833           region.endKey = ByteBuffer.wrap(info.getEndKey());
834           region.id = info.getRegionId();
835           region.name = ByteBuffer.wrap(info.getRegionName());
836           region.version = HREGION_VERSION; // HRegion now not versioned, PB encoding used
837           results.add(region);
838         }
839         return results;
840       } catch (TableNotFoundException e) {
841         // Return empty list for non-existing table
842         return Collections.emptyList();
843       } catch (IOException e){
844         LOG.warn(e.getMessage(), e);
845         throw new IOError(Throwables.getStackTraceAsString(e));
846       }
847     }
848 
849     @Override
850     public List<TCell> get(
851         ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
852         Map<ByteBuffer, ByteBuffer> attributes)
853         throws IOError {
854       byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
855       if (famAndQf.length == 1) {
856         return get(tableName, row, famAndQf[0], null, attributes);
857       }
858       if (famAndQf.length == 2) {
859         return get(tableName, row, famAndQf[0], famAndQf[1], attributes);
860       }
861       throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
862     }
863 
864     /**
865      * Note: this internal interface is slightly different from public APIs in regard to handling
866      * of the qualifier. Here we differ from the public Java API in that null != byte[0]. Rather,
867      * we respect qual == null as a request for the entire column family. The caller (
868      * {@link #get(ByteBuffer, ByteBuffer, ByteBuffer, Map)}) interface IS consistent in that the
869      * column is parse like normal.
870      */
871     protected List<TCell> get(ByteBuffer tableName,
872                               ByteBuffer row,
873                               byte[] family,
874                               byte[] qualifier,
875                               Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
876       Table table = null;
877       try {
878         table = getTable(tableName);
879         Get get = new Get(getBytes(row));
880         addAttributes(get, attributes);
881         if (qualifier == null) {
882           get.addFamily(family);
883         } else {
884           get.addColumn(family, qualifier);
885         }
886         Result result = table.get(get);
887         return ThriftUtilities.cellFromHBase(result.rawCells());
888       } catch (IOException e) {
889         LOG.warn(e.getMessage(), e);
890         throw new IOError(Throwables.getStackTraceAsString(e));
891       } finally {
892         closeTable(table);
893       }
894     }
895 
896     @Override
897     public List<TCell> getVer(ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
898         int numVersions, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
899       byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
900       if(famAndQf.length == 1) {
901         return getVer(tableName, row, famAndQf[0], null, numVersions, attributes);
902       }
903       if (famAndQf.length == 2) {
904         return getVer(tableName, row, famAndQf[0], famAndQf[1], numVersions, attributes);
905       }
906       throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
907 
908     }
909 
910     /**
911      * Note: this public interface is slightly different from public Java APIs in regard to
912      * handling of the qualifier. Here we differ from the public Java API in that null != byte[0].
913      * Rather, we respect qual == null as a request for the entire column family. If you want to
914      * access the entire column family, use
915      * {@link #getVer(ByteBuffer, ByteBuffer, ByteBuffer, int, Map)} with a {@code column} value
916      * that lacks a {@code ':'}.
917      */
918     public List<TCell> getVer(ByteBuffer tableName, ByteBuffer row, byte[] family,
919         byte[] qualifier, int numVersions, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
920       
921       Table table = null;
922       try {
923         table = getTable(tableName);
924         Get get = new Get(getBytes(row));
925         addAttributes(get, attributes);
926         if (null == qualifier) {
927           get.addFamily(family);
928         } else {
929           get.addColumn(family, qualifier);
930         }
931         get.setMaxVersions(numVersions);
932         Result result = table.get(get);
933         return ThriftUtilities.cellFromHBase(result.rawCells());
934       } catch (IOException e) {
935         LOG.warn(e.getMessage(), e);
936         throw new IOError(Throwables.getStackTraceAsString(e));
937       } finally{
938         closeTable(table);
939       }
940     }
941 
942     @Override
943     public List<TCell> getVerTs(ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
944         long timestamp, int numVersions, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
945       byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
946       if (famAndQf.length == 1) {
947         return getVerTs(tableName, row, famAndQf[0], null, timestamp, numVersions, attributes);
948       }
949       if (famAndQf.length == 2) {
950         return getVerTs(tableName, row, famAndQf[0], famAndQf[1], timestamp, numVersions,
951           attributes);
952       }
953       throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
954     }
955 
956     /**
957      * Note: this internal interface is slightly different from public APIs in regard to handling
958      * of the qualifier. Here we differ from the public Java API in that null != byte[0]. Rather,
959      * we respect qual == null as a request for the entire column family. The caller (
960      * {@link #getVerTs(ByteBuffer, ByteBuffer, ByteBuffer, long, int, Map)}) interface IS
961      * consistent in that the column is parse like normal.
962      */
963     protected List<TCell> getVerTs(ByteBuffer tableName, ByteBuffer row, byte[] family,
964         byte[] qualifier, long timestamp, int numVersions, Map<ByteBuffer, ByteBuffer> attributes)
965         throws IOError {
966       
967       Table table = null;
968       try {
969         table = getTable(tableName);
970         Get get = new Get(getBytes(row));
971         addAttributes(get, attributes);
972         if (null == qualifier) {
973           get.addFamily(family);
974         } else {
975           get.addColumn(family, qualifier);
976         }
977         get.setTimeRange(0, timestamp);
978         get.setMaxVersions(numVersions);
979         Result result = table.get(get);
980         return ThriftUtilities.cellFromHBase(result.rawCells());
981       } catch (IOException e) {
982         LOG.warn(e.getMessage(), e);
983         throw new IOError(Throwables.getStackTraceAsString(e));
984       } finally{
985         closeTable(table);
986       }
987     }
988 
989     @Override
990     public List<TRowResult> getRow(ByteBuffer tableName, ByteBuffer row,
991         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
992       return getRowWithColumnsTs(tableName, row, null,
993                                  HConstants.LATEST_TIMESTAMP,
994                                  attributes);
995     }
996 
997     @Override
998     public List<TRowResult> getRowWithColumns(ByteBuffer tableName,
999                                               ByteBuffer row,
1000         List<ByteBuffer> columns,
1001         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1002       return getRowWithColumnsTs(tableName, row, columns,
1003                                  HConstants.LATEST_TIMESTAMP,
1004                                  attributes);
1005     }
1006 
1007     @Override
1008     public List<TRowResult> getRowTs(ByteBuffer tableName, ByteBuffer row,
1009         long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1010       return getRowWithColumnsTs(tableName, row, null,
1011                                  timestamp, attributes);
1012     }
1013 
1014     @Override
1015     public List<TRowResult> getRowWithColumnsTs(
1016         ByteBuffer tableName, ByteBuffer row, List<ByteBuffer> columns,
1017         long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1018       
1019       Table table = null;
1020       try {
1021         table = getTable(tableName);
1022         if (columns == null) {
1023           Get get = new Get(getBytes(row));
1024           addAttributes(get, attributes);
1025           get.setTimeRange(0, timestamp);
1026           Result result = table.get(get);
1027           return ThriftUtilities.rowResultFromHBase(result);
1028         }
1029         Get get = new Get(getBytes(row));
1030         addAttributes(get, attributes);
1031         for(ByteBuffer column : columns) {
1032           byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
1033           if (famAndQf.length == 1) {
1034               get.addFamily(famAndQf[0]);
1035           } else {
1036               get.addColumn(famAndQf[0], famAndQf[1]);
1037           }
1038         }
1039         get.setTimeRange(0, timestamp);
1040         Result result = table.get(get);
1041         return ThriftUtilities.rowResultFromHBase(result);
1042       } catch (IOException e) {
1043         LOG.warn(e.getMessage(), e);
1044         throw new IOError(Throwables.getStackTraceAsString(e));
1045       } finally{
1046         closeTable(table);
1047       }
1048     }
1049 
1050     @Override
1051     public List<TRowResult> getRows(ByteBuffer tableName,
1052                                     List<ByteBuffer> rows,
1053         Map<ByteBuffer, ByteBuffer> attributes)
1054         throws IOError {
1055       return getRowsWithColumnsTs(tableName, rows, null,
1056                                   HConstants.LATEST_TIMESTAMP,
1057                                   attributes);
1058     }
1059 
1060     @Override
1061     public List<TRowResult> getRowsWithColumns(ByteBuffer tableName,
1062                                                List<ByteBuffer> rows,
1063         List<ByteBuffer> columns,
1064         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1065       return getRowsWithColumnsTs(tableName, rows, columns,
1066                                   HConstants.LATEST_TIMESTAMP,
1067                                   attributes);
1068     }
1069 
1070     @Override
1071     public List<TRowResult> getRowsTs(ByteBuffer tableName,
1072                                       List<ByteBuffer> rows,
1073         long timestamp,
1074         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1075       return getRowsWithColumnsTs(tableName, rows, null,
1076                                   timestamp, attributes);
1077     }
1078 
1079     @Override
1080     public List<TRowResult> getRowsWithColumnsTs(ByteBuffer tableName,
1081                                                  List<ByteBuffer> rows,
1082         List<ByteBuffer> columns, long timestamp,
1083         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1084       
1085       Table table= null;
1086       try {
1087         List<Get> gets = new ArrayList<Get>(rows.size());
1088         table = getTable(tableName);
1089         if (metrics != null) {
1090           metrics.incNumRowKeysInBatchGet(rows.size());
1091         }
1092         for (ByteBuffer row : rows) {
1093           Get get = new Get(getBytes(row));
1094           addAttributes(get, attributes);
1095           if (columns != null) {
1096 
1097             for(ByteBuffer column : columns) {
1098               byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
1099               if (famAndQf.length == 1) {
1100                 get.addFamily(famAndQf[0]);
1101               } else {
1102                 get.addColumn(famAndQf[0], famAndQf[1]);
1103               }
1104             }
1105           }
1106           get.setTimeRange(0, timestamp);
1107           gets.add(get);
1108         }
1109         Result[] result = table.get(gets);
1110         return ThriftUtilities.rowResultFromHBase(result);
1111       } catch (IOException e) {
1112         LOG.warn(e.getMessage(), e);
1113         throw new IOError(Throwables.getStackTraceAsString(e));
1114       } finally{
1115         closeTable(table);
1116       }
1117     }
1118 
1119     @Override
1120     public void deleteAll(
1121         ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
1122         Map<ByteBuffer, ByteBuffer> attributes)
1123         throws IOError {
1124       deleteAllTs(tableName, row, column, HConstants.LATEST_TIMESTAMP,
1125                   attributes);
1126     }
1127 
1128     @Override
1129     public void deleteAllTs(ByteBuffer tableName,
1130                             ByteBuffer row,
1131                             ByteBuffer column,
1132         long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1133       Table table = null;
1134       try {
1135         table = getTable(tableName);
1136         Delete delete  = new Delete(getBytes(row));
1137         addAttributes(delete, attributes);
1138         byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
1139         if (famAndQf.length == 1) {
1140           delete.addFamily(famAndQf[0], timestamp);
1141         } else {
1142           delete.addColumns(famAndQf[0], famAndQf[1], timestamp);
1143         }
1144         table.delete(delete);
1145 
1146       } catch (IOException e) {
1147         LOG.warn(e.getMessage(), e);
1148         throw new IOError(Throwables.getStackTraceAsString(e));
1149       } finally {
1150         closeTable(table);
1151       }
1152     }
1153 
1154     @Override
1155     public void deleteAllRow(
1156         ByteBuffer tableName, ByteBuffer row,
1157         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1158       deleteAllRowTs(tableName, row, HConstants.LATEST_TIMESTAMP, attributes);
1159     }
1160 
1161     @Override
1162     public void deleteAllRowTs(
1163         ByteBuffer tableName, ByteBuffer row, long timestamp,
1164         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1165       Table table = null;
1166       try {
1167         table = getTable(tableName);
1168         Delete delete  = new Delete(getBytes(row), timestamp);
1169         addAttributes(delete, attributes);
1170         table.delete(delete);
1171       } catch (IOException e) {
1172         LOG.warn(e.getMessage(), e);
1173         throw new IOError(Throwables.getStackTraceAsString(e));
1174       } finally {
1175         closeTable(table);
1176       }
1177     }
1178 
1179     @Override
1180     public void createTable(ByteBuffer in_tableName,
1181         List<ColumnDescriptor> columnFamilies) throws IOError,
1182         IllegalArgument, AlreadyExists {
1183       TableName tableName = getTableName(in_tableName);
1184       try {
1185         if (getAdmin().tableExists(tableName)) {
1186           throw new AlreadyExists("table name already in use");
1187         }
1188         HTableDescriptor desc = new HTableDescriptor(tableName);
1189         for (ColumnDescriptor col : columnFamilies) {
1190           HColumnDescriptor colDesc = ThriftUtilities.colDescFromThrift(col);
1191           desc.addFamily(colDesc);
1192         }
1193         getAdmin().createTable(desc);
1194       } catch (IOException e) {
1195         LOG.warn(e.getMessage(), e);
1196         throw new IOError(Throwables.getStackTraceAsString(e));
1197       } catch (IllegalArgumentException e) {
1198         LOG.warn(e.getMessage(), e);
1199         throw new IllegalArgument(Throwables.getStackTraceAsString(e));
1200       }
1201     }
1202 
1203     private static TableName getTableName(ByteBuffer buffer) {
1204       return TableName.valueOf(getBytes(buffer));
1205     }
1206 
1207     @Override
1208     public void deleteTable(ByteBuffer in_tableName) throws IOError {
1209       TableName tableName = getTableName(in_tableName);
1210       if (LOG.isDebugEnabled()) {
1211         LOG.debug("deleteTable: table=" + tableName);
1212       }
1213       try {
1214         if (!getAdmin().tableExists(tableName)) {
1215           throw new IOException("table does not exist");
1216         }
1217         getAdmin().deleteTable(tableName);
1218       } catch (IOException e) {
1219         LOG.warn(e.getMessage(), e);
1220         throw new IOError(Throwables.getStackTraceAsString(e));
1221       }
1222     }
1223 
1224     @Override
1225     public void mutateRow(ByteBuffer tableName, ByteBuffer row,
1226         List<Mutation> mutations, Map<ByteBuffer, ByteBuffer> attributes)
1227         throws IOError, IllegalArgument {
1228       mutateRowTs(tableName, row, mutations, HConstants.LATEST_TIMESTAMP,
1229                   attributes);
1230     }
1231 
1232     @Override
1233     public void mutateRowTs(ByteBuffer tableName, ByteBuffer row,
1234         List<Mutation> mutations, long timestamp,
1235         Map<ByteBuffer, ByteBuffer> attributes)
1236         throws IOError, IllegalArgument {
1237       Table table = null;
1238       try {
1239         table = getTable(tableName);
1240         Put put = new Put(getBytes(row), timestamp);
1241         addAttributes(put, attributes);
1242 
1243         Delete delete = new Delete(getBytes(row));
1244         addAttributes(delete, attributes);
1245         if (metrics != null) {
1246           metrics.incNumRowKeysInBatchMutate(mutations.size());
1247         }
1248 
1249         // I apologize for all this mess :)
1250         for (Mutation m : mutations) {
1251           byte[][] famAndQf = KeyValue.parseColumn(getBytes(m.column));
1252           if (m.isDelete) {
1253             if (famAndQf.length == 1) {
1254               delete.addFamily(famAndQf[0], timestamp);
1255             } else {
1256               delete.addColumns(famAndQf[0], famAndQf[1], timestamp);
1257             }
1258             delete.setDurability(m.writeToWAL ? Durability.SYNC_WAL
1259                 : Durability.SKIP_WAL);
1260           } else {
1261             if(famAndQf.length == 1) {
1262               LOG.warn("No column qualifier specified. Delete is the only mutation supported "
1263                   + "over the whole column family.");
1264             } else {
1265               put.addImmutable(famAndQf[0], famAndQf[1],
1266                   m.value != null ? getBytes(m.value)
1267                       : HConstants.EMPTY_BYTE_ARRAY);
1268             }
1269             put.setDurability(m.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
1270           }
1271         }
1272         if (!delete.isEmpty())
1273           table.delete(delete);
1274         if (!put.isEmpty())
1275           table.put(put);
1276       } catch (IOException e) {
1277         LOG.warn(e.getMessage(), e);
1278         throw new IOError(Throwables.getStackTraceAsString(e));
1279       } catch (IllegalArgumentException e) {
1280         LOG.warn(e.getMessage(), e);
1281         throw new IllegalArgument(Throwables.getStackTraceAsString(e));
1282       } finally{
1283         closeTable(table);
1284       }
1285     }
1286 
1287     @Override
1288     public void mutateRows(ByteBuffer tableName, List<BatchMutation> rowBatches,
1289         Map<ByteBuffer, ByteBuffer> attributes)
1290         throws IOError, IllegalArgument, TException {
1291       mutateRowsTs(tableName, rowBatches, HConstants.LATEST_TIMESTAMP, attributes);
1292     }
1293 
1294     @Override
1295     public void mutateRowsTs(
1296         ByteBuffer tableName, List<BatchMutation> rowBatches, long timestamp,
1297         Map<ByteBuffer, ByteBuffer> attributes)
1298         throws IOError, IllegalArgument, TException {
1299       List<Put> puts = new ArrayList<Put>();
1300       List<Delete> deletes = new ArrayList<Delete>();
1301 
1302       for (BatchMutation batch : rowBatches) {
1303         byte[] row = getBytes(batch.row);
1304         List<Mutation> mutations = batch.mutations;
1305         Delete delete = new Delete(row);
1306         addAttributes(delete, attributes);
1307         Put put = new Put(row, timestamp);
1308         addAttributes(put, attributes);
1309         for (Mutation m : mutations) {
1310           byte[][] famAndQf = KeyValue.parseColumn(getBytes(m.column));
1311           if (m.isDelete) {
1312             // no qualifier, family only.
1313             if (famAndQf.length == 1) {
1314               delete.addFamily(famAndQf[0], timestamp);
1315             } else {
1316               delete.addColumns(famAndQf[0], famAndQf[1], timestamp);
1317             }
1318             delete.setDurability(m.writeToWAL ? Durability.SYNC_WAL
1319                 : Durability.SKIP_WAL);
1320           } else {
1321             if (famAndQf.length == 1) {
1322               LOG.warn("No column qualifier specified. Delete is the only mutation supported "
1323                   + "over the whole column family.");
1324             }
1325             if (famAndQf.length == 2) {
1326               put.addImmutable(famAndQf[0], famAndQf[1],
1327                   m.value != null ? getBytes(m.value)
1328                       : HConstants.EMPTY_BYTE_ARRAY);
1329             } else {
1330               throw new IllegalArgumentException("Invalid famAndQf provided.");
1331             }
1332             put.setDurability(m.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
1333           }
1334         }
1335         if (!delete.isEmpty())
1336           deletes.add(delete);
1337         if (!put.isEmpty())
1338           puts.add(put);
1339       }
1340 
1341       Table table = null;
1342       try {
1343         table = getTable(tableName);
1344         if (!puts.isEmpty())
1345           table.put(puts);
1346         if (!deletes.isEmpty())
1347           table.delete(deletes);
1348 
1349       } catch (IOException e) {
1350         LOG.warn(e.getMessage(), e);
1351         throw new IOError(Throwables.getStackTraceAsString(e));
1352       } catch (IllegalArgumentException e) {
1353         LOG.warn(e.getMessage(), e);
1354         throw new IllegalArgument(Throwables.getStackTraceAsString(e));
1355       } finally{
1356         closeTable(table);
1357       }
1358     }
1359 
1360     @Override
1361     public long atomicIncrement(
1362         ByteBuffer tableName, ByteBuffer row, ByteBuffer column, long amount)
1363             throws IOError, IllegalArgument, TException {
1364       byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
1365       if(famAndQf.length == 1) {
1366         return atomicIncrement(tableName, row, famAndQf[0], HConstants.EMPTY_BYTE_ARRAY, amount);
1367       }
1368       return atomicIncrement(tableName, row, famAndQf[0], famAndQf[1], amount);
1369     }
1370 
1371     protected long atomicIncrement(ByteBuffer tableName, ByteBuffer row,
1372         byte [] family, byte [] qualifier, long amount)
1373         throws IOError, IllegalArgument, TException {
1374       Table table = null;
1375       try {
1376         table = getTable(tableName);
1377         return table.incrementColumnValue(
1378             getBytes(row), family, qualifier, amount);
1379       } catch (IOException e) {
1380         LOG.warn(e.getMessage(), e);
1381         throw new IOError(Throwables.getStackTraceAsString(e));
1382       } finally {
1383         closeTable(table);
1384       }
1385     }
1386 
1387     @Override
1388     public void scannerClose(int id) throws IOError, IllegalArgument {
1389       LOG.debug("scannerClose: id=" + id);
1390       ResultScannerWrapper resultScannerWrapper = getScanner(id);
1391       if (resultScannerWrapper == null) {
1392         String message = "scanner ID is invalid";
1393         LOG.warn(message);
1394         throw new IllegalArgument("scanner ID is invalid");
1395       }
1396       resultScannerWrapper.getScanner().close();
1397       removeScanner(id);
1398     }
1399 
1400     @Override
1401     public List<TRowResult> scannerGetList(int id,int nbRows)
1402         throws IllegalArgument, IOError {
1403       LOG.debug("scannerGetList: id=" + id);
1404       ResultScannerWrapper resultScannerWrapper = getScanner(id);
1405       if (null == resultScannerWrapper) {
1406         String message = "scanner ID is invalid";
1407         LOG.warn(message);
1408         throw new IllegalArgument("scanner ID is invalid");
1409       }
1410 
1411       Result [] results = null;
1412       try {
1413         results = resultScannerWrapper.getScanner().next(nbRows);
1414         if (null == results) {
1415           return new ArrayList<TRowResult>();
1416         }
1417       } catch (IOException e) {
1418         LOG.warn(e.getMessage(), e);
1419         throw new IOError(Throwables.getStackTraceAsString(e));
1420       }
1421       return ThriftUtilities.rowResultFromHBase(results, resultScannerWrapper.isColumnSorted());
1422     }
1423 
1424     @Override
1425     public List<TRowResult> scannerGet(int id) throws IllegalArgument, IOError {
1426       return scannerGetList(id,1);
1427     }
1428 
1429     @Override
1430     public int scannerOpenWithScan(ByteBuffer tableName, TScan tScan,
1431         Map<ByteBuffer, ByteBuffer> attributes)
1432         throws IOError {
1433       
1434       Table table = null;
1435       try {
1436         table = getTable(tableName);
1437         Scan scan = new Scan();
1438         addAttributes(scan, attributes);
1439         if (tScan.isSetStartRow()) {
1440           scan.setStartRow(tScan.getStartRow());
1441         }
1442         if (tScan.isSetStopRow()) {
1443           scan.setStopRow(tScan.getStopRow());
1444         }
1445         if (tScan.isSetTimestamp()) {
1446           scan.setTimeRange(0, tScan.getTimestamp());
1447         }
1448         if (tScan.isSetCaching()) {
1449           scan.setCaching(tScan.getCaching());
1450         }
1451         if (tScan.isSetBatchSize()) {
1452           scan.setBatch(tScan.getBatchSize());
1453         }
1454         if (tScan.isSetColumns() && tScan.getColumns().size() != 0) {
1455           for(ByteBuffer column : tScan.getColumns()) {
1456             byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1457             if(famQf.length == 1) {
1458               scan.addFamily(famQf[0]);
1459             } else {
1460               scan.addColumn(famQf[0], famQf[1]);
1461             }
1462           }
1463         }
1464         if (tScan.isSetFilterString()) {
1465           ParseFilter parseFilter = new ParseFilter();
1466           scan.setFilter(
1467               parseFilter.parseFilterString(tScan.getFilterString()));
1468         }
1469         if (tScan.isSetReversed()) {
1470           scan.setReversed(tScan.isReversed());
1471         }
1472         return addScanner(table.getScanner(scan), tScan.sortColumns);
1473       } catch (IOException e) {
1474         LOG.warn(e.getMessage(), e);
1475         throw new IOError(Throwables.getStackTraceAsString(e));
1476       } finally{
1477         closeTable(table);
1478       }
1479     }
1480 
1481     @Override
1482     public int scannerOpen(ByteBuffer tableName, ByteBuffer startRow,
1483         List<ByteBuffer> columns,
1484         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1485       
1486       Table table = null;
1487       try {
1488         table = getTable(tableName);
1489         Scan scan = new Scan(getBytes(startRow));
1490         addAttributes(scan, attributes);
1491         if(columns != null && columns.size() != 0) {
1492           for(ByteBuffer column : columns) {
1493             byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1494             if(famQf.length == 1) {
1495               scan.addFamily(famQf[0]);
1496             } else {
1497               scan.addColumn(famQf[0], famQf[1]);
1498             }
1499           }
1500         }
1501         return addScanner(table.getScanner(scan), false);
1502       } catch (IOException e) {
1503         LOG.warn(e.getMessage(), e);
1504         throw new IOError(Throwables.getStackTraceAsString(e));
1505       } finally{
1506         closeTable(table);
1507       }
1508     }
1509 
1510     @Override
1511     public int scannerOpenWithStop(ByteBuffer tableName, ByteBuffer startRow,
1512         ByteBuffer stopRow, List<ByteBuffer> columns,
1513         Map<ByteBuffer, ByteBuffer> attributes)
1514         throws IOError, TException {
1515       
1516       Table table = null;
1517       try {
1518         table = getTable(tableName);
1519         Scan scan = new Scan(getBytes(startRow), getBytes(stopRow));
1520         addAttributes(scan, attributes);
1521         if(columns != null && columns.size() != 0) {
1522           for(ByteBuffer column : columns) {
1523             byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1524             if(famQf.length == 1) {
1525               scan.addFamily(famQf[0]);
1526             } else {
1527               scan.addColumn(famQf[0], famQf[1]);
1528             }
1529           }
1530         }
1531         return addScanner(table.getScanner(scan), false);
1532       } catch (IOException e) {
1533         LOG.warn(e.getMessage(), e);
1534         throw new IOError(Throwables.getStackTraceAsString(e));
1535       } finally{
1536         closeTable(table);
1537       }
1538     }
1539 
1540     @Override
1541     public int scannerOpenWithPrefix(ByteBuffer tableName,
1542                                      ByteBuffer startAndPrefix,
1543                                      List<ByteBuffer> columns,
1544         Map<ByteBuffer, ByteBuffer> attributes)
1545         throws IOError, TException {
1546       
1547       Table table = null;
1548       try {
1549         table = getTable(tableName);
1550         Scan scan = new Scan(getBytes(startAndPrefix));
1551         addAttributes(scan, attributes);
1552         Filter f = new WhileMatchFilter(
1553             new PrefixFilter(getBytes(startAndPrefix)));
1554         scan.setFilter(f);
1555         if (columns != null && columns.size() != 0) {
1556           for(ByteBuffer column : columns) {
1557             byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1558             if(famQf.length == 1) {
1559               scan.addFamily(famQf[0]);
1560             } else {
1561               scan.addColumn(famQf[0], famQf[1]);
1562             }
1563           }
1564         }
1565         return addScanner(table.getScanner(scan), false);
1566       } catch (IOException e) {
1567         LOG.warn(e.getMessage(), e);
1568         throw new IOError(Throwables.getStackTraceAsString(e));
1569       } finally{
1570         closeTable(table);
1571       }
1572     }
1573 
1574     @Override
1575     public int scannerOpenTs(ByteBuffer tableName, ByteBuffer startRow,
1576         List<ByteBuffer> columns, long timestamp,
1577         Map<ByteBuffer, ByteBuffer> attributes) throws IOError, TException {
1578       
1579       Table table = null;
1580       try {
1581         table = getTable(tableName);
1582         Scan scan = new Scan(getBytes(startRow));
1583         addAttributes(scan, attributes);
1584         scan.setTimeRange(0, timestamp);
1585         if (columns != null && columns.size() != 0) {
1586           for (ByteBuffer column : columns) {
1587             byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1588             if(famQf.length == 1) {
1589               scan.addFamily(famQf[0]);
1590             } else {
1591               scan.addColumn(famQf[0], famQf[1]);
1592             }
1593           }
1594         }
1595         return addScanner(table.getScanner(scan), false);
1596       } catch (IOException e) {
1597         LOG.warn(e.getMessage(), e);
1598         throw new IOError(Throwables.getStackTraceAsString(e));
1599       } finally{
1600         closeTable(table);
1601       }
1602     }
1603 
1604     @Override
1605     public int scannerOpenWithStopTs(ByteBuffer tableName, ByteBuffer startRow,
1606         ByteBuffer stopRow, List<ByteBuffer> columns, long timestamp,
1607         Map<ByteBuffer, ByteBuffer> attributes)
1608         throws IOError, TException {
1609       
1610       Table table = null;
1611       try {
1612         table = getTable(tableName);
1613         Scan scan = new Scan(getBytes(startRow), getBytes(stopRow));
1614         addAttributes(scan, attributes);
1615         scan.setTimeRange(0, timestamp);
1616         if (columns != null && columns.size() != 0) {
1617           for (ByteBuffer column : columns) {
1618             byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1619             if(famQf.length == 1) {
1620               scan.addFamily(famQf[0]);
1621             } else {
1622               scan.addColumn(famQf[0], famQf[1]);
1623             }
1624           }
1625         }
1626         scan.setTimeRange(0, timestamp);
1627         return addScanner(table.getScanner(scan), false);
1628       } catch (IOException e) {
1629         LOG.warn(e.getMessage(), e);
1630         throw new IOError(Throwables.getStackTraceAsString(e));
1631       } finally{
1632         closeTable(table);
1633       }
1634     }
1635 
1636     @Override
1637     public Map<ByteBuffer, ColumnDescriptor> getColumnDescriptors(
1638         ByteBuffer tableName) throws IOError, TException {
1639       
1640       Table table = null;
1641       try {
1642         TreeMap<ByteBuffer, ColumnDescriptor> columns =
1643           new TreeMap<ByteBuffer, ColumnDescriptor>();
1644 
1645         table = getTable(tableName);
1646         HTableDescriptor desc = table.getTableDescriptor();
1647 
1648         for (HColumnDescriptor e : desc.getFamilies()) {
1649           ColumnDescriptor col = ThriftUtilities.colDescFromHbase(e);
1650           columns.put(col.name, col);
1651         }
1652         return columns;
1653       } catch (IOException e) {
1654         LOG.warn(e.getMessage(), e);
1655         throw new IOError(Throwables.getStackTraceAsString(e));
1656       } finally {
1657         closeTable(table);
1658       }
1659     }
1660     
1661     private void closeTable(Table table) throws IOError
1662     {
1663       try{
1664         if(table != null){
1665           table.close();
1666         }
1667       } catch (IOException e){
1668         LOG.error(e.getMessage(), e);
1669         throw new IOError(Throwables.getStackTraceAsString(e));
1670       }
1671     }
1672     
1673     @Override
1674     public TRegionInfo getRegionInfo(ByteBuffer searchRow) throws IOError {
1675       try {
1676         byte[] row = getBytes(searchRow);
1677         Result startRowResult = getReverseScanResult(TableName.META_TABLE_NAME.getName(), row,
1678           HConstants.CATALOG_FAMILY);
1679 
1680         if (startRowResult == null) {
1681           throw new IOException("Cannot find row in "+ TableName.META_TABLE_NAME+", row="
1682                                 + Bytes.toStringBinary(row));
1683         }
1684 
1685         // find region start and end keys
1686         HRegionInfo regionInfo = MetaTableAccessor.getHRegionInfo(startRowResult);
1687         if (regionInfo == null) {
1688           throw new IOException("HRegionInfo REGIONINFO was null or " +
1689                                 " empty in Meta for row="
1690                                 + Bytes.toStringBinary(row));
1691         }
1692         TRegionInfo region = new TRegionInfo();
1693         region.setStartKey(regionInfo.getStartKey());
1694         region.setEndKey(regionInfo.getEndKey());
1695         region.id = regionInfo.getRegionId();
1696         region.setName(regionInfo.getRegionName());
1697         region.version = HREGION_VERSION; // version not used anymore, PB encoding used.
1698 
1699         // find region assignment to server
1700         ServerName serverName = MetaTableAccessor.getServerName(startRowResult, 0);
1701         if (serverName != null) {
1702           region.setServerName(Bytes.toBytes(serverName.getHostname()));
1703           region.port = serverName.getPort();
1704         }
1705         return region;
1706       } catch (IOException e) {
1707         LOG.warn(e.getMessage(), e);
1708         throw new IOError(Throwables.getStackTraceAsString(e));
1709       }
1710     }
1711 
1712     private Result getReverseScanResult(byte[] tableName, byte[] row, byte[] family)
1713         throws IOException {
1714       Scan scan = new Scan(row);
1715       scan.setReversed(true);
1716       scan.addFamily(family);
1717       scan.setStartRow(row);
1718       Table table = getTable(tableName);      
1719       try (ResultScanner scanner = table.getScanner(scan)) {
1720         return scanner.next();
1721       } finally{
1722         if(table != null){
1723           table.close();
1724         }
1725       }
1726     }
1727 
1728     private void initMetrics(ThriftMetrics metrics) {
1729       this.metrics = metrics;
1730     }
1731 
1732     @Override
1733     public void increment(TIncrement tincrement) throws IOError, TException {
1734 
1735       if (tincrement.getRow().length == 0 || tincrement.getTable().length == 0) {
1736         throw new TException("Must supply a table and a row key; can't increment");
1737       }
1738 
1739       if (conf.getBoolean(COALESCE_INC_KEY, false)) {
1740         this.coalescer.queueIncrement(tincrement);
1741         return;
1742       }
1743 
1744       Table table = null;
1745       try {
1746         table = getTable(tincrement.getTable());
1747         Increment inc = ThriftUtilities.incrementFromThrift(tincrement);
1748         table.increment(inc);
1749       } catch (IOException e) {
1750         LOG.warn(e.getMessage(), e);
1751         throw new IOError(Throwables.getStackTraceAsString(e));
1752       } finally{
1753         closeTable(table);
1754       }
1755     }
1756 
1757     @Override
1758     public void incrementRows(List<TIncrement> tincrements) throws IOError, TException {
1759       if (conf.getBoolean(COALESCE_INC_KEY, false)) {
1760         this.coalescer.queueIncrements(tincrements);
1761         return;
1762       }
1763       for (TIncrement tinc : tincrements) {
1764         increment(tinc);
1765       }
1766     }
1767 
1768     @Override
1769     public List<TCell> append(TAppend tappend) throws IOError, TException {
1770       if (tappend.getRow().length == 0 || tappend.getTable().length == 0) {
1771         throw new TException("Must supply a table and a row key; can't append");
1772       }
1773 
1774       Table table = null;
1775       try {
1776         table = getTable(tappend.getTable());
1777         Append append = ThriftUtilities.appendFromThrift(tappend);
1778         Result result = table.append(append);
1779         return ThriftUtilities.cellFromHBase(result.rawCells());
1780       } catch (IOException e) {
1781         LOG.warn(e.getMessage(), e);
1782         throw new IOError(Throwables.getStackTraceAsString(e));
1783       } finally{
1784           closeTable(table);
1785       }
1786     }
1787 
1788     @Override
1789     public boolean checkAndPut(ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
1790         ByteBuffer value, Mutation mput, Map<ByteBuffer, ByteBuffer> attributes) throws IOError,
1791         IllegalArgument, TException {
1792       Put put;
1793       try {
1794         put = new Put(getBytes(row), HConstants.LATEST_TIMESTAMP);
1795         addAttributes(put, attributes);
1796 
1797         byte[][] famAndQf = KeyValue.parseColumn(getBytes(mput.column));
1798 
1799         put.addImmutable(famAndQf[0], famAndQf[1], mput.value != null ? getBytes(mput.value)
1800             : HConstants.EMPTY_BYTE_ARRAY);
1801 
1802         put.setDurability(mput.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
1803       } catch (IllegalArgumentException e) {
1804         LOG.warn(e.getMessage(), e);
1805         throw new IllegalArgument(Throwables.getStackTraceAsString(e));
1806       }
1807 
1808       Table table = null;
1809       try {
1810         table = getTable(tableName);
1811         byte[][] famAndQf = KeyValue.parseColumn(getBytes(column));
1812         return table.checkAndPut(getBytes(row), famAndQf[0], famAndQf[1],
1813           value != null ? getBytes(value) : HConstants.EMPTY_BYTE_ARRAY, put);
1814       } catch (IOException e) {
1815         LOG.warn(e.getMessage(), e);
1816         throw new IOError(Throwables.getStackTraceAsString(e));
1817       } catch (IllegalArgumentException e) {
1818         LOG.warn(e.getMessage(), e);
1819         throw new IllegalArgument(Throwables.getStackTraceAsString(e));
1820       } finally {
1821           closeTable(table);
1822       }
1823     }
1824   }
1825 
1826 
1827 
1828   /**
1829    * Adds all the attributes into the Operation object
1830    */
1831   private static void addAttributes(OperationWithAttributes op,
1832     Map<ByteBuffer, ByteBuffer> attributes) {
1833     if (attributes == null || attributes.size() == 0) {
1834       return;
1835     }
1836     for (Map.Entry<ByteBuffer, ByteBuffer> entry : attributes.entrySet()) {
1837       String name = Bytes.toStringBinary(getBytes(entry.getKey()));
1838       byte[] value =  getBytes(entry.getValue());
1839       op.setAttribute(name, value);
1840     }
1841   }
1842 
1843   public static void registerFilters(Configuration conf) {
1844     String[] filters = conf.getStrings("hbase.thrift.filters");
1845     if(filters != null) {
1846       for(String filterClass: filters) {
1847         String[] filterPart = filterClass.split(":");
1848         if(filterPart.length != 2) {
1849           LOG.warn("Invalid filter specification " + filterClass + " - skipping");
1850         } else {
1851           ParseFilter.registerFilter(filterPart[0], filterPart[1]);
1852         }
1853       }
1854     }
1855   }
1856 }