/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ using System; using System.Diagnostics; using System.Threading; using Apache.NMS.Test; using Apache.NMS.Util; using NUnit.Framework; namespace Apache.NMS.ActiveMQ.Test { [TestFixture] public class MaxInactivityDurationTest : NMSTestSupport { protected static string DESTINATION_NAME = "TEST.MaxInactivityDuration"; protected static string CORRELATION_ID = "MaxInactivityCorrelationID"; [Test] public void TestMaxInactivityDuration() { string testuri = "activemq:tcp://${activemqhost}:61616" + "?wireFormat.maxInactivityDurationInitialDelay=5000" + "&wireFormat.maxInactivityDuration=10000" + "&connection.asyncClose=false"; NMSConnectionFactory factory = new NMSConnectionFactory(NMSTestSupport.ReplaceEnvVar(testuri)); using(IConnection connection = factory.CreateConnection("", "")) { connection.Start(); using(ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge)) { IDestination destination = SessionUtil.GetDestination(session, DESTINATION_NAME); using(IMessageConsumer consumer = session.CreateConsumer(destination)) using(IMessageProducer producer = session.CreateProducer(destination)) { SendMessage(producer); IMessage receivedMsg = consumer.Receive(TimeSpan.FromSeconds(5)); Assert.AreEqual(CORRELATION_ID, receivedMsg.NMSCorrelationID, "Invalid correlation ID."); // Go inactive... Thread.Sleep(TimeSpan.FromSeconds(30)); // Send another message. SendMessage(producer); receivedMsg = consumer.Receive(TimeSpan.FromSeconds(5)); Assert.AreEqual(CORRELATION_ID, receivedMsg.NMSCorrelationID, "Invalid correlation ID."); } } } } protected void SendMessage(IMessageProducer producer) { IMessage request = producer.CreateMessage(); request.NMSCorrelationID = CORRELATION_ID; request.NMSType = "Test"; producer.Send(request); } [Test, Sequential] public void TestInactivityMonitorThreadLeak( [Values(0, 1000)] int inactivityDuration) { Process currentProcess = Process.GetCurrentProcess(); Tracer.InfoFormat("Beginning thread count: {0}, handle count: {1}", currentProcess.Threads.Count, currentProcess.HandleCount); string testuri = string.Format("activemq:tcp://${{activemqhost}}:61616?wireFormat.maxInactivityDuration={0}", inactivityDuration); NMSConnectionFactory factory = new NMSConnectionFactory(NMSTestSupport.ReplaceEnvVar(testuri)); // We measure the initial resource counts, and then allow a certain fudge factor for the resources // to fluctuate at run-time. We allow for a certain amount of fluctuation, but if the counts // grow outside the safe boundaries of delayed garbage collection, then we fail the test. currentProcess = Process.GetCurrentProcess(); int beginThreadCount = currentProcess.Threads.Count; int beginHandleCount = currentProcess.HandleCount; int maxThreadGrowth = 10; int maxHandleGrowth = 500; for(int i = 0; i < 200; i++) { using(IConnection connection = factory.CreateConnection("ResourceLeakTest", "Password")) { using(ISession session = connection.CreateSession()) { IDestination destination = SessionUtil.GetDestination(session, "topic://TEST.NMSResourceLeak"); using(IMessageConsumer consumer = session.CreateConsumer(destination)) { connection.Start(); } } } currentProcess = Process.GetCurrentProcess(); int endThreadCount = currentProcess.Threads.Count; int endHandleCount = currentProcess.HandleCount; Assert.Less(endThreadCount, beginThreadCount + maxThreadGrowth, string.Format("Thread count grew beyond maximum of {0} on iteration #{1}.", maxThreadGrowth, i)); Assert.Less(endHandleCount, beginHandleCount + maxHandleGrowth, string.Format("Handle count grew beyond maximum of {0} on iteration #{1}.", maxHandleGrowth, i)); } } } }