View Javadoc
1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.transport.socket.apr;
21  
22  import java.io.IOException;
23  import java.net.InetSocketAddress;
24  import java.net.SocketAddress;
25  import java.nio.ByteBuffer;
26  import java.util.HashMap;
27  import java.util.HashSet;
28  import java.util.Iterator;
29  import java.util.Map;
30  import java.util.Queue;
31  import java.util.Set;
32  import java.util.concurrent.ConcurrentLinkedQueue;
33  import java.util.concurrent.Executor;
34  
35  import org.apache.mina.core.RuntimeIoException;
36  import org.apache.mina.core.polling.AbstractPollingIoConnector;
37  import org.apache.mina.core.service.IoConnector;
38  import org.apache.mina.core.service.IoProcessor;
39  import org.apache.mina.core.service.IoService;
40  import org.apache.mina.core.service.SimpleIoProcessorPool;
41  import org.apache.mina.core.service.TransportMetadata;
42  import org.apache.mina.transport.socket.DefaultSocketSessionConfig;
43  import org.apache.mina.transport.socket.SocketConnector;
44  import org.apache.mina.transport.socket.SocketSessionConfig;
45  import org.apache.tomcat.jni.Address;
46  import org.apache.tomcat.jni.Poll;
47  import org.apache.tomcat.jni.Pool;
48  import org.apache.tomcat.jni.Socket;
49  import org.apache.tomcat.jni.Status;
50  
51  /**
52   * {@link IoConnector} for APR based socket transport (TCP/IP).
53   * 
54   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
55   */
56  public final class AprSocketConnector extends AbstractPollingIoConnector<AprSession, Long> implements SocketConnector {
57  
58      /** 
59       * This constant is deduced from the APR code. It is used when the timeout
60       * has expired while doing a poll() operation.
61       */
62      private static final int APR_TIMEUP_ERROR = -120001;
63  
64      private static final int POLLSET_SIZE = 1024;
65  
66      private final Map<Long, ConnectionRequest> requests = new HashMap<Long, ConnectionRequest>(POLLSET_SIZE);
67  
68      private final Object wakeupLock = new Object();
69  
70      private volatile long wakeupSocket;
71  
72      private volatile boolean toBeWakenUp;
73  
74      private volatile long pool;
75  
76      private volatile long pollset; // socket poller
77  
78      private final long[] polledSockets = new long[POLLSET_SIZE << 1];
79  
80      private final Queue<Long> polledHandles = new ConcurrentLinkedQueue<Long>();
81  
82      private final Set<Long> failedHandles = new HashSet<Long>(POLLSET_SIZE);
83  
84      private volatile ByteBuffer dummyBuffer;
85  
86      /**
87       * Create an {@link AprSocketConnector} with default configuration (multiple thread model).
88       */
89      public AprSocketConnector() {
90          super(new DefaultSocketSessionConfig(), AprIoProcessor.class);
91          ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
92      }
93  
94      /**
95       * Constructor for {@link AprSocketConnector} with default configuration, and 
96       * given number of {@link AprIoProcessor} for multithreading I/O operations
97       * @param processorCount the number of processor to create and place in a
98       * {@link SimpleIoProcessorPool} 
99       */
100     public AprSocketConnector(int processorCount) {
101         super(new DefaultSocketSessionConfig(), AprIoProcessor.class, processorCount);
102         ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
103     }
104 
105     /**
106      *  Constructor for {@link AprSocketConnector} with default configuration but a
107      *  specific {@link IoProcessor}, useful for sharing the same processor over multiple
108      *  {@link IoService} of the same type.
109      * @param processor the processor to use for managing I/O events
110      */
111     public AprSocketConnector(IoProcessor<AprSession> processor) {
112         super(new DefaultSocketSessionConfig(), processor);
113         ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
114     }
115 
116     /**
117      *  Constructor for {@link AprSocketConnector} with a given {@link Executor} for handling 
118      *  connection events and a given {@link IoProcessor} for handling I/O events, useful for sharing 
119      *  the same processor and executor over multiple {@link IoService} of the same type.
120      * @param executor the executor for connection
121      * @param processor the processor for I/O operations
122      */
123     public AprSocketConnector(Executor executor, IoProcessor<AprSession> processor) {
124         super(new DefaultSocketSessionConfig(), executor, processor);
125         ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
126     }
127 
128     /**
129      * {@inheritDoc}
130      */
131     @Override
132     protected void init() throws Exception {
133         // initialize a memory pool for APR functions
134         pool = Pool.create(AprLibrary.getInstance().getRootPool());
135 
136         wakeupSocket = Socket.create(Socket.APR_INET, Socket.SOCK_DGRAM, Socket.APR_PROTO_UDP, pool);
137 
138         dummyBuffer = Pool.alloc(pool, 1);
139 
140         pollset = Poll.create(POLLSET_SIZE, pool, Poll.APR_POLLSET_THREADSAFE, Long.MAX_VALUE);
141 
142         if (pollset <= 0) {
143             pollset = Poll.create(62, pool, Poll.APR_POLLSET_THREADSAFE, Long.MAX_VALUE);
144         }
145 
146         if (pollset <= 0) {
147             if (Status.APR_STATUS_IS_ENOTIMPL(-(int) pollset)) {
148                 throw new RuntimeIoException("Thread-safe pollset is not supported in this platform.");
149             }
150         }
151     }
152 
153     /**
154      * {@inheritDoc}
155      */
156     @Override
157     protected void destroy() throws Exception {
158         if (wakeupSocket > 0) {
159             Socket.close(wakeupSocket);
160         }
161         if (pollset > 0) {
162             Poll.destroy(pollset);
163         }
164         if (pool > 0) {
165             Pool.destroy(pool);
166         }
167     }
168 
169     /**
170      * {@inheritDoc}
171      */
172     @Override
173     protected Iterator<Long> allHandles() {
174         return polledHandles.iterator();
175     }
176 
177     /**
178      * {@inheritDoc}
179      */
180     @Override
181     protected boolean connect(Long handle, SocketAddress remoteAddress) throws Exception {
182         InetSocketAddress ra = (InetSocketAddress) remoteAddress;
183         long sa;
184         if (ra != null) {
185             if (ra.getAddress() == null) {
186                 sa = Address.info(Address.APR_ANYADDR, Socket.APR_INET, ra.getPort(), 0, pool);
187             } else {
188                 sa = Address.info(ra.getAddress().getHostAddress(), Socket.APR_INET, ra.getPort(), 0, pool);
189             }
190         } else {
191             sa = Address.info(Address.APR_ANYADDR, Socket.APR_INET, 0, 0, pool);
192         }
193 
194         int rv = Socket.connect(handle, sa);
195         if (rv == Status.APR_SUCCESS) {
196             return true;
197         }
198 
199         if (Status.APR_STATUS_IS_EINPROGRESS(rv)) {
200             return false;
201         }
202 
203         throwException(rv);
204         throw new InternalError(); // This sentence will never be executed.
205     }
206 
207     /**
208      * {@inheritDoc}
209      */
210     @Override
211     protected ConnectionRequest getConnectionRequest(Long handle) {
212         return requests.get(handle);
213     }
214 
215     /**
216      * {@inheritDoc}
217      */
218     @Override
219     protected void close(Long handle) throws Exception {
220         finishConnect(handle);
221         int rv = Socket.close(handle);
222         if (rv != Status.APR_SUCCESS) {
223             throwException(rv);
224         }
225     }
226 
227     /**
228      * {@inheritDoc}
229      */
230     @Override
231     protected boolean finishConnect(Long handle) throws Exception {
232         Poll.remove(pollset, handle);
233         requests.remove(handle);
234         if (failedHandles.remove(handle)) {
235             int rv = Socket.recvb(handle, dummyBuffer, 0, 1);
236             throwException(rv);
237             throw new InternalError("Shouldn't reach here.");
238         }
239         return true;
240     }
241 
242     /**
243      * {@inheritDoc}
244      */
245     @Override
246     protected Long newHandle(SocketAddress localAddress) throws Exception {
247         long handle = Socket.create(Socket.APR_INET, Socket.SOCK_STREAM, Socket.APR_PROTO_TCP, pool);
248         boolean success = false;
249         try {
250             int result = Socket.optSet(handle, Socket.APR_SO_NONBLOCK, 1);
251             if (result != Status.APR_SUCCESS) {
252                 throwException(result);
253             }
254             result = Socket.timeoutSet(handle, 0);
255             if (result != Status.APR_SUCCESS) {
256                 throwException(result);
257             }
258 
259             if (localAddress != null) {
260                 InetSocketAddress la = (InetSocketAddress) localAddress;
261                 long sa;
262 
263                 if (la.getAddress() == null) {
264                     sa = Address.info(Address.APR_ANYADDR, Socket.APR_INET, la.getPort(), 0, pool);
265                 } else {
266                     sa = Address.info(la.getAddress().getHostAddress(), Socket.APR_INET, la.getPort(), 0, pool);
267                 }
268 
269                 result = Socket.bind(handle, sa);
270                 if (result != Status.APR_SUCCESS) {
271                     throwException(result);
272                 }
273             }
274 
275             success = true;
276             return handle;
277         } finally {
278             if (!success) {
279                 int rv = Socket.close(handle);
280                 if (rv != Status.APR_SUCCESS) {
281                     throwException(rv);
282                 }
283             }
284         }
285     }
286 
287     /**
288      * {@inheritDoc}
289      */
290     @Override
291     protected AprSession newSession(IoProcessor<AprSession> processor, Long handle) throws Exception {
292         return new AprSocketSession(this, processor, handle);
293     }
294 
295     /**
296      * {@inheritDoc}
297      */
298     @Override
299     protected void register(Long handle, ConnectionRequest request) throws Exception {
300         int rv = Poll.add(pollset, handle, Poll.APR_POLLOUT);
301         if (rv != Status.APR_SUCCESS) {
302             throwException(rv);
303         }
304 
305         requests.put(handle, request);
306     }
307 
308     /**
309      * {@inheritDoc}
310      */
311     @Override
312     protected int select(int timeout) throws Exception {
313         int rv = Poll.poll(pollset, timeout * 1000, polledSockets, false);
314         if (rv <= 0) {
315             if (rv != APR_TIMEUP_ERROR) {
316                 throwException(rv);
317             }
318 
319             rv = Poll.maintain(pollset, polledSockets, true);
320             if (rv > 0) {
321                 for (int i = 0; i < rv; i++) {
322                     Poll.add(pollset, polledSockets[i], Poll.APR_POLLOUT);
323                 }
324             } else if (rv < 0) {
325                 throwException(rv);
326             }
327 
328             return 0;
329         } else {
330             rv <<= 1;
331             if (!polledHandles.isEmpty()) {
332                 polledHandles.clear();
333             }
334 
335             for (int i = 0; i < rv; i++) {
336                 long flag = polledSockets[i];
337                 long socket = polledSockets[++i];
338                 if (socket == wakeupSocket) {
339                     synchronized (wakeupLock) {
340                         Poll.remove(pollset, wakeupSocket);
341                         toBeWakenUp = false;
342                     }
343                     continue;
344                 }
345                 polledHandles.add(socket);
346                 if ((flag & Poll.APR_POLLOUT) == 0) {
347                     failedHandles.add(socket);
348                 }
349             }
350             return polledHandles.size();
351         }
352     }
353 
354     /**
355      * {@inheritDoc}
356      */
357     @Override
358     protected Iterator<Long> selectedHandles() {
359         return polledHandles.iterator();
360     }
361 
362     /**
363      * {@inheritDoc}
364      */
365     @Override
366     protected void wakeup() {
367         if (toBeWakenUp) {
368             return;
369         }
370 
371         // Add a dummy socket to the pollset.
372         synchronized (wakeupLock) {
373             toBeWakenUp = true;
374             Poll.add(pollset, wakeupSocket, Poll.APR_POLLOUT);
375         }
376     }
377 
378     /**
379      * {@inheritDoc}
380      */
381     public TransportMetadata getTransportMetadata() {
382         return AprSocketSession.METADATA;
383     }
384 
385     /**
386      * {@inheritDoc}
387      */
388     public SocketSessionConfig getSessionConfig() {
389         return (SocketSessionConfig) sessionConfig;
390     }
391 
392     /**
393      * {@inheritDoc}
394      */
395     @Override
396     public InetSocketAddress getDefaultRemoteAddress() {
397         return (InetSocketAddress) super.getDefaultRemoteAddress();
398     }
399 
400     /**
401      * {@inheritDoc}
402      */
403     public void setDefaultRemoteAddress(InetSocketAddress defaultRemoteAddress) {
404         super.setDefaultRemoteAddress(defaultRemoteAddress);
405     }
406 
407     /**
408      * transform an APR error number in a more fancy exception
409      * @param code APR error code
410      * @throws IOException the produced exception for the given APR error number
411      */
412     private void throwException(int code) throws IOException {
413         throw new IOException(org.apache.tomcat.jni.Error.strerror(-code) + " (code: " + code + ")");
414     }
415 }