/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ using System; using System.Collections.Generic; using System.Text; using System.Threading; using System.Transactions; using Apache.NMS.ActiveMQ.Commands; using Apache.NMS.ActiveMQ.Transactions; using Apache.NMS.Util; namespace Apache.NMS.ActiveMQ { public sealed class NetTxTransactionContext : TransactionContext, ISinglePhaseNotification { private const int XA_OK = 0; private const int XA_READONLY = 3; private Enlistment currentEnlistment; private static readonly Dictionary recoveredResourceManagerIds = new Dictionary(); public NetTxTransactionContext(Session session) : base(session) { } /// /// DTC recovery is performed once for each AppDomain per default. In case you want to perform /// it again during execution of the application you can call this method before. /// But ensure in this case that no connection is active anymore. /// public static void ResetDtcRecovery() { recoveredResourceManagerIds.Clear(); } public override bool InLocalTransaction { get { return this.transactionId != null && this.currentEnlistment == null; } } public override void Begin() { throw new IllegalStateException("Local Transactions not supported in NetTx resources"); } public override void Commit() { throw new IllegalStateException("Local Transactions not supported in NetTx resources"); } public override void Rollback() { throw new IllegalStateException("Local Transactions not supported in NetTx resources"); } #region Transaction Members used when dealing with .NET System Transactions. // When DTC calls prepare we must then wait for either the TX to commit, rollback or // be canceled because its in doubt. private readonly ManualResetEvent dtcControlEvent = new ManualResetEvent(true); // Once the DTC calls prepare we lock this object and don't unlock it again until // the TX has either completed or terminated, the users of this class should use // this sync point when the TX is a DTC version as opposed to a local one. private readonly object syncObject = new Mutex(); public enum TxState { None = 0, Active = 1, Pending = 2 } private TxState netTxState = TxState.None; public object SyncRoot { get { return this.syncObject; } } public bool InNetTransaction { get { return this.transactionId != null && this.transactionId is XATransactionId; } } public TxState NetTxState { get { return this.netTxState; } } public WaitHandle DtcWaitHandle { get { return dtcControlEvent; } } public void Begin(Transaction transaction) { lock (syncObject) { dtcControlEvent.Reset(); Tracer.Debug("Begin notification received"); if (InNetTransaction) { throw new TransactionInProgressException("A Transaction is already in Progress"); } try { Guid rmId = ResourceManagerGuid; // Enlist this object in the transaction. this.currentEnlistment = transaction.EnlistDurable(rmId, this, EnlistmentOptions.None); // In case of a exception in the current method the transaction will be rolled back. // Until Begin Transaction is completed we consider to be in a rollback scenario. this.netTxState = TxState.Pending; Tracer.Debug("Enlisted in Durable Transaction with RM Id: " + rmId); TransactionInformation txInfo = transaction.TransactionInformation; XATransactionId xaId = new XATransactionId(); this.transactionId = xaId; if (txInfo.DistributedIdentifier != Guid.Empty) { xaId.GlobalTransactionId = txInfo.DistributedIdentifier.ToByteArray(); xaId.BranchQualifier = Encoding.UTF8.GetBytes(Guid.NewGuid().ToString()); } else { xaId.GlobalTransactionId = Encoding.UTF8.GetBytes(txInfo.LocalIdentifier); xaId.BranchQualifier = Encoding.UTF8.GetBytes(Guid.NewGuid().ToString()); } // Now notify the broker that a new XA'ish transaction has started. TransactionInfo info = new TransactionInfo(); info.ConnectionId = this.connection.ConnectionId; info.TransactionId = this.transactionId; info.Type = (int)TransactionType.Begin; this.session.Connection.Oneway(info); // Begin Transaction is completed successfully. Change to transaction active state now. this.netTxState = TxState.Active; SignalTransactionStarted(); if (Tracer.IsDebugEnabled) { Tracer.Debug("Began XA'ish Transaction:" + xaId); } } catch (Exception) { // When in pending state the rollback will signal that a new transaction can be started. Otherwise do it here. if (netTxState != TxState.Pending) { netTxState = TxState.None; dtcControlEvent.Set(); } throw; } } } public void Prepare(PreparingEnlistment preparingEnlistment) { lock (this.syncObject) { this.netTxState = TxState.Pending; try { Tracer.Debug("Prepare notification received for TX id: " + this.transactionId); BeforeEnd(); // Before sending the request to the broker, log the recovery bits, if // this fails we can't prepare and the TX should be rolled back. RecoveryLogger.LogRecoveryInfo(this.transactionId as XATransactionId, preparingEnlistment.RecoveryInformation()); // Inform the broker that work on the XA'sh TX Branch is complete. TransactionInfo info = new TransactionInfo(); info.ConnectionId = this.connection.ConnectionId; info.TransactionId = this.transactionId; info.Type = (int)TransactionType.End; this.connection.CheckConnected(); this.connection.SyncRequest((TransactionInfo) info.Clone()); // Prepare the Transaction for commit. info.Type = (int)TransactionType.Prepare; IntegerResponse response = (IntegerResponse)this.connection.SyncRequest(info); if (response.Result == XA_READONLY) { Tracer.Debug("Transaction Prepare done and doesn't need a commit, TX id: " + this.transactionId); this.transactionId = null; this.currentEnlistment = null; // Read Only means there's nothing to recover because there was no // change on the broker. RecoveryLogger.LogRecovered(this.transactionId as XATransactionId); // if server responds that nothing needs to be done, then reply done. // otherwise the DTC will call Commit or Rollback but another transaction // can already be in progress and this one would be commited or rolled back // immediately. preparingEnlistment.Done(); // Done so commit won't be called. AfterCommit(); // A Read-Only TX is considered closed at this point, DTC won't call us again. this.dtcControlEvent.Set(); } else { Tracer.Debug("Transaction Prepare succeeded TX id: " + this.transactionId); // If work finished correctly, reply prepared preparingEnlistment.Prepared(); } } catch (Exception ex) { Tracer.DebugFormat("Transaction[{0}] Prepare failed with error: {1}", this.transactionId, ex.Message); AfterRollback(); preparingEnlistment.ForceRollback(); try { this.connection.OnException(ex); } catch (Exception error) { Tracer.Error(error.ToString()); } this.currentEnlistment = null; this.transactionId = null; this.netTxState = TxState.None; this.dtcControlEvent.Set(); } } } public void Commit(Enlistment enlistment) { lock (this.syncObject) { try { Tracer.Debug("Commit notification received for TX id: " + this.transactionId); if (this.transactionId != null) { // Now notify the broker that a new XA'ish transaction has completed. TransactionInfo info = new TransactionInfo(); info.ConnectionId = this.connection.ConnectionId; info.TransactionId = this.transactionId; info.Type = (int)TransactionType.CommitTwoPhase; this.connection.CheckConnected(); this.connection.SyncRequest(info); Tracer.Debug("Transaction Commit Done TX id: " + this.transactionId); RecoveryLogger.LogRecovered(this.transactionId as XATransactionId); // if server responds that nothing needs to be done, then reply done. enlistment.Done(); AfterCommit(); } } catch (Exception ex) { Tracer.DebugFormat("Transaction[{0}] Commit failed with error: {1}", this.transactionId, ex.Message); try { this.connection.OnException(ex); } catch (Exception error) { Tracer.Error(error.ToString()); } } finally { this.currentEnlistment = null; this.transactionId = null; this.netTxState = TxState.None; CountDownLatch latch = this.recoveryComplete; if (latch != null) { latch.countDown(); } this.dtcControlEvent.Set(); } } } public void SinglePhaseCommit(SinglePhaseEnlistment enlistment) { lock (this.syncObject) { try { Tracer.Debug("Single Phase Commit notification received for TX id: " + this.transactionId); if (this.transactionId != null) { BeforeEnd(); // Now notify the broker that a new XA'ish transaction has completed. TransactionInfo info = new TransactionInfo(); info.ConnectionId = this.connection.ConnectionId; info.TransactionId = this.transactionId; info.Type = (int)TransactionType.CommitOnePhase; this.connection.CheckConnected(); this.connection.SyncRequest(info); Tracer.Debug("Transaction Single Phase Commit Done TX id: " + this.transactionId); // if server responds that nothing needs to be done, then reply done. enlistment.Done(); AfterCommit(); } } catch (Exception ex) { Tracer.DebugFormat("Transaction[{0}] Single Phase Commit failed with error: {1}", this.transactionId, ex.Message); AfterRollback(); enlistment.Done(); try { this.connection.OnException(ex); } catch (Exception error) { Tracer.Error(error.ToString()); } } finally { this.currentEnlistment = null; this.transactionId = null; this.netTxState = TxState.None; this.dtcControlEvent.Set(); } } } public void Rollback(Enlistment enlistment) { lock (this.syncObject) { try { Tracer.Debug("Rollback notification received for TX id: " + this.transactionId); if (this.transactionId != null) { BeforeEnd(); // Now notify the broker that a new XA'ish transaction has started. TransactionInfo info = new TransactionInfo(); info.ConnectionId = this.connection.ConnectionId; info.TransactionId = this.transactionId; info.Type = (int)TransactionType.End; this.connection.CheckConnected(); this.connection.SyncRequest((TransactionInfo) info.Clone()); info.Type = (int)TransactionType.Rollback; this.connection.CheckConnected(); this.connection.SyncRequest((TransactionInfo) info.Clone()); Tracer.Debug("Transaction Rollback Done TX id: " + this.transactionId); RecoveryLogger.LogRecovered(this.transactionId as XATransactionId); // if server responds that nothing needs to be done, then reply done. enlistment.Done(); AfterRollback(); } } catch (Exception ex) { Tracer.DebugFormat("Transaction[{0}] Rollback failed with error: {1}", this.transactionId, ex.Message); AfterRollback(); try { this.connection.OnException(ex); } catch (Exception error) { Tracer.Error(error.ToString()); } } finally { this.currentEnlistment = null; this.transactionId = null; this.netTxState = TxState.None; CountDownLatch latch = this.recoveryComplete; if (latch != null) { latch.countDown(); } this.dtcControlEvent.Set(); } } } public void InDoubt(Enlistment enlistment) { lock (syncObject) { try { Tracer.Debug("In Doubt notification received for TX id: " + this.transactionId); BeforeEnd(); // Now notify the broker that Rollback should be performed. TransactionInfo info = new TransactionInfo(); info.ConnectionId = this.connection.ConnectionId; info.TransactionId = this.transactionId; info.Type = (int)TransactionType.End; this.connection.CheckConnected(); this.connection.SyncRequest((TransactionInfo) info.Clone()); info.Type = (int)TransactionType.Rollback; this.connection.CheckConnected(); this.connection.SyncRequest((TransactionInfo) info.Clone()); Tracer.Debug("InDoubt Transaction Rollback Done TX id: " + this.transactionId); RecoveryLogger.LogRecovered(this.transactionId as XATransactionId); // if server responds that nothing needs to be done, then reply done. enlistment.Done(); AfterRollback(); } finally { this.currentEnlistment = null; this.transactionId = null; this.netTxState = TxState.None; CountDownLatch latch = this.recoveryComplete; if (latch != null) { latch.countDown(); } this.dtcControlEvent.Set(); } } } #endregion #region Distributed Transaction Recovery Bits private volatile CountDownLatch recoveryComplete; /// /// Should be called from NetTxSession when created to check if any TX /// data is stored for recovery and whether the Broker has matching info /// stored. If an Transaction is found that belongs to this client and is /// still alive on the Broker it will be recovered, otherwise the stored /// data should be cleared. /// public void InitializeDtcTxContext() { string resourceManagerId = ResourceManagerId; // initialize the logger with the current Resource Manager Id RecoveryLogger.Initialize(resourceManagerId); lock (recoveredResourceManagerIds) { if (recoveredResourceManagerIds.ContainsKey(resourceManagerId)) { return; } recoveredResourceManagerIds[resourceManagerId] = true; KeyValuePair[] localRecoverables = RecoveryLogger.GetRecoverables(); if (localRecoverables.Length == 0) { Tracer.Debug("Did not detect any open DTC transaction records on disk."); // No local data so anything stored on the broker can't be recovered here. return; } XATransactionId[] recoverables = TryRecoverBrokerTXIds(); if (recoverables.Length == 0) { Tracer.Debug("Did not detect any recoverable transactions at Broker."); // Broker has no recoverable data so nothing to do here, delete the // old recovery log as its stale. RecoveryLogger.Purge(); return; } List> matches = new List>(); foreach (XATransactionId recoverable in recoverables) { foreach (KeyValuePair entry in localRecoverables) { if (entry.Key.Equals(recoverable)) { Tracer.DebugFormat("Found a matching TX on Broker to stored Id: {0} reenlisting.", entry.Key); matches.Add(entry); } } } if (matches.Count != 0) { this.recoveryComplete = new CountDownLatch(matches.Count); foreach (KeyValuePair recoverable in matches) { this.transactionId = recoverable.Key; Tracer.Info("Reenlisting recovered TX with Id: " + this.transactionId); this.currentEnlistment = TransactionManager.Reenlist(ResourceManagerGuid, recoverable.Value, this); } this.recoveryComplete.await(); Tracer.Debug("All Recovered TX enlistments Reports complete, Recovery Complete."); TransactionManager.RecoveryComplete(ResourceManagerGuid); return; } // The old recovery information doesn't match what's on the broker so we // should discard it as its stale now. RecoveryLogger.Purge(); } } private XATransactionId[] TryRecoverBrokerTXIds() { Tracer.Debug("Checking for Recoverable Transactions on Broker."); TransactionInfo info = new TransactionInfo(); info.ConnectionId = this.session.Connection.ConnectionId; info.Type = (int)TransactionType.Recover; this.connection.CheckConnected(); DataArrayResponse response = this.connection.SyncRequest(info) as DataArrayResponse; if (response != null && response.Data.Length > 0) { Tracer.DebugFormat("Broker reports there are {0} recoverable XA Transactions", response.Data.Length); List recovered = new List(); foreach (DataStructure ds in response.Data) { XATransactionId xid = ds as XATransactionId; if (xid != null) { recovered.Add(xid); } } return recovered.ToArray(); } return new XATransactionId[0]; } #endregion internal IRecoveryLogger RecoveryLogger { get { return (this.connection as NetTxConnection).RecoveryPolicy.RecoveryLogger; } } internal string ResourceManagerId { get { return (this.connection as NetTxConnection).ResourceManagerGuid.ToString(); } } internal Guid ResourceManagerGuid { get { return (this.connection as NetTxConnection).ResourceManagerGuid; } } } }