View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  package org.apache.accumulo.core.iterators;
18  
19  import java.io.IOException;
20  import java.util.ArrayList;
21  import java.util.Collection;
22  import java.util.Collections;
23  import java.util.Iterator;
24  import java.util.Map;
25  import java.util.PriorityQueue;
26  
27  import org.apache.accumulo.core.data.ArrayByteSequence;
28  import org.apache.accumulo.core.data.ByteSequence;
29  import org.apache.accumulo.core.data.Key;
30  import org.apache.accumulo.core.data.Range;
31  import org.apache.accumulo.core.data.Value;
32  import org.apache.hadoop.io.Text;
33  import org.apache.log4j.Logger;
34  
35  /**
36   * An iterator that handles "OR" query constructs on the server side. This code has been adapted/merged from Heap and Multi Iterators.
37   */
38  
39  public class OrIterator implements SortedKeyValueIterator<Key,Value> {
40    
41    private TermSource currentTerm;
42    private ArrayList<TermSource> sources;
43    private PriorityQueue<TermSource> sorted = new PriorityQueue<TermSource>(5);
44    private static final Text nullText = new Text();
45    private static final Key nullKey = new Key();
46    
47    protected static final Logger log = Logger.getLogger(OrIterator.class);
48    
49    protected static class TermSource implements Comparable<TermSource> {
50      public SortedKeyValueIterator<Key,Value> iter;
51      public Text term;
52      public Collection<ByteSequence> seekColfams;
53      
54      public TermSource(TermSource other) {
55        this.iter = other.iter;
56        this.term = other.term;
57        this.seekColfams = other.seekColfams;
58      }
59      
60      public TermSource(SortedKeyValueIterator<Key,Value> iter, Text term) {
61        this.iter = iter;
62        this.term = term;
63        // The desired column families for this source is the term itself
64        this.seekColfams = Collections.<ByteSequence>singletonList(new ArrayByteSequence(term.getBytes(), 0, term.getLength()));
65      }
66      
67      public int compareTo(TermSource o) {
68        // NOTE: If your implementation can have more than one row in a tablet,
69        // you must compare row key here first, then column qualifier.
70        // NOTE2: A null check is not needed because things are only added to the
71        // sorted after they have been determined to be valid.
72        return this.iter.getTopKey().compareColumnQualifier(o.iter.getTopKey().getColumnQualifier());
73      }
74    }
75    
76    public OrIterator() {
77      this.sources = new ArrayList<TermSource>();
78    }
79    
80    private OrIterator(OrIterator other, IteratorEnvironment env) {
81      this.sources = new ArrayList<TermSource>();
82      
83      for (TermSource TS : other.sources)
84        this.sources.add(new TermSource(TS.iter.deepCopy(env), TS.term));
85    }
86    
87    @Override
88    public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
89      return new OrIterator(this, env);
90    }
91    
92    public void addTerm(SortedKeyValueIterator<Key,Value> source, Text term, IteratorEnvironment env) {
93      this.sources.add(new TermSource(source.deepCopy(env), term));
94    }
95    
96    @Override
97    final public void next() throws IOException {
98      
99      if (currentTerm == null)
100       return;
101     
102     // Advance currentTerm
103     currentTerm.iter.next();
104     
105     // See if currentTerm is still valid, remove if not
106     if (!(currentTerm.iter.hasTop()) || ((currentTerm.term != null) && (currentTerm.term.compareTo(currentTerm.iter.getTopKey().getColumnFamily()) != 0)))
107       currentTerm = null;
108     
109     // optimization.
110     // if size == 0, currentTerm is the only item left,
111     // OR there are no items left.
112     // In either case, we don't need to use the PriorityQueue
113     if (sorted.size() > 0) {
114       // sort the term back in
115       if (currentTerm != null)
116         sorted.add(currentTerm);
117       // and get the current top item out.
118       currentTerm = sorted.poll();
119     }
120   }
121   
122   @Override
123   public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
124     
125     // If sources.size is 0, there is nothing to process, so just return.
126     if (sources.size() == 0) {
127       currentTerm = null;
128       return;
129     }
130     
131     // Optimization for when there is only one term.
132     // Yes, this is lots of duplicate code, but the speed works...
133     // and we don't have a priority queue of size 0 or 1.
134     if (sources.size() == 1) {
135       
136       if (currentTerm == null)
137         currentTerm = sources.get(0);
138       Range newRange = null;
139       
140       if (range != null) {
141         if ((range.getStartKey() == null) || (range.getStartKey().getRow() == null))
142           newRange = range;
143         else {
144           Key newKey = null;
145           if (range.getStartKey().getColumnQualifier() == null)
146             newKey = new Key(range.getStartKey().getRow(), (currentTerm.term == null) ? nullText : currentTerm.term);
147           else
148             newKey = new Key(range.getStartKey().getRow(), (currentTerm.term == null) ? nullText : currentTerm.term, range.getStartKey().getColumnQualifier());
149           newRange = new Range((newKey == null) ? nullKey : newKey, true, range.getEndKey(), false);
150         }
151       }
152       currentTerm.iter.seek(newRange, currentTerm.seekColfams, true);
153       
154       // If there is no top key
155       // OR we are:
156       // 1) NOT an iterator
157       // 2) we have seeked into the next term (ie: seek man, get man001)
158       // then ignore it as a valid source
159       if (!(currentTerm.iter.hasTop()) || ((currentTerm.term != null) && (currentTerm.term.compareTo(currentTerm.iter.getTopKey().getColumnFamily()) != 0)))
160         currentTerm = null;
161       
162       // Otherwise, source is valid.
163       return;
164     }
165     
166     // Clear the PriorityQueue so that we can re-populate it.
167     sorted.clear();
168     
169     // This check is put in here to guard against the "initial seek"
170     // crashing us because the topkey term does not match.
171     // Note: It is safe to do the "sources.size() == 1" above
172     // because an Or must have at least two elements.
173     if (currentTerm == null) {
174       for (TermSource TS : sources) {
175         TS.iter.seek(range, TS.seekColfams, true);
176         
177         if ((TS.iter.hasTop()) && ((TS.term != null) && (TS.term.compareTo(TS.iter.getTopKey().getColumnFamily()) == 0)))
178           sorted.add(TS);
179       }
180       currentTerm = sorted.poll();
181       return;
182     }
183     
184     TermSource TS = null;
185     Iterator<TermSource> iter = sources.iterator();
186     // For each term, seek forward.
187     // if a hit is not found, delete it from future searches.
188     while (iter.hasNext()) {
189       TS = iter.next();
190       Range newRange = null;
191       
192       if (range != null) {
193         if ((range.getStartKey() == null) || (range.getStartKey().getRow() == null))
194           newRange = range;
195         else {
196           Key newKey = null;
197           if (range.getStartKey().getColumnQualifier() == null)
198             newKey = new Key(range.getStartKey().getRow(), (TS.term == null) ? nullText : TS.term);
199           else
200             newKey = new Key(range.getStartKey().getRow(), (TS.term == null) ? nullText : TS.term, range.getStartKey().getColumnQualifier());
201           newRange = new Range((newKey == null) ? nullKey : newKey, true, range.getEndKey(), false);
202         }
203       }
204       
205       // Seek only to the term for this source as a column family
206       TS.iter.seek(newRange, TS.seekColfams, true);
207       
208       // If there is no top key
209       // OR we are:
210       // 1) NOT an iterator
211       // 2) we have seeked into the next term (ie: seek man, get man001)
212       // then ignore it as a valid source
213       if (!(TS.iter.hasTop()) || ((TS.term != null) && (TS.term.compareTo(TS.iter.getTopKey().getColumnFamily()) != 0)))
214         iter.remove();
215       
216       // Otherwise, source is valid. Add it to the sources.
217       sorted.add(TS);
218     }
219     
220     // And set currentTerm = the next valid key/term.
221     currentTerm = sorted.poll();
222   }
223   
224   @Override
225   final public Key getTopKey() {
226     return currentTerm.iter.getTopKey();
227   }
228   
229   @Override
230   final public Value getTopValue() {
231     return currentTerm.iter.getTopValue();
232   }
233   
234   @Override
235   final public boolean hasTop() {
236     return currentTerm != null;
237   }
238   
239   @Override
240   public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
241     throw new UnsupportedOperationException();
242   }
243 }