1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.accumulo.core.iterators.user;
18
19 import java.io.ByteArrayInputStream;
20 import java.io.ByteArrayOutputStream;
21 import java.io.DataInputStream;
22 import java.io.DataOutputStream;
23 import java.io.IOException;
24 import java.util.ArrayList;
25 import java.util.Collection;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.SortedMap;
29 import java.util.TreeMap;
30
31 import org.apache.accumulo.core.data.ByteSequence;
32 import org.apache.accumulo.core.data.Key;
33 import org.apache.accumulo.core.data.PartialKey;
34 import org.apache.accumulo.core.data.Range;
35 import org.apache.accumulo.core.data.Value;
36 import org.apache.accumulo.core.iterators.IteratorEnvironment;
37 import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
38 import org.apache.hadoop.io.Text;
39
40 /**
41 *
42 * The WholeRowIterator is designed to provide row-isolation so that queries see mutations as atomic. It does so by encapsulating an entire row of key/value
43 * pairs into a single key/value pair, which is returned through the client as an atomic operation.
44 *
45 * <p>
46 * One caveat is that when seeking in the WholeRowIterator using a range that starts at a non-inclusive first key in a row, (e.g. seek(new Range(new Key(new
47 * Text("row")),false,...),...)) this iterator will skip to the next row. This is done in order to prevent repeated scanning of the same row when system
48 * automatically creates ranges of that form, which happens in the case of the client calling continueScan, or in the case of the tablet server continuing a
49 * scan after swapping out sources.
50 *
51 * <p>
52 * To regain the original key/value pairs of the row, call the decodeRow function on the key/value pair that this iterator returned.
53 *
54 * @see RowFilter
55 */
56 public class WholeRowIterator implements SortedKeyValueIterator<Key,Value> {
57
58 private SortedKeyValueIterator<Key,Value> sourceIter;
59 private Key topKey = null;
60 private Value topValue = null;
61
62 public WholeRowIterator() {
63
64 }
65
66 WholeRowIterator(SortedKeyValueIterator<Key,Value> source) {
67 this.sourceIter = source;
68 }
69
70
71 public static final SortedMap<Key,Value> decodeRow(Key rowKey, Value rowValue) throws IOException {
72 SortedMap<Key,Value> map = new TreeMap<Key,Value>();
73 ByteArrayInputStream in = new ByteArrayInputStream(rowValue.get());
74 DataInputStream din = new DataInputStream(in);
75 int numKeys = din.readInt();
76 for (int i = 0; i < numKeys; i++) {
77 byte[] cf;
78 byte[] cq;
79 byte[] cv;
80 byte[] valBytes;
81
82 {
83 int len = din.readInt();
84 cf = new byte[len];
85 din.read(cf);
86 }
87
88 {
89 int len = din.readInt();
90 cq = new byte[len];
91 din.read(cq);
92 }
93
94 {
95 int len = din.readInt();
96 cv = new byte[len];
97 din.read(cv);
98 }
99
100 long timestamp = din.readLong();
101
102 {
103 int len = din.readInt();
104 valBytes = new byte[len];
105 din.read(valBytes);
106 }
107 map.put(new Key(rowKey.getRowData().toArray(), cf, cq, cv, timestamp, false, false), new Value(valBytes, false));
108 }
109 return map;
110 }
111
112
113
114 public static final Value encodeRow(List<Key> keys, List<Value> values) throws IOException {
115 ByteArrayOutputStream out = new ByteArrayOutputStream();
116 DataOutputStream dout = new DataOutputStream(out);
117 dout.writeInt(keys.size());
118 for (int i = 0; i < keys.size(); i++) {
119 Key k = keys.get(i);
120 Value v = values.get(i);
121
122 {
123 ByteSequence bs = k.getColumnFamilyData();
124 dout.writeInt(bs.length());
125 dout.write(bs.getBackingArray(), bs.offset(), bs.length());
126 }
127
128 {
129 ByteSequence bs = k.getColumnQualifierData();
130 dout.writeInt(bs.length());
131 dout.write(bs.getBackingArray(), bs.offset(), bs.length());
132 }
133
134 {
135 ByteSequence bs = k.getColumnVisibilityData();
136 dout.writeInt(bs.length());
137 dout.write(bs.getBackingArray(), bs.offset(), bs.length());
138 }
139
140 dout.writeLong(k.getTimestamp());
141
142 byte[] valBytes = v.get();
143 dout.writeInt(valBytes.length);
144 dout.write(valBytes);
145 }
146
147 return new Value(out.toByteArray());
148 }
149
150 List<Key> keys = new ArrayList<Key>();
151 List<Value> values = new ArrayList<Value>();
152
153 private void prepKeys() throws IOException {
154 if (topKey != null)
155 return;
156 Text currentRow;
157 do {
158 if (sourceIter.hasTop() == false)
159 return;
160 currentRow = new Text(sourceIter.getTopKey().getRow());
161 keys.clear();
162 values.clear();
163 while (sourceIter.hasTop() && sourceIter.getTopKey().getRow().equals(currentRow)) {
164 keys.add(new Key(sourceIter.getTopKey()));
165 values.add(new Value(sourceIter.getTopValue()));
166 sourceIter.next();
167 }
168 } while (!filter(currentRow, keys, values));
169
170 topKey = new Key(currentRow);
171 topValue = encodeRow(keys, values);
172
173 }
174
175 /**
176 *
177 * @param currentRow
178 * All keys have this in their row portion (do not modify!).
179 * @param keys
180 * One key for each key in the row, ordered as they are given by the source iterator (do not modify!).
181 * @param values
182 * One value for each key in keys, ordered to correspond to the ordering in keys (do not modify!).
183 * @return true if we want to keep the row, false if we want to skip it
184 */
185 protected boolean filter(Text currentRow, List<Key> keys, List<Value> values) {
186 return true;
187 }
188
189 @Override
190 public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
191 if (sourceIter != null)
192 return new WholeRowIterator(sourceIter.deepCopy(env));
193 return new WholeRowIterator();
194 }
195
196 @Override
197 public Key getTopKey() {
198 return topKey;
199 }
200
201 @Override
202 public Value getTopValue() {
203 return topValue;
204 }
205
206 @Override
207 public boolean hasTop() {
208 return topKey != null;
209 }
210
211 @Override
212 public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
213 sourceIter = source;
214 }
215
216 @Override
217 public void next() throws IOException {
218 topKey = null;
219 topValue = null;
220 prepKeys();
221 }
222
223 @Override
224 public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
225 topKey = null;
226 topValue = null;
227
228 Key sk = range.getStartKey();
229
230 if (sk != null && sk.getColumnFamilyData().length() == 0 && sk.getColumnQualifierData().length() == 0 && sk.getColumnVisibilityData().length() == 0
231 && sk.getTimestamp() == Long.MAX_VALUE && !range.isStartKeyInclusive()) {
232
233
234 Key followingRowKey = sk.followingKey(PartialKey.ROW);
235 if (range.getEndKey() != null && followingRowKey.compareTo(range.getEndKey()) > 0)
236 return;
237
238 range = new Range(sk.followingKey(PartialKey.ROW), true, range.getEndKey(), range.isEndKeyInclusive());
239 }
240
241 sourceIter.seek(range, columnFamilies, inclusive);
242 prepKeys();
243 }
244
245 }