View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  package org.apache.accumulo.core.iterators.user;
18  
19  import java.io.ByteArrayInputStream;
20  import java.io.ByteArrayOutputStream;
21  import java.io.DataInputStream;
22  import java.io.DataOutputStream;
23  import java.io.IOException;
24  import java.util.ArrayList;
25  import java.util.Collection;
26  import java.util.List;
27  import java.util.Map;
28  import java.util.SortedMap;
29  import java.util.TreeMap;
30  
31  import org.apache.accumulo.core.data.ByteSequence;
32  import org.apache.accumulo.core.data.Key;
33  import org.apache.accumulo.core.data.PartialKey;
34  import org.apache.accumulo.core.data.Range;
35  import org.apache.accumulo.core.data.Value;
36  import org.apache.accumulo.core.iterators.IteratorEnvironment;
37  import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
38  import org.apache.hadoop.io.Text;
39  
40  /**
41   * 
42   * The WholeRowIterator is designed to provide row-isolation so that queries see mutations as atomic. It does so by encapsulating an entire row of key/value
43   * pairs into a single key/value pair, which is returned through the client as an atomic operation.
44   * 
45   * <p>
46   * One caveat is that when seeking in the WholeRowIterator using a range that starts at a non-inclusive first key in a row, (e.g. seek(new Range(new Key(new
47   * Text("row")),false,...),...)) this iterator will skip to the next row. This is done in order to prevent repeated scanning of the same row when system
48   * automatically creates ranges of that form, which happens in the case of the client calling continueScan, or in the case of the tablet server continuing a
49   * scan after swapping out sources.
50   * 
51   * <p>
52   * To regain the original key/value pairs of the row, call the decodeRow function on the key/value pair that this iterator returned.
53   * 
54   * @see RowFilter
55   */
56  public class WholeRowIterator implements SortedKeyValueIterator<Key,Value> {
57    
58    private SortedKeyValueIterator<Key,Value> sourceIter;
59    private Key topKey = null;
60    private Value topValue = null;
61    
62    public WholeRowIterator() {
63      
64    }
65    
66    WholeRowIterator(SortedKeyValueIterator<Key,Value> source) {
67      this.sourceIter = source;
68    }
69    
70    // decode a bunch of key value pairs that have been encoded into a single value
71    public static final SortedMap<Key,Value> decodeRow(Key rowKey, Value rowValue) throws IOException {
72      SortedMap<Key,Value> map = new TreeMap<Key,Value>();
73      ByteArrayInputStream in = new ByteArrayInputStream(rowValue.get());
74      DataInputStream din = new DataInputStream(in);
75      int numKeys = din.readInt();
76      for (int i = 0; i < numKeys; i++) {
77        byte[] cf;
78        byte[] cq;
79        byte[] cv;
80        byte[] valBytes;
81        // read the col fam
82        {
83          int len = din.readInt();
84          cf = new byte[len];
85          din.read(cf);
86        }
87        // read the col qual
88        {
89          int len = din.readInt();
90          cq = new byte[len];
91          din.read(cq);
92        }
93        // read the col visibility
94        {
95          int len = din.readInt();
96          cv = new byte[len];
97          din.read(cv);
98        }
99        // read the timestamp
100       long timestamp = din.readLong();
101       // read the value
102       {
103         int len = din.readInt();
104         valBytes = new byte[len];
105         din.read(valBytes);
106       }
107       map.put(new Key(rowKey.getRowData().toArray(), cf, cq, cv, timestamp, false, false), new Value(valBytes, false));
108     }
109     return map;
110   }
111   
112   // take a stream of keys and values and output a value that encodes everything but their row
113   // keys and values must be paired one for one
114   public static final Value encodeRow(List<Key> keys, List<Value> values) throws IOException {
115     ByteArrayOutputStream out = new ByteArrayOutputStream();
116     DataOutputStream dout = new DataOutputStream(out);
117     dout.writeInt(keys.size());
118     for (int i = 0; i < keys.size(); i++) {
119       Key k = keys.get(i);
120       Value v = values.get(i);
121       // write the colfam
122       {
123         ByteSequence bs = k.getColumnFamilyData();
124         dout.writeInt(bs.length());
125         dout.write(bs.getBackingArray(), bs.offset(), bs.length());
126       }
127       // write the colqual
128       {
129         ByteSequence bs = k.getColumnQualifierData();
130         dout.writeInt(bs.length());
131         dout.write(bs.getBackingArray(), bs.offset(), bs.length());
132       }
133       // write the column visibility
134       {
135         ByteSequence bs = k.getColumnVisibilityData();
136         dout.writeInt(bs.length());
137         dout.write(bs.getBackingArray(), bs.offset(), bs.length());
138       }
139       // write the timestamp
140       dout.writeLong(k.getTimestamp());
141       // write the value
142       byte[] valBytes = v.get();
143       dout.writeInt(valBytes.length);
144       dout.write(valBytes);
145     }
146     
147     return new Value(out.toByteArray());
148   }
149   
150   List<Key> keys = new ArrayList<Key>();
151   List<Value> values = new ArrayList<Value>();
152   
153   private void prepKeys() throws IOException {
154     if (topKey != null)
155       return;
156     Text currentRow;
157     do {
158       if (sourceIter.hasTop() == false)
159         return;
160       currentRow = new Text(sourceIter.getTopKey().getRow());
161       keys.clear();
162       values.clear();
163       while (sourceIter.hasTop() && sourceIter.getTopKey().getRow().equals(currentRow)) {
164         keys.add(new Key(sourceIter.getTopKey()));
165         values.add(new Value(sourceIter.getTopValue()));
166         sourceIter.next();
167       }
168     } while (!filter(currentRow, keys, values));
169     
170     topKey = new Key(currentRow);
171     topValue = encodeRow(keys, values);
172     
173   }
174   
175   /**
176    * 
177    * @param currentRow
178    *          All keys have this in their row portion (do not modify!).
179    * @param keys
180    *          One key for each key in the row, ordered as they are given by the source iterator (do not modify!).
181    * @param values
182    *          One value for each key in keys, ordered to correspond to the ordering in keys (do not modify!).
183    * @return true if we want to keep the row, false if we want to skip it
184    */
185   protected boolean filter(Text currentRow, List<Key> keys, List<Value> values) {
186     return true;
187   }
188   
189   @Override
190   public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
191     if (sourceIter != null)
192       return new WholeRowIterator(sourceIter.deepCopy(env));
193     return new WholeRowIterator();
194   }
195   
196   @Override
197   public Key getTopKey() {
198     return topKey;
199   }
200   
201   @Override
202   public Value getTopValue() {
203     return topValue;
204   }
205   
206   @Override
207   public boolean hasTop() {
208     return topKey != null;
209   }
210   
211   @Override
212   public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
213     sourceIter = source;
214   }
215   
216   @Override
217   public void next() throws IOException {
218     topKey = null;
219     topValue = null;
220     prepKeys();
221   }
222   
223   @Override
224   public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
225     topKey = null;
226     topValue = null;
227     
228     Key sk = range.getStartKey();
229     
230     if (sk != null && sk.getColumnFamilyData().length() == 0 && sk.getColumnQualifierData().length() == 0 && sk.getColumnVisibilityData().length() == 0
231         && sk.getTimestamp() == Long.MAX_VALUE && !range.isStartKeyInclusive()) {
232       // assuming that we are seeking using a key previously returned by this iterator
233       // therefore go to the next row
234       Key followingRowKey = sk.followingKey(PartialKey.ROW);
235       if (range.getEndKey() != null && followingRowKey.compareTo(range.getEndKey()) > 0)
236         return;
237       
238       range = new Range(sk.followingKey(PartialKey.ROW), true, range.getEndKey(), range.isEndKeyInclusive());
239     }
240     
241     sourceIter.seek(range, columnFamilies, inclusive);
242     prepKeys();
243   }
244   
245 }