View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  package org.apache.accumulo.core.iterators.user;
18  
19  import java.io.IOException;
20  import java.util.Collection;
21  import java.util.Collections;
22  import java.util.Map;
23  
24  import org.apache.accumulo.core.client.IteratorSetting;
25  import org.apache.accumulo.core.data.ArrayByteSequence;
26  import org.apache.accumulo.core.data.ByteSequence;
27  import org.apache.accumulo.core.data.Key;
28  import org.apache.accumulo.core.data.PartialKey;
29  import org.apache.accumulo.core.data.Range;
30  import org.apache.accumulo.core.data.Value;
31  import org.apache.accumulo.core.iterators.IteratorEnvironment;
32  import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
33  import org.apache.accumulo.core.util.TextUtil;
34  import org.apache.commons.codec.binary.Base64;
35  import org.apache.hadoop.io.Text;
36  import org.apache.log4j.Logger;
37  
38  /**
39   * This iterator facilitates document-partitioned indexing. It involves grouping a set of documents together and indexing those documents into a single row of
40   * an Accumulo table. This allows a tablet server to perform boolean AND operations on terms in the index.
41   * 
42   * The table structure should have the following form:
43   * 
44   * row: shardID, colfam: term, colqual: docID
45   * 
46   * When you configure this iterator with a set of terms (column families), it will return only the docIDs that appear with all of the specified terms. The
47   * result will have an empty column family, as follows:
48   * 
49   * row: shardID, colfam: (empty), colqual: docID
50   * 
51   * This iterator is commonly used with BatchScanner or AccumuloInputFormat, to parallelize the search over all shardIDs.
52   * 
53   * This iterator will *ignore* any columnFamilies passed to {@link #seek(Range, Collection, boolean)} as it performs intersections over terms. Extending classes
54   * should override the {@link TermSource#seekColfams} in their implementation's {@link #init(SortedKeyValueIterator, Map, IteratorEnvironment)} method.
55   * 
56   * README.shard in docs/examples shows an example of using the IntersectingIterator.
57   */
58  public class IntersectingIterator implements SortedKeyValueIterator<Key,Value> {
59    
60    protected Text nullText = new Text();
61    
62    protected Text getPartition(Key key) {
63      return key.getRow();
64    }
65    
66    protected Text getTerm(Key key) {
67      return key.getColumnFamily();
68    }
69    
70    protected Text getDocID(Key key) {
71      return key.getColumnQualifier();
72    }
73    
74    protected Key buildKey(Text partition, Text term) {
75      return new Key(partition, (term == null) ? nullText : term);
76    }
77    
78    protected Key buildKey(Text partition, Text term, Text docID) {
79      return new Key(partition, (term == null) ? nullText : term, docID);
80    }
81    
82    protected Key buildFollowingPartitionKey(Key key) {
83      return key.followingKey(PartialKey.ROW);
84    }
85    
86    protected static final Logger log = Logger.getLogger(IntersectingIterator.class);
87    
88    public static class TermSource {
89      public SortedKeyValueIterator<Key,Value> iter;
90      public Text term;
91      public Collection<ByteSequence> seekColfams;
92      public boolean notFlag;
93      
94      public TermSource(TermSource other) {
95        this.iter = other.iter;
96        this.term = other.term;
97        this.notFlag = other.notFlag;
98        this.seekColfams = other.seekColfams;
99      }
100     
101     public TermSource(SortedKeyValueIterator<Key,Value> iter, Text term) {
102       this(iter, term, false);
103     }
104     
105     public TermSource(SortedKeyValueIterator<Key,Value> iter, Text term, boolean notFlag) {
106       this.iter = iter;
107       this.term = term;
108       this.notFlag = notFlag;
109       // The desired column families for this source is the term itself
110       this.seekColfams = Collections.<ByteSequence> singletonList(new ArrayByteSequence(term.getBytes(), 0, term.getLength()));
111     }
112     
113     public String getTermString() {
114       return (this.term == null) ? new String("Iterator") : this.term.toString();
115     }
116   }
117   
118   TermSource[] sources;
119   int sourcesCount = 0;
120   
121   Range overallRange;
122   
123   // query-time settings
124   protected Text currentPartition = null;
125   protected Text currentDocID = new Text(emptyByteArray);
126   static final byte[] emptyByteArray = new byte[0];
127   
128   protected Key topKey = null;
129   protected Value value = new Value(emptyByteArray);
130   
131   public IntersectingIterator() {}
132   
133   @Override
134   public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
135     return new IntersectingIterator(this, env);
136   }
137   
138   private IntersectingIterator(IntersectingIterator other, IteratorEnvironment env) {
139     if (other.sources != null) {
140       sourcesCount = other.sourcesCount;
141       sources = new TermSource[sourcesCount];
142       for (int i = 0; i < sourcesCount; i++) {
143         sources[i] = new TermSource(other.sources[i].iter.deepCopy(env), other.sources[i].term);
144       }
145     }
146   }
147   
148   @Override
149   public Key getTopKey() {
150     return topKey;
151   }
152   
153   @Override
154   public Value getTopValue() {
155     // we don't really care about values
156     return value;
157   }
158   
159   @Override
160   public boolean hasTop() {
161     return currentPartition != null;
162   }
163   
164   // precondition: currentRow is not null
165   private boolean seekOneSource(int sourceID) throws IOException {
166     // find the next key in the appropriate column family that is at or beyond the cursor (currentRow, currentCQ)
167     // advance the cursor if this source goes beyond it
168     // return whether we advanced the cursor
169     
170     // within this loop progress must be made in one of the following forms:
171     // - currentRow or currentCQ must be increased
172     // - the given source must advance its iterator
173     // this loop will end when any of the following criteria are met
174     // - the iterator for the given source is pointing to the key (currentRow, columnFamilies[sourceID], currentCQ)
175     // - the given source is out of data and currentRow is set to null
176     // - the given source has advanced beyond the endRow and currentRow is set to null
177     boolean advancedCursor = false;
178     
179     if (sources[sourceID].notFlag) {
180       while (true) {
181         if (sources[sourceID].iter.hasTop() == false) {
182           // an empty column that you are negating is a valid condition
183           break;
184         }
185         // check if we're past the end key
186         int endCompare = -1;
187         // we should compare the row to the end of the range
188         if (overallRange.getEndKey() != null) {
189           endCompare = overallRange.getEndKey().getRow().compareTo(sources[sourceID].iter.getTopKey().getRow());
190           if ((!overallRange.isEndKeyInclusive() && endCompare <= 0) || endCompare < 0) {
191             // an empty column that you are negating is a valid condition
192             break;
193           }
194         }
195         int partitionCompare = currentPartition.compareTo(getPartition(sources[sourceID].iter.getTopKey()));
196         // check if this source is already at or beyond currentRow
197         // if not, then seek to at least the current row
198         
199         if (partitionCompare > 0) {
200           // seek to at least the currentRow
201           Key seekKey = buildKey(currentPartition, sources[sourceID].term);
202           sources[sourceID].iter.seek(new Range(seekKey, true, null, false), sources[sourceID].seekColfams, true);
203           continue;
204         }
205         // check if this source has gone beyond currentRow
206         // if so, this is a valid condition for negation
207         if (partitionCompare < 0) {
208           break;
209         }
210         // we have verified that the current source is positioned in currentRow
211         // now we must make sure we're in the right columnFamily in the current row
212         // Note: Iterators are auto-magically set to the correct columnFamily
213         if (sources[sourceID].term != null) {
214           int termCompare = sources[sourceID].term.compareTo(getTerm(sources[sourceID].iter.getTopKey()));
215           // check if this source is already on the right columnFamily
216           // if not, then seek forwards to the right columnFamily
217           if (termCompare > 0) {
218             Key seekKey = buildKey(currentPartition, sources[sourceID].term, currentDocID);
219             sources[sourceID].iter.seek(new Range(seekKey, true, null, false), sources[sourceID].seekColfams, true);
220             continue;
221           }
222           // check if this source is beyond the right columnFamily
223           // if so, then this is a valid condition for negating
224           if (termCompare < 0) {
225             break;
226           }
227         }
228         
229         // we have verified that we are in currentRow and the correct column family
230         // make sure we are at or beyond columnQualifier
231         Text docID = getDocID(sources[sourceID].iter.getTopKey());
232         int docIDCompare = currentDocID.compareTo(docID);
233         // If we are past the target, this is a valid result
234         if (docIDCompare < 0) {
235           break;
236         }
237         // if this source is not yet at the currentCQ then advance in this source
238         if (docIDCompare > 0) {
239           // seek forwards
240           Key seekKey = buildKey(currentPartition, sources[sourceID].term, currentDocID);
241           sources[sourceID].iter.seek(new Range(seekKey, true, null, false), sources[sourceID].seekColfams, true);
242           continue;
243         }
244         // if we are equal to the target, this is an invalid result.
245         // Force the entire process to go to the next row.
246         // We are advancing column 0 because we forced that column to not contain a !
247         // when we did the init()
248         if (docIDCompare == 0) {
249           sources[0].iter.next();
250           advancedCursor = true;
251           break;
252         }
253       }
254     } else {
255       while (true) {
256         if (sources[sourceID].iter.hasTop() == false) {
257           currentPartition = null;
258           // setting currentRow to null counts as advancing the cursor
259           return true;
260         }
261         // check if we're past the end key
262         int endCompare = -1;
263         // we should compare the row to the end of the range
264         
265         if (overallRange.getEndKey() != null) {
266           endCompare = overallRange.getEndKey().getRow().compareTo(sources[sourceID].iter.getTopKey().getRow());
267           if ((!overallRange.isEndKeyInclusive() && endCompare <= 0) || endCompare < 0) {
268             currentPartition = null;
269             // setting currentRow to null counts as advancing the cursor
270             return true;
271           }
272         }
273         int partitionCompare = currentPartition.compareTo(getPartition(sources[sourceID].iter.getTopKey()));
274         // check if this source is already at or beyond currentRow
275         // if not, then seek to at least the current row
276         if (partitionCompare > 0) {
277           // seek to at least the currentRow
278           Key seekKey = buildKey(currentPartition, sources[sourceID].term);
279           sources[sourceID].iter.seek(new Range(seekKey, true, null, false), sources[sourceID].seekColfams, true);
280           continue;
281         }
282         // check if this source has gone beyond currentRow
283         // if so, advance currentRow
284         if (partitionCompare < 0) {
285           currentPartition.set(getPartition(sources[sourceID].iter.getTopKey()));
286           currentDocID.set(emptyByteArray);
287           advancedCursor = true;
288           continue;
289         }
290         // we have verified that the current source is positioned in currentRow
291         // now we must make sure we're in the right columnFamily in the current row
292         // Note: Iterators are auto-magically set to the correct columnFamily
293         
294         if (sources[sourceID].term != null) {
295           int termCompare = sources[sourceID].term.compareTo(getTerm(sources[sourceID].iter.getTopKey()));
296           // check if this source is already on the right columnFamily
297           // if not, then seek forwards to the right columnFamily
298           if (termCompare > 0) {
299             Key seekKey = buildKey(currentPartition, sources[sourceID].term, currentDocID);
300             sources[sourceID].iter.seek(new Range(seekKey, true, null, false), sources[sourceID].seekColfams, true);
301             continue;
302           }
303           // check if this source is beyond the right columnFamily
304           // if so, then seek to the next row
305           if (termCompare < 0) {
306             // we're out of entries in the current row, so seek to the next one
307             // byte[] currentRowBytes = currentRow.getBytes();
308             // byte[] nextRow = new byte[currentRowBytes.length + 1];
309             // System.arraycopy(currentRowBytes, 0, nextRow, 0, currentRowBytes.length);
310             // nextRow[currentRowBytes.length] = (byte)0;
311             // // we should reuse text objects here
312             // sources[sourceID].seek(new Key(new Text(nextRow),columnFamilies[sourceID]));
313             if (endCompare == 0) {
314               // we're done
315               currentPartition = null;
316               // setting currentRow to null counts as advancing the cursor
317               return true;
318             }
319             Key seekKey = buildFollowingPartitionKey(sources[sourceID].iter.getTopKey());
320             sources[sourceID].iter.seek(new Range(seekKey, true, null, false), sources[sourceID].seekColfams, true);
321             continue;
322           }
323         }
324         // we have verified that we are in currentRow and the correct column family
325         // make sure we are at or beyond columnQualifier
326         Text docID = getDocID(sources[sourceID].iter.getTopKey());
327         int docIDCompare = currentDocID.compareTo(docID);
328         // if this source has advanced beyond the current column qualifier then advance currentCQ and return true
329         if (docIDCompare < 0) {
330           currentDocID.set(docID);
331           advancedCursor = true;
332           break;
333         }
334         // if this source is not yet at the currentCQ then seek in this source
335         if (docIDCompare > 0) {
336           // seek forwards
337           Key seekKey = buildKey(currentPartition, sources[sourceID].term, currentDocID);
338           sources[sourceID].iter.seek(new Range(seekKey, true, null, false), sources[sourceID].seekColfams, true);
339           continue;
340         }
341         // this source is at the current row, in its column family, and at currentCQ
342         break;
343       }
344     }
345     return advancedCursor;
346   }
347   
348   @Override
349   public void next() throws IOException {
350     if (currentPartition == null) {
351       return;
352     }
353     // precondition: the current row is set up and the sources all have the same column qualifier
354     // while we don't have a match, seek in the source with the smallest column qualifier
355     sources[0].iter.next();
356     advanceToIntersection();
357   }
358   
359   protected void advanceToIntersection() throws IOException {
360     boolean cursorChanged = true;
361     while (cursorChanged) {
362       // seek all of the sources to at least the highest seen column qualifier in the current row
363       cursorChanged = false;
364       for (int i = 0; i < sourcesCount; i++) {
365         if (currentPartition == null) {
366           topKey = null;
367           return;
368         }
369         if (seekOneSource(i)) {
370           cursorChanged = true;
371           break;
372         }
373       }
374     }
375     topKey = buildKey(currentPartition, nullText, currentDocID);
376   }
377   
378   public static String stringTopKey(SortedKeyValueIterator<Key,Value> iter) {
379     if (iter.hasTop())
380       return iter.getTopKey().toString();
381     return "";
382   }
383   
384   private static final String columnFamiliesOptionName = "columnFamilies";
385   private static final String notFlagOptionName = "notFlag";
386   
387   /**
388    * @param columns
389    * @return encoded columns
390    */
391   protected static String encodeColumns(Text[] columns) {
392     StringBuilder sb = new StringBuilder();
393     for (int i = 0; i < columns.length; i++) {
394       sb.append(new String(Base64.encodeBase64(TextUtil.getBytes(columns[i]))));
395       sb.append('\n');
396     }
397     return sb.toString();
398   }
399   
400   /**
401    * @param flags
402    * @return encoded flags
403    */
404   protected static String encodeBooleans(boolean[] flags) {
405     byte[] bytes = new byte[flags.length];
406     for (int i = 0; i < flags.length; i++) {
407       if (flags[i])
408         bytes[i] = 1;
409       else
410         bytes[i] = 0;
411     }
412     return new String(Base64.encodeBase64(bytes));
413   }
414   
415   protected static Text[] decodeColumns(String columns) {
416     String[] columnStrings = columns.split("\n");
417     Text[] columnTexts = new Text[columnStrings.length];
418     for (int i = 0; i < columnStrings.length; i++) {
419       columnTexts[i] = new Text(Base64.decodeBase64(columnStrings[i].getBytes()));
420     }
421     return columnTexts;
422   }
423   
424   /**
425    * @param flags
426    * @return decoded flags
427    */
428   protected static boolean[] decodeBooleans(String flags) {
429     // return null of there were no flags
430     if (flags == null)
431       return null;
432     
433     byte[] bytes = Base64.decodeBase64(flags.getBytes());
434     boolean[] bFlags = new boolean[bytes.length];
435     for (int i = 0; i < bytes.length; i++) {
436       if (bytes[i] == 1)
437         bFlags[i] = true;
438       else
439         bFlags[i] = false;
440     }
441     return bFlags;
442   }
443   
444   @Override
445   public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
446     Text[] terms = decodeColumns(options.get(columnFamiliesOptionName));
447     boolean[] notFlag = decodeBooleans(options.get(notFlagOptionName));
448     
449     if (terms.length < 2) {
450       throw new IllegalArgumentException("IntersectionIterator requires two or more columns families");
451     }
452     
453     // Scan the not flags.
454     // There must be at least one term that isn't negated
455     // And we are going to re-order such that the first term is not a ! term
456     if (notFlag == null) {
457       notFlag = new boolean[terms.length];
458       for (int i = 0; i < terms.length; i++)
459         notFlag[i] = false;
460     }
461     if (notFlag[0]) {
462       for (int i = 1; i < notFlag.length; i++) {
463         if (notFlag[i] == false) {
464           Text swapFamily = new Text(terms[0]);
465           terms[0].set(terms[i]);
466           terms[i].set(swapFamily);
467           notFlag[0] = false;
468           notFlag[i] = true;
469           break;
470         }
471       }
472       if (notFlag[0]) {
473         throw new IllegalArgumentException("IntersectionIterator requires at lest one column family without not");
474       }
475     }
476     
477     sources = new TermSource[terms.length];
478     sources[0] = new TermSource(source, terms[0]);
479     for (int i = 1; i < terms.length; i++) {
480       sources[i] = new TermSource(source.deepCopy(env), terms[i], notFlag[i]);
481     }
482     sourcesCount = terms.length;
483   }
484   
485   @Override
486   public void seek(Range range, Collection<ByteSequence> seekColumnFamilies, boolean inclusive) throws IOException {
487     overallRange = new Range(range);
488     currentPartition = new Text();
489     currentDocID.set(emptyByteArray);
490     
491     // seek each of the sources to the right column family within the row given by key
492     for (int i = 0; i < sourcesCount; i++) {
493       Key sourceKey;
494       if (range.getStartKey() != null) {
495         if (range.getStartKey().getColumnQualifier() != null) {
496           sourceKey = buildKey(getPartition(range.getStartKey()), sources[i].term, range.getStartKey().getColumnQualifier());
497         } else {
498           sourceKey = buildKey(getPartition(range.getStartKey()), sources[i].term);
499         }
500         // Seek only to the term for this source as a column family
501         sources[i].iter.seek(new Range(sourceKey, true, null, false), sources[i].seekColfams, true);
502       } else {
503         // Seek only to the term for this source as a column family
504         sources[i].iter.seek(range, sources[i].seekColfams, true);
505       }
506     }
507     advanceToIntersection();
508   }
509   
510   public void addSource(SortedKeyValueIterator<Key,Value> source, IteratorEnvironment env, Text term, boolean notFlag) {
511     // Check if we have space for the added Source
512     if (sources == null) {
513       sources = new TermSource[1];
514     } else {
515       // allocate space for node, and copy current tree.
516       // TODO: Should we change this to an ArrayList so that we can just add() ?
517       TermSource[] localSources = new TermSource[sources.length + 1];
518       int currSource = 0;
519       for (TermSource myTerm : sources) {
520         // TODO: Do I need to call new here? or can I just re-use the term?
521         localSources[currSource] = new TermSource(myTerm);
522         currSource++;
523       }
524       sources = localSources;
525     }
526     sources[sourcesCount] = new TermSource(source.deepCopy(env), term, notFlag);
527     sourcesCount++;
528   }
529   
530   /**
531    * Encode the columns to be used when iterating.
532    * 
533    * @param cfg
534    * @param columns
535    */
536   public static void setColumnFamilies(IteratorSetting cfg, Text[] columns) {
537     if (columns.length < 2)
538       throw new IllegalArgumentException("Must supply at least two terms to intersect");
539     cfg.addOption(IntersectingIterator.columnFamiliesOptionName, IntersectingIterator.encodeColumns(columns));
540   }
541   
542   /**
543    * Encode columns and NOT flags indicating which columns should be negated (docIDs will be excluded if matching negated columns, instead of included).
544    * 
545    * @param cfg
546    * @param columns
547    * @param notFlags
548    */
549   public static void setColumnFamilies(IteratorSetting cfg, Text[] columns, boolean[] notFlags) {
550     if (columns.length < 2)
551       throw new IllegalArgumentException("Must supply at least two terms to intersect");
552     if (columns.length != notFlags.length)
553       throw new IllegalArgumentException("columns and notFlags arrays must be the same length");
554     setColumnFamilies(cfg, columns);
555     cfg.addOption(IntersectingIterator.notFlagOptionName, IntersectingIterator.encodeBooleans(notFlags));
556   }
557 }