View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.transport.vmpipe;
21  
22  import java.io.IOException;
23  import java.net.SocketAddress;
24  import java.util.HashSet;
25  import java.util.Set;
26  import java.util.concurrent.Executor;
27  
28  import org.apache.mina.core.filterchain.IoFilterChain;
29  import org.apache.mina.core.future.ConnectFuture;
30  import org.apache.mina.core.future.DefaultConnectFuture;
31  import org.apache.mina.core.future.IoFuture;
32  import org.apache.mina.core.future.IoFutureListener;
33  import org.apache.mina.core.service.AbstractIoConnector;
34  import org.apache.mina.core.service.IoHandler;
35  import org.apache.mina.core.service.TransportMetadata;
36  import org.apache.mina.core.session.IoSessionInitializer;
37  import org.apache.mina.util.ExceptionMonitor;
38  
39  /**
40   * Connects to {@link IoHandler}s which is bound on the specified
41   * {@link VmPipeAddress}.
42   *
43   * @author The Apache MINA Project (dev@mina.apache.org)
44   * @version $Rev: 678335 $, $Date: 2008-07-21 03:25:08 +0200 (lun, 21 jui 2008) $
45   */
46  public final class VmPipeConnector extends AbstractIoConnector {
47  
48      /**
49       * Creates a new instance.
50       */
51      public VmPipeConnector() {
52          this(null);
53      }
54      
55      /**
56       * Creates a new instance.
57       */
58      public VmPipeConnector(Executor executor) {
59          super(new DefaultVmPipeSessionConfig(), executor);
60      }
61  
62      public TransportMetadata getTransportMetadata() {
63          return VmPipeSession.METADATA;
64      }
65  
66      @Override
67      public VmPipeSessionConfig getSessionConfig() {
68          return (VmPipeSessionConfig) super.getSessionConfig();
69      }
70  
71      @Override
72      protected ConnectFuture connect0(SocketAddress remoteAddress,
73                                        SocketAddress localAddress,
74                                        IoSessionInitializer<? extends ConnectFuture> sessionInitializer) {
75          VmPipe entry = VmPipeAcceptor.boundHandlers.get(remoteAddress);
76          if (entry == null) {
77              return DefaultConnectFuture.newFailedFuture(new IOException(
78                      "Endpoint unavailable: " + remoteAddress));
79          }
80  
81          DefaultConnectFuture future = new DefaultConnectFuture();
82  
83          // Assign the local address dynamically,
84          VmPipeAddress actualLocalAddress;
85          try {
86              actualLocalAddress = nextLocalAddress();
87          } catch (IOException e) {
88              return DefaultConnectFuture.newFailedFuture(e);
89          }
90  
91          VmPipeSession localSession = new VmPipeSession(this,
92                  getListeners(), actualLocalAddress, getHandler(), entry);
93  
94          finishSessionInitialization(localSession, future, sessionInitializer);
95  
96          // and reclaim the local address when the connection is closed.
97          localSession.getCloseFuture().addListener(LOCAL_ADDRESS_RECLAIMER);
98  
99          // initialize connector session
100         try {
101             IoFilterChain filterChain = localSession.getFilterChain();
102             this.getFilterChainBuilder().buildFilterChain(filterChain);
103 
104             // The following sentences don't throw any exceptions.
105             getListeners().fireSessionCreated(localSession);
106             getIdleStatusChecker().addSession(localSession);
107         } catch (Throwable t) {
108             future.setException(t);
109             return future;
110         }
111 
112         // initialize acceptor session
113         VmPipeSession remoteSession = localSession.getRemoteSession();
114         ((VmPipeAcceptor) remoteSession.getService()).doFinishSessionInitialization(remoteSession, null);
115         try {
116             IoFilterChain filterChain = remoteSession.getFilterChain();
117             entry.getAcceptor().getFilterChainBuilder().buildFilterChain(
118                     filterChain);
119 
120             // The following sentences don't throw any exceptions.
121             entry.getListeners().fireSessionCreated(remoteSession);
122             getIdleStatusChecker().addSession(remoteSession);
123         } catch (Throwable t) {
124             ExceptionMonitor.getInstance().exceptionCaught(t);
125             remoteSession.close();
126         }
127 
128         // Start chains, and then allow and messages read/written to be processed. This is to ensure that
129         // sessionOpened gets received before a messageReceived
130         ((VmPipeFilterChain) localSession.getFilterChain()).start();
131         ((VmPipeFilterChain) remoteSession.getFilterChain()).start();
132 
133         return future;
134     }
135 
136     @Override
137     protected IoFuture dispose0() throws Exception {
138         return null;
139     }
140 
141     private static final Set<VmPipeAddress> TAKEN_LOCAL_ADDRESSES = new HashSet<VmPipeAddress>();
142 
143     private static int nextLocalPort = -1;
144 
145     private static final IoFutureListener<IoFuture> LOCAL_ADDRESS_RECLAIMER = new LocalAddressReclaimer();
146 
147     private static VmPipeAddress nextLocalAddress() throws IOException {
148         synchronized (TAKEN_LOCAL_ADDRESSES) {
149             if (nextLocalPort >= 0) {
150                 nextLocalPort = -1;
151             }
152             for (int i = 0; i < Integer.MAX_VALUE; i++) {
153                 VmPipeAddress answer = new VmPipeAddress(nextLocalPort--);
154                 if (!TAKEN_LOCAL_ADDRESSES.contains(answer)) {
155                     TAKEN_LOCAL_ADDRESSES.add(answer);
156                     return answer;
157                 }
158             }
159         }
160 
161         throw new IOException("Can't assign a local VM pipe port.");
162     }
163 
164     private static class LocalAddressReclaimer implements IoFutureListener<IoFuture> {
165         public void operationComplete(IoFuture future) {
166             synchronized (TAKEN_LOCAL_ADDRESSES) {
167                 TAKEN_LOCAL_ADDRESSES.remove(future.getSession()
168                         .getLocalAddress());
169             }
170         }
171     }
172 }