View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.transport.socket.apr;
21  
22  import java.io.IOException;
23  import java.net.InetSocketAddress;
24  import java.net.SocketAddress;
25  import java.util.Iterator;
26  import java.util.List;
27  import java.util.concurrent.Executor;
28  
29  import org.apache.mina.core.RuntimeIoException;
30  import org.apache.mina.core.polling.AbstractPollingIoAcceptor;
31  import org.apache.mina.core.service.IoProcessor;
32  import org.apache.mina.core.service.TransportMetadata;
33  import org.apache.mina.transport.socket.DefaultSocketSessionConfig;
34  import org.apache.mina.transport.socket.SocketAcceptor;
35  import org.apache.mina.transport.socket.SocketSessionConfig;
36  import org.apache.mina.util.CircularQueue;
37  import org.apache.tomcat.jni.Address;
38  import org.apache.tomcat.jni.Poll;
39  import org.apache.tomcat.jni.Pool;
40  import org.apache.tomcat.jni.Socket;
41  import org.apache.tomcat.jni.Status;
42  
43  /**
44   * TODO Add documentation
45   *
46   * @author The Apache MINA Project (dev@mina.apache.org)
47   * @version $Rev: 671827 $, $Date: 2008-06-26 10:49:48 +0200 (jeu, 26 jun 2008) $
48   */
49  public final class AprSocketAcceptor extends AbstractPollingIoAcceptor<AprSession, Long> implements SocketAcceptor {
50  
51      private static final int POLLSET_SIZE = 1024;
52  
53      private final Object wakeupLock = new Object();
54      private volatile long wakeupSocket;
55      private volatile boolean toBeWakenUp;
56  
57      private int backlog = 50;
58      private boolean reuseAddress = false;
59  
60      private volatile long pool;
61      private volatile long pollset; // socket poller
62      private final long[] polledSockets = new long[POLLSET_SIZE << 1];
63      private final List<Long> polledHandles =
64          new CircularQueue<Long>(POLLSET_SIZE);
65  
66      public AprSocketAcceptor() {
67          super(new DefaultSocketSessionConfig(), AprIoProcessor.class);
68          ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
69      }
70  
71      public AprSocketAcceptor(int processorCount) {
72          super(new DefaultSocketSessionConfig(), AprIoProcessor.class, processorCount);
73          ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
74      }
75  
76      public AprSocketAcceptor(IoProcessor<AprSession> processor) {
77          super(new DefaultSocketSessionConfig(), processor);
78          ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
79      }
80  
81      public AprSocketAcceptor(Executor executor,
82              IoProcessor<AprSession> processor) {
83          super(new DefaultSocketSessionConfig(), executor, processor);
84          ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
85      }
86  
87      @Override
88      protected AprSession accept(IoProcessor<AprSession> processor, Long handle) throws Exception {
89          long s = Socket.accept(handle);
90          boolean success = false;
91          try {
92              AprSession result = new AprSocketSession(this, processor, s);
93              success = true;
94              return result;
95          } finally {
96              if (!success) {
97                  Socket.close(s);
98              }
99          }
100     }
101 
102     @Override
103     protected Long open(SocketAddress localAddress) throws Exception {
104         InetSocketAddress la = (InetSocketAddress) localAddress;
105         long handle = Socket.create(
106                 Socket.APR_INET, Socket.SOCK_STREAM, Socket.APR_PROTO_TCP, pool);
107 
108         boolean success = false;
109         try {
110             int result = Socket.optSet(handle, Socket.APR_SO_NONBLOCK, 1);
111             if (result != Status.APR_SUCCESS) {
112                 throwException(result);
113             }
114             result = Socket.timeoutSet(handle, 0);
115             if (result != Status.APR_SUCCESS) {
116                 throwException(result);
117             }
118 
119             // Configure the server socket,
120             result = Socket.optSet(handle, Socket.APR_SO_REUSEADDR, isReuseAddress()? 1 : 0);
121             if (result != Status.APR_SUCCESS) {
122                 throwException(result);
123             }
124             result = Socket.optSet(handle, Socket.APR_SO_RCVBUF, getSessionConfig().getReceiveBufferSize());
125             if (result != Status.APR_SUCCESS) {
126                 throwException(result);
127             }
128 
129             // and bind.
130             long sa;
131             if (la != null) {
132                 if (la.getAddress() == null) {
133                     sa = Address.info(Address.APR_ANYADDR, Socket.APR_INET, la.getPort(), 0, pool);
134                 } else {
135                     sa = Address.info(la.getAddress().getHostAddress(), Socket.APR_INET, la.getPort(), 0, pool);
136                 }
137             } else {
138                 sa = Address.info(Address.APR_ANYADDR, Socket.APR_INET, 0, 0, pool);
139             }
140 
141             result = Socket.bind(handle, sa);
142             if (result != Status.APR_SUCCESS) {
143                 throwException(result);
144             }
145             result = Socket.listen(handle, getBacklog());
146             if (result != Status.APR_SUCCESS) {
147                 throwException(result);
148             }
149 
150             result = Poll.add(pollset, handle, Poll.APR_POLLIN);
151             if (result != Status.APR_SUCCESS) {
152                 throwException(result);
153             }
154             success = true;
155         } finally {
156             if (!success) {
157                 close(handle);
158             }
159         }
160         return handle;
161     }
162 
163     @Override
164     protected void init() throws Exception {
165         // initialize a memory pool for APR functions
166         pool = Pool.create(AprLibrary.getInstance().getRootPool());
167 
168         wakeupSocket = Socket.create(
169                 Socket.APR_INET, Socket.SOCK_DGRAM, Socket.APR_PROTO_UDP, pool);
170 
171         pollset = Poll.create(
172                         POLLSET_SIZE,
173                         pool,
174                         Poll.APR_POLLSET_THREADSAFE,
175                         Long.MAX_VALUE);
176 
177         if (pollset <= 0) {
178             pollset = Poll.create(
179                     62,
180                     pool,
181                     Poll.APR_POLLSET_THREADSAFE,
182                     Long.MAX_VALUE);
183         }
184 
185         if (pollset <= 0) {
186             if (Status.APR_STATUS_IS_ENOTIMPL(- (int) pollset)) {
187                 throw new RuntimeIoException(
188                         "Thread-safe pollset is not supported in this platform.");
189             }
190         }
191     }
192 
193     @Override
194     protected void destroy() throws Exception {
195         if (wakeupSocket > 0) {
196             Socket.close(wakeupSocket);
197         }
198         if (pollset > 0) {
199             Poll.destroy(pollset);
200         }
201         if (pool > 0) {
202             Pool.destroy(pool);
203         }
204     }
205 
206     @Override
207     protected SocketAddress localAddress(Long handle) throws Exception {
208         long la = Address.get(Socket.APR_LOCAL, handle);
209         return new InetSocketAddress(Address.getip(la), Address.getInfo(la).port);
210     }
211 
212     @Override
213     protected boolean select() throws Exception {
214         int rv = Poll.poll(pollset, Integer.MAX_VALUE, polledSockets, false);
215         if (rv <= 0) {
216             if (rv != -120001) {
217                 throwException(rv);
218             }
219 
220             rv = Poll.maintain(pollset, polledSockets, true);
221             if (rv > 0) {
222                 for (int i = 0; i < rv; i ++) {
223                     Poll.add(pollset, polledSockets[i], Poll.APR_POLLIN);
224                 }
225             } else if (rv < 0) {
226                 throwException(rv);
227             }
228 
229             return false;
230         } else {
231             rv <<= 1;
232             if (!polledHandles.isEmpty()) {
233                 polledHandles.clear();
234             }
235 
236             for (int i = 0; i < rv; i ++) {
237                 long flag = polledSockets[i];
238                 long socket = polledSockets[++i];
239                 if (socket == wakeupSocket) {
240                     synchronized (wakeupLock) {
241                         Poll.remove(pollset, wakeupSocket);
242                         toBeWakenUp = false;
243                     }
244                     continue;
245                 }
246 
247                 if ((flag & Poll.APR_POLLIN) != 0) {
248                     polledHandles.add(socket);
249                 }
250             }
251             return !polledHandles.isEmpty();
252         }
253     }
254 
255     @Override
256     protected Iterator<Long> selectedHandles() {
257         return polledHandles.iterator();
258     }
259 
260     @Override
261     protected void close(Long handle) throws Exception {
262         Poll.remove(pollset, handle);
263         int result = Socket.close(handle);
264         if (result != Status.APR_SUCCESS) {
265             throwException(result);
266         }
267     }
268 
269     @Override
270     protected void wakeup() {
271         if (toBeWakenUp) {
272             return;
273         }
274 
275         // Add a dummy socket to the pollset.
276         synchronized (wakeupLock) {
277             toBeWakenUp = true;
278             Poll.add(pollset, wakeupSocket, Poll.APR_POLLOUT);
279         }
280     }
281 
282     public int getBacklog() {
283         return backlog;
284     }
285 
286     public boolean isReuseAddress() {
287         return reuseAddress;
288     }
289 
290     public void setBacklog(int backlog) {
291         synchronized (bindLock) {
292             if (isActive()) {
293                 throw new IllegalStateException(
294                         "backlog can't be set while the acceptor is bound.");
295             }
296 
297             this.backlog = backlog;
298         }
299     }
300 
301     @Override
302     public InetSocketAddress getLocalAddress() {
303         return (InetSocketAddress) super.getLocalAddress();
304     }
305 
306     @Override
307     public InetSocketAddress getDefaultLocalAddress() {
308         return (InetSocketAddress) super.getDefaultLocalAddress();
309     }
310 
311     public void setDefaultLocalAddress(InetSocketAddress localAddress) {
312         super.setDefaultLocalAddress(localAddress);
313     }
314 
315     public void setReuseAddress(boolean reuseAddress) {
316         synchronized (bindLock) {
317             if (isActive()) {
318                 throw new IllegalStateException(
319                         "backlog can't be set while the acceptor is bound.");
320             }
321 
322             this.reuseAddress = reuseAddress;
323         }
324     }
325 
326     public TransportMetadata getTransportMetadata() {
327         return AprSocketSession.METADATA;
328     }
329 
330     @Override
331     public SocketSessionConfig getSessionConfig() {
332         return (SocketSessionConfig) super.getSessionConfig();
333     }
334 
335     private void throwException(int code) throws IOException {
336         throw new IOException(
337                 org.apache.tomcat.jni.Error.strerror(-code) +
338                 " (code: " + code + ")");
339     }
340 }