View Javadoc
1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.transport.socket.apr;
21  
22  import java.io.IOException;
23  import java.net.InetSocketAddress;
24  import java.net.SocketAddress;
25  import java.nio.channels.spi.SelectorProvider;
26  import java.util.Iterator;
27  import java.util.Queue;
28  import java.util.concurrent.ConcurrentLinkedQueue;
29  import java.util.concurrent.Executor;
30  
31  import org.apache.mina.core.RuntimeIoException;
32  import org.apache.mina.core.polling.AbstractPollingIoAcceptor;
33  import org.apache.mina.core.service.IoAcceptor;
34  import org.apache.mina.core.service.IoProcessor;
35  import org.apache.mina.core.service.IoService;
36  import org.apache.mina.core.service.SimpleIoProcessorPool;
37  import org.apache.mina.core.service.TransportMetadata;
38  import org.apache.mina.transport.socket.DefaultSocketSessionConfig;
39  import org.apache.tomcat.jni.Address;
40  import org.apache.tomcat.jni.Poll;
41  import org.apache.tomcat.jni.Pool;
42  import org.apache.tomcat.jni.Socket;
43  import org.apache.tomcat.jni.Status;
44  
45  /**
46   * {@link IoAcceptor} for APR based socket transport (TCP/IP).
47   *
48   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
49   */
50  public final class AprSocketAcceptor extends AbstractPollingIoAcceptor<AprSession, Long> {
51      /** 
52       * This constant is deduced from the APR code. It is used when the timeout
53       * has expired while doing a poll() operation.
54       */
55      private static final int APR_TIMEUP_ERROR = -120001;
56  
57      private static final int POLLSET_SIZE = 1024;
58  
59      private final Object wakeupLock = new Object();
60  
61      private volatile long wakeupSocket;
62  
63      private volatile boolean toBeWakenUp;
64  
65      private volatile long pool;
66  
67      private volatile long pollset; // socket poller
68  
69      private final long[] polledSockets = new long[POLLSET_SIZE << 1];
70  
71      private final Queue<Long> polledHandles = new ConcurrentLinkedQueue<Long>();
72  
73      /**
74       * Constructor for {@link AprSocketAcceptor} using default parameters (multiple thread model).
75       */
76      public AprSocketAcceptor() {
77          super(new DefaultSocketSessionConfig(), AprIoProcessor.class);
78          ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
79      }
80  
81      /**
82       * Constructor for {@link AprSocketAcceptor} using default parameters, and 
83       * given number of {@link AprIoProcessor} for multithreading I/O operations.
84       * 
85       * @param processorCount the number of processor to create and place in a
86       * {@link SimpleIoProcessorPool} 
87       */
88      public AprSocketAcceptor(int processorCount) {
89          super(new DefaultSocketSessionConfig(), AprIoProcessor.class, processorCount);
90          ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
91      }
92  
93      /**
94       *  Constructor for {@link AprSocketAcceptor} with default configuration but a
95        *  specific {@link AprIoProcessor}, useful for sharing the same processor over multiple
96        *  {@link IoService} of the same type.
97        * @param processor the processor to use for managing I/O events
98        */
99      public AprSocketAcceptor(IoProcessor<AprSession> processor) {
100         super(new DefaultSocketSessionConfig(), processor);
101         ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
102     }
103 
104     /**
105      *  Constructor for {@link AprSocketAcceptor} with a given {@link Executor} for handling 
106      *  connection events and a given {@link AprIoProcessor} for handling I/O events, useful for 
107      *  sharing the same processor and executor over multiple {@link IoService} of the same type.
108      * @param executor the executor for connection
109      * @param processor the processor for I/O operations
110      */
111     public AprSocketAcceptor(Executor executor, IoProcessor<AprSession> processor) {
112         super(new DefaultSocketSessionConfig(), executor, processor);
113         ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
114     }
115 
116     /**
117      * {@inheritDoc}
118      */
119     @Override
120     protected AprSession accept(IoProcessor<AprSession> processor, Long handle) throws Exception {
121         long s = Socket.accept(handle);
122         boolean success = false;
123         try {
124             AprSession result = new AprSocketSession(this, processor, s);
125             success = true;
126             return result;
127         } finally {
128             if (!success) {
129                 Socket.close(s);
130             }
131         }
132     }
133 
134     /**
135      * {@inheritDoc}
136      */
137     @Override
138     protected Long open(SocketAddress localAddress) throws Exception {
139         InetSocketAddress la = (InetSocketAddress) localAddress;
140         long handle = Socket.create(Socket.APR_INET, Socket.SOCK_STREAM, Socket.APR_PROTO_TCP, pool);
141 
142         boolean success = false;
143         try {
144             int result = Socket.optSet(handle, Socket.APR_SO_NONBLOCK, 1);
145             if (result != Status.APR_SUCCESS) {
146                 throwException(result);
147             }
148             result = Socket.timeoutSet(handle, 0);
149             if (result != Status.APR_SUCCESS) {
150                 throwException(result);
151             }
152 
153             // Configure the server socket,
154             result = Socket.optSet(handle, Socket.APR_SO_REUSEADDR, isReuseAddress() ? 1 : 0);
155             if (result != Status.APR_SUCCESS) {
156                 throwException(result);
157             }
158             result = Socket.optSet(handle, Socket.APR_SO_RCVBUF, getSessionConfig().getReceiveBufferSize());
159             if (result != Status.APR_SUCCESS) {
160                 throwException(result);
161             }
162 
163             // and bind.
164             long sa;
165             if (la != null) {
166                 if (la.getAddress() == null) {
167                     sa = Address.info(Address.APR_ANYADDR, Socket.APR_INET, la.getPort(), 0, pool);
168                 } else {
169                     sa = Address.info(la.getAddress().getHostAddress(), Socket.APR_INET, la.getPort(), 0, pool);
170                 }
171             } else {
172                 sa = Address.info(Address.APR_ANYADDR, Socket.APR_INET, 0, 0, pool);
173             }
174 
175             result = Socket.bind(handle, sa);
176             if (result != Status.APR_SUCCESS) {
177                 throwException(result);
178             }
179             result = Socket.listen(handle, getBacklog());
180             if (result != Status.APR_SUCCESS) {
181                 throwException(result);
182             }
183 
184             result = Poll.add(pollset, handle, Poll.APR_POLLIN);
185             if (result != Status.APR_SUCCESS) {
186                 throwException(result);
187             }
188             success = true;
189         } finally {
190             if (!success) {
191                 close(handle);
192             }
193         }
194         return handle;
195     }
196 
197     /**
198      * {@inheritDoc}
199      */
200     @Override
201     protected void init() throws Exception {
202         // initialize a memory pool for APR functions
203         pool = Pool.create(AprLibrary.getInstance().getRootPool());
204 
205         wakeupSocket = Socket.create(Socket.APR_INET, Socket.SOCK_DGRAM, Socket.APR_PROTO_UDP, pool);
206 
207         pollset = Poll.create(POLLSET_SIZE, pool, Poll.APR_POLLSET_THREADSAFE, Long.MAX_VALUE);
208 
209         if (pollset <= 0) {
210             pollset = Poll.create(62, pool, Poll.APR_POLLSET_THREADSAFE, Long.MAX_VALUE);
211         }
212 
213         if (pollset <= 0) {
214             if (Status.APR_STATUS_IS_ENOTIMPL(-(int) pollset)) {
215                 throw new RuntimeIoException("Thread-safe pollset is not supported in this platform.");
216             }
217         }
218     }
219 
220     /**
221      * {@inheritDoc}
222      */
223     @Override
224     protected void destroy() throws Exception {
225         if (wakeupSocket > 0) {
226             Socket.close(wakeupSocket);
227         }
228         if (pollset > 0) {
229             Poll.destroy(pollset);
230         }
231         if (pool > 0) {
232             Pool.destroy(pool);
233         }
234     }
235 
236     /**
237      * {@inheritDoc}
238      */
239     @Override
240     protected SocketAddress localAddress(Long handle) throws Exception {
241         long la = Address.get(Socket.APR_LOCAL, handle);
242         return new InetSocketAddress(Address.getip(la), Address.getInfo(la).port);
243     }
244 
245     /**
246      * {@inheritDoc}
247      */
248     @Override
249     protected int select() throws Exception {
250         int rv = Poll.poll(pollset, Integer.MAX_VALUE, polledSockets, false);
251         if (rv <= 0) {
252             // We have had an error. It can simply be that we have reached
253             // the timeout (very unlikely, as we have set it to MAX_INTEGER)
254             if (rv != APR_TIMEUP_ERROR) {
255                 // It's not a timeout being exceeded. Throw the error
256                 throwException(rv);
257             }
258 
259             rv = Poll.maintain(pollset, polledSockets, true);
260             if (rv > 0) {
261                 for (int i = 0; i < rv; i++) {
262                     Poll.add(pollset, polledSockets[i], Poll.APR_POLLIN);
263                 }
264             } else if (rv < 0) {
265                 throwException(rv);
266             }
267 
268             return 0;
269         } else {
270             rv <<= 1;
271             if (!polledHandles.isEmpty()) {
272                 polledHandles.clear();
273             }
274 
275             for (int i = 0; i < rv; i++) {
276                 long flag = polledSockets[i];
277                 long socket = polledSockets[++i];
278                 if (socket == wakeupSocket) {
279                     synchronized (wakeupLock) {
280                         Poll.remove(pollset, wakeupSocket);
281                         toBeWakenUp = false;
282                     }
283                     continue;
284                 }
285 
286                 if ((flag & Poll.APR_POLLIN) != 0) {
287                     polledHandles.add(socket);
288                 }
289             }
290             return polledHandles.size();
291         }
292     }
293 
294     /**
295      * {@inheritDoc}
296      */
297     @Override
298     protected Iterator<Long> selectedHandles() {
299         return polledHandles.iterator();
300     }
301 
302     /**
303      * {@inheritDoc}
304      */
305     @Override
306     protected void close(Long handle) throws Exception {
307         Poll.remove(pollset, handle);
308         int result = Socket.close(handle);
309         if (result != Status.APR_SUCCESS) {
310             throwException(result);
311         }
312     }
313 
314     /**
315      * {@inheritDoc}
316      */
317     @Override
318     protected void wakeup() {
319         if (toBeWakenUp) {
320             return;
321         }
322 
323         // Add a dummy socket to the pollset.
324         synchronized (wakeupLock) {
325             toBeWakenUp = true;
326             Poll.add(pollset, wakeupSocket, Poll.APR_POLLOUT);
327         }
328     }
329 
330     /**
331      * {@inheritDoc}
332      */
333     @Override
334     public InetSocketAddress getLocalAddress() {
335         return (InetSocketAddress) super.getLocalAddress();
336     }
337 
338     /**
339      * {@inheritDoc}
340      */
341     @Override
342     public InetSocketAddress getDefaultLocalAddress() {
343         return (InetSocketAddress) super.getDefaultLocalAddress();
344     }
345 
346     /**
347      * @see #setDefaultLocalAddress(SocketAddress)
348      * 
349      * @param localAddress The localAddress to set
350      */
351     public void setDefaultLocalAddress(InetSocketAddress localAddress) {
352         super.setDefaultLocalAddress(localAddress);
353     }
354 
355     /**
356      * {@inheritDoc}
357      */
358     public TransportMetadata getTransportMetadata() {
359         return AprSocketSession.METADATA;
360     }
361 
362     /**
363      * Convert an APR code into an Exception with the corresponding message
364      * @param code error number
365      * @throws IOException the generated exception
366      */
367     private void throwException(int code) throws IOException {
368         throw new IOException(org.apache.tomcat.jni.Error.strerror(-code) + " (code: " + code + ")");
369     }
370 
371     @Override
372     protected void init(SelectorProvider selectorProvider) throws Exception {
373         init();
374     }
375 }