001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 * 019 */ 020package org.apache.mina.transport.vmpipe; 021 022import java.io.IOException; 023import java.net.SocketAddress; 024import java.util.HashSet; 025import java.util.Set; 026import java.util.concurrent.Executor; 027 028import org.apache.mina.core.filterchain.IoFilterChain; 029import org.apache.mina.core.future.ConnectFuture; 030import org.apache.mina.core.future.DefaultConnectFuture; 031import org.apache.mina.core.future.IoFuture; 032import org.apache.mina.core.future.IoFutureListener; 033import org.apache.mina.core.service.AbstractIoConnector; 034import org.apache.mina.core.service.IoHandler; 035import org.apache.mina.core.service.TransportMetadata; 036import org.apache.mina.core.session.IdleStatusChecker; 037import org.apache.mina.core.session.IoSessionInitializer; 038import org.apache.mina.util.ExceptionMonitor; 039 040/** 041 * Connects to {@link IoHandler}s which is bound on the specified 042 * {@link VmPipeAddress}. 043 * 044 * @author <a href="http://mina.apache.org">Apache MINA Project</a> 045 */ 046public final class VmPipeConnector extends AbstractIoConnector { 047 048 // object used for checking session idle 049 private IdleStatusChecker idleChecker; 050 051 /** 052 * Creates a new instance. 053 */ 054 public VmPipeConnector() { 055 this(null); 056 } 057 058 /** 059 * Creates a new instance. 060 */ 061 public VmPipeConnector(Executor executor) { 062 super(new DefaultVmPipeSessionConfig(), executor); 063 idleChecker = new IdleStatusChecker(); 064 // we schedule the idle status checking task in this service exceutor 065 // it will be woke up every seconds 066 executeWorker(idleChecker.getNotifyingTask(), "idleStatusChecker"); 067 } 068 069 public TransportMetadata getTransportMetadata() { 070 return VmPipeSession.METADATA; 071 } 072 073 /** 074 * {@inheritDoc} 075 */ 076 public VmPipeSessionConfig getSessionConfig() { 077 return (VmPipeSessionConfig) sessionConfig; 078 } 079 080 @Override 081 protected ConnectFuture connect0(SocketAddress remoteAddress, SocketAddress localAddress, 082 IoSessionInitializer<? extends ConnectFuture> sessionInitializer) { 083 VmPipe entry = VmPipeAcceptor.boundHandlers.get(remoteAddress); 084 if (entry == null) { 085 return DefaultConnectFuture.newFailedFuture(new IOException("Endpoint unavailable: " + remoteAddress)); 086 } 087 088 DefaultConnectFuture future = new DefaultConnectFuture(); 089 090 // Assign the local address dynamically, 091 VmPipeAddress actualLocalAddress; 092 try { 093 actualLocalAddress = nextLocalAddress(); 094 } catch (IOException e) { 095 return DefaultConnectFuture.newFailedFuture(e); 096 } 097 098 VmPipeSession localSession = new VmPipeSession(this, getListeners(), actualLocalAddress, getHandler(), entry); 099 100 initSession(localSession, future, sessionInitializer); 101 102 // and reclaim the local address when the connection is closed. 103 localSession.getCloseFuture().addListener(LOCAL_ADDRESS_RECLAIMER); 104 105 // initialize connector session 106 try { 107 IoFilterChain filterChain = localSession.getFilterChain(); 108 this.getFilterChainBuilder().buildFilterChain(filterChain); 109 110 // The following sentences don't throw any exceptions. 111 getListeners().fireSessionCreated(localSession); 112 idleChecker.addSession(localSession); 113 } catch (Exception e) { 114 future.setException(e); 115 return future; 116 } 117 118 // initialize acceptor session 119 VmPipeSession remoteSession = localSession.getRemoteSession(); 120 ((VmPipeAcceptor) remoteSession.getService()).doFinishSessionInitialization(remoteSession, null); 121 try { 122 IoFilterChain filterChain = remoteSession.getFilterChain(); 123 entry.getAcceptor().getFilterChainBuilder().buildFilterChain(filterChain); 124 125 // The following sentences don't throw any exceptions. 126 entry.getListeners().fireSessionCreated(remoteSession); 127 idleChecker.addSession(remoteSession); 128 } catch (Exception e) { 129 ExceptionMonitor.getInstance().exceptionCaught(e); 130 remoteSession.close(true); 131 } 132 133 // Start chains, and then allow and messages read/written to be processed. This is to ensure that 134 // sessionOpened gets received before a messageReceived 135 ((VmPipeFilterChain) localSession.getFilterChain()).start(); 136 ((VmPipeFilterChain) remoteSession.getFilterChain()).start(); 137 138 return future; 139 } 140 141 @Override 142 protected void dispose0() throws Exception { 143 // stop the idle checking task 144 idleChecker.getNotifyingTask().cancel(); 145 } 146 147 private static final Set<VmPipeAddress> TAKEN_LOCAL_ADDRESSES = new HashSet<VmPipeAddress>(); 148 149 private static int nextLocalPort = -1; 150 151 private static final IoFutureListener<IoFuture> LOCAL_ADDRESS_RECLAIMER = new LocalAddressReclaimer(); 152 153 private static VmPipeAddress nextLocalAddress() throws IOException { 154 synchronized (TAKEN_LOCAL_ADDRESSES) { 155 if (nextLocalPort >= 0) { 156 nextLocalPort = -1; 157 } 158 for (int i = 0; i < Integer.MAX_VALUE; i++) { 159 VmPipeAddress answer = new VmPipeAddress(nextLocalPort--); 160 if (!TAKEN_LOCAL_ADDRESSES.contains(answer)) { 161 TAKEN_LOCAL_ADDRESSES.add(answer); 162 return answer; 163 } 164 } 165 } 166 167 throw new IOException("Can't assign a local VM pipe port."); 168 } 169 170 private static class LocalAddressReclaimer implements IoFutureListener<IoFuture> { 171 public void operationComplete(IoFuture future) { 172 synchronized (TAKEN_LOCAL_ADDRESSES) { 173 TAKEN_LOCAL_ADDRESSES.remove(future.getSession().getLocalAddress()); 174 } 175 } 176 } 177}