001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 * 019 */ 020package org.apache.mina.transport.vmpipe; 021 022import java.io.IOException; 023import java.net.SocketAddress; 024import java.util.HashSet; 025import java.util.Set; 026import java.util.concurrent.Executor; 027 028import org.apache.mina.core.filterchain.IoFilterChain; 029import org.apache.mina.core.future.ConnectFuture; 030import org.apache.mina.core.future.DefaultConnectFuture; 031import org.apache.mina.core.future.IoFuture; 032import org.apache.mina.core.future.IoFutureListener; 033import org.apache.mina.core.service.AbstractIoConnector; 034import org.apache.mina.core.service.IoHandler; 035import org.apache.mina.core.service.TransportMetadata; 036import org.apache.mina.core.session.IdleStatusChecker; 037import org.apache.mina.core.session.IoSessionInitializer; 038import org.apache.mina.util.ExceptionMonitor; 039 040/** 041 * Connects to {@link IoHandler}s which is bound on the specified 042 * {@link VmPipeAddress}. 043 * 044 * @author <a href="http://mina.apache.org">Apache MINA Project</a> 045 */ 046public final class VmPipeConnector extends AbstractIoConnector { 047 048 // object used for checking session idle 049 private IdleStatusChecker idleChecker; 050 051 /** 052 * Creates a new instance. 053 */ 054 public VmPipeConnector() { 055 this(null); 056 } 057 058 /** 059 * Creates a new instance. 060 * 061 * @param executor The executor to use 062 */ 063 public VmPipeConnector(Executor executor) { 064 super(new DefaultVmPipeSessionConfig(), executor); 065 idleChecker = new IdleStatusChecker(); 066 // we schedule the idle status checking task in this service exceutor 067 // it will be woke up every seconds 068 executeWorker(idleChecker.getNotifyingTask(), "idleStatusChecker"); 069 } 070 071 /** 072 * {@inheritDoc} 073 */ 074 public TransportMetadata getTransportMetadata() { 075 return VmPipeSession.METADATA; 076 } 077 078 /** 079 * {@inheritDoc} 080 */ 081 public VmPipeSessionConfig getSessionConfig() { 082 return (VmPipeSessionConfig) sessionConfig; 083 } 084 085 /** 086 * {@inheritDoc} 087 */ 088 @Override 089 protected ConnectFuture connect0(SocketAddress remoteAddress, SocketAddress localAddress, 090 IoSessionInitializer<? extends ConnectFuture> sessionInitializer) { 091 VmPipe entry = VmPipeAcceptor.boundHandlers.get(remoteAddress); 092 if (entry == null) { 093 return DefaultConnectFuture.newFailedFuture(new IOException("Endpoint unavailable: " + remoteAddress)); 094 } 095 096 DefaultConnectFuture future = new DefaultConnectFuture(); 097 098 // Assign the local address dynamically, 099 VmPipeAddress actualLocalAddress; 100 try { 101 actualLocalAddress = nextLocalAddress(); 102 } catch (IOException e) { 103 return DefaultConnectFuture.newFailedFuture(e); 104 } 105 106 VmPipeSession localSession = new VmPipeSession(this, getListeners(), actualLocalAddress, getHandler(), entry); 107 108 initSession(localSession, future, sessionInitializer); 109 110 // and reclaim the local address when the connection is closed. 111 localSession.getCloseFuture().addListener(LOCAL_ADDRESS_RECLAIMER); 112 113 // initialize connector session 114 try { 115 IoFilterChain filterChain = localSession.getFilterChain(); 116 this.getFilterChainBuilder().buildFilterChain(filterChain); 117 118 // The following sentences don't throw any exceptions. 119 getListeners().fireSessionCreated(localSession); 120 idleChecker.addSession(localSession); 121 } catch (Exception e) { 122 future.setException(e); 123 return future; 124 } 125 126 // initialize acceptor session 127 VmPipeSession remoteSession = localSession.getRemoteSession(); 128 ((VmPipeAcceptor) remoteSession.getService()).doFinishSessionInitialization(remoteSession, null); 129 try { 130 IoFilterChain filterChain = remoteSession.getFilterChain(); 131 entry.getAcceptor().getFilterChainBuilder().buildFilterChain(filterChain); 132 133 // The following sentences don't throw any exceptions. 134 entry.getListeners().fireSessionCreated(remoteSession); 135 idleChecker.addSession(remoteSession); 136 } catch (Exception e) { 137 ExceptionMonitor.getInstance().exceptionCaught(e); 138 remoteSession.close(true); 139 } 140 141 // Start chains, and then allow and messages read/written to be processed. This is to ensure that 142 // sessionOpened gets received before a messageReceived 143 ((VmPipeFilterChain) localSession.getFilterChain()).start(); 144 ((VmPipeFilterChain) remoteSession.getFilterChain()).start(); 145 146 return future; 147 } 148 149 /** 150 * {@inheritDoc} 151 */ 152 @Override 153 protected void dispose0() throws Exception { 154 // stop the idle checking task 155 idleChecker.getNotifyingTask().cancel(); 156 } 157 158 private static final Set<VmPipeAddress> TAKEN_LOCAL_ADDRESSES = new HashSet<VmPipeAddress>(); 159 160 private static int nextLocalPort = -1; 161 162 private static final IoFutureListener<IoFuture> LOCAL_ADDRESS_RECLAIMER = new LocalAddressReclaimer(); 163 164 private static VmPipeAddress nextLocalAddress() throws IOException { 165 synchronized (TAKEN_LOCAL_ADDRESSES) { 166 if (nextLocalPort >= 0) { 167 nextLocalPort = -1; 168 } 169 for (int i = 0; i < Integer.MAX_VALUE; i++) { 170 VmPipeAddress answer = new VmPipeAddress(nextLocalPort--); 171 if (!TAKEN_LOCAL_ADDRESSES.contains(answer)) { 172 TAKEN_LOCAL_ADDRESSES.add(answer); 173 return answer; 174 } 175 } 176 } 177 178 throw new IOException("Can't assign a local VM pipe port."); 179 } 180 181 private static class LocalAddressReclaimer implements IoFutureListener<IoFuture> { 182 public void operationComplete(IoFuture future) { 183 synchronized (TAKEN_LOCAL_ADDRESSES) { 184 TAKEN_LOCAL_ADDRESSES.remove(future.getSession().getLocalAddress()); 185 } 186 } 187 } 188}