View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  package org.apache.accumulo.core.client.impl;
18  
19  import org.apache.accumulo.trace.instrument.Tracer;
20  import org.apache.accumulo.core.client.AccumuloException;
21  import org.apache.accumulo.core.client.AccumuloSecurityException;
22  import org.apache.accumulo.core.client.Instance;
23  import org.apache.accumulo.core.client.TableNotFoundException;
24  import org.apache.accumulo.core.client.impl.TabletLocator.TabletLocation;
25  import org.apache.accumulo.core.conf.AccumuloConfiguration;
26  import org.apache.accumulo.core.data.KeyExtent;
27  import org.apache.accumulo.core.data.Mutation;
28  import org.apache.accumulo.core.security.thrift.Credential;
29  import org.apache.accumulo.core.security.thrift.ThriftSecurityException;
30  import org.apache.accumulo.core.tabletserver.thrift.ConstraintViolationException;
31  import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException;
32  import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
33  import org.apache.accumulo.core.util.ArgumentChecker;
34  import org.apache.accumulo.core.util.ThriftUtil;
35  import org.apache.accumulo.core.util.UtilWaitThread;
36  import org.apache.hadoop.io.Text;
37  import org.apache.log4j.Logger;
38  import org.apache.thrift.TException;
39  import org.apache.thrift.TServiceClient;
40  import org.apache.thrift.transport.TTransportException;
41  
42  public class Writer {
43    
44    private static final Logger log = Logger.getLogger(Writer.class);
45    
46    private Instance instance;
47    private Credential credentials;
48    private Text table;
49    
50    public Writer(Instance instance, Credential credentials, Text table) {
51      ArgumentChecker.notNull(instance, credentials, table);
52      this.instance = instance;
53      this.credentials = credentials;
54      this.table = table;
55    }
56    
57    public Writer(Instance instance, Credential credentials, String table) {
58      this(instance, credentials, new Text(table));
59    }
60    
61    private static void updateServer(Mutation m, KeyExtent extent, String server, Credential ai, AccumuloConfiguration configuration) throws TException,
62        NotServingTabletException, ConstraintViolationException, AccumuloSecurityException {
63      ArgumentChecker.notNull(m, extent, server, ai);
64      
65      TabletClientService.Iface client = null;
66      try {
67        client = ThriftUtil.getTServerClient(server, configuration);
68        client.update(Tracer.traceInfo(), ai, extent.toThrift(), m.toThrift());
69        return;
70      } catch (ThriftSecurityException e) {
71        throw new AccumuloSecurityException(e.user, e.code);
72      } catch (TTransportException e) {
73        log.warn("Error connecting to " + server + ": " + e);
74        throw e;
75      } finally {
76        ThriftUtil.returnClient((TServiceClient) client);
77      }
78    }
79    
80    public void update(Mutation m) throws AccumuloException, AccumuloSecurityException, ConstraintViolationException, TableNotFoundException {
81      ArgumentChecker.notNull(m);
82      
83      if (m.size() == 0)
84        throw new IllegalArgumentException("Can not add empty mutations");
85      
86      while (true) {
87        TabletLocation tabLoc = TabletLocator.getInstance(instance, credentials, table).locateTablet(new Text(m.getRow()), false, true);
88        
89        if (tabLoc == null) {
90          log.trace("No tablet location found for row " + new String(m.getRow()));
91          UtilWaitThread.sleep(500);
92          continue;
93        }
94        
95        try {
96          updateServer(m, tabLoc.tablet_extent, tabLoc.tablet_location, credentials, instance.getConfiguration());
97          return;
98        } catch (NotServingTabletException e) {
99          log.trace("Not serving tablet, server = " + tabLoc.tablet_location);
100         TabletLocator.getInstance(instance, credentials, table).invalidateCache(tabLoc.tablet_extent);
101       } catch (TException e) {
102         log.error("server = " + tabLoc.tablet_location, e);
103         TabletLocator.getInstance(instance, credentials, table).invalidateCache(tabLoc.tablet_extent);
104       } 
105       
106       UtilWaitThread.sleep(500);
107     }
108     
109   }
110 }