1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.accumulo.core.iterators.user;
18
19 import java.io.IOException;
20 import java.util.Collection;
21 import java.util.Collections;
22 import java.util.Map;
23 import java.util.Set;
24
25 import org.apache.accumulo.core.client.IteratorSetting;
26 import org.apache.accumulo.core.data.ArrayByteSequence;
27 import org.apache.accumulo.core.data.ByteSequence;
28 import org.apache.accumulo.core.data.Key;
29 import org.apache.accumulo.core.data.PartialKey;
30 import org.apache.accumulo.core.data.Range;
31 import org.apache.accumulo.core.data.Value;
32 import org.apache.accumulo.core.iterators.IteratorEnvironment;
33 import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
34 import org.apache.hadoop.io.Text;
35
36 /**
37 * This iterator facilitates document-partitioned indexing. It is an example of extending the IntersectingIterator to customize the placement of the term and
38 * docID. As with the IntersectingIterator, documents are grouped together and indexed into a single row of an Accumulo table. This allows a tablet server to
39 * perform boolean AND operations on terms in the index. This iterator also stores the document contents in a separate column family in the same row so that the
40 * full document can be returned with each query.
41 *
42 * The table structure should have the following form:
43 *
44 * row: shardID, colfam: docColf\0doctype, colqual: docID, value: doc
45 *
46 * row: shardID, colfam: indexColf, colqual: term\0doctype\0docID\0info, value: (empty)
47 *
48 * When you configure this iterator with a set of terms, it will return only the docIDs and docs that appear with all of the specified terms. The result will
49 * have the following form:
50 *
51 * row: shardID, colfam: indexColf, colqual: doctype\0docID\0info, value: doc
52 *
53 * This iterator is commonly used with BatchScanner or AccumuloInputFormat, to parallelize the search over all shardIDs.
54 */
55 public class IndexedDocIterator extends IntersectingIterator {
56 public static final Text DEFAULT_INDEX_COLF = new Text("i");
57 public static final Text DEFAULT_DOC_COLF = new Text("e");
58
59 private static final String indexFamilyOptionName = "indexFamily";
60 private static final String docFamilyOptionName = "docFamily";
61
62 private static Text indexColf = DEFAULT_INDEX_COLF;
63 private static Text docColf = DEFAULT_DOC_COLF;
64 private static Set<ByteSequence> indexColfSet;
65 private static Set<ByteSequence> docColfSet;
66
67 private static final byte[] nullByte = {0};
68
69 public SortedKeyValueIterator<Key,Value> docSource;
70
71 @Override
72 protected Key buildKey(Text partition, Text term, Text docID) {
73 Text colq = new Text(term);
74 colq.append(nullByte, 0, 1);
75 colq.append(docID.getBytes(), 0, docID.getLength());
76 colq.append(nullByte, 0, 1);
77 return new Key(partition, indexColf, colq);
78 }
79
80 @Override
81 protected Key buildKey(Text partition, Text term) {
82 Text colq = new Text(term);
83 return new Key(partition, indexColf, colq);
84 }
85
86 @Override
87 protected Text getDocID(Key key) {
88 return parseDocID(key);
89 }
90
91 public static Text parseDocID(Key key) {
92 Text colq = key.getColumnQualifier();
93 int firstZeroIndex = colq.find("\0");
94 if (firstZeroIndex < 0) {
95 throw new IllegalArgumentException("bad docid: " + key.toString());
96 }
97 int secondZeroIndex = colq.find("\0", firstZeroIndex + 1);
98 if (secondZeroIndex < 0) {
99 throw new IllegalArgumentException("bad docid: " + key.toString());
100 }
101 int thirdZeroIndex = colq.find("\0", secondZeroIndex + 1);
102 if (thirdZeroIndex < 0) {
103 throw new IllegalArgumentException("bad docid: " + key.toString());
104 }
105 Text docID = new Text();
106 try {
107 docID.set(colq.getBytes(), firstZeroIndex + 1, thirdZeroIndex - 1 - firstZeroIndex);
108 } catch (ArrayIndexOutOfBoundsException e) {
109 throw new IllegalArgumentException("bad indices for docid: " + key.toString() + " " + firstZeroIndex + " " + secondZeroIndex + " " + thirdZeroIndex);
110 }
111 return docID;
112 }
113
114 @Override
115 protected Text getTerm(Key key) {
116 if (indexColf.compareTo(key.getColumnFamily().getBytes(), 0, indexColf.getLength()) < 0) {
117
118
119 return new Text("\uFFFD");
120 }
121 Text colq = key.getColumnQualifier();
122 int zeroIndex = colq.find("\0");
123 Text term = new Text();
124 term.set(colq.getBytes(), 0, zeroIndex);
125 return term;
126 }
127
128 @Override
129 synchronized public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
130 super.init(source, options, env);
131 if (options.containsKey(indexFamilyOptionName))
132 indexColf = new Text(options.get(indexFamilyOptionName));
133 if (options.containsKey(docFamilyOptionName))
134 docColf = new Text(options.get(docFamilyOptionName));
135 docSource = source.deepCopy(env);
136 indexColfSet = Collections.singleton((ByteSequence) new ArrayByteSequence(indexColf.getBytes(), 0, indexColf.getLength()));
137
138 for (TermSource ts : this.sources) {
139 ts.seekColfams = indexColfSet;
140 }
141 }
142
143 @Override
144 public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
145 throw new UnsupportedOperationException();
146 }
147
148 @Override
149 public void seek(Range range, Collection<ByteSequence> seekColumnFamilies, boolean inclusive) throws IOException {
150 super.seek(range, null, true);
151
152 }
153
154 @Override
155 protected void advanceToIntersection() throws IOException {
156 super.advanceToIntersection();
157 if (topKey == null)
158 return;
159 if (log.isTraceEnabled())
160 log.trace("using top key to seek for doc: " + topKey.toString());
161 Key docKey = buildDocKey();
162 docSource.seek(new Range(docKey, true, null, false), docColfSet, true);
163 log.debug("got doc key: " + docSource.getTopKey().toString());
164 if (docSource.hasTop() && docKey.equals(docSource.getTopKey(), PartialKey.ROW_COLFAM_COLQUAL)) {
165 value = docSource.getTopValue();
166 }
167 log.debug("got doc value: " + value.toString());
168 }
169
170 protected Key buildDocKey() {
171 if (log.isTraceEnabled())
172 log.trace("building doc key for " + currentPartition + " " + currentDocID);
173 int zeroIndex = currentDocID.find("\0");
174 if (zeroIndex < 0)
175 throw new IllegalArgumentException("bad current docID");
176 Text colf = new Text(docColf);
177 colf.append(nullByte, 0, 1);
178 colf.append(currentDocID.getBytes(), 0, zeroIndex);
179 docColfSet = Collections.singleton((ByteSequence) new ArrayByteSequence(colf.getBytes(), 0, colf.getLength()));
180 if (log.isTraceEnabled())
181 log.trace(zeroIndex + " " + currentDocID.getLength());
182 Text colq = new Text();
183 colq.set(currentDocID.getBytes(), zeroIndex + 1, currentDocID.getLength() - zeroIndex - 1);
184 Key k = new Key(currentPartition, colf, colq);
185 if (log.isTraceEnabled())
186 log.trace("built doc key for seek: " + k.toString());
187 return k;
188 }
189
190 /**
191 * A convenience method for setting the index column family.
192 *
193 * @param is
194 * IteratorSetting object to configure.
195 * @param indexColf
196 * the index column family
197 */
198 public static void setIndexColf(IteratorSetting is, String indexColf) {
199 is.addOption(indexFamilyOptionName, indexColf);
200 }
201
202 /**
203 * A convenience method for setting the document column family prefix.
204 *
205 * @param is
206 * IteratorSetting object to configure.
207 * @param docColfPrefix
208 * the prefix of the document column family (colf will be of the form docColfPrefix\0doctype)
209 */
210 public static void setDocColfPrefix(IteratorSetting is, String docColfPrefix) {
211 is.addOption(docFamilyOptionName, docColfPrefix);
212 }
213
214 /**
215 * A convenience method for setting the index column family and document column family prefix.
216 *
217 * @param is
218 * IteratorSetting object to configure.
219 * @param indexColf
220 * the index column family
221 * @param docColfPrefix
222 * the prefix of the document column family (colf will be of the form docColfPrefix\0doctype)
223 */
224 public static void setColfs(IteratorSetting is, String indexColf, String docColfPrefix) {
225 setIndexColf(is, indexColf);
226 setDocColfPrefix(is, docColfPrefix);
227 }
228 }