View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  package org.apache.accumulo.core.iterators.user;
18  
19  import java.io.IOException;
20  import java.util.Collection;
21  import java.util.Collections;
22  import java.util.Map;
23  import java.util.Set;
24  
25  import org.apache.accumulo.core.client.IteratorSetting;
26  import org.apache.accumulo.core.data.ArrayByteSequence;
27  import org.apache.accumulo.core.data.ByteSequence;
28  import org.apache.accumulo.core.data.Key;
29  import org.apache.accumulo.core.data.PartialKey;
30  import org.apache.accumulo.core.data.Range;
31  import org.apache.accumulo.core.data.Value;
32  import org.apache.accumulo.core.iterators.IteratorEnvironment;
33  import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
34  import org.apache.hadoop.io.Text;
35  
36  /**
37   * This iterator facilitates document-partitioned indexing. It is an example of extending the IntersectingIterator to customize the placement of the term and
38   * docID. As with the IntersectingIterator, documents are grouped together and indexed into a single row of an Accumulo table. This allows a tablet server to
39   * perform boolean AND operations on terms in the index. This iterator also stores the document contents in a separate column family in the same row so that the
40   * full document can be returned with each query.
41   * 
42   * The table structure should have the following form:
43   * 
44   * row: shardID, colfam: docColf\0doctype, colqual: docID, value: doc
45   * 
46   * row: shardID, colfam: indexColf, colqual: term\0doctype\0docID\0info, value: (empty)
47   * 
48   * When you configure this iterator with a set of terms, it will return only the docIDs and docs that appear with all of the specified terms. The result will
49   * have the following form:
50   * 
51   * row: shardID, colfam: indexColf, colqual: doctype\0docID\0info, value: doc
52   * 
53   * This iterator is commonly used with BatchScanner or AccumuloInputFormat, to parallelize the search over all shardIDs.
54   */
55  public class IndexedDocIterator extends IntersectingIterator {
56    public static final Text DEFAULT_INDEX_COLF = new Text("i");
57    public static final Text DEFAULT_DOC_COLF = new Text("e");
58    
59    private static final String indexFamilyOptionName = "indexFamily";
60    private static final String docFamilyOptionName = "docFamily";
61    
62    private static Text indexColf = DEFAULT_INDEX_COLF;
63    private static Text docColf = DEFAULT_DOC_COLF;
64    private static Set<ByteSequence> indexColfSet;
65    private static Set<ByteSequence> docColfSet;
66    
67    private static final byte[] nullByte = {0};
68    
69    public SortedKeyValueIterator<Key,Value> docSource;
70    
71    @Override
72    protected Key buildKey(Text partition, Text term, Text docID) {
73      Text colq = new Text(term);
74      colq.append(nullByte, 0, 1);
75      colq.append(docID.getBytes(), 0, docID.getLength());
76      colq.append(nullByte, 0, 1);
77      return new Key(partition, indexColf, colq);
78    }
79    
80    @Override
81    protected Key buildKey(Text partition, Text term) {
82      Text colq = new Text(term);
83      return new Key(partition, indexColf, colq);
84    }
85    
86    @Override
87    protected Text getDocID(Key key) {
88      return parseDocID(key);
89    }
90    
91    public static Text parseDocID(Key key) {
92      Text colq = key.getColumnQualifier();
93      int firstZeroIndex = colq.find("\0");
94      if (firstZeroIndex < 0) {
95        throw new IllegalArgumentException("bad docid: " + key.toString());
96      }
97      int secondZeroIndex = colq.find("\0", firstZeroIndex + 1);
98      if (secondZeroIndex < 0) {
99        throw new IllegalArgumentException("bad docid: " + key.toString());
100     }
101     int thirdZeroIndex = colq.find("\0", secondZeroIndex + 1);
102     if (thirdZeroIndex < 0) {
103       throw new IllegalArgumentException("bad docid: " + key.toString());
104     }
105     Text docID = new Text();
106     try {
107       docID.set(colq.getBytes(), firstZeroIndex + 1, thirdZeroIndex - 1 - firstZeroIndex);
108     } catch (ArrayIndexOutOfBoundsException e) {
109       throw new IllegalArgumentException("bad indices for docid: " + key.toString() + " " + firstZeroIndex + " " + secondZeroIndex + " " + thirdZeroIndex);
110     }
111     return docID;
112   }
113   
114   @Override
115   protected Text getTerm(Key key) {
116     if (indexColf.compareTo(key.getColumnFamily().getBytes(), 0, indexColf.getLength()) < 0) {
117       // We're past the index column family, so return a term that will sort lexicographically last.
118       // The last unicode character should suffice
119       return new Text("\uFFFD");
120     }
121     Text colq = key.getColumnQualifier();
122     int zeroIndex = colq.find("\0");
123     Text term = new Text();
124     term.set(colq.getBytes(), 0, zeroIndex);
125     return term;
126   }
127   
128   @Override
129   synchronized public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
130     super.init(source, options, env);
131     if (options.containsKey(indexFamilyOptionName))
132       indexColf = new Text(options.get(indexFamilyOptionName));
133     if (options.containsKey(docFamilyOptionName))
134       docColf = new Text(options.get(docFamilyOptionName));
135     docSource = source.deepCopy(env);
136     indexColfSet = Collections.singleton((ByteSequence) new ArrayByteSequence(indexColf.getBytes(), 0, indexColf.getLength()));
137     
138     for (TermSource ts : this.sources) {
139       ts.seekColfams = indexColfSet;
140     }
141   }
142   
143   @Override
144   public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
145     throw new UnsupportedOperationException();
146   }
147   
148   @Override
149   public void seek(Range range, Collection<ByteSequence> seekColumnFamilies, boolean inclusive) throws IOException {
150     super.seek(range, null, true);
151     
152   }
153   
154   @Override
155   protected void advanceToIntersection() throws IOException {
156     super.advanceToIntersection();
157     if (topKey == null)
158       return;
159     if (log.isTraceEnabled())
160       log.trace("using top key to seek for doc: " + topKey.toString());
161     Key docKey = buildDocKey();
162     docSource.seek(new Range(docKey, true, null, false), docColfSet, true);
163     log.debug("got doc key: " + docSource.getTopKey().toString());
164     if (docSource.hasTop() && docKey.equals(docSource.getTopKey(), PartialKey.ROW_COLFAM_COLQUAL)) {
165       value = docSource.getTopValue();
166     }
167     log.debug("got doc value: " + value.toString());
168   }
169   
170   protected Key buildDocKey() {
171     if (log.isTraceEnabled())
172       log.trace("building doc key for " + currentPartition + " " + currentDocID);
173     int zeroIndex = currentDocID.find("\0");
174     if (zeroIndex < 0)
175       throw new IllegalArgumentException("bad current docID");
176     Text colf = new Text(docColf);
177     colf.append(nullByte, 0, 1);
178     colf.append(currentDocID.getBytes(), 0, zeroIndex);
179     docColfSet = Collections.singleton((ByteSequence) new ArrayByteSequence(colf.getBytes(), 0, colf.getLength()));
180     if (log.isTraceEnabled())
181       log.trace(zeroIndex + " " + currentDocID.getLength());
182     Text colq = new Text();
183     colq.set(currentDocID.getBytes(), zeroIndex + 1, currentDocID.getLength() - zeroIndex - 1);
184     Key k = new Key(currentPartition, colf, colq);
185     if (log.isTraceEnabled())
186       log.trace("built doc key for seek: " + k.toString());
187     return k;
188   }
189   
190   /**
191    * A convenience method for setting the index column family.
192    * 
193    * @param is
194    *          IteratorSetting object to configure.
195    * @param indexColf
196    *          the index column family
197    */
198   public static void setIndexColf(IteratorSetting is, String indexColf) {
199     is.addOption(indexFamilyOptionName, indexColf);
200   }
201   
202   /**
203    * A convenience method for setting the document column family prefix.
204    * 
205    * @param is
206    *          IteratorSetting object to configure.
207    * @param docColfPrefix
208    *          the prefix of the document column family (colf will be of the form docColfPrefix\0doctype)
209    */
210   public static void setDocColfPrefix(IteratorSetting is, String docColfPrefix) {
211     is.addOption(docFamilyOptionName, docColfPrefix);
212   }
213   
214   /**
215    * A convenience method for setting the index column family and document column family prefix.
216    * 
217    * @param is
218    *          IteratorSetting object to configure.
219    * @param indexColf
220    *          the index column family
221    * @param docColfPrefix
222    *          the prefix of the document column family (colf will be of the form docColfPrefix\0doctype)
223    */
224   public static void setColfs(IteratorSetting is, String indexColf, String docColfPrefix) {
225     setIndexColf(is, indexColf);
226     setDocColfPrefix(is, docColfPrefix);
227   }
228 }