1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.accumulo.core.iterators.user;
18
19 import java.io.IOException;
20 import java.util.Collection;
21 import java.util.Collections;
22 import java.util.Map;
23
24 import org.apache.accumulo.core.client.IteratorSetting;
25 import org.apache.accumulo.core.data.ArrayByteSequence;
26 import org.apache.accumulo.core.data.ByteSequence;
27 import org.apache.accumulo.core.data.Key;
28 import org.apache.accumulo.core.data.PartialKey;
29 import org.apache.accumulo.core.data.Range;
30 import org.apache.accumulo.core.data.Value;
31 import org.apache.accumulo.core.iterators.IteratorEnvironment;
32 import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
33 import org.apache.accumulo.core.util.TextUtil;
34 import org.apache.commons.codec.binary.Base64;
35 import org.apache.hadoop.io.Text;
36 import org.apache.log4j.Logger;
37
38 /**
39 * This iterator facilitates document-partitioned indexing. It involves grouping a set of documents together and indexing those documents into a single row of
40 * an Accumulo table. This allows a tablet server to perform boolean AND operations on terms in the index.
41 *
42 * The table structure should have the following form:
43 *
44 * row: shardID, colfam: term, colqual: docID
45 *
46 * When you configure this iterator with a set of terms (column families), it will return only the docIDs that appear with all of the specified terms. The
47 * result will have an empty column family, as follows:
48 *
49 * row: shardID, colfam: (empty), colqual: docID
50 *
51 * This iterator is commonly used with BatchScanner or AccumuloInputFormat, to parallelize the search over all shardIDs.
52 *
53 * This iterator will *ignore* any columnFamilies passed to {@link #seek(Range, Collection, boolean)} as it performs intersections over terms. Extending classes
54 * should override the {@link TermSource#seekColfams} in their implementation's {@link #init(SortedKeyValueIterator, Map, IteratorEnvironment)} method.
55 *
56 * README.shard in docs/examples shows an example of using the IntersectingIterator.
57 */
58 public class IntersectingIterator implements SortedKeyValueIterator<Key,Value> {
59
60 protected Text nullText = new Text();
61
62 protected Text getPartition(Key key) {
63 return key.getRow();
64 }
65
66 protected Text getTerm(Key key) {
67 return key.getColumnFamily();
68 }
69
70 protected Text getDocID(Key key) {
71 return key.getColumnQualifier();
72 }
73
74 protected Key buildKey(Text partition, Text term) {
75 return new Key(partition, (term == null) ? nullText : term);
76 }
77
78 protected Key buildKey(Text partition, Text term, Text docID) {
79 return new Key(partition, (term == null) ? nullText : term, docID);
80 }
81
82 protected Key buildFollowingPartitionKey(Key key) {
83 return key.followingKey(PartialKey.ROW);
84 }
85
86 protected static final Logger log = Logger.getLogger(IntersectingIterator.class);
87
88 public static class TermSource {
89 public SortedKeyValueIterator<Key,Value> iter;
90 public Text term;
91 public Collection<ByteSequence> seekColfams;
92 public boolean notFlag;
93
94 public TermSource(TermSource other) {
95 this.iter = other.iter;
96 this.term = other.term;
97 this.notFlag = other.notFlag;
98 this.seekColfams = other.seekColfams;
99 }
100
101 public TermSource(SortedKeyValueIterator<Key,Value> iter, Text term) {
102 this(iter, term, false);
103 }
104
105 public TermSource(SortedKeyValueIterator<Key,Value> iter, Text term, boolean notFlag) {
106 this.iter = iter;
107 this.term = term;
108 this.notFlag = notFlag;
109
110 this.seekColfams = Collections.<ByteSequence> singletonList(new ArrayByteSequence(term.getBytes(), 0, term.getLength()));
111 }
112
113 public String getTermString() {
114 return (this.term == null) ? new String("Iterator") : this.term.toString();
115 }
116 }
117
118 TermSource[] sources;
119 int sourcesCount = 0;
120
121 Range overallRange;
122
123
124 protected Text currentPartition = null;
125 protected Text currentDocID = new Text(emptyByteArray);
126 static final byte[] emptyByteArray = new byte[0];
127
128 protected Key topKey = null;
129 protected Value value = new Value(emptyByteArray);
130
131 public IntersectingIterator() {}
132
133 @Override
134 public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
135 return new IntersectingIterator(this, env);
136 }
137
138 private IntersectingIterator(IntersectingIterator other, IteratorEnvironment env) {
139 if (other.sources != null) {
140 sourcesCount = other.sourcesCount;
141 sources = new TermSource[sourcesCount];
142 for (int i = 0; i < sourcesCount; i++) {
143 sources[i] = new TermSource(other.sources[i].iter.deepCopy(env), other.sources[i].term);
144 }
145 }
146 }
147
148 @Override
149 public Key getTopKey() {
150 return topKey;
151 }
152
153 @Override
154 public Value getTopValue() {
155
156 return value;
157 }
158
159 @Override
160 public boolean hasTop() {
161 return currentPartition != null;
162 }
163
164
165 private boolean seekOneSource(int sourceID) throws IOException {
166
167
168
169
170
171
172
173
174
175
176
177 boolean advancedCursor = false;
178
179 if (sources[sourceID].notFlag) {
180 while (true) {
181 if (sources[sourceID].iter.hasTop() == false) {
182
183 break;
184 }
185
186 int endCompare = -1;
187
188 if (overallRange.getEndKey() != null) {
189 endCompare = overallRange.getEndKey().getRow().compareTo(sources[sourceID].iter.getTopKey().getRow());
190 if ((!overallRange.isEndKeyInclusive() && endCompare <= 0) || endCompare < 0) {
191
192 break;
193 }
194 }
195 int partitionCompare = currentPartition.compareTo(getPartition(sources[sourceID].iter.getTopKey()));
196
197
198
199 if (partitionCompare > 0) {
200
201 Key seekKey = buildKey(currentPartition, sources[sourceID].term);
202 sources[sourceID].iter.seek(new Range(seekKey, true, null, false), sources[sourceID].seekColfams, true);
203 continue;
204 }
205
206
207 if (partitionCompare < 0) {
208 break;
209 }
210
211
212
213 if (sources[sourceID].term != null) {
214 int termCompare = sources[sourceID].term.compareTo(getTerm(sources[sourceID].iter.getTopKey()));
215
216
217 if (termCompare > 0) {
218 Key seekKey = buildKey(currentPartition, sources[sourceID].term, currentDocID);
219 sources[sourceID].iter.seek(new Range(seekKey, true, null, false), sources[sourceID].seekColfams, true);
220 continue;
221 }
222
223
224 if (termCompare < 0) {
225 break;
226 }
227 }
228
229
230
231 Text docID = getDocID(sources[sourceID].iter.getTopKey());
232 int docIDCompare = currentDocID.compareTo(docID);
233
234 if (docIDCompare < 0) {
235 break;
236 }
237
238 if (docIDCompare > 0) {
239
240 Key seekKey = buildKey(currentPartition, sources[sourceID].term, currentDocID);
241 sources[sourceID].iter.seek(new Range(seekKey, true, null, false), sources[sourceID].seekColfams, true);
242 continue;
243 }
244
245
246
247
248 if (docIDCompare == 0) {
249 sources[0].iter.next();
250 advancedCursor = true;
251 break;
252 }
253 }
254 } else {
255 while (true) {
256 if (sources[sourceID].iter.hasTop() == false) {
257 currentPartition = null;
258
259 return true;
260 }
261
262 int endCompare = -1;
263
264
265 if (overallRange.getEndKey() != null) {
266 endCompare = overallRange.getEndKey().getRow().compareTo(sources[sourceID].iter.getTopKey().getRow());
267 if ((!overallRange.isEndKeyInclusive() && endCompare <= 0) || endCompare < 0) {
268 currentPartition = null;
269
270 return true;
271 }
272 }
273 int partitionCompare = currentPartition.compareTo(getPartition(sources[sourceID].iter.getTopKey()));
274
275
276 if (partitionCompare > 0) {
277
278 Key seekKey = buildKey(currentPartition, sources[sourceID].term);
279 sources[sourceID].iter.seek(new Range(seekKey, true, null, false), sources[sourceID].seekColfams, true);
280 continue;
281 }
282
283
284 if (partitionCompare < 0) {
285 currentPartition.set(getPartition(sources[sourceID].iter.getTopKey()));
286 currentDocID.set(emptyByteArray);
287 advancedCursor = true;
288 continue;
289 }
290
291
292
293
294 if (sources[sourceID].term != null) {
295 int termCompare = sources[sourceID].term.compareTo(getTerm(sources[sourceID].iter.getTopKey()));
296
297
298 if (termCompare > 0) {
299 Key seekKey = buildKey(currentPartition, sources[sourceID].term, currentDocID);
300 sources[sourceID].iter.seek(new Range(seekKey, true, null, false), sources[sourceID].seekColfams, true);
301 continue;
302 }
303
304
305 if (termCompare < 0) {
306
307
308
309
310
311
312
313 if (endCompare == 0) {
314
315 currentPartition = null;
316
317 return true;
318 }
319 Key seekKey = buildFollowingPartitionKey(sources[sourceID].iter.getTopKey());
320 sources[sourceID].iter.seek(new Range(seekKey, true, null, false), sources[sourceID].seekColfams, true);
321 continue;
322 }
323 }
324
325
326 Text docID = getDocID(sources[sourceID].iter.getTopKey());
327 int docIDCompare = currentDocID.compareTo(docID);
328
329 if (docIDCompare < 0) {
330 currentDocID.set(docID);
331 advancedCursor = true;
332 break;
333 }
334
335 if (docIDCompare > 0) {
336
337 Key seekKey = buildKey(currentPartition, sources[sourceID].term, currentDocID);
338 sources[sourceID].iter.seek(new Range(seekKey, true, null, false), sources[sourceID].seekColfams, true);
339 continue;
340 }
341
342 break;
343 }
344 }
345 return advancedCursor;
346 }
347
348 @Override
349 public void next() throws IOException {
350 if (currentPartition == null) {
351 return;
352 }
353
354
355 sources[0].iter.next();
356 advanceToIntersection();
357 }
358
359 protected void advanceToIntersection() throws IOException {
360 boolean cursorChanged = true;
361 while (cursorChanged) {
362
363 cursorChanged = false;
364 for (int i = 0; i < sourcesCount; i++) {
365 if (currentPartition == null) {
366 topKey = null;
367 return;
368 }
369 if (seekOneSource(i)) {
370 cursorChanged = true;
371 break;
372 }
373 }
374 }
375 topKey = buildKey(currentPartition, nullText, currentDocID);
376 }
377
378 public static String stringTopKey(SortedKeyValueIterator<Key,Value> iter) {
379 if (iter.hasTop())
380 return iter.getTopKey().toString();
381 return "";
382 }
383
384 private static final String columnFamiliesOptionName = "columnFamilies";
385 private static final String notFlagOptionName = "notFlag";
386
387 /**
388 * @param columns
389 * @return encoded columns
390 */
391 protected static String encodeColumns(Text[] columns) {
392 StringBuilder sb = new StringBuilder();
393 for (int i = 0; i < columns.length; i++) {
394 sb.append(new String(Base64.encodeBase64(TextUtil.getBytes(columns[i]))));
395 sb.append('\n');
396 }
397 return sb.toString();
398 }
399
400 /**
401 * @param flags
402 * @return encoded flags
403 */
404 protected static String encodeBooleans(boolean[] flags) {
405 byte[] bytes = new byte[flags.length];
406 for (int i = 0; i < flags.length; i++) {
407 if (flags[i])
408 bytes[i] = 1;
409 else
410 bytes[i] = 0;
411 }
412 return new String(Base64.encodeBase64(bytes));
413 }
414
415 protected static Text[] decodeColumns(String columns) {
416 String[] columnStrings = columns.split("\n");
417 Text[] columnTexts = new Text[columnStrings.length];
418 for (int i = 0; i < columnStrings.length; i++) {
419 columnTexts[i] = new Text(Base64.decodeBase64(columnStrings[i].getBytes()));
420 }
421 return columnTexts;
422 }
423
424 /**
425 * @param flags
426 * @return decoded flags
427 */
428 protected static boolean[] decodeBooleans(String flags) {
429
430 if (flags == null)
431 return null;
432
433 byte[] bytes = Base64.decodeBase64(flags.getBytes());
434 boolean[] bFlags = new boolean[bytes.length];
435 for (int i = 0; i < bytes.length; i++) {
436 if (bytes[i] == 1)
437 bFlags[i] = true;
438 else
439 bFlags[i] = false;
440 }
441 return bFlags;
442 }
443
444 @Override
445 public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
446 Text[] terms = decodeColumns(options.get(columnFamiliesOptionName));
447 boolean[] notFlag = decodeBooleans(options.get(notFlagOptionName));
448
449 if (terms.length < 2) {
450 throw new IllegalArgumentException("IntersectionIterator requires two or more columns families");
451 }
452
453
454
455
456 if (notFlag == null) {
457 notFlag = new boolean[terms.length];
458 for (int i = 0; i < terms.length; i++)
459 notFlag[i] = false;
460 }
461 if (notFlag[0]) {
462 for (int i = 1; i < notFlag.length; i++) {
463 if (notFlag[i] == false) {
464 Text swapFamily = new Text(terms[0]);
465 terms[0].set(terms[i]);
466 terms[i].set(swapFamily);
467 notFlag[0] = false;
468 notFlag[i] = true;
469 break;
470 }
471 }
472 if (notFlag[0]) {
473 throw new IllegalArgumentException("IntersectionIterator requires at lest one column family without not");
474 }
475 }
476
477 sources = new TermSource[terms.length];
478 sources[0] = new TermSource(source, terms[0]);
479 for (int i = 1; i < terms.length; i++) {
480 sources[i] = new TermSource(source.deepCopy(env), terms[i], notFlag[i]);
481 }
482 sourcesCount = terms.length;
483 }
484
485 @Override
486 public void seek(Range range, Collection<ByteSequence> seekColumnFamilies, boolean inclusive) throws IOException {
487 overallRange = new Range(range);
488 currentPartition = new Text();
489 currentDocID.set(emptyByteArray);
490
491
492 for (int i = 0; i < sourcesCount; i++) {
493 Key sourceKey;
494 if (range.getStartKey() != null) {
495 if (range.getStartKey().getColumnQualifier() != null) {
496 sourceKey = buildKey(getPartition(range.getStartKey()), sources[i].term, range.getStartKey().getColumnQualifier());
497 } else {
498 sourceKey = buildKey(getPartition(range.getStartKey()), sources[i].term);
499 }
500
501 sources[i].iter.seek(new Range(sourceKey, true, null, false), sources[i].seekColfams, true);
502 } else {
503
504 sources[i].iter.seek(range, sources[i].seekColfams, true);
505 }
506 }
507 advanceToIntersection();
508 }
509
510 public void addSource(SortedKeyValueIterator<Key,Value> source, IteratorEnvironment env, Text term, boolean notFlag) {
511
512 if (sources == null) {
513 sources = new TermSource[1];
514 } else {
515
516
517 TermSource[] localSources = new TermSource[sources.length + 1];
518 int currSource = 0;
519 for (TermSource myTerm : sources) {
520
521 localSources[currSource] = new TermSource(myTerm);
522 currSource++;
523 }
524 sources = localSources;
525 }
526 sources[sourcesCount] = new TermSource(source.deepCopy(env), term, notFlag);
527 sourcesCount++;
528 }
529
530 /**
531 * Encode the columns to be used when iterating.
532 *
533 * @param cfg
534 * @param columns
535 */
536 public static void setColumnFamilies(IteratorSetting cfg, Text[] columns) {
537 if (columns.length < 2)
538 throw new IllegalArgumentException("Must supply at least two terms to intersect");
539 cfg.addOption(IntersectingIterator.columnFamiliesOptionName, IntersectingIterator.encodeColumns(columns));
540 }
541
542 /**
543 * Encode columns and NOT flags indicating which columns should be negated (docIDs will be excluded if matching negated columns, instead of included).
544 *
545 * @param cfg
546 * @param columns
547 * @param notFlags
548 */
549 public static void setColumnFamilies(IteratorSetting cfg, Text[] columns, boolean[] notFlags) {
550 if (columns.length < 2)
551 throw new IllegalArgumentException("Must supply at least two terms to intersect");
552 if (columns.length != notFlags.length)
553 throw new IllegalArgumentException("columns and notFlags arrays must be the same length");
554 setColumnFamilies(cfg, columns);
555 cfg.addOption(IntersectingIterator.notFlagOptionName, IntersectingIterator.encodeBooleans(notFlags));
556 }
557 }