1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.accumulo.core.client.impl;
18
19 import java.util.HashMap;
20
21 import org.apache.accumulo.core.client.AccumuloException;
22 import org.apache.accumulo.core.client.AccumuloSecurityException;
23 import org.apache.accumulo.core.client.BatchWriter;
24 import org.apache.accumulo.core.client.BatchWriterConfig;
25 import org.apache.accumulo.core.client.Instance;
26 import org.apache.accumulo.core.client.MultiTableBatchWriter;
27 import org.apache.accumulo.core.client.MutationsRejectedException;
28 import org.apache.accumulo.core.client.TableNotFoundException;
29 import org.apache.accumulo.core.client.TableOfflineException;
30 import org.apache.accumulo.core.data.Mutation;
31 import org.apache.accumulo.core.master.state.tables.TableState;
32 import org.apache.accumulo.core.security.thrift.Credential;
33 import org.apache.accumulo.core.util.ArgumentChecker;
34 import org.apache.log4j.Logger;
35
36 public class MultiTableBatchWriterImpl implements MultiTableBatchWriter {
37 static final Logger log = Logger.getLogger(MultiTableBatchWriterImpl.class);
38 private boolean closed;
39
40 private class TableBatchWriter implements BatchWriter {
41
42 private String table;
43
44 TableBatchWriter(String table) {
45 this.table = table;
46 }
47
48 @Override
49 public void addMutation(Mutation m) throws MutationsRejectedException {
50 ArgumentChecker.notNull(m);
51 bw.addMutation(table, m);
52 }
53
54 @Override
55 public void addMutations(Iterable<Mutation> iterable) throws MutationsRejectedException {
56 bw.addMutation(table, iterable.iterator());
57 }
58
59 @Override
60 public void close() {
61 throw new UnsupportedOperationException("Must close all tables, can not close an individual table");
62 }
63
64 @Override
65 public void flush() {
66 throw new UnsupportedOperationException("Must flush all tables, can not flush an individual table");
67 }
68
69 }
70
71 private TabletServerBatchWriter bw;
72 private HashMap<String,BatchWriter> tableWriters;
73 private Instance instance;
74
75 public MultiTableBatchWriterImpl(Instance instance, Credential credentials, BatchWriterConfig config) {
76 ArgumentChecker.notNull(instance, credentials);
77 this.instance = instance;
78 this.bw = new TabletServerBatchWriter(instance, credentials, config);
79 tableWriters = new HashMap<String,BatchWriter>();
80 this.closed = false;
81 }
82
83 public boolean isClosed() {
84 return this.closed;
85 }
86
87 public void close() throws MutationsRejectedException {
88 bw.close();
89 this.closed = true;
90 }
91
92 /**
93 * Warning: do not rely upon finalize to close this class. Finalize is not guaranteed to be called.
94 */
95 @Override
96 protected void finalize() {
97 if (!closed) {
98 log.warn(MultiTableBatchWriterImpl.class.getSimpleName() + " not shutdown; did you forget to call close()?");
99 try {
100 close();
101 } catch (MutationsRejectedException mre) {
102 log.error(MultiTableBatchWriterImpl.class.getSimpleName() + " internal error.", mre);
103 throw new RuntimeException("Exception when closing " + MultiTableBatchWriterImpl.class.getSimpleName(), mre);
104 }
105 }
106 }
107
108 @Override
109 public synchronized BatchWriter getBatchWriter(String tableName) throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
110 ArgumentChecker.notNull(tableName);
111 String tableId = Tables.getNameToIdMap(instance).get(tableName);
112 if (tableId == null)
113 throw new TableNotFoundException(tableId, tableName, null);
114
115 if (Tables.getTableState(instance, tableId) == TableState.OFFLINE)
116 throw new TableOfflineException(instance, tableId);
117
118 BatchWriter tbw = tableWriters.get(tableId);
119 if (tbw == null) {
120 tbw = new TableBatchWriter(tableId);
121 tableWriters.put(tableId, tbw);
122 }
123 return tbw;
124 }
125
126 @Override
127 public void flush() throws MutationsRejectedException {
128 bw.flush();
129 }
130
131 }