View Javadoc
1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.transport.socket.apr;
21  
22  import java.io.IOException;
23  import java.nio.ByteBuffer;
24  import java.util.HashMap;
25  import java.util.Iterator;
26  import java.util.Map;
27  import java.util.Queue;
28  import java.util.concurrent.ConcurrentLinkedQueue;
29  import java.util.concurrent.Executor;
30  
31  import org.apache.mina.core.RuntimeIoException;
32  import org.apache.mina.core.buffer.IoBuffer;
33  import org.apache.mina.core.file.FileRegion;
34  import org.apache.mina.core.polling.AbstractPollingIoProcessor;
35  import org.apache.mina.core.session.SessionState;
36  import org.apache.tomcat.jni.File;
37  import org.apache.tomcat.jni.Poll;
38  import org.apache.tomcat.jni.Pool;
39  import org.apache.tomcat.jni.Socket;
40  import org.apache.tomcat.jni.Status;
41  
42  /**
43   * The class in charge of processing socket level IO events for the
44   * {@link AprSocketConnector}
45   *
46   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
47   */
48  public final class AprIoProcessor extends AbstractPollingIoProcessor<AprSession> {
49      private static final int POLLSET_SIZE = 1024;
50  
51      private final Map<Long, AprSession> allSessions = new HashMap<Long, AprSession>(POLLSET_SIZE);
52  
53      private final Object wakeupLock = new Object();
54  
55      private final long wakeupSocket;
56  
57      private volatile boolean toBeWakenUp;
58  
59      private final long pool;
60  
61      private final long bufferPool; // memory pool
62  
63      private final long pollset; // socket poller
64  
65      private final long[] polledSockets = new long[POLLSET_SIZE << 1];
66  
67      private final Queue<AprSession> polledSessions = new ConcurrentLinkedQueue<AprSession>();
68  
69      /**
70       * Create a new instance of {@link AprIoProcessor} with a given Exector for
71       * handling I/Os events.
72       *
73       * @param executor
74       *            the {@link Executor} for handling I/O events
75       */
76      public AprIoProcessor(Executor executor) {
77          super(executor);
78  
79          // initialize a memory pool for APR functions
80          pool = Pool.create(AprLibrary.getInstance().getRootPool());
81          bufferPool = Pool.create(AprLibrary.getInstance().getRootPool());
82  
83          try {
84              wakeupSocket = Socket.create(Socket.APR_INET, Socket.SOCK_DGRAM, Socket.APR_PROTO_UDP, pool);
85          } catch (RuntimeException e) {
86              throw e;
87          } catch (Error e) {
88              throw e;
89          } catch (Exception e) {
90              throw new RuntimeIoException("Failed to create a wakeup socket.", e);
91          }
92  
93          boolean success = false;
94          long newPollset;
95          try {
96              newPollset = Poll.create(POLLSET_SIZE, pool, Poll.APR_POLLSET_THREADSAFE, Long.MAX_VALUE);
97  
98              if (newPollset == 0) {
99                  newPollset = Poll.create(62, pool, Poll.APR_POLLSET_THREADSAFE, Long.MAX_VALUE);
100             }
101 
102             pollset = newPollset;
103             if (pollset < 0) {
104                 if (Status.APR_STATUS_IS_ENOTIMPL(-(int) pollset)) {
105                     throw new RuntimeIoException("Thread-safe pollset is not supported in this platform.");
106                 }
107             }
108             success = true;
109         } catch (RuntimeException e) {
110             throw e;
111         } catch (Error e) {
112             throw e;
113         } catch (Exception e) {
114             throw new RuntimeIoException("Failed to create a pollset.", e);
115         } finally {
116             if (!success) {
117                 dispose();
118             }
119         }
120     }
121 
122     /**
123      * {@inheritDoc}
124      */
125     @Override
126     protected void doDispose() {
127         Poll.destroy(pollset);
128         Socket.close(wakeupSocket);
129         Pool.destroy(bufferPool);
130         Pool.destroy(pool);
131     }
132 
133     /**
134      * {@inheritDoc}
135      */
136     @Override
137     protected int select() throws Exception {
138         return select(Integer.MAX_VALUE);
139     }
140 
141     /**
142      * {@inheritDoc}
143      */
144     @Override
145     protected int select(long timeout) throws Exception {
146         int rv = Poll.poll(pollset, 1000 * timeout, polledSockets, false);
147         if (rv <= 0) {
148             if (rv != -120001) {
149                 throwException(rv);
150             }
151 
152             rv = Poll.maintain(pollset, polledSockets, true);
153             if (rv > 0) {
154                 for (int i = 0; i < rv; i++) {
155                     long socket = polledSockets[i];
156                     AprSession session = allSessions.get(socket);
157                     if (session == null) {
158                         continue;
159                     }
160 
161                     int flag = (session.isInterestedInRead() ? Poll.APR_POLLIN : 0)
162                             | (session.isInterestedInWrite() ? Poll.APR_POLLOUT : 0);
163 
164                     Poll.add(pollset, socket, flag);
165                 }
166             } else if (rv < 0) {
167                 throwException(rv);
168             }
169 
170             return 0;
171         } else {
172             rv <<= 1;
173             if (!polledSessions.isEmpty()) {
174                 polledSessions.clear();
175             }
176             for (int i = 0; i < rv; i++) {
177                 long flag = polledSockets[i];
178                 long socket = polledSockets[++i];
179                 if (socket == wakeupSocket) {
180                     synchronized (wakeupLock) {
181                         Poll.remove(pollset, wakeupSocket);
182                         toBeWakenUp = false;
183                         wakeupCalled.set(true);
184                     }
185                     continue;
186                 }
187                 AprSession session = allSessions.get(socket);
188                 if (session == null) {
189                     continue;
190                 }
191 
192                 session.setReadable((flag & Poll.APR_POLLIN) != 0);
193                 session.setWritable((flag & Poll.APR_POLLOUT) != 0);
194 
195                 polledSessions.add(session);
196             }
197 
198             return polledSessions.size();
199         }
200     }
201 
202     /**
203      * {@inheritDoc}
204      */
205     @Override
206     protected boolean isSelectorEmpty() {
207         return allSessions.isEmpty();
208     }
209 
210     /**
211      * {@inheritDoc}
212      */
213     @Override
214     protected void wakeup() {
215         if (toBeWakenUp) {
216             return;
217         }
218 
219         // Add a dummy socket to the pollset.
220         synchronized (wakeupLock) {
221             toBeWakenUp = true;
222             Poll.add(pollset, wakeupSocket, Poll.APR_POLLOUT);
223         }
224     }
225 
226     /**
227      * {@inheritDoc}
228      */
229     @Override
230     protected Iterator<AprSession> allSessions() {
231         return allSessions.values().iterator();
232     }
233     
234     /**
235      * {@inheritDoc}
236      */
237     @Override
238     protected int allSessionsCount() {
239     	return allSessions.size();
240     }
241 
242     /**
243      * {@inheritDoc}
244      */
245     @Override
246     protected Iterator<AprSession> selectedSessions() {
247         return polledSessions.iterator();
248     }
249 
250     @Override
251     protected void init(AprSession session) throws Exception {
252         long s = session.getDescriptor();
253         Socket.optSet(s, Socket.APR_SO_NONBLOCK, 1);
254         Socket.timeoutSet(s, 0);
255 
256         int rv = Poll.add(pollset, s, Poll.APR_POLLIN);
257         if (rv != Status.APR_SUCCESS) {
258             throwException(rv);
259         }
260 
261         session.setInterestedInRead(true);
262         allSessions.put(s, session);
263     }
264 
265     /**
266      * {@inheritDoc}
267      */
268     @Override
269     protected void destroy(AprSession session) throws Exception {
270         if (allSessions.remove(session.getDescriptor()) == null) {
271             // Already destroyed.
272             return;
273         }
274 
275         int ret = Poll.remove(pollset, session.getDescriptor());
276         try {
277             if (ret != Status.APR_SUCCESS) {
278                 throwException(ret);
279             }
280         } finally {
281             ret = Socket.close(session.getDescriptor());
282 
283             // destroying the session because it won't be reused
284             // after this point
285             Socket.destroy(session.getDescriptor());
286             session.setDescriptor(0);
287 
288             if (ret != Status.APR_SUCCESS) {
289                 throwException(ret);
290             }
291         }
292     }
293 
294     /**
295      * {@inheritDoc}
296      */
297     @Override
298     protected SessionState getState(AprSession session) {
299         long socket = session.getDescriptor();
300 
301         if (socket != 0) {
302             return SessionState.OPENED;
303         } else if (allSessions.get(socket) != null) {
304             return SessionState.OPENING; // will occur ?
305         } else {
306             return SessionState.CLOSING;
307         }
308     }
309 
310     /**
311      * {@inheritDoc}
312      */
313     @Override
314     protected boolean isReadable(AprSession session) {
315         return session.isReadable();
316     }
317 
318     /**
319      * {@inheritDoc}
320      */
321     @Override
322     protected boolean isWritable(AprSession session) {
323         return session.isWritable();
324     }
325 
326     /**
327      * {@inheritDoc}
328      */
329     @Override
330     protected boolean isInterestedInRead(AprSession session) {
331         return session.isInterestedInRead();
332     }
333 
334     /**
335      * {@inheritDoc}
336      */
337     @Override
338     protected boolean isInterestedInWrite(AprSession session) {
339         return session.isInterestedInWrite();
340     }
341 
342     /**
343      * {@inheritDoc}
344      */
345     @Override
346     protected void setInterestedInRead(AprSession session, boolean isInterested) throws Exception {
347         if (session.isInterestedInRead() == isInterested) {
348             return;
349         }
350 
351         int rv = Poll.remove(pollset, session.getDescriptor());
352 
353         if (rv != Status.APR_SUCCESS) {
354             throwException(rv);
355         }
356 
357         int flags = (isInterested ? Poll.APR_POLLIN : 0) | (session.isInterestedInWrite() ? Poll.APR_POLLOUT : 0);
358 
359         rv = Poll.add(pollset, session.getDescriptor(), flags);
360 
361         if (rv == Status.APR_SUCCESS) {
362             session.setInterestedInRead(isInterested);
363         } else {
364             throwException(rv);
365         }
366     }
367 
368     /**
369      * {@inheritDoc}
370      */
371     @Override
372     protected void setInterestedInWrite(AprSession session, boolean isInterested) throws Exception {
373         if (session.isInterestedInWrite() == isInterested) {
374             return;
375         }
376 
377         int rv = Poll.remove(pollset, session.getDescriptor());
378 
379         if (rv != Status.APR_SUCCESS) {
380             throwException(rv);
381         }
382 
383         int flags = (session.isInterestedInRead() ? Poll.APR_POLLIN : 0) | (isInterested ? Poll.APR_POLLOUT : 0);
384 
385         rv = Poll.add(pollset, session.getDescriptor(), flags);
386 
387         if (rv == Status.APR_SUCCESS) {
388             session.setInterestedInWrite(isInterested);
389         } else {
390             throwException(rv);
391         }
392     }
393 
394     /**
395      * {@inheritDoc}
396      */
397     @Override
398     protected int read(AprSession session, IoBuffer buffer) throws Exception {
399         int bytes;
400         int capacity = buffer.remaining();
401         // Using Socket.recv() directly causes memory leak. :-(
402         ByteBuffer b = Pool.alloc(bufferPool, capacity);
403 
404         try {
405             bytes = Socket.recvb(session.getDescriptor(), b, 0, capacity);
406 
407             if (bytes > 0) {
408                 b.position(0);
409                 b.limit(bytes);
410                 buffer.put(b);
411             } else if (bytes < 0) {
412                 if (Status.APR_STATUS_IS_EOF(-bytes)) {
413                     bytes = -1;
414                 } else if (Status.APR_STATUS_IS_EAGAIN(-bytes)) {
415                     bytes = 0;
416                 } else {
417                     throwException(bytes);
418                 }
419             }
420         } finally {
421             Pool.clear(bufferPool);
422         }
423 
424         return bytes;
425     }
426 
427     /**
428      * {@inheritDoc}
429      */
430     @Override
431     protected int write(AprSession session, IoBuffer buf, int length) throws IOException {
432         int writtenBytes;
433         if (buf.isDirect()) {
434             writtenBytes = Socket.sendb(session.getDescriptor(), buf.buf(), buf.position(), length);
435         } else {
436             writtenBytes = Socket.send(session.getDescriptor(), buf.array(), buf.position(), length);
437             if (writtenBytes > 0) {
438                 buf.skip(writtenBytes);
439             }
440         }
441 
442         if (writtenBytes < 0) {
443             if (Status.APR_STATUS_IS_EAGAIN(-writtenBytes)) {
444                 writtenBytes = 0;
445             } else if (Status.APR_STATUS_IS_EOF(-writtenBytes)) {
446                 writtenBytes = 0;
447             } else {
448                 throwException(writtenBytes);
449             }
450         }
451         return writtenBytes;
452     }
453 
454     /**
455      * {@inheritDoc}
456      */
457     @Override
458     protected int transferFile(AprSession session, FileRegion region, int length) throws Exception {
459         if (region.getFilename() == null) {
460             throw new UnsupportedOperationException();
461         }
462 
463         long fd = File.open(region.getFilename(), File.APR_FOPEN_READ | File.APR_FOPEN_SENDFILE_ENABLED
464                 | File.APR_FOPEN_BINARY, 0, Socket.pool(session.getDescriptor()));
465         long numWritten = Socket.sendfilen(session.getDescriptor(), fd, region.getPosition(), length, 0);
466         File.close(fd);
467 
468         if (numWritten < 0) {
469             if (numWritten == -Status.EAGAIN) {
470                 return 0;
471             }
472             throw new IOException(org.apache.tomcat.jni.Error.strerror((int) -numWritten) + " (code: " + numWritten
473                     + ")");
474         }
475         return (int) numWritten;
476     }
477 
478     private void throwException(int code) throws IOException {
479         throw new IOException(org.apache.tomcat.jni.Error.strerror(-code) + " (code: " + code + ")");
480     }
481 
482     /**
483      * {@inheritDoc}
484      */
485     @Override
486     protected void registerNewSelector() {
487         // Do nothing
488     }
489 
490     /**
491      * {@inheritDoc}
492      */
493     @Override
494     protected boolean isBrokenConnection() throws IOException {
495         // Here, we assume that this is the case.
496         return true;
497     }
498 }