1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  package org.apache.accumulo.server.tabletserver.log;
18  
19  import static org.apache.accumulo.server.logger.LogEvents.COMPACTION_FINISH;
20  import static org.apache.accumulo.server.logger.LogEvents.COMPACTION_START;
21  import static org.apache.accumulo.server.logger.LogEvents.DEFINE_TABLET;
22  import static org.apache.accumulo.server.logger.LogEvents.MUTATION;
23  import static org.apache.accumulo.server.logger.LogEvents.OPEN;
24  
25  import java.io.IOException;
26  import java.util.ArrayList;
27  import java.util.Arrays;
28  import java.util.Collections;
29  import java.util.HashSet;
30  import java.util.List;
31  import java.util.Map;
32  import java.util.Set;
33  import java.util.TreeMap;
34  import java.util.Map.Entry;
35  
36  import org.apache.accumulo.core.data.KeyExtent;
37  import org.apache.accumulo.core.data.Mutation;
38  import org.apache.accumulo.core.data.Value;
39  import org.apache.accumulo.core.util.CachedConfiguration;
40  import org.apache.accumulo.server.data.ServerMutation;
41  import org.apache.accumulo.server.logger.LogEvents;
42  import org.apache.accumulo.server.logger.LogFileKey;
43  import org.apache.accumulo.server.logger.LogFileValue;
44  import org.apache.accumulo.server.tabletserver.log.MutationReceiver;
45  import org.apache.accumulo.server.tabletserver.log.SortedLogRecovery;
46  import org.apache.hadoop.conf.Configuration;
47  import org.apache.hadoop.fs.FileSystem;
48  import org.apache.hadoop.fs.Path;
49  import org.apache.hadoop.io.MapFile;
50  import org.apache.hadoop.io.Text;
51  import org.apache.hadoop.io.MapFile.Writer;
52  import org.junit.Assert;
53  import org.junit.Test;
54  
55  public class SortedLogRecoveryTest {
56    
57    static final KeyExtent extent = new KeyExtent(new Text("table"), null, null);
58    static final Text cf = new Text("cf");
59    static final Text cq = new Text("cq");
60    static final Value value = new Value("value".getBytes());
61    
62    static class KeyValue implements Comparable<KeyValue> {
63      public final LogFileKey key;
64      public final LogFileValue value;
65      
66      KeyValue() {
67        key = new LogFileKey();
68        value = new LogFileValue();
69      }
70      
71      @Override
72      public int compareTo(KeyValue o) {
73        return key.compareTo(o.key);
74      }
75    }
76    
77    private static KeyValue createKeyValue(LogEvents type, long seq, int tid, Object fileExtentMutation) {
78      KeyValue result = new KeyValue();
79      result.key.event = type;
80      result.key.seq = seq;
81      result.key.tid = tid;
82      switch (type) {
83        case OPEN:
84          result.key.tserverSession = (String) fileExtentMutation;
85          break;
86        case COMPACTION_FINISH:
87          break;
88        case COMPACTION_START:
89          result.key.filename = (String) fileExtentMutation;
90          break;
91        case DEFINE_TABLET:
92          result.key.tablet = (KeyExtent) fileExtentMutation;
93          break;
94        case MUTATION:
95          result.value.mutations = Arrays.asList((Mutation) fileExtentMutation);
96          break;
97        case MANY_MUTATIONS:
98          result.value.mutations = Arrays.asList((Mutation[])fileExtentMutation);
99      }
100     return result;
101   }
102   
103   private static class CaptureMutations implements MutationReceiver {
104     public ArrayList<Mutation> result = new ArrayList<Mutation>();
105     
106     @Override
107     public void receive(Mutation m) {
108       // make a copy of Mutation:
109       result.add(m);
110     }
111   }
112   
113   private static List<Mutation> recover(Map<String,KeyValue[]> logs, KeyExtent extent) throws IOException {
114     return recover(logs, new HashSet<String>(), extent);
115   }
116   
117   private static List<Mutation> recover(Map<String,KeyValue[]> logs, Set<String> files, KeyExtent extent) throws IOException {
118     final String workdir = "workdir";
119     Configuration conf = CachedConfiguration.getInstance();
120     FileSystem local = FileSystem.getLocal(conf).getRaw();
121     local.delete(new Path(workdir), true);
122     ArrayList<String> dirs = new ArrayList<String>();
123     try {
124       for (Entry<String,KeyValue[]> entry : logs.entrySet()) {
125         String path = workdir + "/" + entry.getKey();
126         Writer map = new MapFile.Writer(conf, local, path + "/log1", LogFileKey.class, LogFileValue.class);
127         for (KeyValue lfe : entry.getValue()) {
128           map.append(lfe.key, lfe.value);
129         }
130         map.close();
131         local.create(new Path(path, "finished")).close();
132         dirs.add(path);
133       }
134       // Recover
135       SortedLogRecovery recovery = new SortedLogRecovery();
136       CaptureMutations capture = new CaptureMutations();
137       recovery.recover(extent, dirs, files, capture);
138       return capture.result;
139     } finally {
140       local.delete(new Path(workdir), true);
141     }
142   }
143   
144   @Test
145   public void testCompactionCrossesLogs() throws IOException {
146     Mutation ignored = new ServerMutation(new Text("ignored"));
147     ignored.put(cf, cq, value);
148     Mutation m = new ServerMutation(new Text("row1"));
149     m.put(cf, cq, value);
150     Mutation m2 = new ServerMutation(new Text("row2"));
151     m2.put(cf, cq, value);
152     KeyValue entries[] = new KeyValue[] {createKeyValue(OPEN, 0, 1, "2"), createKeyValue(DEFINE_TABLET, 1, 1, extent),
153         createKeyValue(COMPACTION_START, 3, 1, "somefile"), createKeyValue(MUTATION, 2, 1, ignored), createKeyValue(MUTATION, 4, 1, ignored),};
154     KeyValue entries2[] = new KeyValue[] {createKeyValue(OPEN, 0, 1, "2"), createKeyValue(DEFINE_TABLET, 1, 1, extent),
155         createKeyValue(COMPACTION_START, 4, 1, "somefile"), createKeyValue(MUTATION, 7, 1, m),};
156     KeyValue entries3[] = new KeyValue[] {createKeyValue(OPEN, 0, 2, "23"), createKeyValue(DEFINE_TABLET, 1, 2, extent),
157         createKeyValue(COMPACTION_START, 5, 2, "newfile"), createKeyValue(COMPACTION_FINISH, 6, 2, null), createKeyValue(MUTATION, 3, 2, ignored),
158         createKeyValue(MUTATION, 4, 2, ignored),};
159     KeyValue entries4[] = new KeyValue[] {createKeyValue(OPEN, 0, 3, "69"), createKeyValue(DEFINE_TABLET, 1, 3, extent),
160         createKeyValue(MUTATION, 2, 3, ignored), createKeyValue(MUTATION, 3, 3, ignored), createKeyValue(MUTATION, 4, 3, ignored),};
161     KeyValue entries5[] = new KeyValue[] {createKeyValue(OPEN, 0, 4, "70"), createKeyValue(DEFINE_TABLET, 1, 4, extent),
162         createKeyValue(COMPACTION_START, 3, 4, "thisfile"), createKeyValue(MUTATION, 2, 4, ignored), createKeyValue(MUTATION, 6, 4, m2),};
163     
164     Map<String,KeyValue[]> logs = new TreeMap<String,KeyValue[]>();
165     logs.put("entries", entries);
166     logs.put("entries2", entries2);
167     logs.put("entries3", entries3);
168     logs.put("entries4", entries4);
169     logs.put("entries5", entries5);
170     
171     // Recover
172     List<Mutation> mutations = recover(logs, extent);
173     
174     // Verify recovered data
175     Assert.assertEquals(2, mutations.size());
176     Assert.assertEquals(m, mutations.get(0));
177     Assert.assertEquals(m2, mutations.get(1));
178   }
179   
180   @Test
181   public void testCompactionCrossesLogs5() throws IOException {
182     // Create a test log
183     Mutation ignored = new ServerMutation(new Text("ignored"));
184     ignored.put(cf, cq, value);
185     Mutation m = new ServerMutation(new Text("row1"));
186     m.put(cf, cq, value);
187     Mutation m2 = new ServerMutation(new Text("row2"));
188     m2.put(cf, cq, value);
189     Mutation m3 = new ServerMutation(new Text("row3"));
190     m3.put(cf, cq, value);
191     Mutation m4 = new ServerMutation(new Text("row4"));
192     m4.put(cf, cq, value);
193     KeyValue entries[] = new KeyValue[] {createKeyValue(OPEN, 0, -1, "1"), createKeyValue(DEFINE_TABLET, 1, 1, extent),
194         createKeyValue(COMPACTION_START, 3, 1, "somefile"), createKeyValue(MUTATION, 2, 1, ignored), createKeyValue(MUTATION, 4, 1, ignored),};
195     KeyValue entries2[] = new KeyValue[] {createKeyValue(OPEN, 5, -1, "2"), createKeyValue(DEFINE_TABLET, 6, 1, extent),
196         createKeyValue(MUTATION, 7, 1, ignored),};
197     KeyValue entries3[] = new KeyValue[] {createKeyValue(OPEN, 8, -1, "3"), createKeyValue(DEFINE_TABLET, 9, 1, extent),
198         createKeyValue(COMPACTION_FINISH, 10, 1, null), createKeyValue(COMPACTION_START, 12, 1, "newfile"), createKeyValue(COMPACTION_FINISH, 13, 1, null),
199         // createKeyValue(COMPACTION_FINISH, 14, 1, null),
200         createKeyValue(MUTATION, 11, 1, ignored), createKeyValue(MUTATION, 15, 1, m), createKeyValue(MUTATION, 16, 1, m2),};
201     KeyValue entries4[] = new KeyValue[] {createKeyValue(OPEN, 17, -1, "4"), createKeyValue(DEFINE_TABLET, 18, 1, extent),
202         createKeyValue(COMPACTION_START, 20, 1, "file"), createKeyValue(MUTATION, 19, 1, m3), createKeyValue(MUTATION, 21, 1, m4),};
203     Map<String,KeyValue[]> logs = new TreeMap<String,KeyValue[]>();
204     logs.put("entries", entries);
205     logs.put("entries2", entries2);
206     logs.put("entries3", entries3);
207     logs.put("entries4", entries4);
208     // Recover
209     List<Mutation> mutations = recover(logs, extent);
210     // Verify recovered data
211     Assert.assertEquals(4, mutations.size());
212     Assert.assertEquals(m, mutations.get(0));
213     Assert.assertEquals(m2, mutations.get(1));
214     Assert.assertEquals(m3, mutations.get(2));
215     Assert.assertEquals(m4, mutations.get(3));
216   }
217   
218   @Test
219   public void testCompactionCrossesLogs6() throws IOException {
220     // Create a test log
221     Mutation ignored = new ServerMutation(new Text("ignored"));
222     ignored.put(cf, cq, value);
223     Mutation m = new ServerMutation(new Text("row1"));
224     m.put(cf, cq, value);
225     Mutation m2 = new ServerMutation(new Text("row2"));
226     m2.put(cf, cq, value);
227     Mutation m3 = new ServerMutation(new Text("row3"));
228     m3.put(cf, cq, value);
229     Mutation m4 = new ServerMutation(new Text("row4"));
230     m4.put(cf, cq, value);
231     Mutation m5 = new ServerMutation(new Text("row5"));
232     m5.put(cf, cq, value);
233     KeyValue entries[] = new KeyValue[] {createKeyValue(OPEN, 0, 1, "2"), createKeyValue(DEFINE_TABLET, 1, 1, extent), createKeyValue(MUTATION, 1, 1, ignored),
234         createKeyValue(MUTATION, 3, 1, m),};
235     KeyValue entries2[] = new KeyValue[] {createKeyValue(OPEN, 0, 1, "2"), createKeyValue(DEFINE_TABLET, 1, 1, extent),
236         createKeyValue(COMPACTION_START, 2, 1, "somefile"), createKeyValue(COMPACTION_FINISH, 3, 1, "somefile"), createKeyValue(MUTATION, 3, 1, m2),};
237     
238     Map<String,KeyValue[]> logs = new TreeMap<String,KeyValue[]>();
239     logs.put("entries", entries);
240     logs.put("entries2", entries2);
241     
242     // Recover
243     List<Mutation> mutations = recover(logs, extent);
244     
245     // Verify recovered data
246     Assert.assertEquals(2, mutations.size());
247     Assert.assertEquals(m, mutations.get(0));
248     Assert.assertEquals(m2, mutations.get(1));
249   }
250   
251   @Test
252   public void testEmpty() throws IOException {
253     // Create a test log
254     KeyValue entries[] = new KeyValue[] {createKeyValue(OPEN, 0, -1, "1"), createKeyValue(DEFINE_TABLET, 1, 1, extent),};
255     Map<String,KeyValue[]> logs = new TreeMap<String,KeyValue[]>();
256     logs.put("testlog", entries);
257     // Recover
258     List<Mutation> mutations = recover(logs, extent);
259     // Verify recovered data
260     Assert.assertEquals(0, mutations.size());
261     
262   }
263   
264   @Test
265   public void testMissingDefinition() {
266     // Create a test log
267     KeyValue entries[] = new KeyValue[] {createKeyValue(OPEN, 0, -1, "1"),};
268     Map<String,KeyValue[]> logs = new TreeMap<String,KeyValue[]>();
269     logs.put("testlog", entries);
270     // Recover
271     try {
272       recover(logs, extent);
273       Assert.fail("tablet should not have been found");
274     } catch (Throwable t) {}
275   }
276   
277   @Test
278   public void testSimple() throws IOException {
279     // Create a test log
280     Mutation m = new ServerMutation(new Text("row1"));
281     m.put(cf, cq, value);
282     KeyValue entries[] = new KeyValue[] {createKeyValue(OPEN, 0, -1, "1"), createKeyValue(DEFINE_TABLET, 1, 1, extent), createKeyValue(MUTATION, 2, 1, m),};
283     Map<String,KeyValue[]> logs = new TreeMap<String,KeyValue[]>();
284     logs.put("testlog", entries);
285     // Recover
286     List<Mutation> mutations = recover(logs, extent);
287     // Verify recovered data
288     Assert.assertEquals(1, mutations.size());
289     Assert.assertEquals(m, mutations.get(0));
290   }
291   
292   @Test
293   public void testSkipSuccessfulCompaction() throws IOException {
294     // Create a test log
295     Mutation ignored = new ServerMutation(new Text("ignored"));
296     ignored.put(cf, cq, value);
297     Mutation m = new ServerMutation(new Text("row1"));
298     m.put(cf, cq, value);
299     KeyValue entries[] = new KeyValue[] {createKeyValue(OPEN, 0, -1, "1"), createKeyValue(DEFINE_TABLET, 1, 1, extent),
300         createKeyValue(COMPACTION_START, 3, 1, "somefile"), createKeyValue(COMPACTION_FINISH, 4, 1, null), createKeyValue(MUTATION, 2, 1, ignored),
301         createKeyValue(MUTATION, 5, 1, m),};
302     Map<String,KeyValue[]> logs = new TreeMap<String,KeyValue[]>();
303     logs.put("testlog", entries);
304     // Recover
305     List<Mutation> mutations = recover(logs, extent);
306     // Verify recovered data
307     Assert.assertEquals(1, mutations.size());
308     Assert.assertEquals(m, mutations.get(0));
309   }
310   
311   @Test
312   public void testSkipSuccessfulCompactionAcrossFiles() throws IOException {
313     // Create a test log
314     Mutation ignored = new ServerMutation(new Text("ignored"));
315     ignored.put(cf, cq, value);
316     Mutation m = new ServerMutation(new Text("row1"));
317     m.put(cf, cq, value);
318     KeyValue entries[] = new KeyValue[] {createKeyValue(OPEN, 0, -1, "1"), createKeyValue(DEFINE_TABLET, 1, 1, extent),
319         createKeyValue(COMPACTION_START, 3, 1, "somefile"), createKeyValue(MUTATION, 2, 1, ignored),};
320     KeyValue entries2[] = new KeyValue[] {createKeyValue(OPEN, 4, -1, "1"), createKeyValue(DEFINE_TABLET, 5, 1, extent),
321         createKeyValue(COMPACTION_FINISH, 6, 1, null), createKeyValue(MUTATION, 7, 1, m),};
322     Map<String,KeyValue[]> logs = new TreeMap<String,KeyValue[]>();
323     logs.put("entries", entries);
324     logs.put("entries2", entries2);
325     // Recover
326     List<Mutation> mutations = recover(logs, extent);
327     // Verify recovered data
328     Assert.assertEquals(1, mutations.size());
329     Assert.assertEquals(m, mutations.get(0));
330   }
331   
332   @Test
333   public void testGetMutationsAfterCompactionStart() throws IOException {
334     // Create a test log
335     Mutation ignored = new ServerMutation(new Text("ignored"));
336     ignored.put(cf, cq, value);
337     Mutation m = new ServerMutation(new Text("row1"));
338     m.put(cf, cq, value);
339     Mutation m2 = new ServerMutation(new Text("row2"));
340     m2.put(cf, cq, new Value("123".getBytes()));
341     KeyValue entries[] = new KeyValue[] {createKeyValue(OPEN, 0, -1, "1"), createKeyValue(DEFINE_TABLET, 1, 1, extent),
342         createKeyValue(COMPACTION_START, 3, 1, "somefile"), createKeyValue(MUTATION, 2, 1, ignored), createKeyValue(MUTATION, 4, 1, m),};
343     KeyValue entries2[] = new KeyValue[] {createKeyValue(OPEN, 5, -1, "1"), createKeyValue(DEFINE_TABLET, 6, 1, extent),
344         createKeyValue(COMPACTION_FINISH, 7, 1, null), createKeyValue(MUTATION, 8, 1, m2),};
345     Map<String,KeyValue[]> logs = new TreeMap<String,KeyValue[]>();
346     logs.put("entries", entries);
347     logs.put("entries2", entries2);
348     // Recover
349     List<Mutation> mutations = recover(logs, extent);
350     // Verify recovered data
351     Assert.assertEquals(2, mutations.size());
352     Assert.assertEquals(m, mutations.get(0));
353     Assert.assertEquals(m2, mutations.get(1));
354   }
355   
356   @Test
357   public void testDoubleFinish() throws IOException {
358     // Create a test log
359     Mutation ignored = new ServerMutation(new Text("ignored"));
360     ignored.put(cf, cq, value);
361     Mutation m = new ServerMutation(new Text("row1"));
362     m.put(cf, cq, value);
363     Mutation m2 = new ServerMutation(new Text("row2"));
364     m2.put(cf, cq, new Value("123".getBytes()));
365     KeyValue entries[] = new KeyValue[] {createKeyValue(OPEN, 0, -1, "1"), createKeyValue(DEFINE_TABLET, 1, 1, extent),
366         createKeyValue(COMPACTION_FINISH, 2, 1, null), createKeyValue(COMPACTION_START, 4, 1, "somefile"), createKeyValue(COMPACTION_FINISH, 6, 1, null),
367         createKeyValue(MUTATION, 3, 1, ignored), createKeyValue(MUTATION, 5, 1, m), createKeyValue(MUTATION, 7, 1, m2),};
368     Map<String,KeyValue[]> logs = new TreeMap<String,KeyValue[]>();
369     logs.put("entries", entries);
370     // Recover
371     List<Mutation> mutations = recover(logs, extent);
372     // Verify recovered data
373     Assert.assertEquals(2, mutations.size());
374     Assert.assertEquals(m, mutations.get(0));
375     Assert.assertEquals(m2, mutations.get(1));
376   }
377   
378   @Test
379   public void testCompactionCrossesLogs2() throws IOException {
380     // Create a test log
381     Mutation ignored = new ServerMutation(new Text("ignored"));
382     ignored.put(cf, cq, value);
383     Mutation m = new ServerMutation(new Text("row1"));
384     m.put(cf, cq, value);
385     Mutation m2 = new ServerMutation(new Text("row2"));
386     m2.put(cf, cq, value);
387     Mutation m3 = new ServerMutation(new Text("row3"));
388     m3.put(cf, cq, value);
389     KeyValue entries[] = new KeyValue[] {createKeyValue(OPEN, 0, -1, "1"), createKeyValue(DEFINE_TABLET, 1, 1, extent),
390         createKeyValue(COMPACTION_START, 3, 1, "somefile"), createKeyValue(MUTATION, 2, 1, ignored), createKeyValue(MUTATION, 4, 1, m),};
391     KeyValue entries2[] = new KeyValue[] {createKeyValue(OPEN, 5, -1, "1"), createKeyValue(DEFINE_TABLET, 6, 1, extent), createKeyValue(MUTATION, 7, 1, m2),};
392     KeyValue entries3[] = new KeyValue[] {createKeyValue(OPEN, 8, -1, "1"), createKeyValue(DEFINE_TABLET, 9, 1, extent),
393         createKeyValue(COMPACTION_FINISH, 10, 1, null), createKeyValue(MUTATION, 11, 1, m3),};
394     Map<String,KeyValue[]> logs = new TreeMap<String,KeyValue[]>();
395     logs.put("entries", entries);
396     logs.put("entries2", entries2);
397     logs.put("entries3", entries3);
398     // Recover
399     List<Mutation> mutations = recover(logs, extent);
400     // Verify recovered data
401     Assert.assertEquals(3, mutations.size());
402     Assert.assertEquals(m, mutations.get(0));
403     Assert.assertEquals(m2, mutations.get(1));
404     Assert.assertEquals(m3, mutations.get(2));
405   }
406   
407   @Test
408   public void testBug1() throws IOException {
409     // this unit test reproduces a bug that occurred, nothing should recover
410     Mutation m1 = new ServerMutation(new Text("row1"));
411     m1.put(cf, cq, value);
412     Mutation m2 = new ServerMutation(new Text("row2"));
413     m2.put(cf, cq, value);
414     KeyValue entries[] = new KeyValue[] {createKeyValue(OPEN, 0, -1, "1"), createKeyValue(DEFINE_TABLET, 1, 1, extent),
415         createKeyValue(COMPACTION_START, 30, 1, "somefile"), createKeyValue(COMPACTION_FINISH, 32, 1, "somefile"), createKeyValue(MUTATION, 29, 1, m1),
416         createKeyValue(MUTATION, 30, 1, m2),};
417     Map<String,KeyValue[]> logs = new TreeMap<String,KeyValue[]>();
418     logs.put("testlog", entries);
419     // Recover
420     List<Mutation> mutations = recover(logs, extent);
421     // Verify recovered data
422     Assert.assertEquals(0, mutations.size());
423   }
424   
425   @Test
426   public void testBug2() throws IOException {
427     // Create a test log
428     Mutation ignored = new ServerMutation(new Text("ignored"));
429     ignored.put(cf, cq, value);
430     Mutation m = new ServerMutation(new Text("row1"));
431     m.put(cf, cq, value);
432     Mutation m2 = new ServerMutation(new Text("row2"));
433     m2.put(cf, cq, value);
434     Mutation m3 = new ServerMutation(new Text("row3"));
435     m3.put(cf, cq, value);
436     KeyValue entries[] = new KeyValue[] {createKeyValue(OPEN, 0, -1, "1"), createKeyValue(DEFINE_TABLET, 1, 1, extent),
437         createKeyValue(COMPACTION_START, 2, 1, "somefile"), createKeyValue(COMPACTION_FINISH, 4, 1, null), createKeyValue(MUTATION, 3, 1, m),};
438     KeyValue entries2[] = new KeyValue[] {createKeyValue(OPEN, 5, -1, "1"), createKeyValue(DEFINE_TABLET, 6, 1, extent),
439         createKeyValue(COMPACTION_START, 8, 1, "somefile"), createKeyValue(MUTATION, 7, 1, m2), createKeyValue(MUTATION, 9, 1, m3),};
440     Map<String,KeyValue[]> logs = new TreeMap<String,KeyValue[]>();
441     logs.put("entries", entries);
442     logs.put("entries2", entries2);
443     // Recover
444     List<Mutation> mutations = recover(logs, extent);
445     // Verify recovered data
446     Assert.assertEquals(3, mutations.size());
447     Assert.assertEquals(m, mutations.get(0));
448     Assert.assertEquals(m2, mutations.get(1));
449     Assert.assertEquals(m3, mutations.get(2));
450   }
451   
452   @Test
453   public void testCompactionCrossesLogs4() throws IOException {
454     // Create a test log
455     Mutation ignored = new ServerMutation(new Text("ignored"));
456     ignored.put(cf, cq, value);
457     Mutation m = new ServerMutation(new Text("row1"));
458     m.put(cf, cq, value);
459     Mutation m2 = new ServerMutation(new Text("row2"));
460     m2.put(cf, cq, value);
461     Mutation m3 = new ServerMutation(new Text("row3"));
462     m3.put(cf, cq, value);
463     Mutation m4 = new ServerMutation(new Text("row4"));
464     m4.put(cf, cq, value);
465     Mutation m5 = new ServerMutation(new Text("row5"));
466     m5.put(cf, cq, value);
467     Mutation m6 = new ServerMutation(new Text("row6"));
468     m6.put(cf, cq, value);
469     KeyValue entries[] = new KeyValue[] {createKeyValue(OPEN, 0, -1, "1"), createKeyValue(DEFINE_TABLET, 1, 1, extent),
470         createKeyValue(COMPACTION_START, 4, 1, "somefile"),
471         // createKeyValue(COMPACTION_FINISH, 5, 1, null),
472         createKeyValue(MUTATION, 2, 1, m), createKeyValue(MUTATION, 3, 1, m2),};
473     KeyValue entries2[] = new KeyValue[] {createKeyValue(OPEN, 5, -1, "2"), createKeyValue(DEFINE_TABLET, 6, 1, extent), createKeyValue(MUTATION, 7, 1, m3),
474         createKeyValue(MUTATION, 8, 1, m4),};
475     KeyValue entries3[] = new KeyValue[] {createKeyValue(OPEN, 9, -1, "3"), createKeyValue(DEFINE_TABLET, 10, 1, extent),
476         // createKeyValue(COMPACTION_FINISH, 11, 1, null),
477         createKeyValue(COMPACTION_START, 12, 1, "somefile"),
478         // createKeyValue(COMPACTION_FINISH, 14, 1, null),
479         // createKeyValue(COMPACTION_START, 15, 1, "somefile"),
480         // createKeyValue(COMPACTION_FINISH, 17, 1, null),
481         // createKeyValue(COMPACTION_START, 18, 1, "somefile"),
482         // createKeyValue(COMPACTION_FINISH, 19, 1, null),
483         createKeyValue(MUTATION, 8, 1, m5), createKeyValue(MUTATION, 20, 1, m6),};
484     Map<String,KeyValue[]> logs = new TreeMap<String,KeyValue[]>();
485     logs.put("entries", entries);
486     logs.put("entries2", entries2);
487     logs.put("entries3", entries3);
488     // Recover
489     
490     List<Mutation> mutations = recover(logs, extent);
491     
492     // Verify recovered data
493     Assert.assertEquals(6, mutations.size());
494     Assert.assertEquals(m, mutations.get(0));
495     Assert.assertEquals(m2, mutations.get(1));
496     Assert.assertEquals(m3, mutations.get(2));
497     Assert.assertEquals(m4, mutations.get(3));
498     Assert.assertEquals(m5, mutations.get(4));
499     Assert.assertEquals(m6, mutations.get(5));
500   }
501   
502   @Test
503   public void testLookingForBug3() throws IOException {
504     Mutation ignored = new ServerMutation(new Text("ignored"));
505     ignored.put(cf, cq, value);
506     Mutation m = new ServerMutation(new Text("row1"));
507     m.put(cf, cq, value);
508     Mutation m2 = new ServerMutation(new Text("row2"));
509     m2.put(cf, cq, value);
510     Mutation m3 = new ServerMutation(new Text("row3"));
511     m3.put(cf, cq, value);
512     Mutation m4 = new ServerMutation(new Text("row4"));
513     m4.put(cf, cq, value);
514     Mutation m5 = new ServerMutation(new Text("row5"));
515     m5.put(cf, cq, value);
516     KeyValue entries[] = new KeyValue[] {createKeyValue(OPEN, 0, -1, "1"), createKeyValue(DEFINE_TABLET, 1, 1, extent),
517         createKeyValue(COMPACTION_START, 2, 1, "somefile"), createKeyValue(COMPACTION_FINISH, 3, 1, null), createKeyValue(MUTATION, 1, 1, ignored),
518         createKeyValue(MUTATION, 3, 1, m), createKeyValue(MUTATION, 3, 1, m2), createKeyValue(MUTATION, 3, 1, m3),};
519     KeyValue entries2[] = new KeyValue[] {createKeyValue(OPEN, 0, -1, "2"), createKeyValue(DEFINE_TABLET, 1, 1, extent),
520         createKeyValue(COMPACTION_START, 2, 1, "somefile2"), createKeyValue(MUTATION, 3, 1, m4), createKeyValue(MUTATION, 3, 1, m5),};
521     Map<String,KeyValue[]> logs = new TreeMap<String,KeyValue[]>();
522     logs.put("entries", entries);
523     logs.put("entries2", entries2);
524     // Recover
525     List<Mutation> mutations = recover(logs, extent);
526     // Verify recovered data
527     Assert.assertEquals(5, mutations.size());
528     Assert.assertEquals(m, mutations.get(0));
529     Assert.assertEquals(m2, mutations.get(1));
530     Assert.assertEquals(m3, mutations.get(2));
531     Assert.assertEquals(m4, mutations.get(3));
532     Assert.assertEquals(m5, mutations.get(4));
533   }
534   
535   @Test
536   public void testMultipleTabletDefinition() throws Exception {
537     // test for a tablet defined multiple times in a log file
538     // there was a bug where the oldest tablet id was used instead
539     // of the newest
540     
541     Mutation ignored = new ServerMutation(new Text("row1"));
542     ignored.put("foo", "bar", "v1");
543     Mutation m = new ServerMutation(new Text("row1"));
544     m.put("foo", "bar", "v1");
545     
546     KeyValue entries[] = new KeyValue[] {createKeyValue(OPEN, 0, -1, "1"), createKeyValue(DEFINE_TABLET, 1, 1, extent),
547         createKeyValue(DEFINE_TABLET, 1, 2, extent), createKeyValue(MUTATION, 2, 2, ignored), createKeyValue(COMPACTION_START, 3, 2, "somefile"),
548         createKeyValue(MUTATION, 4, 2, m), createKeyValue(COMPACTION_FINISH, 6, 2, null),};
549     
550     Arrays.sort(entries);
551     
552     Map<String,KeyValue[]> logs = new TreeMap<String,KeyValue[]>();
553     logs.put("entries", entries);
554     
555     List<Mutation> mutations = recover(logs, extent);
556     
557     Assert.assertEquals(1, mutations.size());
558     Assert.assertEquals(m, mutations.get(0));
559   }
560   
561   @Test
562   public void testNoFinish0() throws Exception {
563     // its possible that a minor compaction finishes successfully, but the process dies before writing the compaction event
564     
565     Mutation ignored = new ServerMutation(new Text("row1"));
566     ignored.put("foo", "bar", "v1");
567     
568     KeyValue entries[] = new KeyValue[] {createKeyValue(OPEN, 0, -1, "1"), createKeyValue(DEFINE_TABLET, 1, 2, extent),
569         createKeyValue(MUTATION, 2, 2, ignored), createKeyValue(COMPACTION_START, 3, 2, "/t/f1")};
570     
571     Arrays.sort(entries);
572     Map<String,KeyValue[]> logs = new TreeMap<String,KeyValue[]>();
573     logs.put("entries", entries);
574     
575     List<Mutation> mutations = recover(logs, Collections.singleton("/t/f1"), extent);
576     
577     Assert.assertEquals(0, mutations.size());
578   }
579   
580   @Test
581   public void testNoFinish1() throws Exception {
582     // its possible that a minor compaction finishes successfully, but the process dies before writing the compaction event
583     
584     Mutation ignored = new ServerMutation(new Text("row1"));
585     ignored.put("foo", "bar", "v1");
586     Mutation m = new ServerMutation(new Text("row1"));
587     m.put("foo", "bar", "v2");
588     
589     KeyValue entries[] = new KeyValue[] {createKeyValue(OPEN, 0, -1, "1"), createKeyValue(DEFINE_TABLET, 1, 2, extent),
590         createKeyValue(MUTATION, 2, 2, ignored), createKeyValue(COMPACTION_START, 3, 2, "/t/f1"), createKeyValue(MUTATION, 4, 2, m),};
591     
592     Arrays.sort(entries);
593     Map<String,KeyValue[]> logs = new TreeMap<String,KeyValue[]>();
594     logs.put("entries", entries);
595     
596     List<Mutation> mutations = recover(logs, Collections.singleton("/t/f1"), extent);
597     
598     Assert.assertEquals(1, mutations.size());
599     Assert.assertEquals(m, mutations.get(0));
600   }
601 }