View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  package org.apache.accumulo.core.client.impl;
18  
19  import java.util.ArrayList;
20  import java.util.HashMap;
21  import java.util.Map;
22  
23  import org.apache.accumulo.core.Constants;
24  import org.apache.accumulo.core.client.AccumuloException;
25  import org.apache.accumulo.core.client.AccumuloSecurityException;
26  import org.apache.accumulo.core.client.Instance;
27  import org.apache.accumulo.core.client.impl.thrift.ClientService;
28  import org.apache.accumulo.core.client.impl.thrift.ClientService.Client;
29  import org.apache.accumulo.core.conf.AccumuloConfiguration;
30  import org.apache.accumulo.core.conf.Property;
31  import org.apache.accumulo.core.security.thrift.ThriftSecurityException;
32  import org.apache.accumulo.core.util.ArgumentChecker;
33  import org.apache.accumulo.core.util.Pair;
34  import org.apache.accumulo.core.util.ServerServices;
35  import org.apache.accumulo.core.util.ServerServices.Service;
36  import org.apache.accumulo.core.util.ThriftUtil;
37  import org.apache.accumulo.core.util.UtilWaitThread;
38  import org.apache.accumulo.core.zookeeper.ZooUtil;
39  import org.apache.accumulo.fate.zookeeper.ZooCache;
40  import org.apache.log4j.Logger;
41  import org.apache.thrift.transport.TTransport;
42  import org.apache.thrift.transport.TTransportException;
43  
44  public class ServerClient {
45    private static final Logger log = Logger.getLogger(ServerClient.class);
46    private static final Map<String,ZooCache> zooCaches = new HashMap<String,ZooCache>();
47    
48    private synchronized static ZooCache getZooCache(Instance instance) {
49      ZooCache result = zooCaches.get(instance.getZooKeepers());
50      if (result == null) {
51        result = new ZooCache(instance.getZooKeepers(), instance.getZooKeepersSessionTimeOut(), null);
52        zooCaches.put(instance.getZooKeepers(), result);
53      }
54      return result;
55    }
56    
57    public static <T> T execute(Instance instance, ClientExecReturn<T,ClientService.Client> exec) throws AccumuloException, AccumuloSecurityException {
58      try {
59        return executeRaw(instance, exec);
60      } catch (ThriftSecurityException e) {
61        throw new AccumuloSecurityException(e.user, e.code, e);
62      } catch (AccumuloException e) {
63        throw e;
64      } catch (Exception e) {
65        throw new AccumuloException(e);
66      }
67    }
68    
69    public static void execute(Instance instance, ClientExec<ClientService.Client> exec) throws AccumuloException, AccumuloSecurityException {
70      try {
71        executeRaw(instance, exec);
72      } catch (ThriftSecurityException e) {
73        throw new AccumuloSecurityException(e.user, e.code, e);
74      } catch (AccumuloException e) {
75        throw e;
76      } catch (Exception e) {
77        throw new AccumuloException(e);
78      }
79    }
80    
81    public static <T> T executeRaw(Instance instance, ClientExecReturn<T,ClientService.Client> exec) throws Exception {
82      while (true) {
83        ClientService.Client client = null;
84        String server = null;
85        try {
86          Pair<String,Client> pair = ServerClient.getConnection(instance);
87          server = pair.getFirst();
88          client = pair.getSecond();
89          return exec.execute(client);
90        } catch (TTransportException tte) {
91          log.debug("ClientService request failed " + server + ", retrying ... ", tte);
92          UtilWaitThread.sleep(100);
93        } finally {
94          if (client != null)
95            ServerClient.close(client);
96        }
97      }
98    }
99    
100   public static void executeRaw(Instance instance, ClientExec<ClientService.Client> exec) throws Exception {
101     while (true) {
102       ClientService.Client client = null;
103       String server = null;
104       try {
105         Pair<String,Client> pair = ServerClient.getConnection(instance);
106         server = pair.getFirst();
107         client = pair.getSecond();
108         exec.execute(client);
109         break;
110       } catch (TTransportException tte) {
111         log.debug("ClientService request failed " + server + ", retrying ... ", tte);
112         UtilWaitThread.sleep(100);
113       } finally {
114         if (client != null)
115           ServerClient.close(client);
116       }
117     }
118   }
119   
120   static volatile boolean warnedAboutTServersBeingDown = false;
121 
122   public static Pair<String,ClientService.Client> getConnection(Instance instance) throws TTransportException {
123     return getConnection(instance, true);
124   }
125   
126   public static Pair<String,ClientService.Client> getConnection(Instance instance, boolean preferCachedConnections) throws TTransportException {
127     AccumuloConfiguration conf = instance.getConfiguration();
128     return getConnection(instance, preferCachedConnections, conf.getTimeInMillis(Property.GENERAL_RPC_TIMEOUT));
129   }
130   
131   public static Pair<String,ClientService.Client> getConnection(Instance instance, boolean preferCachedConnections, long rpcTimeout) throws TTransportException {
132     ArgumentChecker.notNull(instance);
133     // create list of servers
134     ArrayList<ThriftTransportKey> servers = new ArrayList<ThriftTransportKey>();
135     
136     // add tservers
137     
138     ZooCache zc = getZooCache(instance);
139     AccumuloConfiguration conf = instance.getConfiguration();
140     for (String tserver : zc.getChildren(ZooUtil.getRoot(instance) + Constants.ZTSERVERS)) {
141       String path = ZooUtil.getRoot(instance) + Constants.ZTSERVERS + "/" + tserver;
142       byte[] data = ZooUtil.getLockData(zc, path);
143       if (data != null && !new String(data).equals("master"))
144         servers.add(new ThriftTransportKey(
145             new ServerServices(new String(data)).getAddressString(Service.TSERV_CLIENT), 
146             conf.getPort(Property.TSERV_CLIENTPORT), 
147             rpcTimeout));
148     }
149     
150     boolean opened = false;
151     try {
152       Pair<String,TTransport> pair = ThriftTransportPool.getInstance().getAnyTransport(servers, preferCachedConnections);
153       ClientService.Client client = ThriftUtil.createClient(new ClientService.Client.Factory(), pair.getSecond());
154       opened = true;
155       warnedAboutTServersBeingDown = false;
156       return new Pair<String,ClientService.Client>(pair.getFirst(), client);
157     } finally {
158       if (!opened) {
159         if (!warnedAboutTServersBeingDown) {
160           if (servers.isEmpty()) {
161             log.warn("There are no tablet servers: check that zookeeper and accumulo are running.");
162           } else {
163             log.warn("Failed to find an available server in the list of servers: " + servers);
164           }
165           warnedAboutTServersBeingDown = true;
166         }
167       }
168     }
169   }
170   
171   public static void close(ClientService.Client client) {
172     if (client != null && client.getInputProtocol() != null && client.getInputProtocol().getTransport() != null) {
173       ThriftTransportPool.getInstance().returnTransport(client.getInputProtocol().getTransport());
174     } else {
175       log.debug("Attempt to close null connection to a server", new Exception());
176     }
177   }
178 }