001/*
002 *  Licensed to the Apache Software Foundation (ASF) under one
003 *  or more contributor license agreements.  See the NOTICE file
004 *  distributed with this work for additional information
005 *  regarding copyright ownership.  The ASF licenses this file
006 *  to you under the Apache License, Version 2.0 (the
007 *  "License"); you may not use this file except in compliance
008 *  with the License.  You may obtain a copy of the License at
009 *
010 *    http://www.apache.org/licenses/LICENSE-2.0
011 *
012 *  Unless required by applicable law or agreed to in writing,
013 *  software distributed under the License is distributed on an
014 *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 *  KIND, either express or implied.  See the License for the
016 *  specific language governing permissions and limitations
017 *  under the License.
018 *
019 */
020package org.apache.mina.transport.socket.apr;
021
022import java.io.IOException;
023import java.net.InetSocketAddress;
024import java.net.SocketAddress;
025import java.nio.ByteBuffer;
026import java.util.HashMap;
027import java.util.HashSet;
028import java.util.Iterator;
029import java.util.Map;
030import java.util.Queue;
031import java.util.Set;
032import java.util.concurrent.ConcurrentLinkedQueue;
033import java.util.concurrent.Executor;
034
035import org.apache.mina.core.RuntimeIoException;
036import org.apache.mina.core.polling.AbstractPollingIoConnector;
037import org.apache.mina.core.service.IoConnector;
038import org.apache.mina.core.service.IoProcessor;
039import org.apache.mina.core.service.IoService;
040import org.apache.mina.core.service.SimpleIoProcessorPool;
041import org.apache.mina.core.service.TransportMetadata;
042import org.apache.mina.transport.socket.DefaultSocketSessionConfig;
043import org.apache.mina.transport.socket.SocketConnector;
044import org.apache.mina.transport.socket.SocketSessionConfig;
045import org.apache.tomcat.jni.Address;
046import org.apache.tomcat.jni.Poll;
047import org.apache.tomcat.jni.Pool;
048import org.apache.tomcat.jni.Socket;
049import org.apache.tomcat.jni.Status;
050
051/**
052 * {@link IoConnector} for APR based socket transport (TCP/IP).
053 * 
054 * @author <a href="http://mina.apache.org">Apache MINA Project</a>
055 */
056public final class AprSocketConnector extends AbstractPollingIoConnector<AprSession, Long> implements SocketConnector {
057
058    /** 
059     * This constant is deduced from the APR code. It is used when the timeout
060     * has expired while doing a poll() operation.
061     */
062    private static final int APR_TIMEUP_ERROR = -120001;
063
064    private static final int POLLSET_SIZE = 1024;
065
066    private final Map<Long, ConnectionRequest> requests = new HashMap<Long, ConnectionRequest>(POLLSET_SIZE);
067
068    private final Object wakeupLock = new Object();
069
070    private volatile long wakeupSocket;
071
072    private volatile boolean toBeWakenUp;
073
074    private volatile long pool;
075
076    private volatile long pollset; // socket poller
077
078    private final long[] polledSockets = new long[POLLSET_SIZE << 1];
079
080    private final Queue<Long> polledHandles = new ConcurrentLinkedQueue<Long>();
081
082    private final Set<Long> failedHandles = new HashSet<Long>(POLLSET_SIZE);
083
084    private volatile ByteBuffer dummyBuffer;
085
086    /**
087     * Create an {@link AprSocketConnector} with default configuration (multiple thread model).
088     */
089    public AprSocketConnector() {
090        super(new DefaultSocketSessionConfig(), AprIoProcessor.class);
091        ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
092    }
093
094    /**
095     * Constructor for {@link AprSocketConnector} with default configuration, and 
096     * given number of {@link AprIoProcessor} for multithreading I/O operations
097     * @param processorCount the number of processor to create and place in a
098     * {@link SimpleIoProcessorPool} 
099     */
100    public AprSocketConnector(int processorCount) {
101        super(new DefaultSocketSessionConfig(), AprIoProcessor.class, processorCount);
102        ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
103    }
104
105    /**
106     *  Constructor for {@link AprSocketConnector} with default configuration but a
107     *  specific {@link IoProcessor}, useful for sharing the same processor over multiple
108     *  {@link IoService} of the same type.
109     * @param processor the processor to use for managing I/O events
110     */
111    public AprSocketConnector(IoProcessor<AprSession> processor) {
112        super(new DefaultSocketSessionConfig(), processor);
113        ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
114    }
115
116    /**
117     *  Constructor for {@link AprSocketConnector} with a given {@link Executor} for handling 
118     *  connection events and a given {@link IoProcessor} for handling I/O events, useful for sharing 
119     *  the same processor and executor over multiple {@link IoService} of the same type.
120     * @param executor the executor for connection
121     * @param processor the processor for I/O operations
122     */
123    public AprSocketConnector(Executor executor, IoProcessor<AprSession> processor) {
124        super(new DefaultSocketSessionConfig(), executor, processor);
125        ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
126    }
127
128    /**
129     * {@inheritDoc}
130     */
131    @Override
132    protected void init() throws Exception {
133        // initialize a memory pool for APR functions
134        pool = Pool.create(AprLibrary.getInstance().getRootPool());
135
136        wakeupSocket = Socket.create(Socket.APR_INET, Socket.SOCK_DGRAM, Socket.APR_PROTO_UDP, pool);
137
138        dummyBuffer = Pool.alloc(pool, 1);
139
140        pollset = Poll.create(POLLSET_SIZE, pool, Poll.APR_POLLSET_THREADSAFE, Long.MAX_VALUE);
141
142        if (pollset <= 0) {
143            pollset = Poll.create(62, pool, Poll.APR_POLLSET_THREADSAFE, Long.MAX_VALUE);
144        }
145
146        if (pollset <= 0) {
147            if (Status.APR_STATUS_IS_ENOTIMPL(-(int) pollset)) {
148                throw new RuntimeIoException("Thread-safe pollset is not supported in this platform.");
149            }
150        }
151    }
152
153    /**
154     * {@inheritDoc}
155     */
156    @Override
157    protected void destroy() throws Exception {
158        if (wakeupSocket > 0) {
159            Socket.close(wakeupSocket);
160        }
161        if (pollset > 0) {
162            Poll.destroy(pollset);
163        }
164        if (pool > 0) {
165            Pool.destroy(pool);
166        }
167    }
168
169    /**
170     * {@inheritDoc}
171     */
172    @Override
173    protected Iterator<Long> allHandles() {
174        return polledHandles.iterator();
175    }
176
177    /**
178     * {@inheritDoc}
179     */
180    @Override
181    protected boolean connect(Long handle, SocketAddress remoteAddress) throws Exception {
182        InetSocketAddress ra = (InetSocketAddress) remoteAddress;
183        long sa;
184        if (ra != null) {
185            if (ra.getAddress() == null) {
186                sa = Address.info(Address.APR_ANYADDR, Socket.APR_INET, ra.getPort(), 0, pool);
187            } else {
188                sa = Address.info(ra.getAddress().getHostAddress(), Socket.APR_INET, ra.getPort(), 0, pool);
189            }
190        } else {
191            sa = Address.info(Address.APR_ANYADDR, Socket.APR_INET, 0, 0, pool);
192        }
193
194        int rv = Socket.connect(handle, sa);
195        if (rv == Status.APR_SUCCESS) {
196            return true;
197        }
198
199        if (Status.APR_STATUS_IS_EINPROGRESS(rv)) {
200            return false;
201        }
202
203        throwException(rv);
204        throw new InternalError(); // This sentence will never be executed.
205    }
206
207    /**
208     * {@inheritDoc}
209     */
210    @Override
211    protected ConnectionRequest getConnectionRequest(Long handle) {
212        return requests.get(handle);
213    }
214
215    /**
216     * {@inheritDoc}
217     */
218    @Override
219    protected void close(Long handle) throws Exception {
220        finishConnect(handle);
221        int rv = Socket.close(handle);
222        if (rv != Status.APR_SUCCESS) {
223            throwException(rv);
224        }
225    }
226
227    /**
228     * {@inheritDoc}
229     */
230    @Override
231    protected boolean finishConnect(Long handle) throws Exception {
232        Poll.remove(pollset, handle);
233        requests.remove(handle);
234        if (failedHandles.remove(handle)) {
235            int rv = Socket.recvb(handle, dummyBuffer, 0, 1);
236            throwException(rv);
237            throw new InternalError("Shouldn't reach here.");
238        }
239        return true;
240    }
241
242    /**
243     * {@inheritDoc}
244     */
245    @Override
246    protected Long newHandle(SocketAddress localAddress) throws Exception {
247        long handle = Socket.create(Socket.APR_INET, Socket.SOCK_STREAM, Socket.APR_PROTO_TCP, pool);
248        boolean success = false;
249        try {
250            int result = Socket.optSet(handle, Socket.APR_SO_NONBLOCK, 1);
251            if (result != Status.APR_SUCCESS) {
252                throwException(result);
253            }
254            result = Socket.timeoutSet(handle, 0);
255            if (result != Status.APR_SUCCESS) {
256                throwException(result);
257            }
258
259            if (localAddress != null) {
260                InetSocketAddress la = (InetSocketAddress) localAddress;
261                long sa;
262
263                if (la.getAddress() == null) {
264                    sa = Address.info(Address.APR_ANYADDR, Socket.APR_INET, la.getPort(), 0, pool);
265                } else {
266                    sa = Address.info(la.getAddress().getHostAddress(), Socket.APR_INET, la.getPort(), 0, pool);
267                }
268
269                result = Socket.bind(handle, sa);
270                if (result != Status.APR_SUCCESS) {
271                    throwException(result);
272                }
273            }
274
275            success = true;
276            return handle;
277        } finally {
278            if (!success) {
279                int rv = Socket.close(handle);
280                if (rv != Status.APR_SUCCESS) {
281                    throwException(rv);
282                }
283            }
284        }
285    }
286
287    /**
288     * {@inheritDoc}
289     */
290    @Override
291    protected AprSession newSession(IoProcessor<AprSession> processor, Long handle) throws Exception {
292        return new AprSocketSession(this, processor, handle);
293    }
294
295    /**
296     * {@inheritDoc}
297     */
298    @Override
299    protected void register(Long handle, ConnectionRequest request) throws Exception {
300        int rv = Poll.add(pollset, handle, Poll.APR_POLLOUT);
301        if (rv != Status.APR_SUCCESS) {
302            throwException(rv);
303        }
304
305        requests.put(handle, request);
306    }
307
308    /**
309     * {@inheritDoc}
310     */
311    @Override
312    protected int select(int timeout) throws Exception {
313        int rv = Poll.poll(pollset, timeout * 1000, polledSockets, false);
314        if (rv <= 0) {
315            if (rv != APR_TIMEUP_ERROR) {
316                throwException(rv);
317            }
318
319            rv = Poll.maintain(pollset, polledSockets, true);
320            if (rv > 0) {
321                for (int i = 0; i < rv; i++) {
322                    Poll.add(pollset, polledSockets[i], Poll.APR_POLLOUT);
323                }
324            } else if (rv < 0) {
325                throwException(rv);
326            }
327
328            return 0;
329        } else {
330            rv <<= 1;
331            if (!polledHandles.isEmpty()) {
332                polledHandles.clear();
333            }
334
335            for (int i = 0; i < rv; i++) {
336                long flag = polledSockets[i];
337                long socket = polledSockets[++i];
338                if (socket == wakeupSocket) {
339                    synchronized (wakeupLock) {
340                        Poll.remove(pollset, wakeupSocket);
341                        toBeWakenUp = false;
342                    }
343                    continue;
344                }
345                polledHandles.add(socket);
346                if ((flag & Poll.APR_POLLOUT) == 0) {
347                    failedHandles.add(socket);
348                }
349            }
350            return polledHandles.size();
351        }
352    }
353
354    /**
355     * {@inheritDoc}
356     */
357    @Override
358    protected Iterator<Long> selectedHandles() {
359        return polledHandles.iterator();
360    }
361
362    /**
363     * {@inheritDoc}
364     */
365    @Override
366    protected void wakeup() {
367        if (toBeWakenUp) {
368            return;
369        }
370
371        // Add a dummy socket to the pollset.
372        synchronized (wakeupLock) {
373            toBeWakenUp = true;
374            Poll.add(pollset, wakeupSocket, Poll.APR_POLLOUT);
375        }
376    }
377
378    /**
379     * {@inheritDoc}
380     */
381    public TransportMetadata getTransportMetadata() {
382        return AprSocketSession.METADATA;
383    }
384
385    /**
386     * {@inheritDoc}
387     */
388    public SocketSessionConfig getSessionConfig() {
389        return (SocketSessionConfig) sessionConfig;
390    }
391
392    /**
393     * {@inheritDoc}
394     */
395    @Override
396    public InetSocketAddress getDefaultRemoteAddress() {
397        return (InetSocketAddress) super.getDefaultRemoteAddress();
398    }
399
400    /**
401     * {@inheritDoc}
402     */
403    public void setDefaultRemoteAddress(InetSocketAddress defaultRemoteAddress) {
404        super.setDefaultRemoteAddress(defaultRemoteAddress);
405    }
406
407    /**
408     * transform an APR error number in a more fancy exception
409     * @param code APR error code
410     * @throws IOException the produced exception for the given APR error number
411     */
412    private void throwException(int code) throws IOException {
413        throw new IOException(org.apache.tomcat.jni.Error.strerror(-code) + " (code: " + code + ")");
414    }
415}