View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  package org.apache.accumulo.core.iterators.system;
18  
19  import java.io.IOException;
20  import java.util.ArrayList;
21  import java.util.Collection;
22  import java.util.Collections;
23  import java.util.List;
24  import java.util.Map;
25  import java.util.concurrent.atomic.AtomicBoolean;
26  
27  import org.apache.accumulo.core.data.ByteSequence;
28  import org.apache.accumulo.core.data.Key;
29  import org.apache.accumulo.core.data.PartialKey;
30  import org.apache.accumulo.core.data.Range;
31  import org.apache.accumulo.core.data.Value;
32  import org.apache.accumulo.core.iterators.IteratorEnvironment;
33  import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
34  
35  public class SourceSwitchingIterator implements SortedKeyValueIterator<Key,Value>, InterruptibleIterator {
36    
37    public interface DataSource {
38      boolean isCurrent();
39      
40      DataSource getNewDataSource();
41      
42      DataSource getDeepCopyDataSource(IteratorEnvironment env);
43      
44      SortedKeyValueIterator<Key,Value> iterator() throws IOException;
45    }
46    
47    private DataSource source;
48    private SortedKeyValueIterator<Key,Value> iter;
49    
50    private Key key;
51    private Value val;
52    
53    private Range range;
54    private boolean inclusive;
55    private Collection<ByteSequence> columnFamilies;
56    
57    private boolean onlySwitchAfterRow;
58    private AtomicBoolean iflag;
59    
60    private final List<SourceSwitchingIterator> copies;
61    
62    private SourceSwitchingIterator(DataSource source, boolean onlySwitchAfterRow, List<SourceSwitchingIterator> copies, AtomicBoolean iflag) {
63      this.source = source;
64      this.onlySwitchAfterRow = onlySwitchAfterRow;
65      this.copies = copies;
66      this.iflag = iflag;
67      copies.add(this);
68    }
69    
70    public SourceSwitchingIterator(DataSource source, boolean onlySwitchAfterRow) {
71      this(source, onlySwitchAfterRow, Collections.synchronizedList(new ArrayList<SourceSwitchingIterator>()), null);
72    }
73    
74    public SourceSwitchingIterator(DataSource source) {
75      this(source, false);
76    }
77    
78    @Override
79    public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
80      return new SourceSwitchingIterator(source.getDeepCopyDataSource(env), onlySwitchAfterRow, copies, iflag);
81    }
82    
83    @Override
84    public Key getTopKey() {
85      return key;
86    }
87    
88    @Override
89    public Value getTopValue() {
90      return val;
91    }
92    
93    @Override
94    public boolean hasTop() {
95      return key != null;
96    }
97    
98    @Override
99    public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
100     throw new UnsupportedOperationException();
101   }
102   
103   @Override
104   public void next() throws IOException {
105     readNext(false);
106   }
107   
108   private synchronized void readNext(boolean initialSeek) throws IOException {
109     
110     // check of initialSeek second is intentional so that it does not short
111     // circuit the call to switchSource
112     boolean seekNeeded = (!onlySwitchAfterRow && switchSource()) || initialSeek;
113     
114     if (seekNeeded)
115       if (initialSeek)
116         iter.seek(range, columnFamilies, inclusive);
117       else
118         iter.seek(new Range(key, false, range.getEndKey(), range.isEndKeyInclusive()), columnFamilies, inclusive);
119     else {
120       iter.next();
121       if (onlySwitchAfterRow && iter.hasTop() && !source.isCurrent() && !key.getRowData().equals(iter.getTopKey().getRowData())) {
122         switchSource();
123         iter.seek(new Range(key.followingKey(PartialKey.ROW), true, range.getEndKey(), range.isEndKeyInclusive()), columnFamilies, inclusive);
124       }
125     }
126     
127     if (iter.hasTop()) {
128       Key nextKey = iter.getTopKey();
129       Value nextVal = iter.getTopValue();
130       
131       try {
132         key = (Key) nextKey.clone();
133       } catch (CloneNotSupportedException e) {
134         throw new IOException(e);
135       }
136       val = nextVal;
137     } else {
138       key = null;
139       val = null;
140     }
141   }
142   
143   private boolean switchSource() throws IOException {
144     while (!source.isCurrent()) {
145       source = source.getNewDataSource();
146       iter = source.iterator();
147       if (iflag != null)
148         ((InterruptibleIterator) iter).setInterruptFlag(iflag);
149       
150       return true;
151     }
152     
153     return false;
154   }
155   
156   @Override
157   public synchronized void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
158     this.range = range;
159     this.inclusive = inclusive;
160     this.columnFamilies = columnFamilies;
161     
162     if (iter == null) {
163       iter = source.iterator();
164       if (iflag != null)
165         ((InterruptibleIterator) iter).setInterruptFlag(iflag);
166     }
167     
168     readNext(true);
169   }
170   
171   private synchronized void _switchNow() throws IOException {
172     if (onlySwitchAfterRow)
173       throw new IllegalStateException("Can only switch on row boundries");
174     
175     if (switchSource()) {
176       if (key != null) {
177         iter.seek(new Range(key, true, range.getEndKey(), range.isEndKeyInclusive()), columnFamilies, inclusive);
178       }
179     }
180   }
181   
182   public void switchNow() throws IOException {
183     synchronized (copies) {
184       for (SourceSwitchingIterator ssi : copies)
185         ssi._switchNow();
186     }
187   }
188   
189   @Override
190   public synchronized void setInterruptFlag(AtomicBoolean flag) {
191     if (copies.size() != 1)
192       throw new IllegalStateException("setInterruptFlag() called after deep copies made " + copies.size());
193     
194     this.iflag = flag;
195     if (iter != null)
196       ((InterruptibleIterator) iter).setInterruptFlag(flag);
197     
198   }
199   
200 }