View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import java.util.LinkedList;
22  import java.util.concurrent.atomic.AtomicLong;
23  
24  import com.google.common.annotations.VisibleForTesting;
25  
26  import com.google.common.base.Objects;
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  import org.apache.hadoop.hbase.classification.InterfaceAudience;
30  import org.apache.hadoop.hbase.util.Bytes;
31  import org.apache.hadoop.hbase.util.ClassSize;
32  
33  
34  /**
35   * Manages the read/write consistency. This provides an interface for readers to determine what
36   * entries to ignore, and a mechanism for writers to obtain new write numbers, then "commit"
37   * the new writes for readers to read (thus forming atomic transactions).
38   */
39  @InterfaceAudience.Private
40  public class MultiVersionConcurrencyControl {
41    private static final Log LOG = LogFactory.getLog(MultiVersionConcurrencyControl.class);
42  
43    final AtomicLong readPoint = new AtomicLong(0);
44    final AtomicLong writePoint = new AtomicLong(0);
45    private final Object readWaiters = new Object();
46    /**
47     * Represents no value, or not set.
48     */
49    public static final long NONE = -1;
50  
51    // This is the pending queue of writes.
52    //
53    // TODO(eclark): Should this be an array of fixed size to
54    // reduce the number of allocations on the write path?
55    // This could be equal to the number of handlers + a small number.
56    // TODO: St.Ack 20150903 Sounds good to me.
57    private final LinkedList<WriteEntry> writeQueue = new LinkedList<WriteEntry>();
58  
59    public MultiVersionConcurrencyControl() {
60      super();
61    }
62  
63    /**
64     * Construct and set read point. Write point is uninitialized.
65     */
66    public MultiVersionConcurrencyControl(long startPoint) {
67      tryAdvanceTo(startPoint, NONE);
68    }
69  
70    /**
71     * Step the MVCC forward on to a new read/write basis.
72     * @param newStartPoint
73     */
74    public void advanceTo(long newStartPoint) {
75      while (true) {
76        long seqId = this.getWritePoint();
77        if (seqId >= newStartPoint) break;
78        if (this.tryAdvanceTo(/* newSeqId = */ newStartPoint, /* expected = */ seqId)) break;
79      }
80    }
81  
82    /**
83     * Step the MVCC forward on to a new read/write basis.
84     * @param newStartPoint Point to move read and write points to.
85     * @param expected If not -1 (#NONE)
86     * @return Returns false if <code>expected</code> is not equal to the
87     * current <code>readPoint</code> or if <code>startPoint</code> is less than current
88     * <code>readPoint</code>
89     */
90    boolean tryAdvanceTo(long newStartPoint, long expected) {
91      synchronized (writeQueue) {
92        long currentRead = this.readPoint.get();
93        long currentWrite = this.writePoint.get();
94        if (currentRead != currentWrite) {
95          throw new RuntimeException("Already used this mvcc; currentRead=" + currentRead +
96            ", currentWrite=" + currentWrite + "; too late to tryAdvanceTo");
97        }
98        if (expected != NONE && expected != currentRead) {
99          return false;
100       }
101 
102       if (newStartPoint < currentRead) {
103         return false;
104       }
105 
106       readPoint.set(newStartPoint);
107       writePoint.set(newStartPoint);
108     }
109     return true;
110   }
111 
112   /**
113    * Start a write transaction. Create a new {@link WriteEntry} with a new write number and add it
114    * to our queue of ongoing writes. Return this WriteEntry instance.
115    * To complete the write transaction and wait for it to be visible, call
116    * {@link #completeAndWait(WriteEntry)}. If the write failed, call
117    * {@link #complete(WriteEntry)} so we can clean up AFTER removing ALL trace of the failed write
118    * transaction.
119    * @see #complete(WriteEntry)
120    * @see #completeAndWait(WriteEntry)
121    */
122   public WriteEntry begin() {
123     synchronized (writeQueue) {
124       long nextWriteNumber = writePoint.incrementAndGet();
125       WriteEntry e = new WriteEntry(nextWriteNumber);
126       writeQueue.add(e);
127       return e;
128     }
129   }
130 
131   /**
132    * Wait until the read point catches up to the write point; i.e. wait on all outstanding mvccs
133    * to complete.
134    */
135   public void await() {
136     // Add a write and then wait on reads to catch up to it.
137     completeAndWait(begin());
138   }
139 
140   /**
141    * Complete a {@link WriteEntry} that was created by {@link #begin()} then wait until the
142    * read point catches up to our write.
143    *
144    * At the end of this call, the global read point is at least as large as the write point
145    * of the passed in WriteEntry.  Thus, the write is visible to MVCC readers.
146    */
147   public void completeAndWait(WriteEntry e) {
148     complete(e);
149     waitForRead(e);
150   }
151 
152   /**
153    * Mark the {@link WriteEntry} as complete and advance the read point as much as possible.
154    * Call this even if the write has FAILED (AFTER backing out the write transaction
155    * changes completely) so we can clean up the outstanding transaction.
156    *
157    * How much is the read point advanced?
158    * 
159    * Let S be the set of all write numbers that are completed. Set the read point to the highest
160    * numbered write of S.
161    *
162    * @param writeEntry
163    *
164    * @return true if e is visible to MVCC readers (that is, readpoint >= e.writeNumber)
165    */
166   public boolean complete(WriteEntry writeEntry) {
167     synchronized (writeQueue) {
168       writeEntry.markCompleted();
169 
170       long nextReadValue = NONE;
171       boolean ranOnce = false;
172       while (!writeQueue.isEmpty()) {
173         ranOnce = true;
174         WriteEntry queueFirst = writeQueue.getFirst();
175 
176         if (nextReadValue > 0) {
177           if (nextReadValue + 1 != queueFirst.getWriteNumber()) {
178             throw new RuntimeException("Invariant in complete violated, nextReadValue="
179                 + nextReadValue + ", writeNumber=" + queueFirst.getWriteNumber());
180           }
181         }
182 
183         if (queueFirst.isCompleted()) {
184           nextReadValue = queueFirst.getWriteNumber();
185           writeQueue.removeFirst();
186         } else {
187           break;
188         }
189       }
190 
191       if (!ranOnce) {
192         throw new RuntimeException("There is no first!");
193       }
194 
195       if (nextReadValue > 0) {
196         synchronized (readWaiters) {
197           readPoint.set(nextReadValue);
198           readWaiters.notifyAll();
199         }
200       }
201       return readPoint.get() >= writeEntry.getWriteNumber();
202     }
203   }
204 
205   /**
206    * Wait for the global readPoint to advance up to the passed in write entry number.
207    */
208   void waitForRead(WriteEntry e) {
209     boolean interrupted = false;
210     int count = 0;
211     synchronized (readWaiters) {
212       while (readPoint.get() < e.getWriteNumber()) {
213         if (count % 100 == 0 && count > 0) {
214           LOG.warn("STUCK: " + this);
215         }
216         count++;
217         try {
218           readWaiters.wait(10);
219         } catch (InterruptedException ie) {
220           // We were interrupted... finish the loop -- i.e. cleanup --and then
221           // on our way out, reset the interrupt flag.
222           interrupted = true;
223         }
224       }
225     }
226     if (interrupted) {
227       Thread.currentThread().interrupt();
228     }
229   }
230 
231   @VisibleForTesting
232   public String toString() {
233     return Objects.toStringHelper(this)
234         .add("readPoint", readPoint)
235         .add("writePoint", writePoint).toString();
236   }
237 
238   public long getReadPoint() {
239     return readPoint.get();
240   }
241 
242   @VisibleForTesting
243   public long getWritePoint() {
244     return writePoint.get();
245   }
246 
247   /**
248    * Write number and whether write has completed given out at start of a write transaction.
249    * Every created WriteEntry must be completed by calling mvcc#complete or #completeAndWait.
250    */
251   @InterfaceAudience.Private
252   public static class WriteEntry {
253     private final long writeNumber;
254     private boolean completed = false;
255 
256     WriteEntry(long writeNumber) {
257       this.writeNumber = writeNumber;
258     }
259 
260     void markCompleted() {
261       this.completed = true;
262     }
263 
264     boolean isCompleted() {
265       return this.completed;
266     }
267 
268     public long getWriteNumber() {
269       return this.writeNumber;
270     }
271 
272     @Override
273     public String toString() {
274       return this.writeNumber + ", " + this.completed;
275     }
276   }
277 
278   public static final long FIXED_SIZE = ClassSize.align(
279       ClassSize.OBJECT +
280       2 * Bytes.SIZEOF_LONG +
281       2 * ClassSize.REFERENCE);
282 }