// /* // * Licensed to the Apache Software Foundation (ASF) under one or more // * contributor license agreements. See the NOTICE file distributed with // * this work for additional information regarding copyright ownership. // * The ASF licenses this file to You under the Apache License, Version 2.0 // * (the "License"); you may not use this file except in compliance with // * the License. You may obtain a copy of the License at // * // * http://www.apache.org/licenses/LICENSE-2.0 // * // * Unless required by applicable law or agreed to in writing, software // * distributed under the License is distributed on an "AS IS" BASIS, // * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // * See the License for the specific language governing permissions and // * limitations under the License. // */ // using System; using System.Collections.Generic; using System.Threading; using Apache.NMS.Util; using Apache.NMS.ActiveMQ.Commands; using Apache.NMS.ActiveMQ.Transport; using Apache.NMS.ActiveMQ.Transport.Mock; using NUnit.Framework; namespace Apache.NMS.ActiveMQ.Test { [TestFixture] public class InactivityMonitorTest { private List received; private List exceptions; private MockTransport transport = null; private WireFormatInfo localWireFormatInfo = null; private CountDownLatch asyncErrorLatch; public void OnException(ITransport transport, Exception exception) { Tracer.Debug("Test: Received Exception from Transport: " + exception ); exceptions.Add( exception ); asyncErrorLatch.countDown(); } public void OnCommand(ITransport transport, Command command) { Tracer.Debug("Test: Received Command from Transport: " + command ); received.Add( command ); } [SetUp] public void SetUp() { this.received = new List(); this.exceptions = new List(); Uri uri = new Uri("mock://mock?wireformat=openwire"); MockTransportFactory factory = new MockTransportFactory(); this.transport = factory.CompositeConnect( uri ) as MockTransport; this.localWireFormatInfo = new WireFormatInfo(); this.localWireFormatInfo.Version = 5; this.localWireFormatInfo.MaxInactivityDuration = 3000; this.localWireFormatInfo.TightEncodingEnabled = false; this.asyncErrorLatch = new CountDownLatch(1); } [Test] public void TestCreate() { InactivityMonitor monitor = new InactivityMonitor( this.transport ); Assert.IsTrue( monitor.InitialDelayTime == 0 ); Assert.IsTrue( monitor.ReadCheckTime == 0 ); Assert.IsTrue( monitor.WriteCheckTime == 0 ); Assert.IsTrue( monitor.KeepAliveResponseRequired == false ); Assert.IsTrue( monitor.IsDisposed == false ); } [Test] public void TestReadTimeout() { InactivityMonitor monitor = new InactivityMonitor( this.transport ); monitor.Exception += new ExceptionHandler(OnException); monitor.Command += new CommandHandler(OnCommand); // Send the local one for the monitor to record. monitor.Oneway( this.localWireFormatInfo ); // Should not have timed out on Read yet. Assert.IsTrue( this.exceptions.Count == 0 ); this.asyncErrorLatch.await(TimeSpan.FromSeconds(10)); // Channel should have been inactive for to long. Assert.IsTrue( this.exceptions.Count > 0 ); } [Test] public void TestWriteMessageFail() { this.transport.FailOnKeepAliveInfoSends = true ; this.transport.NumSentKeepAliveInfosBeforeFail = 4; InactivityMonitor monitor = new InactivityMonitor( this.transport ); monitor.Exception += new ExceptionHandler(OnException); monitor.Command += new CommandHandler(OnCommand); monitor.Start(); // Send the local one for the monitor to record. monitor.Oneway( this.localWireFormatInfo ); this.asyncErrorLatch.await(TimeSpan.FromSeconds(10)); // Channel should have been inactive for to long. Assert.IsTrue( this.exceptions.Count > 0 ); try { monitor.Oneway(new ActiveMQMessage()); Assert.Fail("Should have thrown an exception"); } catch { } } [Test] public void TestNonFailureSendCase() { InactivityMonitor monitor = new InactivityMonitor( this.transport ); monitor.Exception += new ExceptionHandler(OnException); monitor.Command += new CommandHandler(OnCommand); monitor.Start(); // Send the local one for the monitor to record. monitor.Oneway( this.localWireFormatInfo ); ActiveMQMessage message = new ActiveMQMessage(); for( int ix = 0; ix < 20; ++ix ) { monitor.Oneway( message ); Thread.Sleep( 500 ); this.transport.InjectCommand( message ); Thread.Sleep( 500 ); } // Channel should have been inactive for to long. Assert.IsTrue( this.exceptions.Count == 0 ); } } }