1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.thrift;
20
21 import static org.apache.hadoop.hbase.util.Bytes.getBytes;
22
23 import java.io.IOException;
24 import java.net.InetAddress;
25 import java.net.InetSocketAddress;
26 import java.net.UnknownHostException;
27 import java.nio.ByteBuffer;
28 import java.security.PrivilegedAction;
29 import java.util.ArrayList;
30 import java.util.Arrays;
31 import java.util.Collections;
32 import java.util.HashMap;
33 import java.util.List;
34 import java.util.Map;
35 import java.util.TreeMap;
36 import java.util.concurrent.BlockingQueue;
37 import java.util.concurrent.ExecutorService;
38 import java.util.concurrent.LinkedBlockingQueue;
39 import java.util.concurrent.ThreadPoolExecutor;
40 import java.util.concurrent.TimeUnit;
41
42 import javax.security.auth.callback.Callback;
43 import javax.security.auth.callback.UnsupportedCallbackException;
44 import javax.security.sasl.AuthorizeCallback;
45 import javax.security.sasl.Sasl;
46 import javax.security.sasl.SaslServer;
47
48 import org.apache.commons.cli.CommandLine;
49 import org.apache.commons.cli.Option;
50 import org.apache.commons.cli.OptionGroup;
51 import org.apache.commons.logging.Log;
52 import org.apache.commons.logging.LogFactory;
53 import org.apache.hadoop.conf.Configuration;
54 import org.apache.hadoop.hbase.HBaseConfiguration;
55 import org.apache.hadoop.hbase.HColumnDescriptor;
56 import org.apache.hadoop.hbase.HConstants;
57 import org.apache.hadoop.hbase.HRegionInfo;
58 import org.apache.hadoop.hbase.HRegionLocation;
59 import org.apache.hadoop.hbase.HTableDescriptor;
60 import org.apache.hadoop.hbase.KeyValue;
61 import org.apache.hadoop.hbase.MetaTableAccessor;
62 import org.apache.hadoop.hbase.ServerName;
63 import org.apache.hadoop.hbase.TableName;
64 import org.apache.hadoop.hbase.TableNotFoundException;
65 import org.apache.hadoop.hbase.classification.InterfaceAudience;
66 import org.apache.hadoop.hbase.client.Admin;
67 import org.apache.hadoop.hbase.client.Append;
68 import org.apache.hadoop.hbase.client.Delete;
69 import org.apache.hadoop.hbase.client.Durability;
70 import org.apache.hadoop.hbase.client.Get;
71 import org.apache.hadoop.hbase.client.HBaseAdmin;
72 import org.apache.hadoop.hbase.client.Increment;
73 import org.apache.hadoop.hbase.client.OperationWithAttributes;
74 import org.apache.hadoop.hbase.client.Put;
75 import org.apache.hadoop.hbase.client.RegionLocator;
76 import org.apache.hadoop.hbase.client.Result;
77 import org.apache.hadoop.hbase.client.ResultScanner;
78 import org.apache.hadoop.hbase.client.Scan;
79 import org.apache.hadoop.hbase.client.Table;
80 import org.apache.hadoop.hbase.filter.Filter;
81 import org.apache.hadoop.hbase.filter.ParseFilter;
82 import org.apache.hadoop.hbase.filter.PrefixFilter;
83 import org.apache.hadoop.hbase.filter.WhileMatchFilter;
84 import org.apache.hadoop.hbase.security.SecurityUtil;
85 import org.apache.hadoop.hbase.security.UserProvider;
86 import org.apache.hadoop.hbase.thrift.CallQueue.Call;
87 import org.apache.hadoop.hbase.thrift.generated.AlreadyExists;
88 import org.apache.hadoop.hbase.thrift.generated.BatchMutation;
89 import org.apache.hadoop.hbase.thrift.generated.ColumnDescriptor;
90 import org.apache.hadoop.hbase.thrift.generated.Hbase;
91 import org.apache.hadoop.hbase.thrift.generated.IOError;
92 import org.apache.hadoop.hbase.thrift.generated.IllegalArgument;
93 import org.apache.hadoop.hbase.thrift.generated.Mutation;
94 import org.apache.hadoop.hbase.thrift.generated.TAppend;
95 import org.apache.hadoop.hbase.thrift.generated.TCell;
96 import org.apache.hadoop.hbase.thrift.generated.TIncrement;
97 import org.apache.hadoop.hbase.thrift.generated.TRegionInfo;
98 import org.apache.hadoop.hbase.thrift.generated.TRowResult;
99 import org.apache.hadoop.hbase.thrift.generated.TScan;
100 import org.apache.hadoop.hbase.util.Bytes;
101 import org.apache.hadoop.hbase.util.ConnectionCache;
102 import org.apache.hadoop.hbase.util.DNS;
103 import org.apache.hadoop.hbase.util.Strings;
104 import org.apache.hadoop.security.SaslRpcServer.SaslGssCallbackHandler;
105 import org.apache.hadoop.security.UserGroupInformation;
106 import org.apache.hadoop.security.authorize.ProxyUsers;
107 import org.apache.thrift.TException;
108 import org.apache.thrift.TProcessor;
109 import org.apache.thrift.protocol.TBinaryProtocol;
110 import org.apache.thrift.protocol.TCompactProtocol;
111 import org.apache.thrift.protocol.TProtocol;
112 import org.apache.thrift.protocol.TProtocolFactory;
113 import org.apache.thrift.server.THsHaServer;
114 import org.apache.thrift.server.TNonblockingServer;
115 import org.apache.thrift.server.TServer;
116 import org.apache.thrift.server.TServlet;
117 import org.apache.thrift.server.TThreadedSelectorServer;
118 import org.apache.thrift.transport.TFramedTransport;
119 import org.apache.thrift.transport.TNonblockingServerSocket;
120 import org.apache.thrift.transport.TNonblockingServerTransport;
121 import org.apache.thrift.transport.TSaslServerTransport;
122 import org.apache.thrift.transport.TServerSocket;
123 import org.apache.thrift.transport.TServerTransport;
124 import org.apache.thrift.transport.TTransportFactory;
125 import org.mortbay.jetty.Connector;
126 import org.mortbay.jetty.Server;
127 import org.mortbay.jetty.nio.SelectChannelConnector;
128 import org.mortbay.jetty.security.SslSelectChannelConnector;
129 import org.mortbay.jetty.servlet.Context;
130 import org.mortbay.jetty.servlet.ServletHolder;
131 import org.mortbay.thread.QueuedThreadPool;
132
133 import com.google.common.base.Joiner;
134 import com.google.common.base.Throwables;
135 import com.google.common.util.concurrent.ThreadFactoryBuilder;
136
137
138
139
140
141 @InterfaceAudience.Private
142 public class ThriftServerRunner implements Runnable {
143
144 private static final Log LOG = LogFactory.getLog(ThriftServerRunner.class);
145
146 static final String SERVER_TYPE_CONF_KEY =
147 "hbase.regionserver.thrift.server.type";
148
149 static final String BIND_CONF_KEY = "hbase.regionserver.thrift.ipaddress";
150 static final String COMPACT_CONF_KEY = "hbase.regionserver.thrift.compact";
151 static final String FRAMED_CONF_KEY = "hbase.regionserver.thrift.framed";
152 static final String MAX_FRAME_SIZE_CONF_KEY = "hbase.regionserver.thrift.framed.max_frame_size_in_mb";
153 static final String PORT_CONF_KEY = "hbase.regionserver.thrift.port";
154 static final String COALESCE_INC_KEY = "hbase.regionserver.thrift.coalesceIncrement";
155 static final String USE_HTTP_CONF_KEY = "hbase.regionserver.thrift.http";
156 static final String HTTP_MIN_THREADS = "hbase.thrift.http_threads.min";
157 static final String HTTP_MAX_THREADS = "hbase.thrift.http_threads.max";
158
159 static final String THRIFT_SSL_ENABLED = "hbase.thrift.ssl.enabled";
160 static final String THRIFT_SSL_KEYSTORE_STORE = "hbase.thrift.ssl.keystore.store";
161 static final String THRIFT_SSL_KEYSTORE_PASSWORD = "hbase.thrift.ssl.keystore.password";
162 static final String THRIFT_SSL_KEYSTORE_KEYPASSWORD = "hbase.thrift.ssl.keystore.keypassword";
163
164
165
166
167
168
169
170
171
172
173
174 static final String THRIFT_QOP_KEY = "hbase.thrift.security.qop";
175 static final String BACKLOG_CONF_KEY = "hbase.regionserver.thrift.backlog";
176
177 private static final String DEFAULT_BIND_ADDR = "0.0.0.0";
178 public static final int DEFAULT_LISTEN_PORT = 9090;
179 public static final int HREGION_VERSION = 1;
180 static final String THRIFT_SUPPORT_PROXYUSER = "hbase.thrift.support.proxyuser";
181 private final int listenPort;
182
183 private Configuration conf;
184 volatile TServer tserver;
185 volatile Server httpServer;
186 private final Hbase.Iface handler;
187 private final ThriftMetrics metrics;
188 private final HBaseHandler hbaseHandler;
189 private final UserGroupInformation realUser;
190
191 private final String qop;
192 private String host;
193
194 private final boolean securityEnabled;
195 private final boolean doAsEnabled;
196
197
198 enum ImplType {
199 HS_HA("hsha", true, THsHaServer.class, true),
200 NONBLOCKING("nonblocking", true, TNonblockingServer.class, true),
201 THREAD_POOL("threadpool", false, TBoundedThreadPoolServer.class, true),
202 THREADED_SELECTOR(
203 "threadedselector", true, TThreadedSelectorServer.class, true);
204
205 public static final ImplType DEFAULT = THREAD_POOL;
206
207 final String option;
208 final boolean isAlwaysFramed;
209 final Class<? extends TServer> serverClass;
210 final boolean canSpecifyBindIP;
211
212 ImplType(String option, boolean isAlwaysFramed,
213 Class<? extends TServer> serverClass, boolean canSpecifyBindIP) {
214 this.option = option;
215 this.isAlwaysFramed = isAlwaysFramed;
216 this.serverClass = serverClass;
217 this.canSpecifyBindIP = canSpecifyBindIP;
218 }
219
220
221
222
223
224 @Override
225 public String toString() {
226 return "-" + option;
227 }
228
229 String getDescription() {
230 StringBuilder sb = new StringBuilder("Use the " +
231 serverClass.getSimpleName());
232 if (isAlwaysFramed) {
233 sb.append(" This implies the framed transport.");
234 }
235 if (this == DEFAULT) {
236 sb.append("This is the default.");
237 }
238 return sb.toString();
239 }
240
241 static OptionGroup createOptionGroup() {
242 OptionGroup group = new OptionGroup();
243 for (ImplType t : values()) {
244 group.addOption(new Option(t.option, t.getDescription()));
245 }
246 return group;
247 }
248
249 static ImplType getServerImpl(Configuration conf) {
250 String confType = conf.get(SERVER_TYPE_CONF_KEY, THREAD_POOL.option);
251 for (ImplType t : values()) {
252 if (confType.equals(t.option)) {
253 return t;
254 }
255 }
256 throw new AssertionError("Unknown server ImplType.option:" + confType);
257 }
258
259 static void setServerImpl(CommandLine cmd, Configuration conf) {
260 ImplType chosenType = null;
261 int numChosen = 0;
262 for (ImplType t : values()) {
263 if (cmd.hasOption(t.option)) {
264 chosenType = t;
265 ++numChosen;
266 }
267 }
268 if (numChosen < 1) {
269 LOG.info("Using default thrift server type");
270 chosenType = DEFAULT;
271 } else if (numChosen > 1) {
272 throw new AssertionError("Exactly one option out of " +
273 Arrays.toString(values()) + " has to be specified");
274 }
275 LOG.info("Using thrift server type " + chosenType.option);
276 conf.set(SERVER_TYPE_CONF_KEY, chosenType.option);
277 }
278
279 public String simpleClassName() {
280 return serverClass.getSimpleName();
281 }
282
283 public static List<String> serversThatCannotSpecifyBindIP() {
284 List<String> l = new ArrayList<String>();
285 for (ImplType t : values()) {
286 if (!t.canSpecifyBindIP) {
287 l.add(t.simpleClassName());
288 }
289 }
290 return l;
291 }
292
293 }
294
295 public ThriftServerRunner(Configuration conf) throws IOException {
296 UserProvider userProvider = UserProvider.instantiate(conf);
297
298 securityEnabled = userProvider.isHadoopSecurityEnabled()
299 && userProvider.isHBaseSecurityEnabled();
300 if (securityEnabled) {
301 host = Strings.domainNamePointerToHostName(DNS.getDefaultHost(
302 conf.get("hbase.thrift.dns.interface", "default"),
303 conf.get("hbase.thrift.dns.nameserver", "default")));
304 userProvider.login("hbase.thrift.keytab.file",
305 "hbase.thrift.kerberos.principal", host);
306 }
307 this.conf = HBaseConfiguration.create(conf);
308 this.listenPort = conf.getInt(PORT_CONF_KEY, DEFAULT_LISTEN_PORT);
309 this.metrics = new ThriftMetrics(conf, ThriftMetrics.ThriftServerType.ONE);
310 this.hbaseHandler = new HBaseHandler(conf, userProvider);
311 this.hbaseHandler.initMetrics(metrics);
312 this.handler = HbaseHandlerMetricsProxy.newInstance(
313 hbaseHandler, metrics, conf);
314 this.realUser = userProvider.getCurrent().getUGI();
315 qop = conf.get(THRIFT_QOP_KEY);
316 doAsEnabled = conf.getBoolean(THRIFT_SUPPORT_PROXYUSER, false);
317 if (qop != null) {
318 if (!qop.equals("auth") && !qop.equals("auth-int")
319 && !qop.equals("auth-conf")) {
320 throw new IOException("Invalid " + THRIFT_QOP_KEY + ": " + qop
321 + ", it must be 'auth', 'auth-int', or 'auth-conf'");
322 }
323 if (!securityEnabled) {
324 throw new IOException("Thrift server must"
325 + " run in secure mode to support authentication");
326 }
327 }
328 }
329
330
331
332
333 @Override
334 public void run() {
335 realUser.doAs(new PrivilegedAction<Object>() {
336 @Override
337 public Object run() {
338 try {
339 if (conf.getBoolean(USE_HTTP_CONF_KEY, false)) {
340 setupHTTPServer();
341 httpServer.start();
342 httpServer.join();
343 } else {
344 setupServer();
345 tserver.serve();
346 }
347 } catch (Exception e) {
348 LOG.fatal("Cannot run ThriftServer", e);
349
350 System.exit(-1);
351 }
352 return null;
353 }
354 });
355
356 }
357
358 public void shutdown() {
359 if (tserver != null) {
360 tserver.stop();
361 tserver = null;
362 }
363 if (httpServer != null) {
364 try {
365 httpServer.stop();
366 httpServer = null;
367 } catch (Exception e) {
368 LOG.error("Problem encountered in shutting down HTTP server " + e.getCause());
369 }
370 httpServer = null;
371 }
372 }
373
374 private void setupHTTPServer() throws IOException {
375 TProtocolFactory protocolFactory = new TBinaryProtocol.Factory();
376 TProcessor processor = new Hbase.Processor<Hbase.Iface>(handler);
377 TServlet thriftHttpServlet = new ThriftHttpServlet(processor, protocolFactory, realUser,
378 conf, hbaseHandler, securityEnabled, doAsEnabled);
379
380 httpServer = new Server();
381
382 Context context = new Context(httpServer, "/", Context.SESSIONS);
383 context.setContextPath("/");
384 String httpPath = "/*";
385 httpServer.setHandler(context);
386 context.addServlet(new ServletHolder(thriftHttpServlet), httpPath);
387
388
389 Connector connector = new SelectChannelConnector();
390 if(conf.getBoolean(THRIFT_SSL_ENABLED, false)) {
391 SslSelectChannelConnector sslConnector = new SslSelectChannelConnector();
392 String keystore = conf.get(THRIFT_SSL_KEYSTORE_STORE);
393 String password = HBaseConfiguration.getPassword(conf,
394 THRIFT_SSL_KEYSTORE_PASSWORD, null);
395 String keyPassword = HBaseConfiguration.getPassword(conf,
396 THRIFT_SSL_KEYSTORE_KEYPASSWORD, password);
397 sslConnector.setKeystore(keystore);
398 sslConnector.setPassword(password);
399 sslConnector.setKeyPassword(keyPassword);
400 connector = sslConnector;
401 }
402 String host = getBindAddress(conf).getHostAddress();
403 connector.setPort(listenPort);
404 connector.setHost(host);
405 connector.setHeaderBufferSize(1024 * 64);
406 httpServer.addConnector(connector);
407
408 if (doAsEnabled) {
409 ProxyUsers.refreshSuperUserGroupsConfiguration(conf);
410 }
411
412
413
414
415
416
417 int minThreads = conf.getInt(HTTP_MIN_THREADS, 2);
418 int maxThreads = conf.getInt(HTTP_MAX_THREADS, 100);
419 QueuedThreadPool threadPool = new QueuedThreadPool(maxThreads);
420 threadPool.setMinThreads(minThreads);
421 httpServer.setThreadPool(threadPool);
422
423 httpServer.setSendServerVersion(false);
424 httpServer.setSendDateHeader(false);
425 httpServer.setStopAtShutdown(true);
426
427 LOG.info("Starting Thrift HTTP Server on " + Integer.toString(listenPort));
428 }
429
430
431
432
433 private void setupServer() throws Exception {
434
435 TProtocolFactory protocolFactory;
436 if (conf.getBoolean(COMPACT_CONF_KEY, false)) {
437 LOG.debug("Using compact protocol");
438 protocolFactory = new TCompactProtocol.Factory();
439 } else {
440 LOG.debug("Using binary protocol");
441 protocolFactory = new TBinaryProtocol.Factory();
442 }
443
444 final TProcessor p = new Hbase.Processor<Hbase.Iface>(handler);
445 ImplType implType = ImplType.getServerImpl(conf);
446 TProcessor processor = p;
447
448
449 TTransportFactory transportFactory;
450 if (conf.getBoolean(FRAMED_CONF_KEY, false) || implType.isAlwaysFramed) {
451 if (qop != null) {
452 throw new RuntimeException("Thrift server authentication"
453 + " doesn't work with framed transport yet");
454 }
455 transportFactory = new TFramedTransport.Factory(
456 conf.getInt(MAX_FRAME_SIZE_CONF_KEY, 2) * 1024 * 1024);
457 LOG.debug("Using framed transport");
458 } else if (qop == null) {
459 transportFactory = new TTransportFactory();
460 } else {
461
462 String name = SecurityUtil.getUserFromPrincipal(
463 conf.get("hbase.thrift.kerberos.principal"));
464 Map<String, String> saslProperties = new HashMap<String, String>();
465 saslProperties.put(Sasl.QOP, qop);
466 TSaslServerTransport.Factory saslFactory = new TSaslServerTransport.Factory();
467 saslFactory.addServerDefinition("GSSAPI", name, host, saslProperties,
468 new SaslGssCallbackHandler() {
469 @Override
470 public void handle(Callback[] callbacks)
471 throws UnsupportedCallbackException {
472 AuthorizeCallback ac = null;
473 for (Callback callback : callbacks) {
474 if (callback instanceof AuthorizeCallback) {
475 ac = (AuthorizeCallback) callback;
476 } else {
477 throw new UnsupportedCallbackException(callback,
478 "Unrecognized SASL GSSAPI Callback");
479 }
480 }
481 if (ac != null) {
482 String authid = ac.getAuthenticationID();
483 String authzid = ac.getAuthorizationID();
484 if (!authid.equals(authzid)) {
485 ac.setAuthorized(false);
486 } else {
487 ac.setAuthorized(true);
488 String userName = SecurityUtil.getUserFromPrincipal(authzid);
489 LOG.info("Effective user: " + userName);
490 ac.setAuthorizedID(userName);
491 }
492 }
493 }
494 });
495 transportFactory = saslFactory;
496
497
498 processor = new TProcessor() {
499 @Override
500 public boolean process(TProtocol inProt,
501 TProtocol outProt) throws TException {
502 TSaslServerTransport saslServerTransport =
503 (TSaslServerTransport)inProt.getTransport();
504 SaslServer saslServer = saslServerTransport.getSaslServer();
505 String principal = saslServer.getAuthorizationID();
506 hbaseHandler.setEffectiveUser(principal);
507 return p.process(inProt, outProt);
508 }
509 };
510 }
511
512 if (conf.get(BIND_CONF_KEY) != null && !implType.canSpecifyBindIP) {
513 LOG.error("Server types " + Joiner.on(", ").join(
514 ImplType.serversThatCannotSpecifyBindIP()) + " don't support IP " +
515 "address binding at the moment. See " +
516 "https://issues.apache.org/jira/browse/HBASE-2155 for details.");
517 throw new RuntimeException(
518 "-" + BIND_CONF_KEY + " not supported with " + implType);
519 }
520
521
522 int backlog = conf.getInt(BACKLOG_CONF_KEY, 0);
523
524 if (implType == ImplType.HS_HA || implType == ImplType.NONBLOCKING ||
525 implType == ImplType.THREADED_SELECTOR) {
526
527 InetAddress listenAddress = getBindAddress(conf);
528 TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(
529 new InetSocketAddress(listenAddress, listenPort));
530
531 if (implType == ImplType.NONBLOCKING) {
532 TNonblockingServer.Args serverArgs =
533 new TNonblockingServer.Args(serverTransport);
534 serverArgs.processor(processor)
535 .transportFactory(transportFactory)
536 .protocolFactory(protocolFactory);
537 tserver = new TNonblockingServer(serverArgs);
538 } else if (implType == ImplType.HS_HA) {
539 THsHaServer.Args serverArgs = new THsHaServer.Args(serverTransport);
540 CallQueue callQueue =
541 new CallQueue(new LinkedBlockingQueue<Call>(), metrics);
542 ExecutorService executorService = createExecutor(
543 callQueue, serverArgs.getWorkerThreads());
544 serverArgs.executorService(executorService)
545 .processor(processor)
546 .transportFactory(transportFactory)
547 .protocolFactory(protocolFactory);
548 tserver = new THsHaServer(serverArgs);
549 } else {
550 TThreadedSelectorServer.Args serverArgs =
551 new HThreadedSelectorServerArgs(serverTransport, conf);
552 CallQueue callQueue =
553 new CallQueue(new LinkedBlockingQueue<Call>(), metrics);
554 ExecutorService executorService = createExecutor(
555 callQueue, serverArgs.getWorkerThreads());
556 serverArgs.executorService(executorService)
557 .processor(processor)
558 .transportFactory(transportFactory)
559 .protocolFactory(protocolFactory);
560 tserver = new TThreadedSelectorServer(serverArgs);
561 }
562 LOG.info("starting HBase " + implType.simpleClassName() +
563 " server on " + Integer.toString(listenPort));
564 } else if (implType == ImplType.THREAD_POOL) {
565
566 InetAddress listenAddress = getBindAddress(conf);
567
568 TServerTransport serverTransport = new TServerSocket(
569 new TServerSocket.ServerSocketTransportArgs().
570 bindAddr(new InetSocketAddress(listenAddress, listenPort)).backlog(backlog));
571
572 TBoundedThreadPoolServer.Args serverArgs =
573 new TBoundedThreadPoolServer.Args(serverTransport, conf);
574 serverArgs.processor(processor)
575 .transportFactory(transportFactory)
576 .protocolFactory(protocolFactory);
577 LOG.info("starting " + ImplType.THREAD_POOL.simpleClassName() + " on "
578 + listenAddress + ":" + Integer.toString(listenPort)
579 + "; " + serverArgs);
580 TBoundedThreadPoolServer tserver =
581 new TBoundedThreadPoolServer(serverArgs, metrics);
582 this.tserver = tserver;
583 } else {
584 throw new AssertionError("Unsupported Thrift server implementation: " +
585 implType.simpleClassName());
586 }
587
588
589 if (tserver.getClass() != implType.serverClass) {
590 throw new AssertionError("Expected to create Thrift server class " +
591 implType.serverClass.getName() + " but got " +
592 tserver.getClass().getName());
593 }
594
595
596
597 registerFilters(conf);
598 }
599
600 ExecutorService createExecutor(BlockingQueue<Runnable> callQueue,
601 int workerThreads) {
602 ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
603 tfb.setDaemon(true);
604 tfb.setNameFormat("thrift-worker-%d");
605 return new ThreadPoolExecutor(workerThreads, workerThreads,
606 Long.MAX_VALUE, TimeUnit.SECONDS, callQueue, tfb.build());
607 }
608
609 private InetAddress getBindAddress(Configuration conf)
610 throws UnknownHostException {
611 String bindAddressStr = conf.get(BIND_CONF_KEY, DEFAULT_BIND_ADDR);
612 return InetAddress.getByName(bindAddressStr);
613 }
614
615 protected static class ResultScannerWrapper {
616
617 private final ResultScanner scanner;
618 private final boolean sortColumns;
619 public ResultScannerWrapper(ResultScanner resultScanner,
620 boolean sortResultColumns) {
621 scanner = resultScanner;
622 sortColumns = sortResultColumns;
623 }
624
625 public ResultScanner getScanner() {
626 return scanner;
627 }
628
629 public boolean isColumnSorted() {
630 return sortColumns;
631 }
632 }
633
634
635
636
637
638 public static class HBaseHandler implements Hbase.Iface {
639 protected Configuration conf;
640 protected static final Log LOG = LogFactory.getLog(HBaseHandler.class);
641
642
643 protected int nextScannerId = 0;
644 protected HashMap<Integer, ResultScannerWrapper> scannerMap = null;
645 private ThriftMetrics metrics = null;
646
647 private final ConnectionCache connectionCache;
648 IncrementCoalescer coalescer = null;
649
650 static final String CLEANUP_INTERVAL = "hbase.thrift.connection.cleanup-interval";
651 static final String MAX_IDLETIME = "hbase.thrift.connection.max-idletime";
652
653
654
655
656
657
658
659 byte[][] getAllColumns(Table table) throws IOException {
660 HColumnDescriptor[] cds = table.getTableDescriptor().getColumnFamilies();
661 byte[][] columns = new byte[cds.length][];
662 for (int i = 0; i < cds.length; i++) {
663 columns[i] = Bytes.add(cds[i].getName(),
664 KeyValue.COLUMN_FAMILY_DELIM_ARRAY);
665 }
666 return columns;
667 }
668
669
670
671
672
673
674
675
676
677 public Table getTable(final byte[] tableName) throws
678 IOException {
679 String table = Bytes.toString(tableName);
680 return connectionCache.getTable(table);
681 }
682
683 public Table getTable(final ByteBuffer tableName) throws IOException {
684 return getTable(getBytes(tableName));
685 }
686
687
688
689
690
691
692
693
694 protected synchronized int addScanner(ResultScanner scanner,boolean sortColumns) {
695 int id = nextScannerId++;
696 ResultScannerWrapper resultScannerWrapper = new ResultScannerWrapper(scanner, sortColumns);
697 scannerMap.put(id, resultScannerWrapper);
698 return id;
699 }
700
701
702
703
704
705
706
707 protected synchronized ResultScannerWrapper getScanner(int id) {
708 return scannerMap.get(id);
709 }
710
711
712
713
714
715
716
717
718 protected synchronized ResultScannerWrapper removeScanner(int id) {
719 return scannerMap.remove(id);
720 }
721
722 protected HBaseHandler(final Configuration c,
723 final UserProvider userProvider) throws IOException {
724 this.conf = c;
725 scannerMap = new HashMap<Integer, ResultScannerWrapper>();
726 this.coalescer = new IncrementCoalescer(this);
727
728 int cleanInterval = conf.getInt(CLEANUP_INTERVAL, 10 * 1000);
729 int maxIdleTime = conf.getInt(MAX_IDLETIME, 10 * 60 * 1000);
730 connectionCache = new ConnectionCache(
731 conf, userProvider, cleanInterval, maxIdleTime);
732 }
733
734
735
736
737 private Admin getAdmin() throws IOException {
738 return connectionCache.getAdmin();
739 }
740
741 void setEffectiveUser(String effectiveUser) {
742 connectionCache.setEffectiveUser(effectiveUser);
743 }
744
745 @Override
746 public void enableTable(ByteBuffer tableName) throws IOError {
747 try{
748 getAdmin().enableTable(getTableName(tableName));
749 } catch (IOException e) {
750 LOG.warn(e.getMessage(), e);
751 throw new IOError(Throwables.getStackTraceAsString(e));
752 }
753 }
754
755 @Override
756 public void disableTable(ByteBuffer tableName) throws IOError{
757 try{
758 getAdmin().disableTable(getTableName(tableName));
759 } catch (IOException e) {
760 LOG.warn(e.getMessage(), e);
761 throw new IOError(Throwables.getStackTraceAsString(e));
762 }
763 }
764
765 @Override
766 public boolean isTableEnabled(ByteBuffer tableName) throws IOError {
767 try {
768 return this.connectionCache.getAdmin().isTableEnabled(getTableName(tableName));
769 } catch (IOException e) {
770 LOG.warn(e.getMessage(), e);
771 throw new IOError(Throwables.getStackTraceAsString(e));
772 }
773 }
774
775 @Override
776 public void compact(ByteBuffer tableNameOrRegionName) throws IOError {
777 try {
778
779
780
781 ((HBaseAdmin) getAdmin()).compact(getBytes(tableNameOrRegionName));
782 } catch (IOException e) {
783 LOG.warn(e.getMessage(), e);
784 throw new IOError(Throwables.getStackTraceAsString(e));
785 }
786 }
787
788 @Override
789 public void majorCompact(ByteBuffer tableNameOrRegionName) throws IOError {
790 try {
791
792
793
794 ((HBaseAdmin) getAdmin()).majorCompact(getBytes(tableNameOrRegionName));
795 } catch (IOException e) {
796 LOG.warn(e.getMessage(), e);
797 throw new IOError(Throwables.getStackTraceAsString(e));
798 }
799 }
800
801 @Override
802 public List<ByteBuffer> getTableNames() throws IOError {
803 try {
804 TableName[] tableNames = this.getAdmin().listTableNames();
805 ArrayList<ByteBuffer> list = new ArrayList<ByteBuffer>(tableNames.length);
806 for (int i = 0; i < tableNames.length; i++) {
807 list.add(ByteBuffer.wrap(tableNames[i].getName()));
808 }
809 return list;
810 } catch (IOException e) {
811 LOG.warn(e.getMessage(), e);
812 throw new IOError(Throwables.getStackTraceAsString(e));
813 }
814 }
815
816
817
818
819 @Override
820 public List<TRegionInfo> getTableRegions(ByteBuffer tableName)
821 throws IOError {
822 try (RegionLocator locator = connectionCache.getRegionLocator(getBytes(tableName))) {
823 List<HRegionLocation> regionLocations = locator.getAllRegionLocations();
824 List<TRegionInfo> results = new ArrayList<TRegionInfo>();
825 for (HRegionLocation regionLocation : regionLocations) {
826 HRegionInfo info = regionLocation.getRegionInfo();
827 ServerName serverName = regionLocation.getServerName();
828 TRegionInfo region = new TRegionInfo();
829 region.serverName = ByteBuffer.wrap(
830 Bytes.toBytes(serverName.getHostname()));
831 region.port = serverName.getPort();
832 region.startKey = ByteBuffer.wrap(info.getStartKey());
833 region.endKey = ByteBuffer.wrap(info.getEndKey());
834 region.id = info.getRegionId();
835 region.name = ByteBuffer.wrap(info.getRegionName());
836 region.version = HREGION_VERSION;
837 results.add(region);
838 }
839 return results;
840 } catch (TableNotFoundException e) {
841
842 return Collections.emptyList();
843 } catch (IOException e){
844 LOG.warn(e.getMessage(), e);
845 throw new IOError(Throwables.getStackTraceAsString(e));
846 }
847 }
848
849 @Override
850 public List<TCell> get(
851 ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
852 Map<ByteBuffer, ByteBuffer> attributes)
853 throws IOError {
854 byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
855 if (famAndQf.length == 1) {
856 return get(tableName, row, famAndQf[0], null, attributes);
857 }
858 if (famAndQf.length == 2) {
859 return get(tableName, row, famAndQf[0], famAndQf[1], attributes);
860 }
861 throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
862 }
863
864
865
866
867
868
869
870
871 protected List<TCell> get(ByteBuffer tableName,
872 ByteBuffer row,
873 byte[] family,
874 byte[] qualifier,
875 Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
876 Table table = null;
877 try {
878 table = getTable(tableName);
879 Get get = new Get(getBytes(row));
880 addAttributes(get, attributes);
881 if (qualifier == null) {
882 get.addFamily(family);
883 } else {
884 get.addColumn(family, qualifier);
885 }
886 Result result = table.get(get);
887 return ThriftUtilities.cellFromHBase(result.rawCells());
888 } catch (IOException e) {
889 LOG.warn(e.getMessage(), e);
890 throw new IOError(Throwables.getStackTraceAsString(e));
891 } finally {
892 closeTable(table);
893 }
894 }
895
896 @Override
897 public List<TCell> getVer(ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
898 int numVersions, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
899 byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
900 if(famAndQf.length == 1) {
901 return getVer(tableName, row, famAndQf[0], null, numVersions, attributes);
902 }
903 if (famAndQf.length == 2) {
904 return getVer(tableName, row, famAndQf[0], famAndQf[1], numVersions, attributes);
905 }
906 throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
907
908 }
909
910
911
912
913
914
915
916
917
918 public List<TCell> getVer(ByteBuffer tableName, ByteBuffer row, byte[] family,
919 byte[] qualifier, int numVersions, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
920
921 Table table = null;
922 try {
923 table = getTable(tableName);
924 Get get = new Get(getBytes(row));
925 addAttributes(get, attributes);
926 if (null == qualifier) {
927 get.addFamily(family);
928 } else {
929 get.addColumn(family, qualifier);
930 }
931 get.setMaxVersions(numVersions);
932 Result result = table.get(get);
933 return ThriftUtilities.cellFromHBase(result.rawCells());
934 } catch (IOException e) {
935 LOG.warn(e.getMessage(), e);
936 throw new IOError(Throwables.getStackTraceAsString(e));
937 } finally{
938 closeTable(table);
939 }
940 }
941
942 @Override
943 public List<TCell> getVerTs(ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
944 long timestamp, int numVersions, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
945 byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
946 if (famAndQf.length == 1) {
947 return getVerTs(tableName, row, famAndQf[0], null, timestamp, numVersions, attributes);
948 }
949 if (famAndQf.length == 2) {
950 return getVerTs(tableName, row, famAndQf[0], famAndQf[1], timestamp, numVersions,
951 attributes);
952 }
953 throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
954 }
955
956
957
958
959
960
961
962
963 protected List<TCell> getVerTs(ByteBuffer tableName, ByteBuffer row, byte[] family,
964 byte[] qualifier, long timestamp, int numVersions, Map<ByteBuffer, ByteBuffer> attributes)
965 throws IOError {
966
967 Table table = null;
968 try {
969 table = getTable(tableName);
970 Get get = new Get(getBytes(row));
971 addAttributes(get, attributes);
972 if (null == qualifier) {
973 get.addFamily(family);
974 } else {
975 get.addColumn(family, qualifier);
976 }
977 get.setTimeRange(0, timestamp);
978 get.setMaxVersions(numVersions);
979 Result result = table.get(get);
980 return ThriftUtilities.cellFromHBase(result.rawCells());
981 } catch (IOException e) {
982 LOG.warn(e.getMessage(), e);
983 throw new IOError(Throwables.getStackTraceAsString(e));
984 } finally{
985 closeTable(table);
986 }
987 }
988
989 @Override
990 public List<TRowResult> getRow(ByteBuffer tableName, ByteBuffer row,
991 Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
992 return getRowWithColumnsTs(tableName, row, null,
993 HConstants.LATEST_TIMESTAMP,
994 attributes);
995 }
996
997 @Override
998 public List<TRowResult> getRowWithColumns(ByteBuffer tableName,
999 ByteBuffer row,
1000 List<ByteBuffer> columns,
1001 Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1002 return getRowWithColumnsTs(tableName, row, columns,
1003 HConstants.LATEST_TIMESTAMP,
1004 attributes);
1005 }
1006
1007 @Override
1008 public List<TRowResult> getRowTs(ByteBuffer tableName, ByteBuffer row,
1009 long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1010 return getRowWithColumnsTs(tableName, row, null,
1011 timestamp, attributes);
1012 }
1013
1014 @Override
1015 public List<TRowResult> getRowWithColumnsTs(
1016 ByteBuffer tableName, ByteBuffer row, List<ByteBuffer> columns,
1017 long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1018
1019 Table table = null;
1020 try {
1021 table = getTable(tableName);
1022 if (columns == null) {
1023 Get get = new Get(getBytes(row));
1024 addAttributes(get, attributes);
1025 get.setTimeRange(0, timestamp);
1026 Result result = table.get(get);
1027 return ThriftUtilities.rowResultFromHBase(result);
1028 }
1029 Get get = new Get(getBytes(row));
1030 addAttributes(get, attributes);
1031 for(ByteBuffer column : columns) {
1032 byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
1033 if (famAndQf.length == 1) {
1034 get.addFamily(famAndQf[0]);
1035 } else {
1036 get.addColumn(famAndQf[0], famAndQf[1]);
1037 }
1038 }
1039 get.setTimeRange(0, timestamp);
1040 Result result = table.get(get);
1041 return ThriftUtilities.rowResultFromHBase(result);
1042 } catch (IOException e) {
1043 LOG.warn(e.getMessage(), e);
1044 throw new IOError(Throwables.getStackTraceAsString(e));
1045 } finally{
1046 closeTable(table);
1047 }
1048 }
1049
1050 @Override
1051 public List<TRowResult> getRows(ByteBuffer tableName,
1052 List<ByteBuffer> rows,
1053 Map<ByteBuffer, ByteBuffer> attributes)
1054 throws IOError {
1055 return getRowsWithColumnsTs(tableName, rows, null,
1056 HConstants.LATEST_TIMESTAMP,
1057 attributes);
1058 }
1059
1060 @Override
1061 public List<TRowResult> getRowsWithColumns(ByteBuffer tableName,
1062 List<ByteBuffer> rows,
1063 List<ByteBuffer> columns,
1064 Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1065 return getRowsWithColumnsTs(tableName, rows, columns,
1066 HConstants.LATEST_TIMESTAMP,
1067 attributes);
1068 }
1069
1070 @Override
1071 public List<TRowResult> getRowsTs(ByteBuffer tableName,
1072 List<ByteBuffer> rows,
1073 long timestamp,
1074 Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1075 return getRowsWithColumnsTs(tableName, rows, null,
1076 timestamp, attributes);
1077 }
1078
1079 @Override
1080 public List<TRowResult> getRowsWithColumnsTs(ByteBuffer tableName,
1081 List<ByteBuffer> rows,
1082 List<ByteBuffer> columns, long timestamp,
1083 Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1084
1085 Table table= null;
1086 try {
1087 List<Get> gets = new ArrayList<Get>(rows.size());
1088 table = getTable(tableName);
1089 if (metrics != null) {
1090 metrics.incNumRowKeysInBatchGet(rows.size());
1091 }
1092 for (ByteBuffer row : rows) {
1093 Get get = new Get(getBytes(row));
1094 addAttributes(get, attributes);
1095 if (columns != null) {
1096
1097 for(ByteBuffer column : columns) {
1098 byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
1099 if (famAndQf.length == 1) {
1100 get.addFamily(famAndQf[0]);
1101 } else {
1102 get.addColumn(famAndQf[0], famAndQf[1]);
1103 }
1104 }
1105 }
1106 get.setTimeRange(0, timestamp);
1107 gets.add(get);
1108 }
1109 Result[] result = table.get(gets);
1110 return ThriftUtilities.rowResultFromHBase(result);
1111 } catch (IOException e) {
1112 LOG.warn(e.getMessage(), e);
1113 throw new IOError(Throwables.getStackTraceAsString(e));
1114 } finally{
1115 closeTable(table);
1116 }
1117 }
1118
1119 @Override
1120 public void deleteAll(
1121 ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
1122 Map<ByteBuffer, ByteBuffer> attributes)
1123 throws IOError {
1124 deleteAllTs(tableName, row, column, HConstants.LATEST_TIMESTAMP,
1125 attributes);
1126 }
1127
1128 @Override
1129 public void deleteAllTs(ByteBuffer tableName,
1130 ByteBuffer row,
1131 ByteBuffer column,
1132 long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1133 Table table = null;
1134 try {
1135 table = getTable(tableName);
1136 Delete delete = new Delete(getBytes(row));
1137 addAttributes(delete, attributes);
1138 byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
1139 if (famAndQf.length == 1) {
1140 delete.addFamily(famAndQf[0], timestamp);
1141 } else {
1142 delete.addColumns(famAndQf[0], famAndQf[1], timestamp);
1143 }
1144 table.delete(delete);
1145
1146 } catch (IOException e) {
1147 LOG.warn(e.getMessage(), e);
1148 throw new IOError(Throwables.getStackTraceAsString(e));
1149 } finally {
1150 closeTable(table);
1151 }
1152 }
1153
1154 @Override
1155 public void deleteAllRow(
1156 ByteBuffer tableName, ByteBuffer row,
1157 Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1158 deleteAllRowTs(tableName, row, HConstants.LATEST_TIMESTAMP, attributes);
1159 }
1160
1161 @Override
1162 public void deleteAllRowTs(
1163 ByteBuffer tableName, ByteBuffer row, long timestamp,
1164 Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1165 Table table = null;
1166 try {
1167 table = getTable(tableName);
1168 Delete delete = new Delete(getBytes(row), timestamp);
1169 addAttributes(delete, attributes);
1170 table.delete(delete);
1171 } catch (IOException e) {
1172 LOG.warn(e.getMessage(), e);
1173 throw new IOError(Throwables.getStackTraceAsString(e));
1174 } finally {
1175 closeTable(table);
1176 }
1177 }
1178
1179 @Override
1180 public void createTable(ByteBuffer in_tableName,
1181 List<ColumnDescriptor> columnFamilies) throws IOError,
1182 IllegalArgument, AlreadyExists {
1183 TableName tableName = getTableName(in_tableName);
1184 try {
1185 if (getAdmin().tableExists(tableName)) {
1186 throw new AlreadyExists("table name already in use");
1187 }
1188 HTableDescriptor desc = new HTableDescriptor(tableName);
1189 for (ColumnDescriptor col : columnFamilies) {
1190 HColumnDescriptor colDesc = ThriftUtilities.colDescFromThrift(col);
1191 desc.addFamily(colDesc);
1192 }
1193 getAdmin().createTable(desc);
1194 } catch (IOException e) {
1195 LOG.warn(e.getMessage(), e);
1196 throw new IOError(Throwables.getStackTraceAsString(e));
1197 } catch (IllegalArgumentException e) {
1198 LOG.warn(e.getMessage(), e);
1199 throw new IllegalArgument(Throwables.getStackTraceAsString(e));
1200 }
1201 }
1202
1203 private static TableName getTableName(ByteBuffer buffer) {
1204 return TableName.valueOf(getBytes(buffer));
1205 }
1206
1207 @Override
1208 public void deleteTable(ByteBuffer in_tableName) throws IOError {
1209 TableName tableName = getTableName(in_tableName);
1210 if (LOG.isDebugEnabled()) {
1211 LOG.debug("deleteTable: table=" + tableName);
1212 }
1213 try {
1214 if (!getAdmin().tableExists(tableName)) {
1215 throw new IOException("table does not exist");
1216 }
1217 getAdmin().deleteTable(tableName);
1218 } catch (IOException e) {
1219 LOG.warn(e.getMessage(), e);
1220 throw new IOError(Throwables.getStackTraceAsString(e));
1221 }
1222 }
1223
1224 @Override
1225 public void mutateRow(ByteBuffer tableName, ByteBuffer row,
1226 List<Mutation> mutations, Map<ByteBuffer, ByteBuffer> attributes)
1227 throws IOError, IllegalArgument {
1228 mutateRowTs(tableName, row, mutations, HConstants.LATEST_TIMESTAMP,
1229 attributes);
1230 }
1231
1232 @Override
1233 public void mutateRowTs(ByteBuffer tableName, ByteBuffer row,
1234 List<Mutation> mutations, long timestamp,
1235 Map<ByteBuffer, ByteBuffer> attributes)
1236 throws IOError, IllegalArgument {
1237 Table table = null;
1238 try {
1239 table = getTable(tableName);
1240 Put put = new Put(getBytes(row), timestamp);
1241 addAttributes(put, attributes);
1242
1243 Delete delete = new Delete(getBytes(row));
1244 addAttributes(delete, attributes);
1245 if (metrics != null) {
1246 metrics.incNumRowKeysInBatchMutate(mutations.size());
1247 }
1248
1249
1250 for (Mutation m : mutations) {
1251 byte[][] famAndQf = KeyValue.parseColumn(getBytes(m.column));
1252 if (m.isDelete) {
1253 if (famAndQf.length == 1) {
1254 delete.addFamily(famAndQf[0], timestamp);
1255 } else {
1256 delete.addColumns(famAndQf[0], famAndQf[1], timestamp);
1257 }
1258 delete.setDurability(m.writeToWAL ? Durability.SYNC_WAL
1259 : Durability.SKIP_WAL);
1260 } else {
1261 if(famAndQf.length == 1) {
1262 LOG.warn("No column qualifier specified. Delete is the only mutation supported "
1263 + "over the whole column family.");
1264 } else {
1265 put.addImmutable(famAndQf[0], famAndQf[1],
1266 m.value != null ? getBytes(m.value)
1267 : HConstants.EMPTY_BYTE_ARRAY);
1268 }
1269 put.setDurability(m.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
1270 }
1271 }
1272 if (!delete.isEmpty())
1273 table.delete(delete);
1274 if (!put.isEmpty())
1275 table.put(put);
1276 } catch (IOException e) {
1277 LOG.warn(e.getMessage(), e);
1278 throw new IOError(Throwables.getStackTraceAsString(e));
1279 } catch (IllegalArgumentException e) {
1280 LOG.warn(e.getMessage(), e);
1281 throw new IllegalArgument(Throwables.getStackTraceAsString(e));
1282 } finally{
1283 closeTable(table);
1284 }
1285 }
1286
1287 @Override
1288 public void mutateRows(ByteBuffer tableName, List<BatchMutation> rowBatches,
1289 Map<ByteBuffer, ByteBuffer> attributes)
1290 throws IOError, IllegalArgument, TException {
1291 mutateRowsTs(tableName, rowBatches, HConstants.LATEST_TIMESTAMP, attributes);
1292 }
1293
1294 @Override
1295 public void mutateRowsTs(
1296 ByteBuffer tableName, List<BatchMutation> rowBatches, long timestamp,
1297 Map<ByteBuffer, ByteBuffer> attributes)
1298 throws IOError, IllegalArgument, TException {
1299 List<Put> puts = new ArrayList<Put>();
1300 List<Delete> deletes = new ArrayList<Delete>();
1301
1302 for (BatchMutation batch : rowBatches) {
1303 byte[] row = getBytes(batch.row);
1304 List<Mutation> mutations = batch.mutations;
1305 Delete delete = new Delete(row);
1306 addAttributes(delete, attributes);
1307 Put put = new Put(row, timestamp);
1308 addAttributes(put, attributes);
1309 for (Mutation m : mutations) {
1310 byte[][] famAndQf = KeyValue.parseColumn(getBytes(m.column));
1311 if (m.isDelete) {
1312
1313 if (famAndQf.length == 1) {
1314 delete.addFamily(famAndQf[0], timestamp);
1315 } else {
1316 delete.addColumns(famAndQf[0], famAndQf[1], timestamp);
1317 }
1318 delete.setDurability(m.writeToWAL ? Durability.SYNC_WAL
1319 : Durability.SKIP_WAL);
1320 } else {
1321 if (famAndQf.length == 1) {
1322 LOG.warn("No column qualifier specified. Delete is the only mutation supported "
1323 + "over the whole column family.");
1324 }
1325 if (famAndQf.length == 2) {
1326 put.addImmutable(famAndQf[0], famAndQf[1],
1327 m.value != null ? getBytes(m.value)
1328 : HConstants.EMPTY_BYTE_ARRAY);
1329 } else {
1330 throw new IllegalArgumentException("Invalid famAndQf provided.");
1331 }
1332 put.setDurability(m.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
1333 }
1334 }
1335 if (!delete.isEmpty())
1336 deletes.add(delete);
1337 if (!put.isEmpty())
1338 puts.add(put);
1339 }
1340
1341 Table table = null;
1342 try {
1343 table = getTable(tableName);
1344 if (!puts.isEmpty())
1345 table.put(puts);
1346 if (!deletes.isEmpty())
1347 table.delete(deletes);
1348
1349 } catch (IOException e) {
1350 LOG.warn(e.getMessage(), e);
1351 throw new IOError(Throwables.getStackTraceAsString(e));
1352 } catch (IllegalArgumentException e) {
1353 LOG.warn(e.getMessage(), e);
1354 throw new IllegalArgument(Throwables.getStackTraceAsString(e));
1355 } finally{
1356 closeTable(table);
1357 }
1358 }
1359
1360 @Override
1361 public long atomicIncrement(
1362 ByteBuffer tableName, ByteBuffer row, ByteBuffer column, long amount)
1363 throws IOError, IllegalArgument, TException {
1364 byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
1365 if(famAndQf.length == 1) {
1366 return atomicIncrement(tableName, row, famAndQf[0], HConstants.EMPTY_BYTE_ARRAY, amount);
1367 }
1368 return atomicIncrement(tableName, row, famAndQf[0], famAndQf[1], amount);
1369 }
1370
1371 protected long atomicIncrement(ByteBuffer tableName, ByteBuffer row,
1372 byte [] family, byte [] qualifier, long amount)
1373 throws IOError, IllegalArgument, TException {
1374 Table table = null;
1375 try {
1376 table = getTable(tableName);
1377 return table.incrementColumnValue(
1378 getBytes(row), family, qualifier, amount);
1379 } catch (IOException e) {
1380 LOG.warn(e.getMessage(), e);
1381 throw new IOError(Throwables.getStackTraceAsString(e));
1382 } finally {
1383 closeTable(table);
1384 }
1385 }
1386
1387 @Override
1388 public void scannerClose(int id) throws IOError, IllegalArgument {
1389 LOG.debug("scannerClose: id=" + id);
1390 ResultScannerWrapper resultScannerWrapper = getScanner(id);
1391 if (resultScannerWrapper == null) {
1392 String message = "scanner ID is invalid";
1393 LOG.warn(message);
1394 throw new IllegalArgument("scanner ID is invalid");
1395 }
1396 resultScannerWrapper.getScanner().close();
1397 removeScanner(id);
1398 }
1399
1400 @Override
1401 public List<TRowResult> scannerGetList(int id,int nbRows)
1402 throws IllegalArgument, IOError {
1403 LOG.debug("scannerGetList: id=" + id);
1404 ResultScannerWrapper resultScannerWrapper = getScanner(id);
1405 if (null == resultScannerWrapper) {
1406 String message = "scanner ID is invalid";
1407 LOG.warn(message);
1408 throw new IllegalArgument("scanner ID is invalid");
1409 }
1410
1411 Result [] results = null;
1412 try {
1413 results = resultScannerWrapper.getScanner().next(nbRows);
1414 if (null == results) {
1415 return new ArrayList<TRowResult>();
1416 }
1417 } catch (IOException e) {
1418 LOG.warn(e.getMessage(), e);
1419 throw new IOError(Throwables.getStackTraceAsString(e));
1420 }
1421 return ThriftUtilities.rowResultFromHBase(results, resultScannerWrapper.isColumnSorted());
1422 }
1423
1424 @Override
1425 public List<TRowResult> scannerGet(int id) throws IllegalArgument, IOError {
1426 return scannerGetList(id,1);
1427 }
1428
1429 @Override
1430 public int scannerOpenWithScan(ByteBuffer tableName, TScan tScan,
1431 Map<ByteBuffer, ByteBuffer> attributes)
1432 throws IOError {
1433
1434 Table table = null;
1435 try {
1436 table = getTable(tableName);
1437 Scan scan = new Scan();
1438 addAttributes(scan, attributes);
1439 if (tScan.isSetStartRow()) {
1440 scan.setStartRow(tScan.getStartRow());
1441 }
1442 if (tScan.isSetStopRow()) {
1443 scan.setStopRow(tScan.getStopRow());
1444 }
1445 if (tScan.isSetTimestamp()) {
1446 scan.setTimeRange(0, tScan.getTimestamp());
1447 }
1448 if (tScan.isSetCaching()) {
1449 scan.setCaching(tScan.getCaching());
1450 }
1451 if (tScan.isSetBatchSize()) {
1452 scan.setBatch(tScan.getBatchSize());
1453 }
1454 if (tScan.isSetColumns() && tScan.getColumns().size() != 0) {
1455 for(ByteBuffer column : tScan.getColumns()) {
1456 byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1457 if(famQf.length == 1) {
1458 scan.addFamily(famQf[0]);
1459 } else {
1460 scan.addColumn(famQf[0], famQf[1]);
1461 }
1462 }
1463 }
1464 if (tScan.isSetFilterString()) {
1465 ParseFilter parseFilter = new ParseFilter();
1466 scan.setFilter(
1467 parseFilter.parseFilterString(tScan.getFilterString()));
1468 }
1469 if (tScan.isSetReversed()) {
1470 scan.setReversed(tScan.isReversed());
1471 }
1472 return addScanner(table.getScanner(scan), tScan.sortColumns);
1473 } catch (IOException e) {
1474 LOG.warn(e.getMessage(), e);
1475 throw new IOError(Throwables.getStackTraceAsString(e));
1476 } finally{
1477 closeTable(table);
1478 }
1479 }
1480
1481 @Override
1482 public int scannerOpen(ByteBuffer tableName, ByteBuffer startRow,
1483 List<ByteBuffer> columns,
1484 Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1485
1486 Table table = null;
1487 try {
1488 table = getTable(tableName);
1489 Scan scan = new Scan(getBytes(startRow));
1490 addAttributes(scan, attributes);
1491 if(columns != null && columns.size() != 0) {
1492 for(ByteBuffer column : columns) {
1493 byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1494 if(famQf.length == 1) {
1495 scan.addFamily(famQf[0]);
1496 } else {
1497 scan.addColumn(famQf[0], famQf[1]);
1498 }
1499 }
1500 }
1501 return addScanner(table.getScanner(scan), false);
1502 } catch (IOException e) {
1503 LOG.warn(e.getMessage(), e);
1504 throw new IOError(Throwables.getStackTraceAsString(e));
1505 } finally{
1506 closeTable(table);
1507 }
1508 }
1509
1510 @Override
1511 public int scannerOpenWithStop(ByteBuffer tableName, ByteBuffer startRow,
1512 ByteBuffer stopRow, List<ByteBuffer> columns,
1513 Map<ByteBuffer, ByteBuffer> attributes)
1514 throws IOError, TException {
1515
1516 Table table = null;
1517 try {
1518 table = getTable(tableName);
1519 Scan scan = new Scan(getBytes(startRow), getBytes(stopRow));
1520 addAttributes(scan, attributes);
1521 if(columns != null && columns.size() != 0) {
1522 for(ByteBuffer column : columns) {
1523 byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1524 if(famQf.length == 1) {
1525 scan.addFamily(famQf[0]);
1526 } else {
1527 scan.addColumn(famQf[0], famQf[1]);
1528 }
1529 }
1530 }
1531 return addScanner(table.getScanner(scan), false);
1532 } catch (IOException e) {
1533 LOG.warn(e.getMessage(), e);
1534 throw new IOError(Throwables.getStackTraceAsString(e));
1535 } finally{
1536 closeTable(table);
1537 }
1538 }
1539
1540 @Override
1541 public int scannerOpenWithPrefix(ByteBuffer tableName,
1542 ByteBuffer startAndPrefix,
1543 List<ByteBuffer> columns,
1544 Map<ByteBuffer, ByteBuffer> attributes)
1545 throws IOError, TException {
1546
1547 Table table = null;
1548 try {
1549 table = getTable(tableName);
1550 Scan scan = new Scan(getBytes(startAndPrefix));
1551 addAttributes(scan, attributes);
1552 Filter f = new WhileMatchFilter(
1553 new PrefixFilter(getBytes(startAndPrefix)));
1554 scan.setFilter(f);
1555 if (columns != null && columns.size() != 0) {
1556 for(ByteBuffer column : columns) {
1557 byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1558 if(famQf.length == 1) {
1559 scan.addFamily(famQf[0]);
1560 } else {
1561 scan.addColumn(famQf[0], famQf[1]);
1562 }
1563 }
1564 }
1565 return addScanner(table.getScanner(scan), false);
1566 } catch (IOException e) {
1567 LOG.warn(e.getMessage(), e);
1568 throw new IOError(Throwables.getStackTraceAsString(e));
1569 } finally{
1570 closeTable(table);
1571 }
1572 }
1573
1574 @Override
1575 public int scannerOpenTs(ByteBuffer tableName, ByteBuffer startRow,
1576 List<ByteBuffer> columns, long timestamp,
1577 Map<ByteBuffer, ByteBuffer> attributes) throws IOError, TException {
1578
1579 Table table = null;
1580 try {
1581 table = getTable(tableName);
1582 Scan scan = new Scan(getBytes(startRow));
1583 addAttributes(scan, attributes);
1584 scan.setTimeRange(0, timestamp);
1585 if (columns != null && columns.size() != 0) {
1586 for (ByteBuffer column : columns) {
1587 byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1588 if(famQf.length == 1) {
1589 scan.addFamily(famQf[0]);
1590 } else {
1591 scan.addColumn(famQf[0], famQf[1]);
1592 }
1593 }
1594 }
1595 return addScanner(table.getScanner(scan), false);
1596 } catch (IOException e) {
1597 LOG.warn(e.getMessage(), e);
1598 throw new IOError(Throwables.getStackTraceAsString(e));
1599 } finally{
1600 closeTable(table);
1601 }
1602 }
1603
1604 @Override
1605 public int scannerOpenWithStopTs(ByteBuffer tableName, ByteBuffer startRow,
1606 ByteBuffer stopRow, List<ByteBuffer> columns, long timestamp,
1607 Map<ByteBuffer, ByteBuffer> attributes)
1608 throws IOError, TException {
1609
1610 Table table = null;
1611 try {
1612 table = getTable(tableName);
1613 Scan scan = new Scan(getBytes(startRow), getBytes(stopRow));
1614 addAttributes(scan, attributes);
1615 scan.setTimeRange(0, timestamp);
1616 if (columns != null && columns.size() != 0) {
1617 for (ByteBuffer column : columns) {
1618 byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1619 if(famQf.length == 1) {
1620 scan.addFamily(famQf[0]);
1621 } else {
1622 scan.addColumn(famQf[0], famQf[1]);
1623 }
1624 }
1625 }
1626 scan.setTimeRange(0, timestamp);
1627 return addScanner(table.getScanner(scan), false);
1628 } catch (IOException e) {
1629 LOG.warn(e.getMessage(), e);
1630 throw new IOError(Throwables.getStackTraceAsString(e));
1631 } finally{
1632 closeTable(table);
1633 }
1634 }
1635
1636 @Override
1637 public Map<ByteBuffer, ColumnDescriptor> getColumnDescriptors(
1638 ByteBuffer tableName) throws IOError, TException {
1639
1640 Table table = null;
1641 try {
1642 TreeMap<ByteBuffer, ColumnDescriptor> columns =
1643 new TreeMap<ByteBuffer, ColumnDescriptor>();
1644
1645 table = getTable(tableName);
1646 HTableDescriptor desc = table.getTableDescriptor();
1647
1648 for (HColumnDescriptor e : desc.getFamilies()) {
1649 ColumnDescriptor col = ThriftUtilities.colDescFromHbase(e);
1650 columns.put(col.name, col);
1651 }
1652 return columns;
1653 } catch (IOException e) {
1654 LOG.warn(e.getMessage(), e);
1655 throw new IOError(Throwables.getStackTraceAsString(e));
1656 } finally {
1657 closeTable(table);
1658 }
1659 }
1660
1661 private void closeTable(Table table) throws IOError
1662 {
1663 try{
1664 if(table != null){
1665 table.close();
1666 }
1667 } catch (IOException e){
1668 LOG.error(e.getMessage(), e);
1669 throw new IOError(Throwables.getStackTraceAsString(e));
1670 }
1671 }
1672
1673 @Override
1674 public TRegionInfo getRegionInfo(ByteBuffer searchRow) throws IOError {
1675 try {
1676 byte[] row = getBytes(searchRow);
1677 Result startRowResult = getReverseScanResult(TableName.META_TABLE_NAME.getName(), row,
1678 HConstants.CATALOG_FAMILY);
1679
1680 if (startRowResult == null) {
1681 throw new IOException("Cannot find row in "+ TableName.META_TABLE_NAME+", row="
1682 + Bytes.toStringBinary(row));
1683 }
1684
1685
1686 HRegionInfo regionInfo = MetaTableAccessor.getHRegionInfo(startRowResult);
1687 if (regionInfo == null) {
1688 throw new IOException("HRegionInfo REGIONINFO was null or " +
1689 " empty in Meta for row="
1690 + Bytes.toStringBinary(row));
1691 }
1692 TRegionInfo region = new TRegionInfo();
1693 region.setStartKey(regionInfo.getStartKey());
1694 region.setEndKey(regionInfo.getEndKey());
1695 region.id = regionInfo.getRegionId();
1696 region.setName(regionInfo.getRegionName());
1697 region.version = HREGION_VERSION;
1698
1699
1700 ServerName serverName = MetaTableAccessor.getServerName(startRowResult, 0);
1701 if (serverName != null) {
1702 region.setServerName(Bytes.toBytes(serverName.getHostname()));
1703 region.port = serverName.getPort();
1704 }
1705 return region;
1706 } catch (IOException e) {
1707 LOG.warn(e.getMessage(), e);
1708 throw new IOError(Throwables.getStackTraceAsString(e));
1709 }
1710 }
1711
1712 private Result getReverseScanResult(byte[] tableName, byte[] row, byte[] family)
1713 throws IOException {
1714 Scan scan = new Scan(row);
1715 scan.setReversed(true);
1716 scan.addFamily(family);
1717 scan.setStartRow(row);
1718 Table table = getTable(tableName);
1719 try (ResultScanner scanner = table.getScanner(scan)) {
1720 return scanner.next();
1721 } finally{
1722 if(table != null){
1723 table.close();
1724 }
1725 }
1726 }
1727
1728 private void initMetrics(ThriftMetrics metrics) {
1729 this.metrics = metrics;
1730 }
1731
1732 @Override
1733 public void increment(TIncrement tincrement) throws IOError, TException {
1734
1735 if (tincrement.getRow().length == 0 || tincrement.getTable().length == 0) {
1736 throw new TException("Must supply a table and a row key; can't increment");
1737 }
1738
1739 if (conf.getBoolean(COALESCE_INC_KEY, false)) {
1740 this.coalescer.queueIncrement(tincrement);
1741 return;
1742 }
1743
1744 Table table = null;
1745 try {
1746 table = getTable(tincrement.getTable());
1747 Increment inc = ThriftUtilities.incrementFromThrift(tincrement);
1748 table.increment(inc);
1749 } catch (IOException e) {
1750 LOG.warn(e.getMessage(), e);
1751 throw new IOError(Throwables.getStackTraceAsString(e));
1752 } finally{
1753 closeTable(table);
1754 }
1755 }
1756
1757 @Override
1758 public void incrementRows(List<TIncrement> tincrements) throws IOError, TException {
1759 if (conf.getBoolean(COALESCE_INC_KEY, false)) {
1760 this.coalescer.queueIncrements(tincrements);
1761 return;
1762 }
1763 for (TIncrement tinc : tincrements) {
1764 increment(tinc);
1765 }
1766 }
1767
1768 @Override
1769 public List<TCell> append(TAppend tappend) throws IOError, TException {
1770 if (tappend.getRow().length == 0 || tappend.getTable().length == 0) {
1771 throw new TException("Must supply a table and a row key; can't append");
1772 }
1773
1774 Table table = null;
1775 try {
1776 table = getTable(tappend.getTable());
1777 Append append = ThriftUtilities.appendFromThrift(tappend);
1778 Result result = table.append(append);
1779 return ThriftUtilities.cellFromHBase(result.rawCells());
1780 } catch (IOException e) {
1781 LOG.warn(e.getMessage(), e);
1782 throw new IOError(Throwables.getStackTraceAsString(e));
1783 } finally{
1784 closeTable(table);
1785 }
1786 }
1787
1788 @Override
1789 public boolean checkAndPut(ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
1790 ByteBuffer value, Mutation mput, Map<ByteBuffer, ByteBuffer> attributes) throws IOError,
1791 IllegalArgument, TException {
1792 Put put;
1793 try {
1794 put = new Put(getBytes(row), HConstants.LATEST_TIMESTAMP);
1795 addAttributes(put, attributes);
1796
1797 byte[][] famAndQf = KeyValue.parseColumn(getBytes(mput.column));
1798
1799 put.addImmutable(famAndQf[0], famAndQf[1], mput.value != null ? getBytes(mput.value)
1800 : HConstants.EMPTY_BYTE_ARRAY);
1801
1802 put.setDurability(mput.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
1803 } catch (IllegalArgumentException e) {
1804 LOG.warn(e.getMessage(), e);
1805 throw new IllegalArgument(Throwables.getStackTraceAsString(e));
1806 }
1807
1808 Table table = null;
1809 try {
1810 table = getTable(tableName);
1811 byte[][] famAndQf = KeyValue.parseColumn(getBytes(column));
1812 return table.checkAndPut(getBytes(row), famAndQf[0], famAndQf[1],
1813 value != null ? getBytes(value) : HConstants.EMPTY_BYTE_ARRAY, put);
1814 } catch (IOException e) {
1815 LOG.warn(e.getMessage(), e);
1816 throw new IOError(Throwables.getStackTraceAsString(e));
1817 } catch (IllegalArgumentException e) {
1818 LOG.warn(e.getMessage(), e);
1819 throw new IllegalArgument(Throwables.getStackTraceAsString(e));
1820 } finally {
1821 closeTable(table);
1822 }
1823 }
1824 }
1825
1826
1827
1828
1829
1830
1831 private static void addAttributes(OperationWithAttributes op,
1832 Map<ByteBuffer, ByteBuffer> attributes) {
1833 if (attributes == null || attributes.size() == 0) {
1834 return;
1835 }
1836 for (Map.Entry<ByteBuffer, ByteBuffer> entry : attributes.entrySet()) {
1837 String name = Bytes.toStringBinary(getBytes(entry.getKey()));
1838 byte[] value = getBytes(entry.getValue());
1839 op.setAttribute(name, value);
1840 }
1841 }
1842
1843 public static void registerFilters(Configuration conf) {
1844 String[] filters = conf.getStrings("hbase.thrift.filters");
1845 if(filters != null) {
1846 for(String filterClass: filters) {
1847 String[] filterPart = filterClass.split(":");
1848 if(filterPart.length != 2) {
1849 LOG.warn("Invalid filter specification " + filterClass + " - skipping");
1850 } else {
1851 ParseFilter.registerFilter(filterPart[0], filterPart[1]);
1852 }
1853 }
1854 }
1855 }
1856 }