View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.transport.socket.apr;
21  
22  import java.io.IOException;
23  import java.nio.ByteBuffer;
24  import java.util.HashMap;
25  import java.util.Iterator;
26  import java.util.List;
27  import java.util.Map;
28  import java.util.concurrent.Executor;
29  
30  import org.apache.mina.core.RuntimeIoException;
31  import org.apache.mina.core.buffer.IoBuffer;
32  import org.apache.mina.core.file.FileRegion;
33  import org.apache.mina.core.polling.AbstractPollingIoProcessor;
34  import org.apache.mina.util.CircularQueue;
35  import org.apache.tomcat.jni.Poll;
36  import org.apache.tomcat.jni.Pool;
37  import org.apache.tomcat.jni.Socket;
38  import org.apache.tomcat.jni.Status;
39  
40  /**
41   * The class in charge of processing socket level IO events for the {@link AprSocketConnector}
42   *
43   * @author The Apache MINA Project (dev@mina.apache.org)
44   * @version $Rev: 671827 $, $Date: 2008-06-26 10:49:48 +0200 (jeu, 26 jun 2008) $
45   */
46  public final class AprIoProcessor extends AbstractPollingIoProcessor<AprSession> {
47      private static final int POLLSET_SIZE = 1024;
48  
49      private final Map<Long, AprSession> allSessions =
50          new HashMap<Long, AprSession>(POLLSET_SIZE);
51  
52      private final Object wakeupLock = new Object();
53      private final long wakeupSocket;
54      private volatile boolean toBeWakenUp;
55  
56      private final long pool;
57      private final long bufferPool; // memory pool
58      private final long pollset; // socket poller
59      private final long[] polledSockets = new long[POLLSET_SIZE << 1];
60      private final List<AprSession> polledSessions =
61          new CircularQueue<AprSession>(POLLSET_SIZE);
62  
63      public AprIoProcessor(Executor executor) {
64          super(executor);
65  
66          // initialize a memory pool for APR functions
67          pool = Pool.create(AprLibrary.getInstance().getRootPool());
68          bufferPool = Pool.create(AprLibrary.getInstance().getRootPool());
69  
70          try {
71              wakeupSocket = Socket.create(
72                      Socket.APR_INET, Socket.SOCK_DGRAM, Socket.APR_PROTO_UDP, pool);
73          } catch (RuntimeException e) {
74              throw e;
75          } catch (Error e) {
76              throw e;
77          } catch (Exception e) {
78              throw new RuntimeIoException("Failed to create a wakeup socket.", e);
79          }
80  
81          boolean success = false;
82          long newPollset;
83          try {
84              newPollset = Poll.create(
85                      POLLSET_SIZE,
86                      pool,
87                      Poll.APR_POLLSET_THREADSAFE,
88                      Long.MAX_VALUE);
89  
90              if (newPollset == 0) {
91                  newPollset = Poll.create(
92                          62,
93                          pool,
94                          Poll.APR_POLLSET_THREADSAFE,
95                          Long.MAX_VALUE);
96              }
97  
98              pollset = newPollset;
99              if (pollset < 0) {
100                 if (Status.APR_STATUS_IS_ENOTIMPL(- (int) pollset)) {
101                     throw new RuntimeIoException(
102                             "Thread-safe pollset is not supported in this platform.");
103                 }
104             }
105             success = true;
106         } catch (RuntimeException e) {
107             throw e;
108         } catch (Error e) {
109             throw e;
110         } catch (Exception e) {
111             throw new RuntimeIoException("Failed to create a pollset.", e);
112         } finally {
113             if (!success) {
114                 dispose();
115             }
116         }
117     }
118 
119     @Override
120     protected void dispose0() {
121         Poll.destroy(pollset);
122         Socket.close(wakeupSocket);
123         Pool.destroy(bufferPool);
124         Pool.destroy(pool);
125     }
126 
127     @Override
128     protected boolean select(int timeout) throws Exception {
129         int rv = Poll.poll(pollset, 1000 * timeout, polledSockets, false);
130         if (rv <= 0) {
131             if (rv != -120001) {
132                 throwException(rv);
133             }
134 
135             rv = Poll.maintain(pollset, polledSockets, true);
136             if (rv > 0) {
137                 for (int i = 0; i < rv; i ++) {
138                     long socket = polledSockets[i];
139                     AprSession session = allSessions.get(socket);
140                     if (session == null) {
141                         continue;
142                     }
143 
144                     int flag = (session.isInterestedInRead()? Poll.APR_POLLIN : 0) |
145                                (session.isInterestedInWrite()? Poll.APR_POLLOUT : 0);
146 
147                     Poll.add(pollset, socket, flag);
148                 }
149             } else if (rv < 0) {
150                 throwException(rv);
151             }
152 
153             return false;
154         } else {
155             rv <<= 1;
156             if (!polledSessions.isEmpty()) {
157                 polledSessions.clear();
158             }
159             for (int i = 0; i < rv; i ++) {
160                 long flag = polledSockets[i];
161                 long socket = polledSockets[++i];
162                 if (socket == wakeupSocket) {
163                     synchronized (wakeupLock) {
164                         Poll.remove(pollset, wakeupSocket);
165                         toBeWakenUp = false;
166                     }
167                     continue;
168                 }
169                 AprSession session = allSessions.get(socket);
170                 if (session == null) {
171                     continue;
172                 }
173 
174                 session.setReadable((flag & Poll.APR_POLLIN) != 0);
175                 session.setWritable((flag & Poll.APR_POLLOUT) != 0);
176 
177                 polledSessions.add(session);
178             }
179 
180             return !polledSessions.isEmpty();
181         }
182     }
183 
184     @Override
185     protected boolean isSelectorEmpty() {
186         return allSessions.isEmpty();
187     }
188 
189     @Override
190     protected void wakeup() {
191         if (toBeWakenUp) {
192             return;
193         }
194 
195         // Add a dummy socket to the pollset.
196         synchronized (wakeupLock) {
197             toBeWakenUp = true;
198             Poll.add(pollset, wakeupSocket, Poll.APR_POLLOUT);
199         }
200     }
201 
202     @Override
203     protected Iterator<AprSession> allSessions() {
204         return allSessions.values().iterator();
205     }
206 
207     @Override
208     protected Iterator<AprSession> selectedSessions() {
209         return polledSessions.iterator();
210     }
211 
212     @Override
213     protected void init(AprSession session) throws Exception {
214         long s = session.getDescriptor();
215         Socket.optSet(s, Socket.APR_SO_NONBLOCK, 1);
216         Socket.timeoutSet(s, 0);
217 
218         int rv = Poll.add(pollset, s, Poll.APR_POLLIN);
219         if (rv != Status.APR_SUCCESS) {
220             throwException(rv);
221         }
222 
223         session.setInterestedInRead(true);
224         allSessions.put(s, session);
225     }
226 
227     @Override
228     protected void destroy(AprSession session) throws Exception {
229         if (allSessions.remove(session.getDescriptor()) == null) {
230             // Already destroyed.
231             return;
232         }
233 
234         int ret = Poll.remove(pollset, session.getDescriptor());
235         try {
236             if (ret != Status.APR_SUCCESS) {
237                 throwException(ret);
238             }
239         } finally {
240             ret = Socket.close(session.getDescriptor());
241             
242         	// destroying the session because it won't be reused 
243             // after this point
244             Socket.destroy(session.getDescriptor());
245             session.setDescriptor(0);
246             
247             if (ret != Status.APR_SUCCESS) {
248                 throwException(ret);
249             }
250         }
251     }
252 
253     @Override
254     protected SessionState state(AprSession session) {
255         long socket = session.getDescriptor();
256         if (socket != 0) {
257             return SessionState.OPEN;
258         } else if (allSessions.get(socket) != null) {
259             return SessionState.PREPARING; // will occur ?
260         } else {
261             return SessionState.CLOSED;
262         }
263     }
264 
265     @Override
266     protected boolean isReadable(AprSession session) {
267         return session.isReadable();
268     }
269 
270     @Override
271     protected boolean isWritable(AprSession session) {
272         return session.isWritable();
273     }
274 
275     @Override
276     protected boolean isInterestedInRead(AprSession session) {
277         return session.isInterestedInRead();
278     }
279 
280     @Override
281     protected boolean isInterestedInWrite(AprSession session) {
282         return session.isInterestedInWrite();
283     }
284 
285     @Override
286     protected void setInterestedInRead(AprSession session, boolean value) throws Exception {
287         if (session.isInterestedInRead() == value) {
288             return;
289         }
290 
291         int rv = Poll.remove(pollset, session.getDescriptor());
292         if (rv != Status.APR_SUCCESS) {
293             throwException(rv);
294         }
295 
296         int flags = (value ? Poll.APR_POLLIN : 0)
297                 | (session.isInterestedInWrite() ? Poll.APR_POLLOUT : 0);
298 
299         rv = Poll.add(pollset, session.getDescriptor(), flags);
300         if (rv == Status.APR_SUCCESS) {
301             session.setInterestedInRead(value);
302         } else {
303             throwException(rv);
304         }
305     }
306 
307     @Override
308     protected void setInterestedInWrite(AprSession session, boolean value) throws Exception {
309         if (session.isInterestedInWrite() == value) {
310             return;
311         }
312 
313         int rv = Poll.remove(pollset, session.getDescriptor());
314         if (rv != Status.APR_SUCCESS) {
315             throwException(rv);
316         }
317 
318         int flags = (session.isInterestedInRead() ? Poll.APR_POLLIN : 0)
319                 | (value ? Poll.APR_POLLOUT : 0);
320 
321         rv = Poll.add(pollset, session.getDescriptor(), flags);
322         if (rv == Status.APR_SUCCESS) {
323             session.setInterestedInWrite(value);
324         } else {
325             throwException(rv);
326         }
327     }
328 
329     @Override
330     protected int read(AprSession session, IoBuffer buffer) throws Exception {
331         int bytes;
332         int capacity = buffer.remaining();
333         // Using Socket.recv() directly causes memory leak. :-(
334         ByteBuffer b = Pool.alloc(bufferPool, capacity);
335         try {
336             bytes = Socket.recvb(
337                     session.getDescriptor(), b, 0, capacity);
338             if (bytes > 0) {
339                 b.position(0);
340                 b.limit(bytes);
341                 buffer.put(b);
342             } else if (bytes < 0) {
343                 if (Status.APR_STATUS_IS_EOF(-bytes)) {
344                     bytes = -1;
345                 } else if (Status.APR_STATUS_IS_EAGAIN(-bytes)) {
346                     bytes = 0;
347                 } else {
348                     throwException(bytes);
349                 }
350             }
351         } finally {
352             Pool.clear(bufferPool);
353         }
354         return bytes;
355     }
356 
357     @Override
358     protected int write(AprSession session, IoBuffer buf, int length) throws Exception {
359         int writtenBytes;
360         if (buf.isDirect()) {
361             writtenBytes = Socket.sendb(
362                     session.getDescriptor(), buf.buf(), buf.position(), length);
363         } else {
364             writtenBytes = Socket.send(
365                     session.getDescriptor(), buf.array(), buf.position(), length);
366             if (writtenBytes > 0) {
367                 buf.skip(writtenBytes);
368             }
369         }
370 
371         if (writtenBytes < 0) {
372             if (Status.APR_STATUS_IS_EAGAIN(-writtenBytes)) {
373                 writtenBytes = 0;
374             } else if (Status.APR_STATUS_IS_EOF(-writtenBytes)) {
375                 writtenBytes = 0;
376             } else {
377                 throwException(writtenBytes);
378             }
379         }
380         return writtenBytes;
381     }
382 
383     @Override
384     protected int transferFile(AprSession session, FileRegion region, int length)
385             throws Exception {
386         throw new UnsupportedOperationException();
387     }
388 
389     private void throwException(int code) throws IOException {
390         throw new IOException(
391                 org.apache.tomcat.jni.Error.strerror(-code) +
392                 " (code: " + code + ")");
393     }
394 }