1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.commons.transaction.memory;
18
19 import java.io.PrintWriter;
20 import java.util.HashSet;
21 import java.util.Iterator;
22 import java.util.Map;
23 import java.util.Set;
24 import java.util.Collections;
25
26 import org.apache.commons.transaction.locking.ReadWriteLock;
27 import org.apache.commons.transaction.util.LoggerFacade;
28 import org.apache.commons.transaction.util.PrintWriterLogger;
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51 public class OptimisticMapWrapper extends TransactionalMapWrapper {
52
53 protected static final int COMMIT_TIMEOUT = 1000 * 60;
54 protected static final int ACCESS_TIMEOUT = 1000 * 30;
55
56 protected Set activeTransactions;
57
58 protected LoggerFacade logger;
59
60 protected ReadWriteLock commitLock;
61
62
63
64
65
66
67
68 public OptimisticMapWrapper(Map wrapped) {
69 this(wrapped, new HashMapFactory(), new HashSetFactory());
70 }
71
72
73
74
75
76
77
78
79
80 public OptimisticMapWrapper(Map wrapped, MapFactory mapFactory, SetFactory setFactory) {
81 this(wrapped, mapFactory, setFactory, new PrintWriterLogger(new PrintWriter(System.out),
82 OptimisticMapWrapper.class.getName(), false));
83 }
84
85
86
87
88
89
90
91
92
93
94
95 public OptimisticMapWrapper(Map wrapped, MapFactory mapFactory, SetFactory setFactory, LoggerFacade logger) {
96 super(wrapped, mapFactory, setFactory);
97 activeTransactions = Collections.synchronizedSet(new HashSet());
98 this.logger = logger;
99 commitLock = new ReadWriteLock("COMMIT", logger);
100 }
101
102 public void startTransaction() {
103 if (getActiveTx() != null) {
104 throw new IllegalStateException(
105 "Active thread " + Thread.currentThread() + " already associated with a transaction!");
106 }
107 CopyingTxContext context = new CopyingTxContext();
108 activeTransactions.add(context);
109 setActiveTx(context);
110 }
111
112 public void rollbackTransaction() {
113 TxContext txContext = getActiveTx();
114 super.rollbackTransaction();
115 activeTransactions.remove(txContext);
116 }
117
118 public void commitTransaction() throws ConflictException {
119 commitTransaction(false);
120 }
121
122 public void commitTransaction(boolean force) throws ConflictException {
123 TxContext txContext = getActiveTx();
124
125 if (txContext == null) {
126 throw new IllegalStateException(
127 "Active thread " + Thread.currentThread() + " not associated with a transaction!");
128 }
129
130 if (txContext.status == STATUS_MARKED_ROLLBACK) {
131 throw new IllegalStateException("Active thread " + Thread.currentThread() + " is marked for rollback!");
132 }
133
134 try {
135
136
137 commitLock.acquireWrite(txContext, COMMIT_TIMEOUT);
138
139 if (!force) {
140 Object conflictKey = checkForConflicts();
141 if (conflictKey != null) {
142 throw new ConflictException(conflictKey);
143 }
144 }
145
146 activeTransactions.remove(txContext);
147 copyChangesToConcurrentTransactions();
148 super.commitTransaction();
149
150 } catch (InterruptedException e) {
151
152 throw new ConflictException(e);
153 } finally {
154 commitLock.release(txContext);
155 }
156 }
157
158
159 public Object checkForConflicts() {
160 CopyingTxContext txContext = (CopyingTxContext) getActiveTx();
161
162 Set keys = txContext.changedKeys();
163 Set externalKeys = txContext.externalChangedKeys();
164
165 for (Iterator it2 = keys.iterator(); it2.hasNext();) {
166 Object key = it2.next();
167 if (externalKeys.contains(key)) {
168 return key;
169 }
170 }
171 return null;
172 }
173
174 protected void copyChangesToConcurrentTransactions() {
175 CopyingTxContext thisTxContext = (CopyingTxContext) getActiveTx();
176
177 synchronized (activeTransactions) {
178 for (Iterator it = activeTransactions.iterator(); it.hasNext();) {
179 CopyingTxContext otherTxContext = (CopyingTxContext) it.next();
180
181
182 if (otherTxContext.cleared)
183 continue;
184
185 if (thisTxContext.cleared) {
186
187 otherTxContext.externalChanges.putAll(wrapped);
188 } else
189 {
190 for (Iterator it2 = thisTxContext.changes.entrySet().iterator(); it2.hasNext();) {
191 Map.Entry entry = (Map.Entry) it2.next();
192 Object value = wrapped.get(entry.getKey());
193 if (value != null) {
194
195 otherTxContext.externalChanges.put(entry.getKey(), value);
196 } else {
197
198 otherTxContext.externalDeletes.add(entry.getKey());
199 }
200 }
201
202 for (Iterator it2 = thisTxContext.deletes.iterator(); it2.hasNext();) {
203
204 Object key = it2.next();
205 Object value = wrapped.get(key);
206 otherTxContext.externalChanges.put(key, value);
207 }
208 }
209 }
210 }
211 }
212
213 public class CopyingTxContext extends TxContext {
214 protected Map externalChanges;
215 protected Map externalAdds;
216 protected Set externalDeletes;
217
218 protected CopyingTxContext() {
219 super();
220 externalChanges = mapFactory.createMap();
221 externalDeletes = setFactory.createSet();
222 externalAdds = mapFactory.createMap();
223 }
224
225 protected Set externalChangedKeys() {
226 Set keySet = new HashSet();
227 keySet.addAll(externalDeletes);
228 keySet.addAll(externalChanges.keySet());
229 keySet.addAll(externalAdds.keySet());
230 return keySet;
231 }
232
233 protected Set changedKeys() {
234 Set keySet = new HashSet();
235 keySet.addAll(deletes);
236 keySet.addAll(changes.keySet());
237 keySet.addAll(adds.keySet());
238 return keySet;
239 }
240
241 protected Set keys() {
242 try {
243 commitLock.acquireRead(this, ACCESS_TIMEOUT);
244 Set keySet = super.keys();
245 keySet.removeAll(externalDeletes);
246 keySet.addAll(externalAdds.keySet());
247 return keySet;
248 } catch (InterruptedException e) {
249 return null;
250 } finally {
251 commitLock.release(this);
252 }
253 }
254
255 protected Object get(Object key) {
256 try {
257 commitLock.acquireRead(this, ACCESS_TIMEOUT);
258
259 if (deletes.contains(key)) {
260
261 return null;
262 }
263
264 Object changed = changes.get(key);
265 if (changed != null) {
266 return changed;
267 }
268
269 Object added = adds.get(key);
270 if (added != null) {
271 return added;
272 }
273
274 if (cleared) {
275 return null;
276 } else {
277 if (externalDeletes.contains(key)) {
278
279 return null;
280 }
281
282 changed = externalChanges.get(key);
283 if (changed != null) {
284 return changed;
285 }
286
287 added = externalAdds.get(key);
288 if (added != null) {
289 return added;
290 }
291
292
293 return wrapped.get(key);
294 }
295 } catch (InterruptedException e) {
296 return null;
297 } finally {
298 commitLock.release(this);
299 }
300 }
301
302 protected void put(Object key, Object value) {
303 try {
304 commitLock.acquireRead(this, ACCESS_TIMEOUT);
305 super.put(key, value);
306 } catch (InterruptedException e) {
307 } finally {
308 commitLock.release(this);
309 }
310 }
311
312 protected void remove(Object key) {
313 try {
314 commitLock.acquireRead(this, ACCESS_TIMEOUT);
315 super.remove(key);
316 } catch (InterruptedException e) {
317 } finally {
318 commitLock.release(this);
319 }
320 }
321
322 protected int size() {
323 try {
324 commitLock.acquireRead(this, ACCESS_TIMEOUT);
325 int size = super.size();
326
327 size -= externalDeletes.size();
328 size += externalAdds.size();
329
330 return size;
331 } catch (InterruptedException e) {
332 return -1;
333 } finally {
334 commitLock.release(this);
335 }
336 }
337
338 protected void clear() {
339 try {
340 commitLock.acquireRead(this, ACCESS_TIMEOUT);
341 super.clear();
342 externalDeletes.clear();
343 externalChanges.clear();
344 externalAdds.clear();
345 } catch (InterruptedException e) {
346 } finally {
347 commitLock.release(this);
348 }
349 }
350
351 protected void merge() {
352 try {
353 commitLock.acquireRead(this, ACCESS_TIMEOUT);
354 super.merge();
355 } catch (InterruptedException e) {
356 } finally {
357 commitLock.release(this);
358 }
359 }
360
361 protected void dispose() {
362 try {
363 commitLock.acquireRead(this, ACCESS_TIMEOUT);
364 super.dispose();
365 setFactory.disposeSet(externalDeletes);
366 externalDeletes = null;
367 mapFactory.disposeMap(externalChanges);
368 externalChanges = null;
369 mapFactory.disposeMap(externalAdds);
370 externalAdds = null;
371 } catch (InterruptedException e) {
372 } finally {
373 commitLock.release(this);
374 }
375 }
376
377 protected void finalize() throws Throwable {
378 activeTransactions.remove(this);
379 super.finalize();
380 }
381 }
382 }