EMMA Coverage Report (generated Tue Apr 17 08:51:20 BST 2007)
[all classes][org.jtoolkit.essence.data]

COVERAGE SUMMARY FOR SOURCE FILE [Transaction.java]

nameclass, %method, %block, %line, %
Transaction.java100% (3/3)89%  (39/44)67%  (828/1237)76%  (212.6/280)

COVERAGE BREAKDOWN BY CLASS AND METHOD

nameclass, %method, %block, %line, %
     
class Transaction100% (1/1)88%  (35/40)65%  (768/1176)75%  (200.7/268)
getMillisLeft (): long 0%   (0/1)0%   (0/14)0%   (0/4)
getName (): String 0%   (0/1)0%   (0/3)0%   (0/1)
getStateString (): String 0%   (0/1)0%   (0/26)0%   (0/4)
rollbackAfterWaitOn (): void 0%   (0/1)0%   (0/6)0%   (0/4)
rollbackCurrent (String): void 0%   (0/1)0%   (0/9)0%   (0/4)
rollbackAll (): void 100% (1/1)20%  (5/25)25%  (2/8)
runOnComplete (Runnable): void 100% (1/1)22%  (10/45)63%  (3.2/5)
waitOn (Timeout): void 100% (1/1)27%  (12/45)32%  (5.4/17)
complete (): void 100% (1/1)41%  (18/44)68%  (7.5/11)
closeOnRollback (Closeable): void 100% (1/1)49%  (25/51)86%  (6.9/8)
rollback (String): void 100% (1/1)54%  (34/63)73%  (11/15)
commit (): void 100% (1/1)66%  (110/166)75%  (25.6/34)
Transaction (Transaction, String, Timeout): void 100% (1/1)69%  (108/156)93%  (26/28)
include (Callback, Map, Callback): void 100% (1/1)69%  (88/127)83%  (22.4/27)
isCommited (): boolean 100% (1/1)71%  (12/17)92%  (3.7/4)
isRolledback (): boolean 100% (1/1)71%  (12/17)92%  (3.7/4)
finalize (): void 100% (1/1)75%  (6/8)90%  (2.7/3)
isComplete (): boolean 100% (1/1)79%  (19/24)88%  (3.5/4)
commit0 (): void 100% (1/1)85%  (28/33)80%  (8/10)
setTransaction (Transaction): void 100% (1/1)88%  (29/33)87%  (5.2/6)
<static initializer> 100% (1/1)88%  (15/17)97%  (3.9/4)
rollback0 (String): void 100% (1/1)89%  (63/71)84%  (14.2/17)
getMillisLeft (long): long 100% (1/1)95%  (21/22)98%  (4.9/5)
toString (): String 100% (1/1)95%  (21/22)95%  (1/1)
access$000 (Transaction): Transaction 100% (1/1)100% (3/3)100% (1/1)
access$100 (Transaction): String 100% (1/1)100% (3/3)100% (1/1)
access$200 (): Log 100% (1/1)100% (2/2)100% (1/1)
closeOnRollbackCurrent (Closeable): void 100% (1/1)100% (8/8)100% (4/4)
closeOnRollbackCurrent (Socket): void 100% (1/1)100% (8/8)100% (3/3)
complete (Transaction): void 100% (1/1)100% (5/5)100% (2/2)
getTransaction (): Transaction 100% (1/1)100% (5/5)100% (1/1)
include (Callback, String, Object, Object): void 100% (1/1)100% (8/8)100% (2/2)
isTransactional (): boolean 100% (1/1)100% (6/6)100% (1/1)
removeTransaction (): void 100% (1/1)100% (5/5)100% (2/2)
rollback (): void 100% (1/1)100% (4/4)100% (2/2)
runOnCompleteCurrent (Runnable): void 100% (1/1)100% (11/11)100% (6/6)
start (String): Transaction 100% (1/1)100% (4/4)100% (1/1)
start (String, Timeout): Transaction 100% (1/1)100% (13/13)100% (4/4)
toChange (String, Object, Object): Map 100% (1/1)100% (20/20)100% (5/5)
unnest (): void 100% (1/1)100% (27/27)100% (7/7)
     
class Transaction$OnTimeoutRunnable100% (1/1)100% (2/2)98%  (50/51)99%  (8.9/9)
run (): void 100% (1/1)97%  (38/39)97%  (4.9/5)
Transaction$OnTimeoutRunnable (String, Transaction): void 100% (1/1)100% (12/12)100% (4/4)
     
class Transaction$1100% (1/1)100% (2/2)100% (10/10)100% (3/3)
Transaction$1 (Socket): void 100% (1/1)100% (6/6)100% (1/1)
close (): void 100% (1/1)100% (4/4)100% (2/2)

1package org.jtoolkit.essence.data;
2 
3import org.apache.commons.logging.Log;
4import static org.apache.commons.logging.LogFactory.getLog;
5import org.jetbrains.annotations.NotNull;
6import org.jetbrains.annotations.Nullable;
7import org.jtoolkit.essence.concurrency.*;
8import static org.jtoolkit.essence.concurrency.Threads.onTimeout;
9import org.jtoolkit.essence.concurrency.impl.ThreadedCallback;
10import org.jtoolkit.essence.utils.IOUtils;
11import org.jtoolkit.essence.utils.Named;
12import org.jtoolkit.essence.utils.Pair;
13import org.jtoolkit.essence.app.Main;
14 
15import java.io.Closeable;
16import java.io.IOException;
17import java.lang.ref.Reference;
18import java.lang.ref.WeakReference;
19import java.net.Socket;
20import java.util.*;
21import java.util.concurrent.*;
22import java.util.concurrent.locks.Condition;
23import java.util.concurrent.locks.Lock;
24import java.util.concurrent.locks.ReentrantLock;
25 
26/*
27   Copyright 2006 Peter Lawrey
28 
29   Licensed under the Apache License, Version 2.0 (the "License");
30   you may not use this file except in compliance with the License.
31   You may obtain a copy of the License at
32 
33       http://www.apache.org/licenses/LICENSE-2.0
34 
35   Unless required by applicable law or agreed to in writing, software
36   distributed under the License is distributed on an &quot;AS IS&quot; BASIS,
37   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
38   See the License for the specific language governing permissions and
39   limitations under the License.
40*/
41 
42/**
43 * This class manages transactions.
44 *
45 * @author Peter Lawrey
46 */
47@ThreadSafe(Concurrency.THREAD_LOCAL)
48@SuppressWarnings({"unchecked", "ClassWithTooManyMethods"})
49public class Transaction implements Named {
50    private static final Log LOG = getLog(Transaction.class);
51    private static final Map<Thread, Transaction> TRANSACTIONS = new ConcurrentHashMap<Thread, Transaction>();
52 
53    private final Transaction parent;
54    private final String name;
55    private final Timeout timeout;
56    private final Map<Callback, Pair<Callback, Map<String, Map>>> allChanges = new LinkedHashMap<Callback, Pair<Callback, Map<String, Map>>>();
57 
58    @Nullable private Set<Transaction> children = null;
59    @Nullable private Set<Closeable> closeableResources = null;
60    @Nullable private Set<Pair<Callback, Object>> pendingCallbacks = null;
61    private Runnable onComplete = null;
62 
63    private final Future timeoutFuture;
64    private final Lock completeLock;
65    private final Condition complete;
66 
67    private boolean commited = false;
68    private boolean rolledback = false;
69    private String rollbackReason = null;
70    private boolean commiting = false;
71    private static final String TIMEOUT_EQ = ",  timeout=";
72    private static final String TRANSACTION = "Transaction ";
73 
74    private Transaction(Transaction parent, String name, Timeout timeout) throws Timeout.TimeoutException {
75        this.parent = parent;
76        this.name = name;
77 
78        completeLock = parent == null ? new ReentrantLock() : parent.completeLock;
79        complete = completeLock.newCondition();
80 
81        if (parent == null) {
82            this.timeout = timeout;
83        } else {
84            if (parent.timeout.getMillisLeft() <= 0L)
85                throw new Timeout.TimeoutException("Transaction parent timed out " + parent.timeout + TIMEOUT_EQ + timeout);
86            this.timeout = timeout.or(parent.timeout);
87            if (parent.children == null)
88                parent.children = new LinkedHashSet<Transaction>();
89            parent.children.add(this);
90        }
91        if (this.timeout.getMillisLeft() <= 0L)
92            throw new Timeout.TimeoutException("Transaction timed out " + this.timeout + TIMEOUT_EQ + timeout + (parent == null ? "" : ", parent.timeout=" + parent.timeout));
93        if (this.timeout.isNever()) {
94            timeoutFuture = null;
95        } else {
96            timeoutFuture = onTimeout(this.timeout, new OnTimeoutRunnable(name, this));
97        }
98    }
99 
100    public static void closeOnRollbackCurrent(final Socket s) {
101        if (s != null)
102            closeOnRollbackCurrent(new Closeable() {
103                public void close() throws IOException {
104                    s.close();
105                }
106            });
107    }
108 
109    public static void closeOnRollbackCurrent(Closeable closeable) {
110        Transaction t = getTransaction();
111        if (t != null)
112            t.closeOnRollback(closeable);
113    }
114 
115    public void commit() throws IllegalStateException {
116        if (commiting) return;
117 
118        if (children != null)
119            for (Transaction t : children)
120                try {
121                    t.waitOn(timeout);
122                } catch (InterruptedException ignored) {
123                    if (LOG.isInfoEnabled())
124                        LOG.info(Thread.currentThread().getName() + ": Waiting on " + t + " was interrupted as it was rolledback.");
125                }
126 
127        Lock writeLock = completeLock;
128        try {
129            writeLock.lock();
130            if (pendingCallbacks != null)
131                for (Pair<Callback, Object> callbackPair : pendingCallbacks) {
132                    if (callbackPair.second instanceof Throwable && callbackPair.first instanceof CallbackEx)
133                        ((CallbackEx) callbackPair.first).onException((Throwable) callbackPair.second);
134                    else
135                        try {
136                            callbackPair.first.onCallback(callbackPair.second);
137                        } catch (Exception e) {
138                            LOG.error(name + ": "+Main.UNHANDLED_EXCEPTION, e);
139                        }
140                }
141            pendingCallbacks = null;
142            if (rolledback) {
143                unnest();
144                throw new IllegalStateException(name + ": Transaction has been rolledback. " + rollbackReason);
145            }
146            if (commited) {
147                unnest();
148                return;
149            }
150            if (timeoutFuture != null) timeoutFuture.cancel(false);
151 
152            commiting = true;
153            commit0();
154            commiting = false;
155 
156            commited = true;
157            complete.signalAll();
158        } finally {
159            writeLock.unlock();
160        }
161        unnest();
162    }
163 
164    private void commit0() {
165        for (Callback callback : allChanges.keySet()) {
166            Pair<Callback, Map<String, Map>> pair = allChanges.get(callback);
167            try {
168                callback.onCallback(pair.second);
169            } catch (RuntimeException e) {
170                throw e;
171            } catch (Exception e) {
172                LOG.error("unhandled exception", e);
173            }
174        }
175    }
176 
177    public static void complete(Transaction t) {
178        if (t != null) t.complete();
179    }
180 
181    public void complete() {
182        if (!isComplete())
183            rollback();
184        if (onComplete != null)
185            try {
186                onComplete.run();
187            } catch (RuntimeException e) {
188                LOG.warn(Thread.currentThread().getName() + ": onComplete " + onComplete + " threw an exception", e);
189                throw e;
190            } finally {
191                onComplete = null;
192            }
193        unnest();
194    }
195 
196    public void closeOnRollback(Closeable closeable) {
197        Lock writeLock = completeLock;
198        try {
199            writeLock.lock();
200            if (isComplete()) throw new IllegalStateException(name + ": Transaction is " + getStateString() + ' ' + rollbackReason);
201            if (closeableResources == null) closeableResources = new LinkedHashSet<Closeable>();
202            closeableResources.add(closeable);
203        } finally {
204            writeLock.unlock();
205        }
206    }
207 
208    public static long getMillisLeft() {
209        Transaction t = getTransaction();
210        if (t == null || t.timeout.isNever())
211            return Long.MAX_VALUE;
212        return t.timeout.getMillisLeft();
213    }
214 
215    public static long getMillisLeft(long timeoutMS) {
216        Transaction t = getTransaction();
217        if (t == null || t.timeout.isNever())
218            return timeoutMS;
219        long millisLeft2 = t.timeout.getMillisLeft();
220        return timeoutMS < millisLeft2 ? timeoutMS : millisLeft2;
221    }
222 
223    private String getStateString() {
224        Lock readLock = completeLock;
225        try {
226            readLock.lock();
227            return rolledback ? "rolledback" : commited ? "commited" : "is incomplete";
228        } finally {
229            readLock.unlock();
230        }
231    }
232 
233    public boolean isComplete() {
234        Lock readLock = completeLock;
235        try {
236            readLock.lock();
237            return commited || rolledback;
238        } finally {
239            readLock.unlock();
240        }
241    }
242 
243    public boolean isCommited() {
244        Lock readLock = completeLock;
245        try {
246            readLock.lock();
247            return commited;
248        } finally {
249            readLock.unlock();
250        }
251    }
252 
253    public boolean isRolledback() {
254        Lock readLock = completeLock;
255        try {
256            readLock.lock();
257            return rolledback;
258        } finally {
259            readLock.unlock();
260        }
261    }
262 
263    public static boolean isTransactional() {
264        return getTransaction() != null;
265    }
266 
267    public static void include(Callback<Map<String, Map>> onCommit, Map<String, Map> changes, Callback<Map<String, Map>> onRollback) throws IllegalStateException {
268        Transaction t = getTransaction();
269        if (t == null) {
270            try {
271                onCommit.onCallback(changes);
272            } catch (Exception e) {
273                LOG.error(Main.UNHANDLED_EXCEPTION, e);
274            }
275            return;
276        }
277        if (t.isComplete())
278            throw new IllegalStateException(t.name + ": Transaction has been " + t.getStateString());
279        if (t.timeout.getMillisLeft() < 0)
280            throw new Timeout.TimeoutException(t.name + ": Transaction has timed out");
281        synchronized (t.allChanges) {
282            Pair<Callback, Map<String, Map>> pair = t.allChanges.get(onCommit);
283            if (pair == null) {
284                t.allChanges.put(onCommit, new Pair<Callback, Map<String, Map>>(onRollback, changes));
285                return;
286            }
287            Map<String, Map> pchanges = pair.second;
288            for (Map.Entry<String, Map> change : changes.entrySet()) {
289                String changeKey = change.getKey();
290                Map pmap = pchanges.get(changeKey);
291                Map changeValue = change.getValue();
292                if (pmap == null) {
293                    pchanges.put(changeKey, changeValue);
294                } else {
295                    pmap.putAll(changeValue);
296                }
297            }
298        }
299    }
300 
301    public static void include(Callback<Map<String, Map>> onCommit, String name, Object key, Object value) throws IllegalStateException {
302        include(onCommit, toChange(name, key, value), null);
303    }
304 
305    public static void rollbackCurrent(String rollbackReason) throws IllegalStateException {
306        Transaction t = getTransaction();
307        if (t == null) return;
308        t.rollback(rollbackReason);
309    }
310 
311    public void rollback() throws IllegalStateException {
312        rollback(Threads.stackSnapshot());
313    }
314 
315    public void rollback(String reason) throws IllegalStateException {
316        Lock writeLock = completeLock;
317        try {
318            writeLock.lock();
319            if (commited) {
320                unnest();
321                throw new IllegalStateException(name + ": Transaction has been committed.");
322            }
323            if (rolledback) {
324                unnest();
325                return;
326            }
327            if (timeoutFuture != null) timeoutFuture.cancel(false);
328 
329            rollback0(reason);
330 
331            rolledback = true;
332            complete.signalAll();
333        } finally {
334            writeLock.unlock();
335            unnest();
336        }
337    }
338 
339    private void rollback0(String rollbackReason) {
340        this.rollbackReason = rollbackReason;
341        if (children != null)
342            for (Transaction t : children) {
343                if (!t.isComplete())
344                    t.rollback(rollbackReason);
345            }
346 
347        if (closeableResources != null)
348            for (Closeable c : closeableResources)
349                IOUtils.close(c);
350 
351        for (Pair<Callback, Map<String, Map>> pair : allChanges.values()) {
352            Callback onRollback = pair.first;
353            if (onRollback != null)
354                try {
355                    onRollback.onCallback(pair.second);
356                } catch (Exception e) {
357                    LOG.error(Main.UNHANDLED_EXCEPTION, e);
358                }
359        }
360    }
361 
362    public static void rollbackAll() {
363        Transaction t = getTransaction();
364        if (t == null) return;
365        while (t.parent != null) {
366            t.rollback();
367            t = t.parent;
368        }
369        t.rollback();
370        assert getTransaction() == null;
371    }
372 
373    private void runOnComplete(Runnable run) {
374        if (isComplete()) throw new IllegalStateException(TRANSACTION + name + " complete.");
375        if (onComplete != null)
376            throw new IllegalStateException(name + ": Multiple onComplete Runnables not supported " + onComplete + " not " + run);
377        onComplete = run;
378    }
379 
380    public static void runOnCompleteCurrent(Runnable run) {
381        Transaction t = getTransaction();
382        if (t == null) {
383            run.run();
384            return;
385        }
386        t.runOnComplete(run);
387    }
388 
389    public static Transaction start(String name) throws Timeout.TimeoutException {
390        return start(name, Timeout.NEVER);
391    }
392 
393    public static Transaction start(String name, Timeout timeout) throws Timeout.TimeoutException {
394        Transaction parent = getTransaction();
395        Transaction trans = new Transaction(parent, name, timeout);
396        setTransaction(trans);
397        return trans;
398    }
399 
400    private static Map<String, Map> toChange(String name, Object key, Object value) {
401        Map innerMap = new LinkedHashMap();
402        innerMap.put(key, value);
403        Map<String, Map> outerMap = new LinkedHashMap<String, Map>();
404        outerMap.put(name, innerMap);
405        return outerMap;
406    }
407 
408    private void unnest() {
409        Transaction t = getTransaction();
410        //noinspection OverlyComplexBooleanExpression,UnnecessaryParentheses,ConstantConditions
411        if (t == this || (children != null && !children.isEmpty() && children.contains(t))) {
412            Transaction value = t.parent;
413            if (value == null)
414                removeTransaction();
415            else
416                setTransaction(value);
417        }
418    }
419 
420    private static void removeTransaction() {
421        TRANSACTIONS.remove(Thread.currentThread());
422    }
423 
424    @SuppressWarnings("StaticNonFinalField")
425    private static int s_setCounter = 0;
426 
427    private static void setTransaction(Transaction value) {
428        if ((s_setCounter++ & 127) == 0)
429            for (Thread t : TRANSACTIONS.keySet())
430                if (!t.isAlive())
431                    TRANSACTIONS.remove(t);
432        TRANSACTIONS.put(Thread.currentThread(), value);
433    }
434 
435    public static Transaction getTransaction() {
436        return TRANSACTIONS.get(Thread.currentThread());
437    }
438 
439    private void waitOn(Timeout timeout) throws InterruptedException {
440        Lock writeLock = completeLock;
441        //noinspection LockAcquiredButNotSafelyReleased
442        writeLock.lockInterruptibly();
443        try {
444            while (!isComplete()) {
445                long millisLeft = timeout.getMillisLeft();
446                if (millisLeft <= 0)
447                    rollbackAfterWaitOn();
448                if (millisLeft > 50)
449                    millisLeft = 50;
450                try {
451                    complete.await(millisLeft, TimeUnit.MILLISECONDS);
452                } catch (InterruptedException e) {
453                    rollbackAfterWaitOn();
454                    throw e;
455                }
456            }
457        } finally {
458            writeLock.unlock();
459        }
460    }
461 
462    private void rollbackAfterWaitOn() {
463        try {
464            rollback("on timeout waiting for child transaction.");
465        } catch (IllegalStateException ignored) {
466            // ignored.
467        }
468    }
469 
470    @NotNull public String getName() {
471        return name;
472    }
473 
474    @Override protected void finalize() throws Throwable {
475        super.finalize();
476        if (!isComplete()) complete();
477    }
478 
479    private static class OnTimeoutRunnable implements Runnable {
480        private final String name;
481        private final Reference<Transaction> transactionRef;
482 
483        OnTimeoutRunnable(@NotNull String name, @NotNull Transaction transaction) {
484            this.name = name;
485            transactionRef = new WeakReference<Transaction>(transaction);
486        }
487 
488        public void run() {
489            Transaction transaction = transactionRef.get();
490            if (transaction == null) return;
491            LOG.warn(name + ": Transaction rollback on timeout " + (transaction.parent == null ? "" : ", parent= " + transaction.parent.name));
492            transaction.rollback("on timeout");
493        }
494    }
495 
496    public String toString() {
497        return TRANSACTION + name + (isCommited() ? " commited" : isRolledback() ? " rolledback" : " not complete");
498    }
499}

[all classes][org.jtoolkit.essence.data]
EMMA 2.0.5312 (C) Vladimir Roubtsov