001/*
002 *  Licensed to the Apache Software Foundation (ASF) under one
003 *  or more contributor license agreements.  See the NOTICE file
004 *  distributed with this work for additional information
005 *  regarding copyright ownership.  The ASF licenses this file
006 *  to you under the Apache License, Version 2.0 (the
007 *  "License"); you may not use this file except in compliance
008 *  with the License.  You may obtain a copy of the License at
009 *
010 *    http://www.apache.org/licenses/LICENSE-2.0
011 *
012 *  Unless required by applicable law or agreed to in writing,
013 *  software distributed under the License is distributed on an
014 *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 *  KIND, either express or implied.  See the License for the
016 *  specific language governing permissions and limitations
017 *  under the License.
018 *
019 */
020package org.apache.mina.transport.socket.apr;
021
022import java.io.IOException;
023import java.net.InetSocketAddress;
024import java.net.SocketAddress;
025import java.nio.channels.spi.SelectorProvider;
026import java.util.Iterator;
027import java.util.Queue;
028import java.util.concurrent.ConcurrentLinkedQueue;
029import java.util.concurrent.Executor;
030
031import org.apache.mina.core.RuntimeIoException;
032import org.apache.mina.core.polling.AbstractPollingIoAcceptor;
033import org.apache.mina.core.service.IoAcceptor;
034import org.apache.mina.core.service.IoProcessor;
035import org.apache.mina.core.service.IoService;
036import org.apache.mina.core.service.SimpleIoProcessorPool;
037import org.apache.mina.core.service.TransportMetadata;
038import org.apache.mina.transport.socket.DefaultSocketSessionConfig;
039import org.apache.tomcat.jni.Address;
040import org.apache.tomcat.jni.Poll;
041import org.apache.tomcat.jni.Pool;
042import org.apache.tomcat.jni.Socket;
043import org.apache.tomcat.jni.Status;
044
045/**
046 * {@link IoAcceptor} for APR based socket transport (TCP/IP).
047 *
048 * @author <a href="http://mina.apache.org">Apache MINA Project</a>
049 */
050public final class AprSocketAcceptor extends AbstractPollingIoAcceptor<AprSession, Long> {
051    /** 
052     * This constant is deduced from the APR code. It is used when the timeout
053     * has expired while doing a poll() operation.
054     */
055    private static final int APR_TIMEUP_ERROR = -120001;
056
057    private static final int POLLSET_SIZE = 1024;
058
059    private final Object wakeupLock = new Object();
060
061    private volatile long wakeupSocket;
062
063    private volatile boolean toBeWakenUp;
064
065    private volatile long pool;
066
067    private volatile long pollset; // socket poller
068
069    private final long[] polledSockets = new long[POLLSET_SIZE << 1];
070
071    private final Queue<Long> polledHandles = new ConcurrentLinkedQueue<Long>();
072
073    /**
074     * Constructor for {@link AprSocketAcceptor} using default parameters (multiple thread model).
075     */
076    public AprSocketAcceptor() {
077        super(new DefaultSocketSessionConfig(), AprIoProcessor.class);
078        ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
079    }
080
081    /**
082     * Constructor for {@link AprSocketAcceptor} using default parameters, and 
083     * given number of {@link AprIoProcessor} for multithreading I/O operations.
084     * 
085     * @param processorCount the number of processor to create and place in a
086     * {@link SimpleIoProcessorPool} 
087     */
088    public AprSocketAcceptor(int processorCount) {
089        super(new DefaultSocketSessionConfig(), AprIoProcessor.class, processorCount);
090        ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
091    }
092
093    /**
094     *  Constructor for {@link AprSocketAcceptor} with default configuration but a
095      *  specific {@link AprIoProcessor}, useful for sharing the same processor over multiple
096      *  {@link IoService} of the same type.
097      * @param processor the processor to use for managing I/O events
098      */
099    public AprSocketAcceptor(IoProcessor<AprSession> processor) {
100        super(new DefaultSocketSessionConfig(), processor);
101        ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
102    }
103
104    /**
105     *  Constructor for {@link AprSocketAcceptor} with a given {@link Executor} for handling 
106     *  connection events and a given {@link AprIoProcessor} for handling I/O events, useful for 
107     *  sharing the same processor and executor over multiple {@link IoService} of the same type.
108     * @param executor the executor for connection
109     * @param processor the processor for I/O operations
110     */
111    public AprSocketAcceptor(Executor executor, IoProcessor<AprSession> processor) {
112        super(new DefaultSocketSessionConfig(), executor, processor);
113        ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
114    }
115
116    /**
117     * {@inheritDoc}
118     */
119    @Override
120    protected AprSession accept(IoProcessor<AprSession> processor, Long handle) throws Exception {
121        long s = Socket.accept(handle);
122        boolean success = false;
123        try {
124            AprSession result = new AprSocketSession(this, processor, s);
125            success = true;
126            return result;
127        } finally {
128            if (!success) {
129                Socket.close(s);
130            }
131        }
132    }
133
134    /**
135     * {@inheritDoc}
136     */
137    @Override
138    protected Long open(SocketAddress localAddress) throws Exception {
139        InetSocketAddress la = (InetSocketAddress) localAddress;
140        long handle = Socket.create(Socket.APR_INET, Socket.SOCK_STREAM, Socket.APR_PROTO_TCP, pool);
141
142        boolean success = false;
143        try {
144            int result = Socket.optSet(handle, Socket.APR_SO_NONBLOCK, 1);
145            if (result != Status.APR_SUCCESS) {
146                throwException(result);
147            }
148            result = Socket.timeoutSet(handle, 0);
149            if (result != Status.APR_SUCCESS) {
150                throwException(result);
151            }
152
153            // Configure the server socket,
154            result = Socket.optSet(handle, Socket.APR_SO_REUSEADDR, isReuseAddress() ? 1 : 0);
155            if (result != Status.APR_SUCCESS) {
156                throwException(result);
157            }
158            result = Socket.optSet(handle, Socket.APR_SO_RCVBUF, getSessionConfig().getReceiveBufferSize());
159            if (result != Status.APR_SUCCESS) {
160                throwException(result);
161            }
162
163            // and bind.
164            long sa;
165            if (la != null) {
166                if (la.getAddress() == null) {
167                    sa = Address.info(Address.APR_ANYADDR, Socket.APR_INET, la.getPort(), 0, pool);
168                } else {
169                    sa = Address.info(la.getAddress().getHostAddress(), Socket.APR_INET, la.getPort(), 0, pool);
170                }
171            } else {
172                sa = Address.info(Address.APR_ANYADDR, Socket.APR_INET, 0, 0, pool);
173            }
174
175            result = Socket.bind(handle, sa);
176            if (result != Status.APR_SUCCESS) {
177                throwException(result);
178            }
179            result = Socket.listen(handle, getBacklog());
180            if (result != Status.APR_SUCCESS) {
181                throwException(result);
182            }
183
184            result = Poll.add(pollset, handle, Poll.APR_POLLIN);
185            if (result != Status.APR_SUCCESS) {
186                throwException(result);
187            }
188            success = true;
189        } finally {
190            if (!success) {
191                close(handle);
192            }
193        }
194        return handle;
195    }
196
197    /**
198     * {@inheritDoc}
199     */
200    @Override
201    protected void init() throws Exception {
202        // initialize a memory pool for APR functions
203        pool = Pool.create(AprLibrary.getInstance().getRootPool());
204
205        wakeupSocket = Socket.create(Socket.APR_INET, Socket.SOCK_DGRAM, Socket.APR_PROTO_UDP, pool);
206
207        pollset = Poll.create(POLLSET_SIZE, pool, Poll.APR_POLLSET_THREADSAFE, Long.MAX_VALUE);
208
209        if (pollset <= 0) {
210            pollset = Poll.create(62, pool, Poll.APR_POLLSET_THREADSAFE, Long.MAX_VALUE);
211        }
212
213        if (pollset <= 0) {
214            if (Status.APR_STATUS_IS_ENOTIMPL(-(int) pollset)) {
215                throw new RuntimeIoException("Thread-safe pollset is not supported in this platform.");
216            }
217        }
218    }
219
220    /**
221     * {@inheritDoc}
222     */
223    @Override
224    protected void destroy() throws Exception {
225        if (wakeupSocket > 0) {
226            Socket.close(wakeupSocket);
227        }
228        if (pollset > 0) {
229            Poll.destroy(pollset);
230        }
231        if (pool > 0) {
232            Pool.destroy(pool);
233        }
234    }
235
236    /**
237     * {@inheritDoc}
238     */
239    @Override
240    protected SocketAddress localAddress(Long handle) throws Exception {
241        long la = Address.get(Socket.APR_LOCAL, handle);
242        return new InetSocketAddress(Address.getip(la), Address.getInfo(la).port);
243    }
244
245    /**
246     * {@inheritDoc}
247     */
248    @Override
249    protected int select() throws Exception {
250        int rv = Poll.poll(pollset, Integer.MAX_VALUE, polledSockets, false);
251        if (rv <= 0) {
252            // We have had an error. It can simply be that we have reached
253            // the timeout (very unlikely, as we have set it to MAX_INTEGER)
254            if (rv != APR_TIMEUP_ERROR) {
255                // It's not a timeout being exceeded. Throw the error
256                throwException(rv);
257            }
258
259            rv = Poll.maintain(pollset, polledSockets, true);
260            if (rv > 0) {
261                for (int i = 0; i < rv; i++) {
262                    Poll.add(pollset, polledSockets[i], Poll.APR_POLLIN);
263                }
264            } else if (rv < 0) {
265                throwException(rv);
266            }
267
268            return 0;
269        } else {
270            rv <<= 1;
271            if (!polledHandles.isEmpty()) {
272                polledHandles.clear();
273            }
274
275            for (int i = 0; i < rv; i++) {
276                long flag = polledSockets[i];
277                long socket = polledSockets[++i];
278                if (socket == wakeupSocket) {
279                    synchronized (wakeupLock) {
280                        Poll.remove(pollset, wakeupSocket);
281                        toBeWakenUp = false;
282                    }
283                    continue;
284                }
285
286                if ((flag & Poll.APR_POLLIN) != 0) {
287                    polledHandles.add(socket);
288                }
289            }
290            return polledHandles.size();
291        }
292    }
293
294    /**
295     * {@inheritDoc}
296     */
297    @Override
298    protected Iterator<Long> selectedHandles() {
299        return polledHandles.iterator();
300    }
301
302    /**
303     * {@inheritDoc}
304     */
305    @Override
306    protected void close(Long handle) throws Exception {
307        Poll.remove(pollset, handle);
308        int result = Socket.close(handle);
309        if (result != Status.APR_SUCCESS) {
310            throwException(result);
311        }
312    }
313
314    /**
315     * {@inheritDoc}
316     */
317    @Override
318    protected void wakeup() {
319        if (toBeWakenUp) {
320            return;
321        }
322
323        // Add a dummy socket to the pollset.
324        synchronized (wakeupLock) {
325            toBeWakenUp = true;
326            Poll.add(pollset, wakeupSocket, Poll.APR_POLLOUT);
327        }
328    }
329
330    /**
331     * {@inheritDoc}
332     */
333    @Override
334    public InetSocketAddress getLocalAddress() {
335        return (InetSocketAddress) super.getLocalAddress();
336    }
337
338    /**
339     * {@inheritDoc}
340     */
341    @Override
342    public InetSocketAddress getDefaultLocalAddress() {
343        return (InetSocketAddress) super.getDefaultLocalAddress();
344    }
345
346    /**
347     * @see #setDefaultLocalAddress(SocketAddress)
348     * 
349     * @param localAddress The localAddress to set
350     */
351    public void setDefaultLocalAddress(InetSocketAddress localAddress) {
352        super.setDefaultLocalAddress(localAddress);
353    }
354
355    /**
356     * {@inheritDoc}
357     */
358    public TransportMetadata getTransportMetadata() {
359        return AprSocketSession.METADATA;
360    }
361
362    /**
363     * Convert an APR code into an Exception with the corresponding message
364     * @param code error number
365     * @throws IOException the generated exception
366     */
367    private void throwException(int code) throws IOException {
368        throw new IOException(org.apache.tomcat.jni.Error.strerror(-code) + " (code: " + code + ")");
369    }
370
371    @Override
372    protected void init(SelectorProvider selectorProvider) throws Exception {
373        init();
374    }
375}