/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ using System; using System.Threading; using System.Collections; using Apache.NMS.Stomp.Commands; namespace Apache.NMS.Stomp.Transport { /// /// A Transport that correlates asynchronous send/receive messages into single request/response. /// public class ResponseCorrelator : TransportFilter { private readonly IDictionary requestMap = Hashtable.Synchronized(new Hashtable()); private int nextCommandId; private Exception error; public ResponseCorrelator(ITransport next) : base(next) { } protected override void OnException(ITransport sender, Exception command) { Dispose(command); base.OnException(sender, command); } internal int GetNextCommandId() { return Interlocked.Increment(ref nextCommandId); } public override void Oneway(Command command) { command.CommandId = GetNextCommandId(); command.ResponseRequired = false; next.Oneway(command); } public override FutureResponse AsyncRequest(Command command) { int commandId = GetNextCommandId(); command.CommandId = commandId; command.ResponseRequired = true; FutureResponse future = new FutureResponse(); Exception priorError = null; lock(requestMap.SyncRoot) { priorError = this.error; if(priorError == null) { requestMap[commandId] = future; } } if(priorError != null) { BrokerError brError = new BrokerError(); brError.Message = priorError.Message; ExceptionResponse response = new ExceptionResponse(); response.Exception = brError; future.Response = response; throw priorError; } next.Oneway(command); return future; } public override Response Request(Command command, TimeSpan timeout) { FutureResponse future = AsyncRequest(command); future.ResponseTimeout = timeout; Response response = future.Response; if(response != null && response is ExceptionResponse) { ExceptionResponse er = response as ExceptionResponse; BrokerError brokerError = er.Exception; if(brokerError == null) { throw new BrokerException(); } else { throw new BrokerException(brokerError); } } return response; } protected override void OnCommand(ITransport sender, Command command) { if(command is Response) { Response response = (Response) command; int correlationId = response.CorrelationId; FutureResponse future = (FutureResponse) requestMap[correlationId]; if(future != null) { requestMap.Remove(correlationId); future.Response = response; if(response is ExceptionResponse) { ExceptionResponse er = response as ExceptionResponse; BrokerError brokerError = er.Exception; BrokerException exception = new BrokerException(brokerError); this.exceptionHandler(this, exception); } } else { if(Tracer.IsDebugEnabled) { Tracer.Debug("Unknown response ID: " + response.CommandId + " for response: " + response); } } } else { this.commandHandler(sender, command); } } public override void Stop() { this.Dispose(new IOException("Stopped")); base.Stop(); } private void Dispose(Exception error) { ArrayList requests = null; lock(requestMap.SyncRoot) { if(this.error == null) { this.error = error; requests = new ArrayList(requestMap.Values); requestMap.Clear(); } } if(requests != null) { foreach(FutureResponse future in requests) { BrokerError brError = new BrokerError(); brError.Message = error.Message; ExceptionResponse response = new ExceptionResponse(); response.Exception = brError; future.Response = response; } } } } }