View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.transport.socket.apr;
21  
22  import java.io.IOException;
23  import java.net.InetSocketAddress;
24  import java.net.SocketAddress;
25  import java.nio.ByteBuffer;
26  import java.util.HashMap;
27  import java.util.HashSet;
28  import java.util.Iterator;
29  import java.util.List;
30  import java.util.Map;
31  import java.util.Set;
32  import java.util.concurrent.Executor;
33  
34  import org.apache.mina.core.RuntimeIoException;
35  import org.apache.mina.core.polling.AbstractPollingIoConnector;
36  import org.apache.mina.core.service.IoConnector;
37  import org.apache.mina.core.service.IoProcessor;
38  import org.apache.mina.core.service.IoService;
39  import org.apache.mina.core.service.SimpleIoProcessorPool;
40  import org.apache.mina.core.service.TransportMetadata;
41  import org.apache.mina.transport.socket.DefaultSocketSessionConfig;
42  import org.apache.mina.transport.socket.SocketConnector;
43  import org.apache.mina.transport.socket.SocketSessionConfig;
44  import org.apache.mina.util.CircularQueue;
45  import org.apache.tomcat.jni.Address;
46  import org.apache.tomcat.jni.Poll;
47  import org.apache.tomcat.jni.Pool;
48  import org.apache.tomcat.jni.Socket;
49  import org.apache.tomcat.jni.Status;
50  
51  /**
52   * {@link IoConnector} for APR based socket transport (TCP/IP).
53   * 
54   * @author The Apache MINA Project (dev@mina.apache.org)
55   */
56  public final class AprSocketConnector extends AbstractPollingIoConnector<AprSession, Long> implements SocketConnector {
57  
58      /** 
59       * This constant is deduced from the APR code. It is used when the timeout
60       * has expired while doing a poll() operation.
61       */ 
62      private static final int APR_TIMEUP_ERROR = -120001;
63  
64      private static final int POLLSET_SIZE = 1024;
65  
66      private final Map<Long, ConnectionRequest> requests =
67          new HashMap<Long, ConnectionRequest>(POLLSET_SIZE);
68  
69      private final Object wakeupLock = new Object();
70      private volatile long wakeupSocket;
71      private volatile boolean toBeWakenUp;
72  
73      private volatile long pool;
74      private volatile long pollset; // socket poller
75      private final long[] polledSockets = new long[POLLSET_SIZE << 1];
76      private final List<Long> polledHandles = new CircularQueue<Long>(POLLSET_SIZE);
77      private final Set<Long> failedHandles = new HashSet<Long>(POLLSET_SIZE);
78      private volatile ByteBuffer dummyBuffer;
79  
80      /**
81       * Create an {@link AprSocketConnector} with default configuration (multiple thread model).
82       */
83      public AprSocketConnector() {
84          super(new DefaultSocketSessionConfig(), AprIoProcessor.class);
85          ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
86      }
87  
88      /**
89       * Constructor for {@link AprSocketConnector} with default configuration, and 
90       * given number of {@link AprIoProcessor} for multithreading I/O operations
91       * @param processorCount the number of processor to create and place in a
92       * {@link SimpleIoProcessorPool} 
93       */
94      public AprSocketConnector(int processorCount) {
95          super(new DefaultSocketSessionConfig(), AprIoProcessor.class, processorCount);
96          ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
97      }
98  
99      /**
100      *  Constructor for {@link AprSocketConnector} with default configuration but a
101      *  specific {@link IoProcessor}, useful for sharing the same processor over multiple
102      *  {@link IoService} of the same type.
103      * @param processor the processor to use for managing I/O events
104      */
105     public AprSocketConnector(IoProcessor<AprSession> processor) {
106         super(new DefaultSocketSessionConfig(), processor);
107         ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
108     }
109 
110     /**
111      *  Constructor for {@link AprSocketConnector} with a given {@link Executor} for handling 
112      *  connection events and a given {@link IoProcessor} for handling I/O events, useful for sharing 
113      *  the same processor and executor over multiple {@link IoService} of the same type.
114      * @param executor the executor for connection
115      * @param processor the processor for I/O operations
116      */
117     public AprSocketConnector(Executor executor, IoProcessor<AprSession> processor) {
118         super(new DefaultSocketSessionConfig(), executor, processor);
119         ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
120     }
121 
122     /**
123      * {@inheritDoc}
124      */
125     @Override
126     protected void init() throws Exception {
127         // initialize a memory pool for APR functions
128         pool = Pool.create(AprLibrary.getInstance().getRootPool());
129 
130         wakeupSocket = Socket.create(
131                 Socket.APR_INET, Socket.SOCK_DGRAM, Socket.APR_PROTO_UDP, pool);
132 
133         dummyBuffer = Pool.alloc(pool, 1);
134 
135         pollset = Poll.create(
136                         POLLSET_SIZE,
137                         pool,
138                         Poll.APR_POLLSET_THREADSAFE,
139                         Long.MAX_VALUE);
140 
141         if (pollset <= 0) {
142             pollset = Poll.create(
143                     62,
144                     pool,
145                     Poll.APR_POLLSET_THREADSAFE,
146                     Long.MAX_VALUE);
147         }
148 
149         if (pollset <= 0) {
150             if (Status.APR_STATUS_IS_ENOTIMPL(- (int) pollset)) {
151                 throw new RuntimeIoException(
152                         "Thread-safe pollset is not supported in this platform.");
153             }
154         }
155     }
156 
157     /**
158      * {@inheritDoc}
159      */
160     @Override
161     protected void destroy() throws Exception {
162         if (wakeupSocket > 0) {
163             Socket.close(wakeupSocket);
164         }
165         if (pollset > 0) {
166             Poll.destroy(pollset);
167         }
168         if (pool > 0) {
169             Pool.destroy(pool);
170         }
171     }
172 
173     /**
174      * {@inheritDoc}
175      */
176     @Override
177     protected Iterator<Long> allHandles() {
178         return polledHandles.iterator();
179     }
180 
181     /**
182      * {@inheritDoc}
183      */
184     @Override
185     protected boolean connect(Long handle, SocketAddress remoteAddress)
186             throws Exception {
187         InetSocketAddress ra = (InetSocketAddress) remoteAddress;
188         long sa;
189         if (ra != null) {
190             if (ra.getAddress() == null) {
191                 sa = Address.info(Address.APR_ANYADDR, Socket.APR_INET, ra.getPort(), 0, pool);
192             } else {
193                 sa = Address.info(ra.getAddress().getHostAddress(), Socket.APR_INET, ra.getPort(), 0, pool);
194             }
195         } else {
196             sa = Address.info(Address.APR_ANYADDR, Socket.APR_INET, 0, 0, pool);
197         }
198 
199         int rv = Socket.connect(handle, sa);
200         if (rv == Status.APR_SUCCESS) {
201             return true;
202         }
203 
204         if (Status.APR_STATUS_IS_EINPROGRESS(rv)) {
205             return false;
206         }
207 
208         throwException(rv);
209         throw new InternalError(); // This sentence will never be executed.
210     }
211 
212     /**
213      * {@inheritDoc}
214      */
215     @Override
216     protected ConnectionRequest getConnectionRequest(Long handle) {
217         return requests.get(handle);
218     }
219 
220     /**
221      * {@inheritDoc}
222      */
223     @Override
224     protected void close(Long handle) throws Exception {
225         finishConnect(handle);
226         int rv = Socket.close(handle);
227         if (rv != Status.APR_SUCCESS) {
228             throwException(rv);
229         }
230     }
231     
232     /**
233      * {@inheritDoc}
234      */
235     @Override
236     protected boolean finishConnect(Long handle) throws Exception {
237         Poll.remove(pollset, handle);
238         requests.remove(handle);
239         if (failedHandles.remove(handle)) {
240             int rv = Socket.recvb(handle, dummyBuffer, 0, 1);
241             throwException(rv);
242             throw new InternalError("Shouldn't reach here.");
243         }
244         return true;
245     }
246 
247     /**
248      * {@inheritDoc}
249      */
250     @Override
251     protected Long newHandle(SocketAddress localAddress) throws Exception {
252         long handle = Socket.create(
253                 Socket.APR_INET, Socket.SOCK_STREAM, Socket.APR_PROTO_TCP, pool);
254         boolean success = false;
255         try {
256             int result = Socket.optSet(handle, Socket.APR_SO_NONBLOCK, 1);
257             if (result != Status.APR_SUCCESS) {
258                 throwException(result);
259             }
260             result = Socket.timeoutSet(handle, 0);
261             if (result != Status.APR_SUCCESS) {
262                 throwException(result);
263             }
264 
265             if (localAddress != null) {
266                 InetSocketAddress la = (InetSocketAddress) localAddress;
267                 long sa;
268 
269                 if (la.getAddress() == null) {
270                     sa = Address.info(Address.APR_ANYADDR, Socket.APR_INET, la.getPort(), 0, pool);
271                 } else {
272                     sa = Address.info(la.getAddress().getHostAddress(), Socket.APR_INET, la.getPort(), 0, pool);
273                 }
274 
275                 result = Socket.bind(handle, sa);
276                 if (result != Status.APR_SUCCESS) {
277                     throwException(result);
278                 }
279             }
280 
281             success = true;
282             return handle;
283         } finally {
284             if (!success) {
285                 int rv = Socket.close(handle);
286                 if (rv != Status.APR_SUCCESS) {
287                     throwException(rv);
288                 }
289             }
290         }
291     }
292 
293     /**
294      * {@inheritDoc}
295      */
296     @Override
297     protected AprSession newSession(IoProcessor<AprSession> processor,
298             Long handle) throws Exception {
299         return new AprSocketSession(this, processor, handle);
300     }
301 
302     /**
303      * {@inheritDoc}
304      */
305     @Override
306     protected void register(Long handle, ConnectionRequest request)
307             throws Exception {
308         int rv = Poll.add(pollset, handle, Poll.APR_POLLOUT);
309         if (rv != Status.APR_SUCCESS) {
310             throwException(rv);
311         }
312 
313         requests.put(handle, request);
314     }
315 
316     /**
317      * {@inheritDoc}
318      */
319     @Override
320     protected int select(int timeout) throws Exception {
321         int rv = Poll.poll(pollset, timeout * 1000, polledSockets, false);
322         if (rv <= 0) {
323             if (rv != APR_TIMEUP_ERROR) {
324                 throwException(rv);
325             }
326 
327             rv = Poll.maintain(pollset, polledSockets, true);
328             if (rv > 0) {
329                 for (int i = 0; i < rv; i ++) {
330                     Poll.add(pollset, polledSockets[i], Poll.APR_POLLOUT);
331                 }
332             } else if (rv < 0) {
333                 throwException(rv);
334             }
335 
336             return 0;
337         } else {
338             rv <<= 1;
339             if (!polledHandles.isEmpty()) {
340                 polledHandles.clear();
341             }
342 
343             for (int i = 0; i < rv; i ++) {
344                 long flag = polledSockets[i];
345                 long socket = polledSockets[++i];
346                 if (socket == wakeupSocket) {
347                     synchronized (wakeupLock) {
348                         Poll.remove(pollset, wakeupSocket);
349                         toBeWakenUp = false;
350                     }
351                     continue;
352                 }
353                 polledHandles.add(socket);
354                 if ((flag & Poll.APR_POLLOUT) == 0) {
355                     failedHandles.add(socket);
356                 }
357             }
358             return polledHandles.size();
359         }
360     }
361 
362     /**
363      * {@inheritDoc}
364      */
365     @Override
366     protected Iterator<Long> selectedHandles() {
367         return polledHandles.iterator();
368     }
369     
370     /**
371      * {@inheritDoc}
372      */
373     @Override
374     protected void wakeup() {
375         if (toBeWakenUp) {
376             return;
377         }
378 
379         // Add a dummy socket to the pollset.
380         synchronized (wakeupLock) {
381             toBeWakenUp = true;
382             Poll.add(pollset, wakeupSocket, Poll.APR_POLLOUT);
383         }
384     }
385 
386     /**
387      * {@inheritDoc}
388      */
389     public TransportMetadata getTransportMetadata() {
390         return AprSocketSession.METADATA;
391     }
392 
393     /**
394      * {@inheritDoc}
395      */
396     @Override
397     public SocketSessionConfig getSessionConfig() {
398         return (SocketSessionConfig) super.getSessionConfig();
399     }
400 
401     /**
402      * {@inheritDoc}
403      */
404     @Override
405     public InetSocketAddress getDefaultRemoteAddress() {
406         return (InetSocketAddress) super.getDefaultRemoteAddress();
407     }
408 
409     /**
410      * {@inheritDoc}
411      */
412     public void setDefaultRemoteAddress(InetSocketAddress defaultRemoteAddress) {
413         super.setDefaultRemoteAddress(defaultRemoteAddress);
414     }
415 
416     /**
417      * transform an APR error number in a more fancy exception
418      * @param code APR error code
419      * @throws IOException the produced exception for the given APR error number
420      */
421     private void throwException(int code) throws IOException {
422         throw new IOException(
423                 org.apache.tomcat.jni.Error.strerror(-code) +
424                 " (code: " + code + ")");
425     }
426 }