1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.transport.vmpipe;
21
22 import java.io.IOException;
23 import java.net.SocketAddress;
24 import java.util.HashSet;
25 import java.util.Set;
26 import java.util.concurrent.Executor;
27
28 import org.apache.mina.core.filterchain.IoFilterChain;
29 import org.apache.mina.core.future.ConnectFuture;
30 import org.apache.mina.core.future.DefaultConnectFuture;
31 import org.apache.mina.core.future.IoFuture;
32 import org.apache.mina.core.future.IoFutureListener;
33 import org.apache.mina.core.service.AbstractIoConnector;
34 import org.apache.mina.core.service.IoHandler;
35 import org.apache.mina.core.service.TransportMetadata;
36 import org.apache.mina.core.session.IoSessionInitializer;
37 import org.apache.mina.util.ExceptionMonitor;
38
39
40
41
42
43
44
45
46 public final class VmPipeConnector extends AbstractIoConnector {
47
48
49
50
51 public VmPipeConnector() {
52 this(null);
53 }
54
55
56
57
58 public VmPipeConnector(Executor executor) {
59 super(new DefaultVmPipeSessionConfig(), executor);
60 }
61
62 public TransportMetadata getTransportMetadata() {
63 return VmPipeSession.METADATA;
64 }
65
66 @Override
67 public VmPipeSessionConfig getSessionConfig() {
68 return (VmPipeSessionConfig) super.getSessionConfig();
69 }
70
71 @Override
72 protected ConnectFuture connect0(SocketAddress remoteAddress,
73 SocketAddress localAddress,
74 IoSessionInitializer<? extends ConnectFuture> sessionInitializer) {
75 VmPipe entry = VmPipeAcceptor.boundHandlers.get(remoteAddress);
76 if (entry == null) {
77 return DefaultConnectFuture.newFailedFuture(new IOException(
78 "Endpoint unavailable: " + remoteAddress));
79 }
80
81 DefaultConnectFuture future = new DefaultConnectFuture();
82
83
84 VmPipeAddress actualLocalAddress;
85 try {
86 actualLocalAddress = nextLocalAddress();
87 } catch (IOException e) {
88 return DefaultConnectFuture.newFailedFuture(e);
89 }
90
91 VmPipeSession localSession = new VmPipeSession(this,
92 getListeners(), actualLocalAddress, getHandler(), entry);
93
94 finishSessionInitialization(localSession, future, sessionInitializer);
95
96
97 localSession.getCloseFuture().addListener(LOCAL_ADDRESS_RECLAIMER);
98
99
100 try {
101 IoFilterChain filterChain = localSession.getFilterChain();
102 this.getFilterChainBuilder().buildFilterChain(filterChain);
103
104
105 getListeners().fireSessionCreated(localSession);
106 getIdleStatusChecker().addSession(localSession);
107 } catch (Throwable t) {
108 future.setException(t);
109 return future;
110 }
111
112
113 VmPipeSession remoteSession = localSession.getRemoteSession();
114 ((VmPipeAcceptor) remoteSession.getService()).doFinishSessionInitialization(remoteSession, null);
115 try {
116 IoFilterChain filterChain = remoteSession.getFilterChain();
117 entry.getAcceptor().getFilterChainBuilder().buildFilterChain(
118 filterChain);
119
120
121 entry.getListeners().fireSessionCreated(remoteSession);
122 getIdleStatusChecker().addSession(remoteSession);
123 } catch (Throwable t) {
124 ExceptionMonitor.getInstance().exceptionCaught(t);
125 remoteSession.close();
126 }
127
128
129
130 ((VmPipeFilterChain) localSession.getFilterChain()).start();
131 ((VmPipeFilterChain) remoteSession.getFilterChain()).start();
132
133 return future;
134 }
135
136 @Override
137 protected IoFuture dispose0() throws Exception {
138 return null;
139 }
140
141 private static final Set<VmPipeAddress> TAKEN_LOCAL_ADDRESSES = new HashSet<VmPipeAddress>();
142
143 private static int nextLocalPort = -1;
144
145 private static final IoFutureListener<IoFuture> LOCAL_ADDRESS_RECLAIMER = new LocalAddressReclaimer();
146
147 private static VmPipeAddress nextLocalAddress() throws IOException {
148 synchronized (TAKEN_LOCAL_ADDRESSES) {
149 if (nextLocalPort >= 0) {
150 nextLocalPort = -1;
151 }
152 for (int i = 0; i < Integer.MAX_VALUE; i++) {
153 VmPipeAddress answer = new VmPipeAddress(nextLocalPort--);
154 if (!TAKEN_LOCAL_ADDRESSES.contains(answer)) {
155 TAKEN_LOCAL_ADDRESSES.add(answer);
156 return answer;
157 }
158 }
159 }
160
161 throw new IOException("Can't assign a local VM pipe port.");
162 }
163
164 private static class LocalAddressReclaimer implements IoFutureListener<IoFuture> {
165 public void operationComplete(IoFuture future) {
166 synchronized (TAKEN_LOCAL_ADDRESSES) {
167 TAKEN_LOCAL_ADDRESSES.remove(future.getSession()
168 .getLocalAddress());
169 }
170 }
171 }
172 }