View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.transport.socket.apr;
21  
22  import java.io.IOException;
23  import java.nio.ByteBuffer;
24  import java.util.HashMap;
25  import java.util.Iterator;
26  import java.util.Map;
27  import java.util.Queue;
28  import java.util.concurrent.ConcurrentLinkedQueue;
29  import java.util.concurrent.Executor;
30  
31  import org.apache.mina.core.RuntimeIoException;
32  import org.apache.mina.core.buffer.IoBuffer;
33  import org.apache.mina.core.file.FileRegion;
34  import org.apache.mina.core.polling.AbstractPollingIoProcessor;
35  import org.apache.mina.core.session.SessionState;
36  import org.apache.tomcat.jni.File;
37  import org.apache.tomcat.jni.Poll;
38  import org.apache.tomcat.jni.Pool;
39  import org.apache.tomcat.jni.Socket;
40  import org.apache.tomcat.jni.Status;
41  
42  /**
43   * The class in charge of processing socket level IO events for the
44   * {@link AprSocketConnector}
45   * 
46   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
47   */
48  public final class AprIoProcessor extends AbstractPollingIoProcessor<AprSession> {
49      private static final int POLLSET_SIZE = 1024;
50  
51      private final Map<Long, AprSession> allSessions = new HashMap<Long, AprSession>(POLLSET_SIZE);
52  
53      private final Object wakeupLock = new Object();
54      private final long wakeupSocket;
55      private volatile boolean toBeWakenUp;
56  
57      private final long pool;
58      private final long bufferPool; // memory pool
59      private final long pollset; // socket poller
60      private final long[] polledSockets = new long[POLLSET_SIZE << 1];
61      private final Queue<AprSession> polledSessions = new ConcurrentLinkedQueue<AprSession>();
62  
63      /**
64       * Create a new instance of {@link AprIoProcessor} with a given Exector for
65       * handling I/Os events.
66       * 
67       * @param executor
68       *            the {@link Executor} for handling I/O events
69       */
70      public AprIoProcessor(Executor executor) {
71          super(executor);
72  
73          // initialize a memory pool for APR functions
74          pool = Pool.create(AprLibrary.getInstance().getRootPool());
75          bufferPool = Pool.create(AprLibrary.getInstance().getRootPool());
76  
77          try {
78              wakeupSocket = Socket.create(Socket.APR_INET, Socket.SOCK_DGRAM, Socket.APR_PROTO_UDP, pool);
79          } catch (RuntimeException e) {
80              throw e;
81          } catch (Error e) {
82              throw e;
83          } catch (Exception e) {
84              throw new RuntimeIoException("Failed to create a wakeup socket.", e);
85          }
86  
87          boolean success = false;
88          long newPollset;
89          try {
90              newPollset = Poll.create(POLLSET_SIZE, pool, Poll.APR_POLLSET_THREADSAFE, Long.MAX_VALUE);
91  
92              if (newPollset == 0) {
93                  newPollset = Poll.create(62, pool, Poll.APR_POLLSET_THREADSAFE, Long.MAX_VALUE);
94              }
95  
96              pollset = newPollset;
97              if (pollset < 0) {
98                  if (Status.APR_STATUS_IS_ENOTIMPL(-(int) pollset)) {
99                      throw new RuntimeIoException("Thread-safe pollset is not supported in this platform.");
100                 }
101             }
102             success = true;
103         } catch (RuntimeException e) {
104             throw e;
105         } catch (Error e) {
106             throw e;
107         } catch (Exception e) {
108             throw new RuntimeIoException("Failed to create a pollset.", e);
109         } finally {
110             if (!success) {
111                 dispose();
112             }
113         }
114     }
115 
116     /**
117      * {@inheritDoc}
118      */
119     @Override
120     protected void dispose0() {
121         Poll.destroy(pollset);
122         Socket.close(wakeupSocket);
123         Pool.destroy(bufferPool);
124         Pool.destroy(pool);
125     }
126 
127     /**
128      * {@inheritDoc}
129      */
130     @Override
131     protected int select() throws Exception {
132         return select(Integer.MAX_VALUE);
133     }
134 
135     /**
136      * {@inheritDoc}
137      */
138     @Override
139     protected int select(long timeout) throws Exception {
140         int rv = Poll.poll(pollset, 1000 * timeout, polledSockets, false);
141         if (rv <= 0) {
142             if (rv != -120001) {
143                 throwException(rv);
144             }
145 
146             rv = Poll.maintain(pollset, polledSockets, true);
147             if (rv > 0) {
148                 for (int i = 0; i < rv; i++) {
149                     long socket = polledSockets[i];
150                     AprSession session = allSessions.get(socket);
151                     if (session == null) {
152                         continue;
153                     }
154 
155                     int flag = (session.isInterestedInRead() ? Poll.APR_POLLIN : 0)
156                             | (session.isInterestedInWrite() ? Poll.APR_POLLOUT : 0);
157 
158                     Poll.add(pollset, socket, flag);
159                 }
160             } else if (rv < 0) {
161                 throwException(rv);
162             }
163 
164             return 0;
165         } else {
166             rv <<= 1;
167             if (!polledSessions.isEmpty()) {
168                 polledSessions.clear();
169             }
170             for (int i = 0; i < rv; i++) {
171                 long flag = polledSockets[i];
172                 long socket = polledSockets[++i];
173                 if (socket == wakeupSocket) {
174                     synchronized (wakeupLock) {
175                         Poll.remove(pollset, wakeupSocket);
176                         toBeWakenUp = false;
177                     }
178                     continue;
179                 }
180                 AprSession session = allSessions.get(socket);
181                 if (session == null) {
182                     continue;
183                 }
184 
185                 session.setReadable((flag & Poll.APR_POLLIN) != 0);
186                 session.setWritable((flag & Poll.APR_POLLOUT) != 0);
187 
188                 polledSessions.add(session);
189             }
190 
191             return polledSessions.size();
192         }
193     }
194 
195     /**
196      * {@inheritDoc}
197      */
198     @Override
199     protected boolean isSelectorEmpty() {
200         return allSessions.isEmpty();
201     }
202 
203     /**
204      * {@inheritDoc}
205      */
206     @Override
207     protected void wakeup() {
208         if (toBeWakenUp) {
209             return;
210         }
211 
212         // Add a dummy socket to the pollset.
213         synchronized (wakeupLock) {
214             toBeWakenUp = true;
215             Poll.add(pollset, wakeupSocket, Poll.APR_POLLOUT);
216         }
217     }
218 
219     /**
220      * {@inheritDoc}
221      */
222     @Override
223     protected Iterator<AprSession> allSessions() {
224         return allSessions.values().iterator();
225     }
226 
227     /**
228      * {@inheritDoc}
229      */
230     @Override
231     protected Iterator<AprSession> selectedSessions() {
232         return polledSessions.iterator();
233     }
234 
235     @Override
236     protected void init(AprSession session) throws Exception {
237         long s = session.getDescriptor();
238         Socket.optSet(s, Socket.APR_SO_NONBLOCK, 1);
239         Socket.timeoutSet(s, 0);
240 
241         int rv = Poll.add(pollset, s, Poll.APR_POLLIN);
242         if (rv != Status.APR_SUCCESS) {
243             throwException(rv);
244         }
245 
246         session.setInterestedInRead(true);
247         allSessions.put(s, session);
248     }
249 
250     /**
251      * {@inheritDoc}
252      */
253     @Override
254     protected void destroy(AprSession session) throws Exception {
255         if (allSessions.remove(session.getDescriptor()) == null) {
256             // Already destroyed.
257             return;
258         }
259 
260         int ret = Poll.remove(pollset, session.getDescriptor());
261         try {
262             if (ret != Status.APR_SUCCESS) {
263                 throwException(ret);
264             }
265         } finally {
266             ret = Socket.close(session.getDescriptor());
267 
268             // destroying the session because it won't be reused
269             // after this point
270             Socket.destroy(session.getDescriptor());
271             session.setDescriptor(0);
272 
273             if (ret != Status.APR_SUCCESS) {
274                 throwException(ret);
275             }
276         }
277     }
278 
279     /**
280      * {@inheritDoc}
281      */
282     @Override
283     protected SessionState getState(AprSession session) {
284         long socket = session.getDescriptor();
285 
286         if (socket != 0) {
287             return SessionState.OPENED;
288         } else if (allSessions.get(socket) != null) {
289             return SessionState.OPENING; // will occur ?
290         } else {
291             return SessionState.CLOSING;
292         }
293     }
294 
295     /**
296      * {@inheritDoc}
297      */
298     @Override
299     protected boolean isReadable(AprSession session) {
300         return session.isReadable();
301     }
302 
303     /**
304      * {@inheritDoc}
305      */
306     @Override
307     protected boolean isWritable(AprSession session) {
308         return session.isWritable();
309     }
310 
311     /**
312      * {@inheritDoc}
313      */
314     @Override
315     protected boolean isInterestedInRead(AprSession session) {
316         return session.isInterestedInRead();
317     }
318 
319     /**
320      * {@inheritDoc}
321      */
322     @Override
323     protected boolean isInterestedInWrite(AprSession session) {
324         return session.isInterestedInWrite();
325     }
326 
327     /**
328      * {@inheritDoc}
329      */
330     @Override
331     protected void setInterestedInRead(AprSession session, boolean isInterested) throws Exception {
332         if (session.isInterestedInRead() == isInterested) {
333             return;
334         }
335 
336         int rv = Poll.remove(pollset, session.getDescriptor());
337 
338         if (rv != Status.APR_SUCCESS) {
339             throwException(rv);
340         }
341 
342         int flags = (isInterested ? Poll.APR_POLLIN : 0) | (session.isInterestedInWrite() ? Poll.APR_POLLOUT : 0);
343 
344         rv = Poll.add(pollset, session.getDescriptor(), flags);
345 
346         if (rv == Status.APR_SUCCESS) {
347             session.setInterestedInRead(isInterested);
348         } else {
349             throwException(rv);
350         }
351     }
352 
353     /**
354      * {@inheritDoc}
355      */
356     @Override
357     protected void setInterestedInWrite(AprSession session, boolean isInterested) throws Exception {
358         if (session.isInterestedInWrite() == isInterested) {
359             return;
360         }
361 
362         int rv = Poll.remove(pollset, session.getDescriptor());
363 
364         if (rv != Status.APR_SUCCESS) {
365             throwException(rv);
366         }
367 
368         int flags = (session.isInterestedInRead() ? Poll.APR_POLLIN : 0) | (isInterested ? Poll.APR_POLLOUT : 0);
369 
370         rv = Poll.add(pollset, session.getDescriptor(), flags);
371 
372         if (rv == Status.APR_SUCCESS) {
373             session.setInterestedInWrite(isInterested);
374         } else {
375             throwException(rv);
376         }
377     }
378 
379     /**
380      * {@inheritDoc}
381      */
382     @Override
383     protected int read(AprSession session, IoBuffer buffer) throws Exception {
384         int bytes;
385         int capacity = buffer.remaining();
386         // Using Socket.recv() directly causes memory leak. :-(
387         ByteBuffer b = Pool.alloc(bufferPool, capacity);
388 
389         try {
390             bytes = Socket.recvb(session.getDescriptor(), b, 0, capacity);
391 
392             if (bytes > 0) {
393                 b.position(0);
394                 b.limit(bytes);
395                 buffer.put(b);
396             } else if (bytes < 0) {
397                 if (Status.APR_STATUS_IS_EOF(-bytes)) {
398                     bytes = -1;
399                 } else if (Status.APR_STATUS_IS_EAGAIN(-bytes)) {
400                     bytes = 0;
401                 } else {
402                     throwException(bytes);
403                 }
404             }
405         } finally {
406             Pool.clear(bufferPool);
407         }
408 
409         return bytes;
410     }
411 
412     /**
413      * {@inheritDoc}
414      */
415     @Override
416     protected int write(AprSession session, IoBuffer buf, int length) throws Exception {
417         int writtenBytes;
418         if (buf.isDirect()) {
419             writtenBytes = Socket.sendb(session.getDescriptor(), buf.buf(), buf.position(), length);
420         } else {
421             writtenBytes = Socket.send(session.getDescriptor(), buf.array(), buf.position(), length);
422             if (writtenBytes > 0) {
423                 buf.skip(writtenBytes);
424             }
425         }
426 
427         if (writtenBytes < 0) {
428             if (Status.APR_STATUS_IS_EAGAIN(-writtenBytes)) {
429                 writtenBytes = 0;
430             } else if (Status.APR_STATUS_IS_EOF(-writtenBytes)) {
431                 writtenBytes = 0;
432             } else {
433                 throwException(writtenBytes);
434             }
435         }
436         return writtenBytes;
437     }
438 
439     /**
440      * {@inheritDoc}
441      */
442     @Override
443     protected int transferFile(AprSession session, FileRegion region, int length) throws Exception {
444         if (region.getFilename() == null) {
445             throw new UnsupportedOperationException();
446         }
447 
448         long fd = File.open(region.getFilename(),
449                             File.APR_FOPEN_READ
450                                 | File.APR_FOPEN_SENDFILE_ENABLED
451                                 | File.APR_FOPEN_BINARY,
452                             0,
453                             Socket.pool(session.getDescriptor()));
454         long numWritten = Socket.sendfilen(session.getDescriptor(), fd, region.getPosition(), length, 0);
455         File.close(fd);
456 
457         if (numWritten < 0) {
458             if (numWritten == -Status.EAGAIN) {
459                 return 0;
460             }
461             throw new IOException(org.apache.tomcat.jni.Error.strerror((int) -numWritten) + " (code: " + numWritten + ")");
462         }
463         return (int) numWritten;
464     }
465 
466     private void throwException(int code) throws IOException {
467         throw new IOException(org.apache.tomcat.jni.Error.strerror(-code) + " (code: " + code + ")");
468     }
469 
470     /**
471      * {@inheritDoc}
472      */
473     protected void registerNewSelector() {
474         // Do nothing
475     }
476 
477     /**
478      * {@inheritDoc}
479      */
480     protected boolean isBrokenConnection() throws IOException {
481         // Here, we assume that this is the case.
482         return true;
483     }
484 }