1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.transport.vmpipe;
21
22 import java.io.IOException;
23 import java.net.SocketAddress;
24 import java.util.HashSet;
25 import java.util.Set;
26 import java.util.concurrent.Executor;
27
28 import org.apache.mina.core.filterchain.IoFilterChain;
29 import org.apache.mina.core.future.ConnectFuture;
30 import org.apache.mina.core.future.DefaultConnectFuture;
31 import org.apache.mina.core.future.IoFuture;
32 import org.apache.mina.core.future.IoFutureListener;
33 import org.apache.mina.core.service.AbstractIoConnector;
34 import org.apache.mina.core.service.IoHandler;
35 import org.apache.mina.core.service.TransportMetadata;
36 import org.apache.mina.core.session.IdleStatusChecker;
37 import org.apache.mina.core.session.IoSessionInitializer;
38 import org.apache.mina.util.ExceptionMonitor;
39
40
41
42
43
44
45
46 public final class VmPipeConnector extends AbstractIoConnector {
47
48
49 private IdleStatusChecker idleChecker;
50
51
52
53
54 public VmPipeConnector() {
55 this(null);
56 }
57
58
59
60
61
62
63 public VmPipeConnector(Executor executor) {
64 super(new DefaultVmPipeSessionConfig(), executor);
65 idleChecker = new IdleStatusChecker();
66
67
68 executeWorker(idleChecker.getNotifyingTask(), "idleStatusChecker");
69 }
70
71
72
73
74 public TransportMetadata getTransportMetadata() {
75 return VmPipeSession.METADATA;
76 }
77
78
79
80
81 public VmPipeSessionConfig getSessionConfig() {
82 return (VmPipeSessionConfig) sessionConfig;
83 }
84
85
86
87
88 @Override
89 protected ConnectFuture connect0(SocketAddress remoteAddress, SocketAddress localAddress,
90 IoSessionInitializer<? extends ConnectFuture> sessionInitializer) {
91 VmPipe entry = VmPipeAcceptor.boundHandlers.get(remoteAddress);
92 if (entry == null) {
93 return DefaultConnectFuture.newFailedFuture(new IOException("Endpoint unavailable: " + remoteAddress));
94 }
95
96 DefaultConnectFutureectFuture.html#DefaultConnectFuture">DefaultConnectFuture future = new DefaultConnectFuture();
97
98
99 VmPipeAddress actualLocalAddress;
100 try {
101 actualLocalAddress = nextLocalAddress();
102 } catch (IOException e) {
103 return DefaultConnectFuture.newFailedFuture(e);
104 }
105
106 VmPipeSessioneSession.html#VmPipeSession">VmPipeSession localSession = new VmPipeSession(this, getListeners(), actualLocalAddress, getHandler(), entry);
107
108 initSession(localSession, future, sessionInitializer);
109
110
111 localSession.getCloseFuture().addListener(LOCAL_ADDRESS_RECLAIMER);
112
113
114 try {
115 IoFilterChain filterChain = localSession.getFilterChain();
116 this.getFilterChainBuilder().buildFilterChain(filterChain);
117
118
119 getListeners().fireSessionCreated(localSession);
120 idleChecker.addSession(localSession);
121 } catch (Exception e) {
122 future.setException(e);
123 return future;
124 }
125
126
127 VmPipeSession remoteSession = localSession.getRemoteSession();
128 ((VmPipeAcceptor) remoteSession.getService()).doFinishSessionInitialization(remoteSession, null);
129 try {
130 IoFilterChain filterChain = remoteSession.getFilterChain();
131 entry.getAcceptor().getFilterChainBuilder().buildFilterChain(filterChain);
132
133
134 entry.getListeners().fireSessionCreated(remoteSession);
135 idleChecker.addSession(remoteSession);
136 } catch (Exception e) {
137 ExceptionMonitor.getInstance().exceptionCaught(e);
138 remoteSession.closeNow();
139 }
140
141
142
143 ((VmPipeFilterChain) localSession.getFilterChain()).start();
144 ((VmPipeFilterChain) remoteSession.getFilterChain()).start();
145
146 return future;
147 }
148
149
150
151
152 @Override
153 protected void dispose0() throws Exception {
154
155 idleChecker.getNotifyingTask().cancel();
156 }
157
158 private static final Set<VmPipeAddress> TAKEN_LOCAL_ADDRESSES = new HashSet<VmPipeAddress>();
159
160 private static int nextLocalPort = -1;
161
162 private static final IoFutureListener<IoFuture> LOCAL_ADDRESS_RECLAIMER = new LocalAddressReclaimer();
163
164 private static VmPipeAddress nextLocalAddress() throws IOException {
165 synchronized (TAKEN_LOCAL_ADDRESSES) {
166 if (nextLocalPort >= 0) {
167 nextLocalPort = -1;
168 }
169 for (int i = 0; i < Integer.MAX_VALUE; i++) {
170 VmPipeAddress/VmPipeAddress.html#VmPipeAddress">VmPipeAddress answer = new VmPipeAddress(nextLocalPort--);
171 if (!TAKEN_LOCAL_ADDRESSES.contains(answer)) {
172 TAKEN_LOCAL_ADDRESSES.add(answer);
173 return answer;
174 }
175 }
176 }
177
178 throw new IOException("Can't assign a local VM pipe port.");
179 }
180
181 private static class LocalAddressReclaimer implements IoFutureListener<IoFuture> {
182 public void operationComplete(IoFuture future) {
183 synchronized (TAKEN_LOCAL_ADDRESSES) {
184 TAKEN_LOCAL_ADDRESSES.remove(future.getSession().getLocalAddress());
185 }
186 }
187 }
188 }