/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ using System; using System.Threading; using Apache.NMS.Stomp.Commands; using Apache.NMS.Stomp.Threads; using Apache.NMS.Stomp.Util; using Apache.NMS.Util; namespace Apache.NMS.Stomp.Transport { /// /// This class make sure that the connection is still alive, /// by monitoring the reception of commands from the peer of /// the transport. /// public class InactivityMonitor : TransportFilter { private readonly Atomic monitorStarted = new Atomic(false); private readonly Atomic commandSent = new Atomic(false); private readonly Atomic commandReceived = new Atomic(false); private readonly Atomic failed = new Atomic(false); private readonly Atomic inRead = new Atomic(false); private readonly Atomic inWrite = new Atomic(false); private DedicatedTaskRunner asyncTask; private AsyncWriteTask asyncWriteTask; private readonly Mutex monitor = new Mutex(); private Timer connectionCheckTimer; private long maxInactivityDuration = 10000; public long MaxInactivityDuration { get { return this.maxInactivityDuration; } set { this.maxInactivityDuration = value; } } private long maxInactivityDurationInitialDelay = 10000; public long MaxInactivityDurationInitialDelay { get { return this.maxInactivityDurationInitialDelay; } set { this.maxInactivityDurationInitialDelay = value; } } /// /// Constructor or the Inactivity Monitor /// /// public InactivityMonitor(ITransport next) : base(next) { Tracer.Debug("Creating Inactivity Monitor"); } ~InactivityMonitor() { Dispose(false); } protected override void Dispose(bool disposing) { if(disposing) { // get rid of unmanaged stuff } StopMonitorThreads(); base.Dispose(disposing); } #region WriteCheck Related /// /// Check the write to the broker /// public void WriteCheck(object unused) { if(this.inWrite.Value || this.failed.Value) { Tracer.Debug("Inactivity Monitor is in write or already failed."); return; } if(!commandSent.Value) { Tracer.Debug("No Message sent since last write check. Sending a KeepAliveInfo"); this.asyncTask.Wakeup(); } else { Tracer.Debug("Message sent since last write check. Resetting flag"); } commandSent.Value = false; } #endregion public override void Stop() { StopMonitorThreads(); next.Stop(); } protected override void OnCommand(ITransport sender, Command command) { commandReceived.Value = true; inRead.Value = true; try { try { StartMonitorThreads(); } catch(IOException ex) { OnException(this, ex); } base.OnCommand(sender, command); } finally { inRead.Value = false; } } public override void Oneway(Command command) { // Disable inactivity monitoring while processing a command. // synchronize this method - its not synchronized // further down the transport stack and gets called by more // than one thread by this class lock(inWrite) { inWrite.Value = true; try { if(failed.Value) { throw new IOException("Channel was inactive for too long: " + next.RemoteAddress.ToString()); } next.Oneway(command); } finally { commandSent.Value = true; inWrite.Value = false; } } } protected override void OnException(ITransport sender, Exception command) { if(failed.CompareAndSet(false, true)) { Tracer.Debug("Exception received in the Inactivity Monitor: " + command.ToString()); StopMonitorThreads(); base.OnException(sender, command); } } private void StartMonitorThreads() { lock(monitor) { if(monitorStarted.Value || maxInactivityDuration == 0) { return; } Tracer.DebugFormat("Inactivity: Write Check time interval: {0}", maxInactivityDuration ); Tracer.DebugFormat("Inactivity: Initial Delay time interval: {0}", maxInactivityDurationInitialDelay ); this.asyncWriteTask = new AsyncWriteTask(this); this.asyncTask = new DedicatedTaskRunner(this.asyncWriteTask); monitorStarted.Value = true; this.connectionCheckTimer = new Timer( new TimerCallback(WriteCheck), null, maxInactivityDurationInitialDelay, maxInactivityDuration ); } } private void StopMonitorThreads() { lock(monitor) { if(monitorStarted.CompareAndSet(true, false)) { // Attempt to wait for the Timer to shutdown, but don't wait // forever, if they don't shutdown after two seconds, just quit. ThreadUtil.DisposeTimer(connectionCheckTimer, 2000); this.asyncTask.Shutdown(); this.asyncTask = null; this.asyncWriteTask = null; } } } #region Async Tasks // Task that fires when the TaskRunner is signaled by the WriteCheck Timer Task. class AsyncWriteTask : Task { private readonly InactivityMonitor parent; public AsyncWriteTask(InactivityMonitor parent) { this.parent = parent; } public bool Iterate() { Tracer.Debug("AsyncWriteTask perparing for another Write Check"); if(this.parent.monitorStarted.Value) { try { Tracer.Debug("AsyncWriteTask Write Check required sending KeepAlive."); KeepAliveInfo info = new KeepAliveInfo(); this.parent.Oneway(info); } catch(IOException e) { this.parent.OnException(parent, e); } } return false; } } #endregion } }