1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver;
20
21 import java.util.LinkedList;
22 import java.util.concurrent.atomic.AtomicLong;
23
24 import com.google.common.annotations.VisibleForTesting;
25
26 import com.google.common.base.Objects;
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.apache.hadoop.hbase.classification.InterfaceAudience;
30 import org.apache.hadoop.hbase.util.Bytes;
31 import org.apache.hadoop.hbase.util.ClassSize;
32
33
34
35
36
37
38
39 @InterfaceAudience.Private
40 public class MultiVersionConcurrencyControl {
41 private static final Log LOG = LogFactory.getLog(MultiVersionConcurrencyControl.class);
42
43 final AtomicLong readPoint = new AtomicLong(0);
44 final AtomicLong writePoint = new AtomicLong(0);
45 private final Object readWaiters = new Object();
46
47
48
49 public static final long NONE = -1;
50
51
52
53
54
55
56
57 private final LinkedList<WriteEntry> writeQueue = new LinkedList<WriteEntry>();
58
59 public MultiVersionConcurrencyControl() {
60 super();
61 }
62
63
64
65
66 public MultiVersionConcurrencyControl(long startPoint) {
67 tryAdvanceTo(startPoint, NONE);
68 }
69
70
71
72
73
74 public void advanceTo(long newStartPoint) {
75 while (true) {
76 long seqId = this.getWritePoint();
77 if (seqId >= newStartPoint) break;
78 if (this.tryAdvanceTo(
79 }
80 }
81
82
83
84
85
86
87
88
89
90 boolean tryAdvanceTo(long newStartPoint, long expected) {
91 synchronized (writeQueue) {
92 long currentRead = this.readPoint.get();
93 long currentWrite = this.writePoint.get();
94 if (currentRead != currentWrite) {
95 throw new RuntimeException("Already used this mvcc; currentRead=" + currentRead +
96 ", currentWrite=" + currentWrite + "; too late to tryAdvanceTo");
97 }
98 if (expected != NONE && expected != currentRead) {
99 return false;
100 }
101
102 if (newStartPoint < currentRead) {
103 return false;
104 }
105
106 readPoint.set(newStartPoint);
107 writePoint.set(newStartPoint);
108 }
109 return true;
110 }
111
112
113
114
115
116
117
118
119
120
121
122 public WriteEntry begin() {
123 synchronized (writeQueue) {
124 long nextWriteNumber = writePoint.incrementAndGet();
125 WriteEntry e = new WriteEntry(nextWriteNumber);
126 writeQueue.add(e);
127 return e;
128 }
129 }
130
131
132
133
134
135 public void await() {
136
137 completeAndWait(begin());
138 }
139
140
141
142
143
144
145
146
147 public void completeAndWait(WriteEntry e) {
148 complete(e);
149 waitForRead(e);
150 }
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166 public boolean complete(WriteEntry writeEntry) {
167 synchronized (writeQueue) {
168 writeEntry.markCompleted();
169
170 long nextReadValue = NONE;
171 boolean ranOnce = false;
172 while (!writeQueue.isEmpty()) {
173 ranOnce = true;
174 WriteEntry queueFirst = writeQueue.getFirst();
175
176 if (nextReadValue > 0) {
177 if (nextReadValue + 1 != queueFirst.getWriteNumber()) {
178 throw new RuntimeException("Invariant in complete violated, nextReadValue="
179 + nextReadValue + ", writeNumber=" + queueFirst.getWriteNumber());
180 }
181 }
182
183 if (queueFirst.isCompleted()) {
184 nextReadValue = queueFirst.getWriteNumber();
185 writeQueue.removeFirst();
186 } else {
187 break;
188 }
189 }
190
191 if (!ranOnce) {
192 throw new RuntimeException("There is no first!");
193 }
194
195 if (nextReadValue > 0) {
196 synchronized (readWaiters) {
197 readPoint.set(nextReadValue);
198 readWaiters.notifyAll();
199 }
200 }
201 return readPoint.get() >= writeEntry.getWriteNumber();
202 }
203 }
204
205
206
207
208 void waitForRead(WriteEntry e) {
209 boolean interrupted = false;
210 int count = 0;
211 synchronized (readWaiters) {
212 while (readPoint.get() < e.getWriteNumber()) {
213 if (count % 100 == 0 && count > 0) {
214 LOG.warn("STUCK: " + this);
215 }
216 count++;
217 try {
218 readWaiters.wait(10);
219 } catch (InterruptedException ie) {
220
221
222 interrupted = true;
223 }
224 }
225 }
226 if (interrupted) {
227 Thread.currentThread().interrupt();
228 }
229 }
230
231 @VisibleForTesting
232 public String toString() {
233 return Objects.toStringHelper(this)
234 .add("readPoint", readPoint)
235 .add("writePoint", writePoint).toString();
236 }
237
238 public long getReadPoint() {
239 return readPoint.get();
240 }
241
242 @VisibleForTesting
243 public long getWritePoint() {
244 return writePoint.get();
245 }
246
247
248
249
250
251 @InterfaceAudience.Private
252 public static class WriteEntry {
253 private final long writeNumber;
254 private boolean completed = false;
255
256 WriteEntry(long writeNumber) {
257 this.writeNumber = writeNumber;
258 }
259
260 void markCompleted() {
261 this.completed = true;
262 }
263
264 boolean isCompleted() {
265 return this.completed;
266 }
267
268 public long getWriteNumber() {
269 return this.writeNumber;
270 }
271
272 @Override
273 public String toString() {
274 return this.writeNumber + ", " + this.completed;
275 }
276 }
277
278 public static final long FIXED_SIZE = ClassSize.align(
279 ClassSize.OBJECT +
280 2 * Bytes.SIZEOF_LONG +
281 2 * ClassSize.REFERENCE);
282 }