001/*
002 *  Licensed to the Apache Software Foundation (ASF) under one
003 *  or more contributor license agreements.  See the NOTICE file
004 *  distributed with this work for additional information
005 *  regarding copyright ownership.  The ASF licenses this file
006 *  to you under the Apache License, Version 2.0 (the
007 *  "License"); you may not use this file except in compliance
008 *  with the License.  You may obtain a copy of the License at
009 *
010 *    http://www.apache.org/licenses/LICENSE-2.0
011 *
012 *  Unless required by applicable law or agreed to in writing,
013 *  software distributed under the License is distributed on an
014 *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 *  KIND, either express or implied.  See the License for the
016 *  specific language governing permissions and limitations
017 *  under the License.
018 *
019 */
020package org.apache.mina.transport.socket.apr;
021
022import java.io.IOException;
023import java.nio.ByteBuffer;
024import java.util.HashMap;
025import java.util.Iterator;
026import java.util.Map;
027import java.util.Queue;
028import java.util.concurrent.ConcurrentLinkedQueue;
029import java.util.concurrent.Executor;
030
031import org.apache.mina.core.RuntimeIoException;
032import org.apache.mina.core.buffer.IoBuffer;
033import org.apache.mina.core.file.FileRegion;
034import org.apache.mina.core.polling.AbstractPollingIoProcessor;
035import org.apache.mina.core.session.SessionState;
036import org.apache.tomcat.jni.File;
037import org.apache.tomcat.jni.Poll;
038import org.apache.tomcat.jni.Pool;
039import org.apache.tomcat.jni.Socket;
040import org.apache.tomcat.jni.Status;
041
042/**
043 * The class in charge of processing socket level IO events for the
044 * {@link AprSocketConnector}
045 *
046 * @author <a href="http://mina.apache.org">Apache MINA Project</a>
047 */
048public final class AprIoProcessor extends AbstractPollingIoProcessor<AprSession> {
049    private static final int POLLSET_SIZE = 1024;
050
051    private final Map<Long, AprSession> allSessions = new HashMap<Long, AprSession>(POLLSET_SIZE);
052
053    private final Object wakeupLock = new Object();
054
055    private final long wakeupSocket;
056
057    private volatile boolean toBeWakenUp;
058
059    private final long pool;
060
061    private final long bufferPool; // memory pool
062
063    private final long pollset; // socket poller
064
065    private final long[] polledSockets = new long[POLLSET_SIZE << 1];
066
067    private final Queue<AprSession> polledSessions = new ConcurrentLinkedQueue<AprSession>();
068
069    /**
070     * Create a new instance of {@link AprIoProcessor} with a given Exector for
071     * handling I/Os events.
072     *
073     * @param executor
074     *            the {@link Executor} for handling I/O events
075     */
076    public AprIoProcessor(Executor executor) {
077        super(executor);
078
079        // initialize a memory pool for APR functions
080        pool = Pool.create(AprLibrary.getInstance().getRootPool());
081        bufferPool = Pool.create(AprLibrary.getInstance().getRootPool());
082
083        try {
084            wakeupSocket = Socket.create(Socket.APR_INET, Socket.SOCK_DGRAM, Socket.APR_PROTO_UDP, pool);
085        } catch (RuntimeException e) {
086            throw e;
087        } catch (Error e) {
088            throw e;
089        } catch (Exception e) {
090            throw new RuntimeIoException("Failed to create a wakeup socket.", e);
091        }
092
093        boolean success = false;
094        long newPollset;
095        try {
096            newPollset = Poll.create(POLLSET_SIZE, pool, Poll.APR_POLLSET_THREADSAFE, Long.MAX_VALUE);
097
098            if (newPollset == 0) {
099                newPollset = Poll.create(62, pool, Poll.APR_POLLSET_THREADSAFE, Long.MAX_VALUE);
100            }
101
102            pollset = newPollset;
103            if (pollset < 0) {
104                if (Status.APR_STATUS_IS_ENOTIMPL(-(int) pollset)) {
105                    throw new RuntimeIoException("Thread-safe pollset is not supported in this platform.");
106                }
107            }
108            success = true;
109        } catch (RuntimeException e) {
110            throw e;
111        } catch (Error e) {
112            throw e;
113        } catch (Exception e) {
114            throw new RuntimeIoException("Failed to create a pollset.", e);
115        } finally {
116            if (!success) {
117                dispose();
118            }
119        }
120    }
121
122    /**
123     * {@inheritDoc}
124     */
125    @Override
126    protected void doDispose() {
127        Poll.destroy(pollset);
128        Socket.close(wakeupSocket);
129        Pool.destroy(bufferPool);
130        Pool.destroy(pool);
131    }
132
133    /**
134     * {@inheritDoc}
135     */
136    @Override
137    protected int select() throws Exception {
138        return select(Integer.MAX_VALUE);
139    }
140
141    /**
142     * {@inheritDoc}
143     */
144    @Override
145    protected int select(long timeout) throws Exception {
146        int rv = Poll.poll(pollset, 1000 * timeout, polledSockets, false);
147        if (rv <= 0) {
148            if (rv != -120001) {
149                throwException(rv);
150            }
151
152            rv = Poll.maintain(pollset, polledSockets, true);
153            if (rv > 0) {
154                for (int i = 0; i < rv; i++) {
155                    long socket = polledSockets[i];
156                    AprSession session = allSessions.get(socket);
157                    if (session == null) {
158                        continue;
159                    }
160
161                    int flag = (session.isInterestedInRead() ? Poll.APR_POLLIN : 0)
162                            | (session.isInterestedInWrite() ? Poll.APR_POLLOUT : 0);
163
164                    Poll.add(pollset, socket, flag);
165                }
166            } else if (rv < 0) {
167                throwException(rv);
168            }
169
170            return 0;
171        } else {
172            rv <<= 1;
173            if (!polledSessions.isEmpty()) {
174                polledSessions.clear();
175            }
176            for (int i = 0; i < rv; i++) {
177                long flag = polledSockets[i];
178                long socket = polledSockets[++i];
179                if (socket == wakeupSocket) {
180                    synchronized (wakeupLock) {
181                        Poll.remove(pollset, wakeupSocket);
182                        toBeWakenUp = false;
183                        wakeupCalled.set(true);
184                    }
185                    continue;
186                }
187                AprSession session = allSessions.get(socket);
188                if (session == null) {
189                    continue;
190                }
191
192                session.setReadable((flag & Poll.APR_POLLIN) != 0);
193                session.setWritable((flag & Poll.APR_POLLOUT) != 0);
194
195                polledSessions.add(session);
196            }
197
198            return polledSessions.size();
199        }
200    }
201
202    /**
203     * {@inheritDoc}
204     */
205    @Override
206    protected boolean isSelectorEmpty() {
207        return allSessions.isEmpty();
208    }
209
210    /**
211     * {@inheritDoc}
212     */
213    @Override
214    protected void wakeup() {
215        if (toBeWakenUp) {
216            return;
217        }
218
219        // Add a dummy socket to the pollset.
220        synchronized (wakeupLock) {
221            toBeWakenUp = true;
222            Poll.add(pollset, wakeupSocket, Poll.APR_POLLOUT);
223        }
224    }
225
226    /**
227     * {@inheritDoc}
228     */
229    @Override
230    protected Iterator<AprSession> allSessions() {
231        return allSessions.values().iterator();
232    }
233
234    /**
235     * {@inheritDoc}
236     */
237    @Override
238    protected Iterator<AprSession> selectedSessions() {
239        return polledSessions.iterator();
240    }
241
242    @Override
243    protected void init(AprSession session) throws Exception {
244        long s = session.getDescriptor();
245        Socket.optSet(s, Socket.APR_SO_NONBLOCK, 1);
246        Socket.timeoutSet(s, 0);
247
248        int rv = Poll.add(pollset, s, Poll.APR_POLLIN);
249        if (rv != Status.APR_SUCCESS) {
250            throwException(rv);
251        }
252
253        session.setInterestedInRead(true);
254        allSessions.put(s, session);
255    }
256
257    /**
258     * {@inheritDoc}
259     */
260    @Override
261    protected void destroy(AprSession session) throws Exception {
262        if (allSessions.remove(session.getDescriptor()) == null) {
263            // Already destroyed.
264            return;
265        }
266
267        int ret = Poll.remove(pollset, session.getDescriptor());
268        try {
269            if (ret != Status.APR_SUCCESS) {
270                throwException(ret);
271            }
272        } finally {
273            ret = Socket.close(session.getDescriptor());
274
275            // destroying the session because it won't be reused
276            // after this point
277            Socket.destroy(session.getDescriptor());
278            session.setDescriptor(0);
279
280            if (ret != Status.APR_SUCCESS) {
281                throwException(ret);
282            }
283        }
284    }
285
286    /**
287     * {@inheritDoc}
288     */
289    @Override
290    protected SessionState getState(AprSession session) {
291        long socket = session.getDescriptor();
292
293        if (socket != 0) {
294            return SessionState.OPENED;
295        } else if (allSessions.get(socket) != null) {
296            return SessionState.OPENING; // will occur ?
297        } else {
298            return SessionState.CLOSING;
299        }
300    }
301
302    /**
303     * {@inheritDoc}
304     */
305    @Override
306    protected boolean isReadable(AprSession session) {
307        return session.isReadable();
308    }
309
310    /**
311     * {@inheritDoc}
312     */
313    @Override
314    protected boolean isWritable(AprSession session) {
315        return session.isWritable();
316    }
317
318    /**
319     * {@inheritDoc}
320     */
321    @Override
322    protected boolean isInterestedInRead(AprSession session) {
323        return session.isInterestedInRead();
324    }
325
326    /**
327     * {@inheritDoc}
328     */
329    @Override
330    protected boolean isInterestedInWrite(AprSession session) {
331        return session.isInterestedInWrite();
332    }
333
334    /**
335     * {@inheritDoc}
336     */
337    @Override
338    protected void setInterestedInRead(AprSession session, boolean isInterested) throws Exception {
339        if (session.isInterestedInRead() == isInterested) {
340            return;
341        }
342
343        int rv = Poll.remove(pollset, session.getDescriptor());
344
345        if (rv != Status.APR_SUCCESS) {
346            throwException(rv);
347        }
348
349        int flags = (isInterested ? Poll.APR_POLLIN : 0) | (session.isInterestedInWrite() ? Poll.APR_POLLOUT : 0);
350
351        rv = Poll.add(pollset, session.getDescriptor(), flags);
352
353        if (rv == Status.APR_SUCCESS) {
354            session.setInterestedInRead(isInterested);
355        } else {
356            throwException(rv);
357        }
358    }
359
360    /**
361     * {@inheritDoc}
362     */
363    @Override
364    protected void setInterestedInWrite(AprSession session, boolean isInterested) throws Exception {
365        if (session.isInterestedInWrite() == isInterested) {
366            return;
367        }
368
369        int rv = Poll.remove(pollset, session.getDescriptor());
370
371        if (rv != Status.APR_SUCCESS) {
372            throwException(rv);
373        }
374
375        int flags = (session.isInterestedInRead() ? Poll.APR_POLLIN : 0) | (isInterested ? Poll.APR_POLLOUT : 0);
376
377        rv = Poll.add(pollset, session.getDescriptor(), flags);
378
379        if (rv == Status.APR_SUCCESS) {
380            session.setInterestedInWrite(isInterested);
381        } else {
382            throwException(rv);
383        }
384    }
385
386    /**
387     * {@inheritDoc}
388     */
389    @Override
390    protected int read(AprSession session, IoBuffer buffer) throws Exception {
391        int bytes;
392        int capacity = buffer.remaining();
393        // Using Socket.recv() directly causes memory leak. :-(
394        ByteBuffer b = Pool.alloc(bufferPool, capacity);
395
396        try {
397            bytes = Socket.recvb(session.getDescriptor(), b, 0, capacity);
398
399            if (bytes > 0) {
400                b.position(0);
401                b.limit(bytes);
402                buffer.put(b);
403            } else if (bytes < 0) {
404                if (Status.APR_STATUS_IS_EOF(-bytes)) {
405                    bytes = -1;
406                } else if (Status.APR_STATUS_IS_EAGAIN(-bytes)) {
407                    bytes = 0;
408                } else {
409                    throwException(bytes);
410                }
411            }
412        } finally {
413            Pool.clear(bufferPool);
414        }
415
416        return bytes;
417    }
418
419    /**
420     * {@inheritDoc}
421     */
422    @Override
423    protected int write(AprSession session, IoBuffer buf, int length) throws IOException {
424        int writtenBytes;
425        if (buf.isDirect()) {
426            writtenBytes = Socket.sendb(session.getDescriptor(), buf.buf(), buf.position(), length);
427        } else {
428            writtenBytes = Socket.send(session.getDescriptor(), buf.array(), buf.position(), length);
429            if (writtenBytes > 0) {
430                buf.skip(writtenBytes);
431            }
432        }
433
434        if (writtenBytes < 0) {
435            if (Status.APR_STATUS_IS_EAGAIN(-writtenBytes)) {
436                writtenBytes = 0;
437            } else if (Status.APR_STATUS_IS_EOF(-writtenBytes)) {
438                writtenBytes = 0;
439            } else {
440                throwException(writtenBytes);
441            }
442        }
443        return writtenBytes;
444    }
445
446    /**
447     * {@inheritDoc}
448     */
449    @Override
450    protected int transferFile(AprSession session, FileRegion region, int length) throws Exception {
451        if (region.getFilename() == null) {
452            throw new UnsupportedOperationException();
453        }
454
455        long fd = File.open(region.getFilename(), File.APR_FOPEN_READ | File.APR_FOPEN_SENDFILE_ENABLED
456                | File.APR_FOPEN_BINARY, 0, Socket.pool(session.getDescriptor()));
457        long numWritten = Socket.sendfilen(session.getDescriptor(), fd, region.getPosition(), length, 0);
458        File.close(fd);
459
460        if (numWritten < 0) {
461            if (numWritten == -Status.EAGAIN) {
462                return 0;
463            }
464            throw new IOException(org.apache.tomcat.jni.Error.strerror((int) -numWritten) + " (code: " + numWritten
465                    + ")");
466        }
467        return (int) numWritten;
468    }
469
470    private void throwException(int code) throws IOException {
471        throw new IOException(org.apache.tomcat.jni.Error.strerror(-code) + " (code: " + code + ")");
472    }
473
474    /**
475     * {@inheritDoc}
476     */
477    @Override
478    protected void registerNewSelector() {
479        // Do nothing
480    }
481
482    /**
483     * {@inheritDoc}
484     */
485    @Override
486    protected boolean isBrokenConnection() throws IOException {
487        // Here, we assume that this is the case.
488        return true;
489    }
490}