1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.transport.vmpipe;
21
22 import java.io.IOException;
23 import java.net.SocketAddress;
24 import java.util.HashSet;
25 import java.util.Set;
26 import java.util.concurrent.Executor;
27
28 import org.apache.mina.core.filterchain.IoFilterChain;
29 import org.apache.mina.core.future.ConnectFuture;
30 import org.apache.mina.core.future.DefaultConnectFuture;
31 import org.apache.mina.core.future.IoFuture;
32 import org.apache.mina.core.future.IoFutureListener;
33 import org.apache.mina.core.service.AbstractIoConnector;
34 import org.apache.mina.core.service.IoHandler;
35 import org.apache.mina.core.service.TransportMetadata;
36 import org.apache.mina.core.session.IdleStatusChecker;
37 import org.apache.mina.core.session.IoSessionInitializer;
38 import org.apache.mina.util.ExceptionMonitor;
39
40
41
42
43
44
45
46 public final class VmPipeConnector extends AbstractIoConnector {
47
48
49 private IdleStatusChecker idleChecker;
50
51
52
53
54 public VmPipeConnector() {
55 this(null);
56 }
57
58
59
60
61 public VmPipeConnector(Executor executor) {
62 super(new DefaultVmPipeSessionConfig(), executor);
63 idleChecker = new IdleStatusChecker();
64
65
66 executeWorker(idleChecker.getNotifyingTask(), "idleStatusChecker");
67 }
68
69 public TransportMetadata getTransportMetadata() {
70 return VmPipeSession.METADATA;
71 }
72
73 @Override
74 public VmPipeSessionConfig getSessionConfig() {
75 return (VmPipeSessionConfig) super.getSessionConfig();
76 }
77
78 @Override
79 protected ConnectFuture connect0(SocketAddress remoteAddress,
80 SocketAddress localAddress,
81 IoSessionInitializer<? extends ConnectFuture> sessionInitializer) {
82 VmPipe entry = VmPipeAcceptor.boundHandlers.get(remoteAddress);
83 if (entry == null) {
84 return DefaultConnectFuture.newFailedFuture(new IOException(
85 "Endpoint unavailable: " + remoteAddress));
86 }
87
88 DefaultConnectFuture future = new DefaultConnectFuture();
89
90
91 VmPipeAddress actualLocalAddress;
92 try {
93 actualLocalAddress = nextLocalAddress();
94 } catch (IOException e) {
95 return DefaultConnectFuture.newFailedFuture(e);
96 }
97
98 VmPipeSession localSession = new VmPipeSession(this,
99 getListeners(), actualLocalAddress, getHandler(), entry);
100
101 initSession(localSession, future, sessionInitializer);
102
103
104 localSession.getCloseFuture().addListener(LOCAL_ADDRESS_RECLAIMER);
105
106
107 try {
108 IoFilterChain filterChain = localSession.getFilterChain();
109 this.getFilterChainBuilder().buildFilterChain(filterChain);
110
111
112 getListeners().fireSessionCreated(localSession);
113 idleChecker.addSession(localSession);
114 } catch (Throwable t) {
115 future.setException(t);
116 return future;
117 }
118
119
120 VmPipeSession remoteSession = localSession.getRemoteSession();
121 ((VmPipeAcceptor) remoteSession.getService()).doFinishSessionInitialization(remoteSession, null);
122 try {
123 IoFilterChain filterChain = remoteSession.getFilterChain();
124 entry.getAcceptor().getFilterChainBuilder().buildFilterChain(
125 filterChain);
126
127
128 entry.getListeners().fireSessionCreated(remoteSession);
129 idleChecker.addSession(remoteSession);
130 } catch (Throwable t) {
131 ExceptionMonitor.getInstance().exceptionCaught(t);
132 remoteSession.close(true);
133 }
134
135
136
137 ((VmPipeFilterChain) localSession.getFilterChain()).start();
138 ((VmPipeFilterChain) remoteSession.getFilterChain()).start();
139
140 return future;
141 }
142
143 @Override
144 protected IoFuture dispose0() throws Exception {
145
146 idleChecker.getNotifyingTask().cancel();
147 return null;
148 }
149
150 private static final Set<VmPipeAddress> TAKEN_LOCAL_ADDRESSES = new HashSet<VmPipeAddress>();
151
152 private static int nextLocalPort = -1;
153
154 private static final IoFutureListener<IoFuture> LOCAL_ADDRESS_RECLAIMER = new LocalAddressReclaimer();
155
156 private static VmPipeAddress nextLocalAddress() throws IOException {
157 synchronized (TAKEN_LOCAL_ADDRESSES) {
158 if (nextLocalPort >= 0) {
159 nextLocalPort = -1;
160 }
161 for (int i = 0; i < Integer.MAX_VALUE; i++) {
162 VmPipeAddress answer = new VmPipeAddress(nextLocalPort--);
163 if (!TAKEN_LOCAL_ADDRESSES.contains(answer)) {
164 TAKEN_LOCAL_ADDRESSES.add(answer);
165 return answer;
166 }
167 }
168 }
169
170 throw new IOException("Can't assign a local VM pipe port.");
171 }
172
173 private static class LocalAddressReclaimer implements IoFutureListener<IoFuture> {
174 public void operationComplete(IoFuture future) {
175 synchronized (TAKEN_LOCAL_ADDRESSES) {
176 TAKEN_LOCAL_ADDRESSES.remove(future.getSession()
177 .getLocalAddress());
178 }
179 }
180 }
181 }