1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.accumulo.core.client.impl;
18
19 import org.apache.accumulo.trace.instrument.Tracer;
20 import org.apache.accumulo.core.client.AccumuloException;
21 import org.apache.accumulo.core.client.AccumuloSecurityException;
22 import org.apache.accumulo.core.client.Instance;
23 import org.apache.accumulo.core.client.TableNotFoundException;
24 import org.apache.accumulo.core.client.impl.TabletLocator.TabletLocation;
25 import org.apache.accumulo.core.conf.AccumuloConfiguration;
26 import org.apache.accumulo.core.data.KeyExtent;
27 import org.apache.accumulo.core.data.Mutation;
28 import org.apache.accumulo.core.security.thrift.Credential;
29 import org.apache.accumulo.core.security.thrift.ThriftSecurityException;
30 import org.apache.accumulo.core.tabletserver.thrift.ConstraintViolationException;
31 import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException;
32 import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
33 import org.apache.accumulo.core.util.ArgumentChecker;
34 import org.apache.accumulo.core.util.ThriftUtil;
35 import org.apache.accumulo.core.util.UtilWaitThread;
36 import org.apache.hadoop.io.Text;
37 import org.apache.log4j.Logger;
38 import org.apache.thrift.TException;
39 import org.apache.thrift.TServiceClient;
40 import org.apache.thrift.transport.TTransportException;
41
42 public class Writer {
43
44 private static final Logger log = Logger.getLogger(Writer.class);
45
46 private Instance instance;
47 private Credential credentials;
48 private Text table;
49
50 public Writer(Instance instance, Credential credentials, Text table) {
51 ArgumentChecker.notNull(instance, credentials, table);
52 this.instance = instance;
53 this.credentials = credentials;
54 this.table = table;
55 }
56
57 public Writer(Instance instance, Credential credentials, String table) {
58 this(instance, credentials, new Text(table));
59 }
60
61 private static void updateServer(Mutation m, KeyExtent extent, String server, Credential ai, AccumuloConfiguration configuration) throws TException,
62 NotServingTabletException, ConstraintViolationException, AccumuloSecurityException {
63 ArgumentChecker.notNull(m, extent, server, ai);
64
65 TabletClientService.Iface client = null;
66 try {
67 client = ThriftUtil.getTServerClient(server, configuration);
68 client.update(Tracer.traceInfo(), ai, extent.toThrift(), m.toThrift());
69 return;
70 } catch (ThriftSecurityException e) {
71 throw new AccumuloSecurityException(e.user, e.code);
72 } catch (TTransportException e) {
73 log.warn("Error connecting to " + server + ": " + e);
74 throw e;
75 } finally {
76 ThriftUtil.returnClient((TServiceClient) client);
77 }
78 }
79
80 public void update(Mutation m) throws AccumuloException, AccumuloSecurityException, ConstraintViolationException, TableNotFoundException {
81 ArgumentChecker.notNull(m);
82
83 if (m.size() == 0)
84 throw new IllegalArgumentException("Can not add empty mutations");
85
86 while (true) {
87 TabletLocation tabLoc = TabletLocator.getInstance(instance, credentials, table).locateTablet(new Text(m.getRow()), false, true);
88
89 if (tabLoc == null) {
90 log.trace("No tablet location found for row " + new String(m.getRow()));
91 UtilWaitThread.sleep(500);
92 continue;
93 }
94
95 try {
96 updateServer(m, tabLoc.tablet_extent, tabLoc.tablet_location, credentials, instance.getConfiguration());
97 return;
98 } catch (NotServingTabletException e) {
99 log.trace("Not serving tablet, server = " + tabLoc.tablet_location);
100 TabletLocator.getInstance(instance, credentials, table).invalidateCache(tabLoc.tablet_extent);
101 } catch (TException e) {
102 log.error("server = " + tabLoc.tablet_location, e);
103 TabletLocator.getInstance(instance, credentials, table).invalidateCache(tabLoc.tablet_extent);
104 }
105
106 UtilWaitThread.sleep(500);
107 }
108
109 }
110 }