1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.accumulo.core.iterators;
18
19 import java.io.IOException;
20 import java.util.ArrayList;
21 import java.util.Collection;
22 import java.util.Collections;
23 import java.util.Iterator;
24 import java.util.Map;
25 import java.util.PriorityQueue;
26
27 import org.apache.accumulo.core.data.ArrayByteSequence;
28 import org.apache.accumulo.core.data.ByteSequence;
29 import org.apache.accumulo.core.data.Key;
30 import org.apache.accumulo.core.data.Range;
31 import org.apache.accumulo.core.data.Value;
32 import org.apache.hadoop.io.Text;
33 import org.apache.log4j.Logger;
34
35 /**
36 * An iterator that handles "OR" query constructs on the server side. This code has been adapted/merged from Heap and Multi Iterators.
37 */
38
39 public class OrIterator implements SortedKeyValueIterator<Key,Value> {
40
41 private TermSource currentTerm;
42 private ArrayList<TermSource> sources;
43 private PriorityQueue<TermSource> sorted = new PriorityQueue<TermSource>(5);
44 private static final Text nullText = new Text();
45 private static final Key nullKey = new Key();
46
47 protected static final Logger log = Logger.getLogger(OrIterator.class);
48
49 protected static class TermSource implements Comparable<TermSource> {
50 public SortedKeyValueIterator<Key,Value> iter;
51 public Text term;
52 public Collection<ByteSequence> seekColfams;
53
54 public TermSource(TermSource other) {
55 this.iter = other.iter;
56 this.term = other.term;
57 this.seekColfams = other.seekColfams;
58 }
59
60 public TermSource(SortedKeyValueIterator<Key,Value> iter, Text term) {
61 this.iter = iter;
62 this.term = term;
63
64 this.seekColfams = Collections.<ByteSequence>singletonList(new ArrayByteSequence(term.getBytes(), 0, term.getLength()));
65 }
66
67 public int compareTo(TermSource o) {
68
69
70
71
72 return this.iter.getTopKey().compareColumnQualifier(o.iter.getTopKey().getColumnQualifier());
73 }
74 }
75
76 public OrIterator() {
77 this.sources = new ArrayList<TermSource>();
78 }
79
80 private OrIterator(OrIterator other, IteratorEnvironment env) {
81 this.sources = new ArrayList<TermSource>();
82
83 for (TermSource TS : other.sources)
84 this.sources.add(new TermSource(TS.iter.deepCopy(env), TS.term));
85 }
86
87 @Override
88 public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
89 return new OrIterator(this, env);
90 }
91
92 public void addTerm(SortedKeyValueIterator<Key,Value> source, Text term, IteratorEnvironment env) {
93 this.sources.add(new TermSource(source.deepCopy(env), term));
94 }
95
96 @Override
97 final public void next() throws IOException {
98
99 if (currentTerm == null)
100 return;
101
102
103 currentTerm.iter.next();
104
105
106 if (!(currentTerm.iter.hasTop()) || ((currentTerm.term != null) && (currentTerm.term.compareTo(currentTerm.iter.getTopKey().getColumnFamily()) != 0)))
107 currentTerm = null;
108
109
110
111
112
113 if (sorted.size() > 0) {
114
115 if (currentTerm != null)
116 sorted.add(currentTerm);
117
118 currentTerm = sorted.poll();
119 }
120 }
121
122 @Override
123 public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
124
125
126 if (sources.size() == 0) {
127 currentTerm = null;
128 return;
129 }
130
131
132
133
134 if (sources.size() == 1) {
135
136 if (currentTerm == null)
137 currentTerm = sources.get(0);
138 Range newRange = null;
139
140 if (range != null) {
141 if ((range.getStartKey() == null) || (range.getStartKey().getRow() == null))
142 newRange = range;
143 else {
144 Key newKey = null;
145 if (range.getStartKey().getColumnQualifier() == null)
146 newKey = new Key(range.getStartKey().getRow(), (currentTerm.term == null) ? nullText : currentTerm.term);
147 else
148 newKey = new Key(range.getStartKey().getRow(), (currentTerm.term == null) ? nullText : currentTerm.term, range.getStartKey().getColumnQualifier());
149 newRange = new Range((newKey == null) ? nullKey : newKey, true, range.getEndKey(), false);
150 }
151 }
152 currentTerm.iter.seek(newRange, currentTerm.seekColfams, true);
153
154
155
156
157
158
159 if (!(currentTerm.iter.hasTop()) || ((currentTerm.term != null) && (currentTerm.term.compareTo(currentTerm.iter.getTopKey().getColumnFamily()) != 0)))
160 currentTerm = null;
161
162
163 return;
164 }
165
166
167 sorted.clear();
168
169
170
171
172
173 if (currentTerm == null) {
174 for (TermSource TS : sources) {
175 TS.iter.seek(range, TS.seekColfams, true);
176
177 if ((TS.iter.hasTop()) && ((TS.term != null) && (TS.term.compareTo(TS.iter.getTopKey().getColumnFamily()) == 0)))
178 sorted.add(TS);
179 }
180 currentTerm = sorted.poll();
181 return;
182 }
183
184 TermSource TS = null;
185 Iterator<TermSource> iter = sources.iterator();
186
187
188 while (iter.hasNext()) {
189 TS = iter.next();
190 Range newRange = null;
191
192 if (range != null) {
193 if ((range.getStartKey() == null) || (range.getStartKey().getRow() == null))
194 newRange = range;
195 else {
196 Key newKey = null;
197 if (range.getStartKey().getColumnQualifier() == null)
198 newKey = new Key(range.getStartKey().getRow(), (TS.term == null) ? nullText : TS.term);
199 else
200 newKey = new Key(range.getStartKey().getRow(), (TS.term == null) ? nullText : TS.term, range.getStartKey().getColumnQualifier());
201 newRange = new Range((newKey == null) ? nullKey : newKey, true, range.getEndKey(), false);
202 }
203 }
204
205
206 TS.iter.seek(newRange, TS.seekColfams, true);
207
208
209
210
211
212
213 if (!(TS.iter.hasTop()) || ((TS.term != null) && (TS.term.compareTo(TS.iter.getTopKey().getColumnFamily()) != 0)))
214 iter.remove();
215
216
217 sorted.add(TS);
218 }
219
220
221 currentTerm = sorted.poll();
222 }
223
224 @Override
225 final public Key getTopKey() {
226 return currentTerm.iter.getTopKey();
227 }
228
229 @Override
230 final public Value getTopValue() {
231 return currentTerm.iter.getTopValue();
232 }
233
234 @Override
235 final public boolean hasTop() {
236 return currentTerm != null;
237 }
238
239 @Override
240 public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
241 throw new UnsupportedOperationException();
242 }
243 }