View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.transport.socket.apr;
21  
22  import java.io.IOException;
23  import java.net.InetSocketAddress;
24  import java.net.SocketAddress;
25  import java.nio.ByteBuffer;
26  import java.util.HashMap;
27  import java.util.HashSet;
28  import java.util.Iterator;
29  import java.util.List;
30  import java.util.Map;
31  import java.util.Set;
32  import java.util.concurrent.Executor;
33  
34  import org.apache.mina.core.RuntimeIoException;
35  import org.apache.mina.core.polling.AbstractPollingIoConnector;
36  import org.apache.mina.core.service.IoProcessor;
37  import org.apache.mina.core.service.TransportMetadata;
38  import org.apache.mina.transport.socket.DefaultSocketSessionConfig;
39  import org.apache.mina.transport.socket.SocketConnector;
40  import org.apache.mina.transport.socket.SocketSessionConfig;
41  import org.apache.mina.util.CircularQueue;
42  import org.apache.tomcat.jni.Address;
43  import org.apache.tomcat.jni.Poll;
44  import org.apache.tomcat.jni.Pool;
45  import org.apache.tomcat.jni.Socket;
46  import org.apache.tomcat.jni.Status;
47  
48  /**
49   * {@link IoConnector} for ABR based socket transport (TCP/IP).
50   * 
51   * @author The Apache MINA Project (dev@mina.apache.org)
52   * @version $Rev: 671827 $, $Date: 2008-06-26 10:49:48 +0200 (jeu, 26 jun 2008) $
53   */
54  public final class AprSocketConnector extends AbstractPollingIoConnector<AprSession, Long> implements SocketConnector {
55  
56      private static final int POLLSET_SIZE = 1024;
57  
58      private final Map<Long, ConnectionRequest> requests =
59          new HashMap<Long, ConnectionRequest>(POLLSET_SIZE);
60  
61      private final Object wakeupLock = new Object();
62      private volatile long wakeupSocket;
63      private volatile boolean toBeWakenUp;
64  
65      private volatile long pool;
66      private volatile long pollset; // socket poller
67      private final long[] polledSockets = new long[POLLSET_SIZE << 1];
68      private final List<Long> polledHandles = new CircularQueue<Long>(POLLSET_SIZE);
69      private final Set<Long> failedHandles = new HashSet<Long>(POLLSET_SIZE);
70      private volatile ByteBuffer dummyBuffer;
71  
72      /**
73       * TODO : document superclass
74       */
75      public AprSocketConnector() {
76          super(new DefaultSocketSessionConfig(), AprIoProcessor.class);
77          ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
78      }
79  
80      /**
81       * TODO : document superclass
82       */
83      public AprSocketConnector(int processorCount) {
84          super(new DefaultSocketSessionConfig(), AprIoProcessor.class, processorCount);
85          ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
86      }
87  
88      /**
89       * TODO : document superclass
90       */
91      public AprSocketConnector(IoProcessor<AprSession> processor) {
92          super(new DefaultSocketSessionConfig(), processor);
93          ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
94      }
95  
96      /**
97       * TODO : document superclass
98       */
99      public AprSocketConnector(Executor executor, IoProcessor<AprSession> processor) {
100         super(new DefaultSocketSessionConfig(), executor, processor);
101         ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
102     }
103 
104     /**
105      * {@inheritDoc}
106      */
107     @Override
108     protected void init() throws Exception {
109         // initialize a memory pool for APR functions
110         pool = Pool.create(AprLibrary.getInstance().getRootPool());
111 
112         wakeupSocket = Socket.create(
113                 Socket.APR_INET, Socket.SOCK_DGRAM, Socket.APR_PROTO_UDP, pool);
114 
115         dummyBuffer = Pool.alloc(pool, 1);
116 
117         pollset = Poll.create(
118                         POLLSET_SIZE,
119                         pool,
120                         Poll.APR_POLLSET_THREADSAFE,
121                         Long.MAX_VALUE);
122 
123         if (pollset <= 0) {
124             pollset = Poll.create(
125                     62,
126                     pool,
127                     Poll.APR_POLLSET_THREADSAFE,
128                     Long.MAX_VALUE);
129         }
130 
131         if (pollset <= 0) {
132             if (Status.APR_STATUS_IS_ENOTIMPL(- (int) pollset)) {
133                 throw new RuntimeIoException(
134                         "Thread-safe pollset is not supported in this platform.");
135             }
136         }
137     }
138 
139     /**
140      * {@inheritDoc}
141      */
142     @Override
143     protected void destroy() throws Exception {
144         if (wakeupSocket > 0) {
145             Socket.close(wakeupSocket);
146         }
147         if (pollset > 0) {
148             Poll.destroy(pollset);
149         }
150         if (pool > 0) {
151             Pool.destroy(pool);
152         }
153     }
154 
155     /**
156      * {@inheritDoc}
157      */
158     @Override
159     protected Iterator<Long> allHandles() {
160         return polledHandles.iterator();
161     }
162 
163     /**
164      * {@inheritDoc}
165      */
166     @Override
167     protected boolean connect(Long handle, SocketAddress remoteAddress)
168             throws Exception {
169         InetSocketAddress ra = (InetSocketAddress) remoteAddress;
170         long sa;
171         if (ra != null) {
172             if (ra.getAddress() == null) {
173                 sa = Address.info(Address.APR_ANYADDR, Socket.APR_INET, ra.getPort(), 0, pool);
174             } else {
175                 sa = Address.info(ra.getAddress().getHostAddress(), Socket.APR_INET, ra.getPort(), 0, pool);
176             }
177         } else {
178             sa = Address.info(Address.APR_ANYADDR, Socket.APR_INET, 0, 0, pool);
179         }
180 
181         int rv = Socket.connect(handle, sa);
182         if (rv == Status.APR_SUCCESS) {
183             return true;
184         }
185 
186         if (Status.APR_STATUS_IS_EINPROGRESS(rv)) {
187             return false;
188         }
189 
190         throwException(rv);
191         throw new InternalError(); // This sentence will never be executed.
192     }
193 
194     /**
195      * {@inheritDoc}
196      */
197     @Override
198     protected ConnectionRequest connectionRequest(Long handle) {
199         return requests.get(handle);
200     }
201 
202     /**
203      * {@inheritDoc}
204      */
205     @Override
206     protected void close(Long handle) throws Exception {
207         finishConnect(handle);
208         int rv = Socket.close(handle);
209         if (rv != Status.APR_SUCCESS) {
210             throwException(rv);
211         }
212     }
213     
214     /**
215      * {@inheritDoc}
216      */
217     @Override
218     protected boolean finishConnect(Long handle) throws Exception {
219         Poll.remove(pollset, handle);
220         requests.remove(handle);
221         if (failedHandles.remove(handle)) {
222             int rv = Socket.recvb(handle, dummyBuffer, 0, 1);
223             throwException(rv);
224             throw new InternalError("Shouldn't reach here.");
225         }
226         return true;
227     }
228 
229     /**
230      * {@inheritDoc}
231      */
232     @Override
233     protected Long newHandle(SocketAddress localAddress) throws Exception {
234         long handle = Socket.create(
235                 Socket.APR_INET, Socket.SOCK_STREAM, Socket.APR_PROTO_TCP, pool);
236         boolean success = false;
237         try {
238             int result = Socket.optSet(handle, Socket.APR_SO_NONBLOCK, 1);
239             if (result != Status.APR_SUCCESS) {
240                 throwException(result);
241             }
242             result = Socket.timeoutSet(handle, 0);
243             if (result != Status.APR_SUCCESS) {
244                 throwException(result);
245             }
246 
247             if (localAddress != null) {
248                 InetSocketAddress la = (InetSocketAddress) localAddress;
249                 long sa;
250 
251                 if (la.getAddress() == null) {
252                     sa = Address.info(Address.APR_ANYADDR, Socket.APR_INET, la.getPort(), 0, pool);
253                 } else {
254                     sa = Address.info(la.getAddress().getHostAddress(), Socket.APR_INET, la.getPort(), 0, pool);
255                 }
256 
257                 result = Socket.bind(handle, sa);
258                 if (result != Status.APR_SUCCESS) {
259                     throwException(result);
260                 }
261             }
262 
263             success = true;
264             return handle;
265         } finally {
266             if (!success) {
267                 int rv = Socket.close(handle);
268                 if (rv != Status.APR_SUCCESS) {
269                     throwException(rv);
270                 }
271             }
272         }
273     }
274 
275     /**
276      * {@inheritDoc}
277      */
278     @Override
279     protected AprSession newSession(IoProcessor<AprSession> processor,
280             Long handle) throws Exception {
281         return new AprSocketSession(this, processor, handle);
282     }
283 
284     /**
285      * {@inheritDoc}
286      */
287     @Override
288     protected void register(Long handle, ConnectionRequest request)
289             throws Exception {
290         int rv = Poll.add(pollset, handle, Poll.APR_POLLOUT);
291         if (rv != Status.APR_SUCCESS) {
292             throwException(rv);
293         }
294 
295         requests.put(handle, request);
296     }
297 
298     /**
299      * {@inheritDoc}
300      */
301     @Override
302     protected boolean select(int timeout) throws Exception {
303         int rv = Poll.poll(pollset, timeout * 1000, polledSockets, false);
304         if (rv <= 0) {
305             if (rv != -120001) {
306                 throwException(rv);
307             }
308 
309             rv = Poll.maintain(pollset, polledSockets, true);
310             if (rv > 0) {
311                 for (int i = 0; i < rv; i ++) {
312                     Poll.add(pollset, polledSockets[i], Poll.APR_POLLOUT);
313                 }
314             } else if (rv < 0) {
315                 throwException(rv);
316             }
317 
318             return false;
319         } else {
320             rv <<= 1;
321             if (!polledHandles.isEmpty()) {
322                 polledHandles.clear();
323             }
324 
325             for (int i = 0; i < rv; i ++) {
326                 long flag = polledSockets[i];
327                 long socket = polledSockets[++i];
328                 if (socket == wakeupSocket) {
329                     synchronized (wakeupLock) {
330                         Poll.remove(pollset, wakeupSocket);
331                         toBeWakenUp = false;
332                     }
333                     continue;
334                 }
335                 polledHandles.add(socket);
336                 if ((flag & Poll.APR_POLLOUT) == 0) {
337                     failedHandles.add(socket);
338                 }
339             }
340             return !polledHandles.isEmpty();
341         }
342     }
343 
344     /**
345      * {@inheritDoc}
346      */
347     @Override
348     protected Iterator<Long> selectedHandles() {
349         return polledHandles.iterator();
350     }
351     
352     /**
353      * {@inheritDoc}
354      */
355     @Override
356     protected void wakeup() {
357         if (toBeWakenUp) {
358             return;
359         }
360 
361         // Add a dummy socket to the pollset.
362         synchronized (wakeupLock) {
363             toBeWakenUp = true;
364             Poll.add(pollset, wakeupSocket, Poll.APR_POLLOUT);
365         }
366     }
367 
368     /**
369      * {@inheritDoc}
370      */
371     public TransportMetadata getTransportMetadata() {
372         return AprSocketSession.METADATA;
373     }
374 
375     /**
376      * {@inheritDoc}
377      */
378     @Override
379     public SocketSessionConfig getSessionConfig() {
380         return (SocketSessionConfig) super.getSessionConfig();
381     }
382 
383     /**
384      * {@inheritDoc}
385      */
386     @Override
387     public InetSocketAddress getDefaultRemoteAddress() {
388         return (InetSocketAddress) super.getDefaultRemoteAddress();
389     }
390 
391     /**
392      * {@inheritDoc}
393      */
394     public void setDefaultRemoteAddress(InetSocketAddress defaultRemoteAddress) {
395         super.setDefaultRemoteAddress(defaultRemoteAddress);
396     }
397 
398     /**
399      * transform an APR error number in a more fancy exception
400      * @param code APR error code
401      * @throws IOException the produced exception for the given APR error number
402      */
403     private void throwException(int code) throws IOException {
404         throw new IOException(
405                 org.apache.tomcat.jni.Error.strerror(-code) +
406                 " (code: " + code + ")");
407     }
408 }