1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.accumulo.core.iterators.system;
18
19 import java.io.IOException;
20 import java.util.ArrayList;
21 import java.util.Collection;
22 import java.util.Collections;
23 import java.util.List;
24 import java.util.Map;
25 import java.util.concurrent.atomic.AtomicBoolean;
26
27 import org.apache.accumulo.core.data.ByteSequence;
28 import org.apache.accumulo.core.data.Key;
29 import org.apache.accumulo.core.data.PartialKey;
30 import org.apache.accumulo.core.data.Range;
31 import org.apache.accumulo.core.data.Value;
32 import org.apache.accumulo.core.iterators.IteratorEnvironment;
33 import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
34
35 public class SourceSwitchingIterator implements SortedKeyValueIterator<Key,Value>, InterruptibleIterator {
36
37 public interface DataSource {
38 boolean isCurrent();
39
40 DataSource getNewDataSource();
41
42 DataSource getDeepCopyDataSource(IteratorEnvironment env);
43
44 SortedKeyValueIterator<Key,Value> iterator() throws IOException;
45 }
46
47 private DataSource source;
48 private SortedKeyValueIterator<Key,Value> iter;
49
50 private Key key;
51 private Value val;
52
53 private Range range;
54 private boolean inclusive;
55 private Collection<ByteSequence> columnFamilies;
56
57 private boolean onlySwitchAfterRow;
58 private AtomicBoolean iflag;
59
60 private final List<SourceSwitchingIterator> copies;
61
62 private SourceSwitchingIterator(DataSource source, boolean onlySwitchAfterRow, List<SourceSwitchingIterator> copies, AtomicBoolean iflag) {
63 this.source = source;
64 this.onlySwitchAfterRow = onlySwitchAfterRow;
65 this.copies = copies;
66 this.iflag = iflag;
67 copies.add(this);
68 }
69
70 public SourceSwitchingIterator(DataSource source, boolean onlySwitchAfterRow) {
71 this(source, onlySwitchAfterRow, Collections.synchronizedList(new ArrayList<SourceSwitchingIterator>()), null);
72 }
73
74 public SourceSwitchingIterator(DataSource source) {
75 this(source, false);
76 }
77
78 @Override
79 public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
80 return new SourceSwitchingIterator(source.getDeepCopyDataSource(env), onlySwitchAfterRow, copies, iflag);
81 }
82
83 @Override
84 public Key getTopKey() {
85 return key;
86 }
87
88 @Override
89 public Value getTopValue() {
90 return val;
91 }
92
93 @Override
94 public boolean hasTop() {
95 return key != null;
96 }
97
98 @Override
99 public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
100 throw new UnsupportedOperationException();
101 }
102
103 @Override
104 public void next() throws IOException {
105 readNext(false);
106 }
107
108 private synchronized void readNext(boolean initialSeek) throws IOException {
109
110
111
112 boolean seekNeeded = (!onlySwitchAfterRow && switchSource()) || initialSeek;
113
114 if (seekNeeded)
115 if (initialSeek)
116 iter.seek(range, columnFamilies, inclusive);
117 else
118 iter.seek(new Range(key, false, range.getEndKey(), range.isEndKeyInclusive()), columnFamilies, inclusive);
119 else {
120 iter.next();
121 if (onlySwitchAfterRow && iter.hasTop() && !source.isCurrent() && !key.getRowData().equals(iter.getTopKey().getRowData())) {
122 switchSource();
123 iter.seek(new Range(key.followingKey(PartialKey.ROW), true, range.getEndKey(), range.isEndKeyInclusive()), columnFamilies, inclusive);
124 }
125 }
126
127 if (iter.hasTop()) {
128 Key nextKey = iter.getTopKey();
129 Value nextVal = iter.getTopValue();
130
131 try {
132 key = (Key) nextKey.clone();
133 } catch (CloneNotSupportedException e) {
134 throw new IOException(e);
135 }
136 val = nextVal;
137 } else {
138 key = null;
139 val = null;
140 }
141 }
142
143 private boolean switchSource() throws IOException {
144 while (!source.isCurrent()) {
145 source = source.getNewDataSource();
146 iter = source.iterator();
147 if (iflag != null)
148 ((InterruptibleIterator) iter).setInterruptFlag(iflag);
149
150 return true;
151 }
152
153 return false;
154 }
155
156 @Override
157 public synchronized void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
158 this.range = range;
159 this.inclusive = inclusive;
160 this.columnFamilies = columnFamilies;
161
162 if (iter == null) {
163 iter = source.iterator();
164 if (iflag != null)
165 ((InterruptibleIterator) iter).setInterruptFlag(iflag);
166 }
167
168 readNext(true);
169 }
170
171 private synchronized void _switchNow() throws IOException {
172 if (onlySwitchAfterRow)
173 throw new IllegalStateException("Can only switch on row boundries");
174
175 if (switchSource()) {
176 if (key != null) {
177 iter.seek(new Range(key, true, range.getEndKey(), range.isEndKeyInclusive()), columnFamilies, inclusive);
178 }
179 }
180 }
181
182 public void switchNow() throws IOException {
183 synchronized (copies) {
184 for (SourceSwitchingIterator ssi : copies)
185 ssi._switchNow();
186 }
187 }
188
189 @Override
190 public synchronized void setInterruptFlag(AtomicBoolean flag) {
191 if (copies.size() != 1)
192 throw new IllegalStateException("setInterruptFlag() called after deep copies made " + copies.size());
193
194 this.iflag = flag;
195 if (iter != null)
196 ((InterruptibleIterator) iter).setInterruptFlag(flag);
197
198 }
199
200 }