| 1 | package org.jtoolkit.essence.data; |
| 2 | |
| 3 | import org.apache.commons.logging.Log; |
| 4 | import static org.apache.commons.logging.LogFactory.getLog; |
| 5 | import org.jetbrains.annotations.NotNull; |
| 6 | import org.jetbrains.annotations.Nullable; |
| 7 | import org.jtoolkit.essence.concurrency.*; |
| 8 | import static org.jtoolkit.essence.concurrency.Threads.onTimeout; |
| 9 | import org.jtoolkit.essence.concurrency.impl.ThreadedCallback; |
| 10 | import org.jtoolkit.essence.utils.IOUtils; |
| 11 | import org.jtoolkit.essence.utils.Named; |
| 12 | import org.jtoolkit.essence.utils.Pair; |
| 13 | import org.jtoolkit.essence.app.Main; |
| 14 | |
| 15 | import java.io.Closeable; |
| 16 | import java.io.IOException; |
| 17 | import java.lang.ref.Reference; |
| 18 | import java.lang.ref.WeakReference; |
| 19 | import java.net.Socket; |
| 20 | import java.util.*; |
| 21 | import java.util.concurrent.*; |
| 22 | import java.util.concurrent.locks.Condition; |
| 23 | import java.util.concurrent.locks.Lock; |
| 24 | import java.util.concurrent.locks.ReentrantLock; |
| 25 | |
| 26 | /* |
| 27 | Copyright 2006 Peter Lawrey |
| 28 | |
| 29 | Licensed under the Apache License, Version 2.0 (the "License"); |
| 30 | you may not use this file except in compliance with the License. |
| 31 | You may obtain a copy of the License at |
| 32 | |
| 33 | http://www.apache.org/licenses/LICENSE-2.0 |
| 34 | |
| 35 | Unless required by applicable law or agreed to in writing, software |
| 36 | distributed under the License is distributed on an "AS IS" BASIS, |
| 37 | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 38 | See the License for the specific language governing permissions and |
| 39 | limitations under the License. |
| 40 | */ |
| 41 | |
| 42 | /** |
| 43 | * This class manages transactions. |
| 44 | * |
| 45 | * @author Peter Lawrey |
| 46 | */ |
| 47 | @ThreadSafe(Concurrency.THREAD_LOCAL) |
| 48 | @SuppressWarnings({"unchecked", "ClassWithTooManyMethods"}) |
| 49 | public class Transaction implements Named { |
| 50 | private static final Log LOG = getLog(Transaction.class); |
| 51 | private static final Map<Thread, Transaction> TRANSACTIONS = new ConcurrentHashMap<Thread, Transaction>(); |
| 52 | |
| 53 | private final Transaction parent; |
| 54 | private final String name; |
| 55 | private final Timeout timeout; |
| 56 | private final Map<Callback, Pair<Callback, Map<String, Map>>> allChanges = new LinkedHashMap<Callback, Pair<Callback, Map<String, Map>>>(); |
| 57 | |
| 58 | @Nullable private Set<Transaction> children = null; |
| 59 | @Nullable private Set<Closeable> closeableResources = null; |
| 60 | @Nullable private Set<Pair<Callback, Object>> pendingCallbacks = null; |
| 61 | private Runnable onComplete = null; |
| 62 | |
| 63 | private final Future timeoutFuture; |
| 64 | private final Lock completeLock; |
| 65 | private final Condition complete; |
| 66 | |
| 67 | private boolean commited = false; |
| 68 | private boolean rolledback = false; |
| 69 | private String rollbackReason = null; |
| 70 | private boolean commiting = false; |
| 71 | private static final String TIMEOUT_EQ = ", timeout="; |
| 72 | private static final String TRANSACTION = "Transaction "; |
| 73 | |
| 74 | private Transaction(Transaction parent, String name, Timeout timeout) throws Timeout.TimeoutException { |
| 75 | this.parent = parent; |
| 76 | this.name = name; |
| 77 | |
| 78 | completeLock = parent == null ? new ReentrantLock() : parent.completeLock; |
| 79 | complete = completeLock.newCondition(); |
| 80 | |
| 81 | if (parent == null) { |
| 82 | this.timeout = timeout; |
| 83 | } else { |
| 84 | if (parent.timeout.getMillisLeft() <= 0L) |
| 85 | throw new Timeout.TimeoutException("Transaction parent timed out " + parent.timeout + TIMEOUT_EQ + timeout); |
| 86 | this.timeout = timeout.or(parent.timeout); |
| 87 | if (parent.children == null) |
| 88 | parent.children = new LinkedHashSet<Transaction>(); |
| 89 | parent.children.add(this); |
| 90 | } |
| 91 | if (this.timeout.getMillisLeft() <= 0L) |
| 92 | throw new Timeout.TimeoutException("Transaction timed out " + this.timeout + TIMEOUT_EQ + timeout + (parent == null ? "" : ", parent.timeout=" + parent.timeout)); |
| 93 | if (this.timeout.isNever()) { |
| 94 | timeoutFuture = null; |
| 95 | } else { |
| 96 | timeoutFuture = onTimeout(this.timeout, new OnTimeoutRunnable(name, this)); |
| 97 | } |
| 98 | } |
| 99 | |
| 100 | public static void closeOnRollbackCurrent(final Socket s) { |
| 101 | if (s != null) |
| 102 | closeOnRollbackCurrent(new Closeable() { |
| 103 | public void close() throws IOException { |
| 104 | s.close(); |
| 105 | } |
| 106 | }); |
| 107 | } |
| 108 | |
| 109 | public static void closeOnRollbackCurrent(Closeable closeable) { |
| 110 | Transaction t = getTransaction(); |
| 111 | if (t != null) |
| 112 | t.closeOnRollback(closeable); |
| 113 | } |
| 114 | |
| 115 | public void commit() throws IllegalStateException { |
| 116 | if (commiting) return; |
| 117 | |
| 118 | if (children != null) |
| 119 | for (Transaction t : children) |
| 120 | try { |
| 121 | t.waitOn(timeout); |
| 122 | } catch (InterruptedException ignored) { |
| 123 | if (LOG.isInfoEnabled()) |
| 124 | LOG.info(Thread.currentThread().getName() + ": Waiting on " + t + " was interrupted as it was rolledback."); |
| 125 | } |
| 126 | |
| 127 | Lock writeLock = completeLock; |
| 128 | try { |
| 129 | writeLock.lock(); |
| 130 | if (pendingCallbacks != null) |
| 131 | for (Pair<Callback, Object> callbackPair : pendingCallbacks) { |
| 132 | if (callbackPair.second instanceof Throwable && callbackPair.first instanceof CallbackEx) |
| 133 | ((CallbackEx) callbackPair.first).onException((Throwable) callbackPair.second); |
| 134 | else |
| 135 | try { |
| 136 | callbackPair.first.onCallback(callbackPair.second); |
| 137 | } catch (Exception e) { |
| 138 | LOG.error(name + ": "+Main.UNHANDLED_EXCEPTION, e); |
| 139 | } |
| 140 | } |
| 141 | pendingCallbacks = null; |
| 142 | if (rolledback) { |
| 143 | unnest(); |
| 144 | throw new IllegalStateException(name + ": Transaction has been rolledback. " + rollbackReason); |
| 145 | } |
| 146 | if (commited) { |
| 147 | unnest(); |
| 148 | return; |
| 149 | } |
| 150 | if (timeoutFuture != null) timeoutFuture.cancel(false); |
| 151 | |
| 152 | commiting = true; |
| 153 | commit0(); |
| 154 | commiting = false; |
| 155 | |
| 156 | commited = true; |
| 157 | complete.signalAll(); |
| 158 | } finally { |
| 159 | writeLock.unlock(); |
| 160 | } |
| 161 | unnest(); |
| 162 | } |
| 163 | |
| 164 | private void commit0() { |
| 165 | for (Callback callback : allChanges.keySet()) { |
| 166 | Pair<Callback, Map<String, Map>> pair = allChanges.get(callback); |
| 167 | try { |
| 168 | callback.onCallback(pair.second); |
| 169 | } catch (RuntimeException e) { |
| 170 | throw e; |
| 171 | } catch (Exception e) { |
| 172 | LOG.error("unhandled exception", e); |
| 173 | } |
| 174 | } |
| 175 | } |
| 176 | |
| 177 | public static void complete(Transaction t) { |
| 178 | if (t != null) t.complete(); |
| 179 | } |
| 180 | |
| 181 | public void complete() { |
| 182 | if (!isComplete()) |
| 183 | rollback(); |
| 184 | if (onComplete != null) |
| 185 | try { |
| 186 | onComplete.run(); |
| 187 | } catch (RuntimeException e) { |
| 188 | LOG.warn(Thread.currentThread().getName() + ": onComplete " + onComplete + " threw an exception", e); |
| 189 | throw e; |
| 190 | } finally { |
| 191 | onComplete = null; |
| 192 | } |
| 193 | unnest(); |
| 194 | } |
| 195 | |
| 196 | public void closeOnRollback(Closeable closeable) { |
| 197 | Lock writeLock = completeLock; |
| 198 | try { |
| 199 | writeLock.lock(); |
| 200 | if (isComplete()) throw new IllegalStateException(name + ": Transaction is " + getStateString() + ' ' + rollbackReason); |
| 201 | if (closeableResources == null) closeableResources = new LinkedHashSet<Closeable>(); |
| 202 | closeableResources.add(closeable); |
| 203 | } finally { |
| 204 | writeLock.unlock(); |
| 205 | } |
| 206 | } |
| 207 | |
| 208 | public static long getMillisLeft() { |
| 209 | Transaction t = getTransaction(); |
| 210 | if (t == null || t.timeout.isNever()) |
| 211 | return Long.MAX_VALUE; |
| 212 | return t.timeout.getMillisLeft(); |
| 213 | } |
| 214 | |
| 215 | public static long getMillisLeft(long timeoutMS) { |
| 216 | Transaction t = getTransaction(); |
| 217 | if (t == null || t.timeout.isNever()) |
| 218 | return timeoutMS; |
| 219 | long millisLeft2 = t.timeout.getMillisLeft(); |
| 220 | return timeoutMS < millisLeft2 ? timeoutMS : millisLeft2; |
| 221 | } |
| 222 | |
| 223 | private String getStateString() { |
| 224 | Lock readLock = completeLock; |
| 225 | try { |
| 226 | readLock.lock(); |
| 227 | return rolledback ? "rolledback" : commited ? "commited" : "is incomplete"; |
| 228 | } finally { |
| 229 | readLock.unlock(); |
| 230 | } |
| 231 | } |
| 232 | |
| 233 | public boolean isComplete() { |
| 234 | Lock readLock = completeLock; |
| 235 | try { |
| 236 | readLock.lock(); |
| 237 | return commited || rolledback; |
| 238 | } finally { |
| 239 | readLock.unlock(); |
| 240 | } |
| 241 | } |
| 242 | |
| 243 | public boolean isCommited() { |
| 244 | Lock readLock = completeLock; |
| 245 | try { |
| 246 | readLock.lock(); |
| 247 | return commited; |
| 248 | } finally { |
| 249 | readLock.unlock(); |
| 250 | } |
| 251 | } |
| 252 | |
| 253 | public boolean isRolledback() { |
| 254 | Lock readLock = completeLock; |
| 255 | try { |
| 256 | readLock.lock(); |
| 257 | return rolledback; |
| 258 | } finally { |
| 259 | readLock.unlock(); |
| 260 | } |
| 261 | } |
| 262 | |
| 263 | public static boolean isTransactional() { |
| 264 | return getTransaction() != null; |
| 265 | } |
| 266 | |
| 267 | public static void include(Callback<Map<String, Map>> onCommit, Map<String, Map> changes, Callback<Map<String, Map>> onRollback) throws IllegalStateException { |
| 268 | Transaction t = getTransaction(); |
| 269 | if (t == null) { |
| 270 | try { |
| 271 | onCommit.onCallback(changes); |
| 272 | } catch (Exception e) { |
| 273 | LOG.error(Main.UNHANDLED_EXCEPTION, e); |
| 274 | } |
| 275 | return; |
| 276 | } |
| 277 | if (t.isComplete()) |
| 278 | throw new IllegalStateException(t.name + ": Transaction has been " + t.getStateString()); |
| 279 | if (t.timeout.getMillisLeft() < 0) |
| 280 | throw new Timeout.TimeoutException(t.name + ": Transaction has timed out"); |
| 281 | synchronized (t.allChanges) { |
| 282 | Pair<Callback, Map<String, Map>> pair = t.allChanges.get(onCommit); |
| 283 | if (pair == null) { |
| 284 | t.allChanges.put(onCommit, new Pair<Callback, Map<String, Map>>(onRollback, changes)); |
| 285 | return; |
| 286 | } |
| 287 | Map<String, Map> pchanges = pair.second; |
| 288 | for (Map.Entry<String, Map> change : changes.entrySet()) { |
| 289 | String changeKey = change.getKey(); |
| 290 | Map pmap = pchanges.get(changeKey); |
| 291 | Map changeValue = change.getValue(); |
| 292 | if (pmap == null) { |
| 293 | pchanges.put(changeKey, changeValue); |
| 294 | } else { |
| 295 | pmap.putAll(changeValue); |
| 296 | } |
| 297 | } |
| 298 | } |
| 299 | } |
| 300 | |
| 301 | public static void include(Callback<Map<String, Map>> onCommit, String name, Object key, Object value) throws IllegalStateException { |
| 302 | include(onCommit, toChange(name, key, value), null); |
| 303 | } |
| 304 | |
| 305 | public static void rollbackCurrent(String rollbackReason) throws IllegalStateException { |
| 306 | Transaction t = getTransaction(); |
| 307 | if (t == null) return; |
| 308 | t.rollback(rollbackReason); |
| 309 | } |
| 310 | |
| 311 | public void rollback() throws IllegalStateException { |
| 312 | rollback(Threads.stackSnapshot()); |
| 313 | } |
| 314 | |
| 315 | public void rollback(String reason) throws IllegalStateException { |
| 316 | Lock writeLock = completeLock; |
| 317 | try { |
| 318 | writeLock.lock(); |
| 319 | if (commited) { |
| 320 | unnest(); |
| 321 | throw new IllegalStateException(name + ": Transaction has been committed."); |
| 322 | } |
| 323 | if (rolledback) { |
| 324 | unnest(); |
| 325 | return; |
| 326 | } |
| 327 | if (timeoutFuture != null) timeoutFuture.cancel(false); |
| 328 | |
| 329 | rollback0(reason); |
| 330 | |
| 331 | rolledback = true; |
| 332 | complete.signalAll(); |
| 333 | } finally { |
| 334 | writeLock.unlock(); |
| 335 | unnest(); |
| 336 | } |
| 337 | } |
| 338 | |
| 339 | private void rollback0(String rollbackReason) { |
| 340 | this.rollbackReason = rollbackReason; |
| 341 | if (children != null) |
| 342 | for (Transaction t : children) { |
| 343 | if (!t.isComplete()) |
| 344 | t.rollback(rollbackReason); |
| 345 | } |
| 346 | |
| 347 | if (closeableResources != null) |
| 348 | for (Closeable c : closeableResources) |
| 349 | IOUtils.close(c); |
| 350 | |
| 351 | for (Pair<Callback, Map<String, Map>> pair : allChanges.values()) { |
| 352 | Callback onRollback = pair.first; |
| 353 | if (onRollback != null) |
| 354 | try { |
| 355 | onRollback.onCallback(pair.second); |
| 356 | } catch (Exception e) { |
| 357 | LOG.error(Main.UNHANDLED_EXCEPTION, e); |
| 358 | } |
| 359 | } |
| 360 | } |
| 361 | |
| 362 | public static void rollbackAll() { |
| 363 | Transaction t = getTransaction(); |
| 364 | if (t == null) return; |
| 365 | while (t.parent != null) { |
| 366 | t.rollback(); |
| 367 | t = t.parent; |
| 368 | } |
| 369 | t.rollback(); |
| 370 | assert getTransaction() == null; |
| 371 | } |
| 372 | |
| 373 | private void runOnComplete(Runnable run) { |
| 374 | if (isComplete()) throw new IllegalStateException(TRANSACTION + name + " complete."); |
| 375 | if (onComplete != null) |
| 376 | throw new IllegalStateException(name + ": Multiple onComplete Runnables not supported " + onComplete + " not " + run); |
| 377 | onComplete = run; |
| 378 | } |
| 379 | |
| 380 | public static void runOnCompleteCurrent(Runnable run) { |
| 381 | Transaction t = getTransaction(); |
| 382 | if (t == null) { |
| 383 | run.run(); |
| 384 | return; |
| 385 | } |
| 386 | t.runOnComplete(run); |
| 387 | } |
| 388 | |
| 389 | public static Transaction start(String name) throws Timeout.TimeoutException { |
| 390 | return start(name, Timeout.NEVER); |
| 391 | } |
| 392 | |
| 393 | public static Transaction start(String name, Timeout timeout) throws Timeout.TimeoutException { |
| 394 | Transaction parent = getTransaction(); |
| 395 | Transaction trans = new Transaction(parent, name, timeout); |
| 396 | setTransaction(trans); |
| 397 | return trans; |
| 398 | } |
| 399 | |
| 400 | private static Map<String, Map> toChange(String name, Object key, Object value) { |
| 401 | Map innerMap = new LinkedHashMap(); |
| 402 | innerMap.put(key, value); |
| 403 | Map<String, Map> outerMap = new LinkedHashMap<String, Map>(); |
| 404 | outerMap.put(name, innerMap); |
| 405 | return outerMap; |
| 406 | } |
| 407 | |
| 408 | private void unnest() { |
| 409 | Transaction t = getTransaction(); |
| 410 | //noinspection OverlyComplexBooleanExpression,UnnecessaryParentheses,ConstantConditions |
| 411 | if (t == this || (children != null && !children.isEmpty() && children.contains(t))) { |
| 412 | Transaction value = t.parent; |
| 413 | if (value == null) |
| 414 | removeTransaction(); |
| 415 | else |
| 416 | setTransaction(value); |
| 417 | } |
| 418 | } |
| 419 | |
| 420 | private static void removeTransaction() { |
| 421 | TRANSACTIONS.remove(Thread.currentThread()); |
| 422 | } |
| 423 | |
| 424 | @SuppressWarnings("StaticNonFinalField") |
| 425 | private static int s_setCounter = 0; |
| 426 | |
| 427 | private static void setTransaction(Transaction value) { |
| 428 | if ((s_setCounter++ & 127) == 0) |
| 429 | for (Thread t : TRANSACTIONS.keySet()) |
| 430 | if (!t.isAlive()) |
| 431 | TRANSACTIONS.remove(t); |
| 432 | TRANSACTIONS.put(Thread.currentThread(), value); |
| 433 | } |
| 434 | |
| 435 | public static Transaction getTransaction() { |
| 436 | return TRANSACTIONS.get(Thread.currentThread()); |
| 437 | } |
| 438 | |
| 439 | private void waitOn(Timeout timeout) throws InterruptedException { |
| 440 | Lock writeLock = completeLock; |
| 441 | //noinspection LockAcquiredButNotSafelyReleased |
| 442 | writeLock.lockInterruptibly(); |
| 443 | try { |
| 444 | while (!isComplete()) { |
| 445 | long millisLeft = timeout.getMillisLeft(); |
| 446 | if (millisLeft <= 0) |
| 447 | rollbackAfterWaitOn(); |
| 448 | if (millisLeft > 50) |
| 449 | millisLeft = 50; |
| 450 | try { |
| 451 | complete.await(millisLeft, TimeUnit.MILLISECONDS); |
| 452 | } catch (InterruptedException e) { |
| 453 | rollbackAfterWaitOn(); |
| 454 | throw e; |
| 455 | } |
| 456 | } |
| 457 | } finally { |
| 458 | writeLock.unlock(); |
| 459 | } |
| 460 | } |
| 461 | |
| 462 | private void rollbackAfterWaitOn() { |
| 463 | try { |
| 464 | rollback("on timeout waiting for child transaction."); |
| 465 | } catch (IllegalStateException ignored) { |
| 466 | // ignored. |
| 467 | } |
| 468 | } |
| 469 | |
| 470 | @NotNull public String getName() { |
| 471 | return name; |
| 472 | } |
| 473 | |
| 474 | @Override protected void finalize() throws Throwable { |
| 475 | super.finalize(); |
| 476 | if (!isComplete()) complete(); |
| 477 | } |
| 478 | |
| 479 | private static class OnTimeoutRunnable implements Runnable { |
| 480 | private final String name; |
| 481 | private final Reference<Transaction> transactionRef; |
| 482 | |
| 483 | OnTimeoutRunnable(@NotNull String name, @NotNull Transaction transaction) { |
| 484 | this.name = name; |
| 485 | transactionRef = new WeakReference<Transaction>(transaction); |
| 486 | } |
| 487 | |
| 488 | public void run() { |
| 489 | Transaction transaction = transactionRef.get(); |
| 490 | if (transaction == null) return; |
| 491 | LOG.warn(name + ": Transaction rollback on timeout " + (transaction.parent == null ? "" : ", parent= " + transaction.parent.name)); |
| 492 | transaction.rollback("on timeout"); |
| 493 | } |
| 494 | } |
| 495 | |
| 496 | public String toString() { |
| 497 | return TRANSACTION + name + (isCommited() ? " commited" : isRolledback() ? " rolledback" : " not complete"); |
| 498 | } |
| 499 | } |