View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.transport.vmpipe;
21  
22  import java.util.ArrayList;
23  import java.util.List;
24  import java.util.Queue;
25  import java.util.concurrent.ConcurrentLinkedQueue;
26  
27  import org.apache.mina.core.buffer.IoBuffer;
28  import org.apache.mina.core.filterchain.DefaultIoFilterChain;
29  import org.apache.mina.core.service.IoProcessor;
30  import org.apache.mina.core.session.AbstractIoSession;
31  import org.apache.mina.core.session.IdleStatus;
32  import org.apache.mina.core.session.IoEvent;
33  import org.apache.mina.core.session.IoEventType;
34  import org.apache.mina.core.write.WriteRequest;
35  import org.apache.mina.core.write.WriteRequestQueue;
36  import org.apache.mina.core.write.WriteToClosedSessionException;
37  
38  /**
39   * TODO Add documentation
40   * 
41   * @author The Apache MINA Project (dev@mina.apache.org)
42   * @version $Rev: 671827 $, $Date: 2008-06-26 10:49:48 +0200 (jeu, 26 jun 2008) $
43   */
44  class VmPipeFilterChain extends DefaultIoFilterChain {
45  
46      private final Queue<IoEvent> eventQueue = new ConcurrentLinkedQueue<IoEvent>();
47      private final IoProcessor<VmPipeSession> processor = new VmPipeIoProcessor();
48  
49      private volatile boolean flushEnabled;
50      private volatile boolean sessionOpened;
51  
52      VmPipeFilterChain(AbstractIoSession session) {
53          super(session);
54      }
55  
56      IoProcessor<VmPipeSession> getProcessor() {
57          return processor;
58      }
59  
60      public void start() {
61          flushEnabled = true;
62          flushEvents();
63          flushPendingDataQueues((VmPipeSession) getSession());
64      }
65  
66      private void pushEvent(IoEvent e) {
67          pushEvent(e, flushEnabled);
68      }
69  
70      private void pushEvent(IoEvent e, boolean flushNow) {
71          eventQueue.add(e);
72          if (flushNow) {
73              flushEvents();
74          }
75      }
76  
77      private void flushEvents() {
78          IoEvent e;
79          while ((e = eventQueue.poll()) != null) {
80              fireEvent(e);
81          }
82      }
83  
84      private void fireEvent(IoEvent e) {
85          VmPipeSession session = (VmPipeSession) getSession();
86          IoEventType type = e.getType();
87          Object data = e.getParameter();
88  
89          if (type == IoEventType.MESSAGE_RECEIVED) {
90              if (sessionOpened && session.getTrafficMask().isReadable() && session.getLock().tryLock()) {
91                  try {
92                      if (!session.getTrafficMask().isReadable()) {
93                          session.receivedMessageQueue.add(data);
94                      } else {
95                          super.fireMessageReceived(data);
96                      }
97                  } finally {
98                      session.getLock().unlock();
99                  }
100             } else {
101                 session.receivedMessageQueue.add(data);
102             }
103         } else if (type == IoEventType.WRITE) {
104             super.fireFilterWrite((WriteRequest) data);
105         } else if (type == IoEventType.MESSAGE_SENT) {
106             super.fireMessageSent((WriteRequest) data);
107         } else if (type == IoEventType.EXCEPTION_CAUGHT) {
108             super.fireExceptionCaught((Throwable) data);
109         } else if (type == IoEventType.SESSION_IDLE) {
110             super.fireSessionIdle((IdleStatus) data);
111         } else if (type == IoEventType.SESSION_OPENED) {
112             super.fireSessionOpened();
113             sessionOpened = true;
114         } else if (type == IoEventType.SESSION_CREATED) {
115             session.getLock().lock();
116             try {
117                 super.fireSessionCreated();
118             } finally {
119                 session.getLock().unlock();
120             }
121         } else if (type == IoEventType.SESSION_CLOSED) {
122             flushPendingDataQueues(session);
123             super.fireSessionClosed();
124         } else if (type == IoEventType.CLOSE) {
125             super.fireFilterClose();
126         }
127     }
128 
129     private static void flushPendingDataQueues(VmPipeSession s) {
130         s.getProcessor().updateTrafficMask(s);
131         s.getRemoteSession().getProcessor().updateTrafficMask(s);
132     }
133 
134     @Override
135     public void fireFilterClose() {
136         pushEvent(new IoEvent(IoEventType.CLOSE, getSession(), null));
137     }
138 
139     @Override
140     public void fireFilterWrite(WriteRequest writeRequest) {
141         pushEvent(new IoEvent(IoEventType.WRITE, getSession(), writeRequest));
142     }
143 
144     @Override
145     public void fireExceptionCaught(Throwable cause) {
146         pushEvent(new IoEvent(IoEventType.EXCEPTION_CAUGHT, getSession(), cause));
147     }
148 
149     @Override
150     public void fireMessageSent(WriteRequest request) {
151         pushEvent(new IoEvent(IoEventType.MESSAGE_SENT, getSession(), request));
152     }
153 
154     @Override
155     public void fireSessionClosed() {
156         pushEvent(new IoEvent(IoEventType.SESSION_CLOSED, getSession(), null));
157     }
158 
159     @Override
160     public void fireSessionCreated() {
161         pushEvent(new IoEvent(IoEventType.SESSION_CREATED, getSession(), null));
162     }
163 
164     @Override
165     public void fireSessionIdle(IdleStatus status) {
166         pushEvent(new IoEvent(IoEventType.SESSION_IDLE, getSession(), status));
167     }
168 
169     @Override
170     public void fireSessionOpened() {
171         pushEvent(new IoEvent(IoEventType.SESSION_OPENED, getSession(), null));
172     }
173 
174     @Override
175     public void fireMessageReceived(Object message) {
176         pushEvent(new IoEvent(IoEventType.MESSAGE_RECEIVED, getSession(), message));
177     }
178 
179     private class VmPipeIoProcessor implements IoProcessor<VmPipeSession> {
180         public void flush(VmPipeSession session) {
181             WriteRequestQueue queue = session.getWriteRequestQueue0();
182             if (!session.isClosing()) {
183                 session.getLock().lock();
184                 try {
185                     if (queue.isEmpty(session)) {
186                         return;
187                     }
188                     WriteRequest req;
189                     long currentTime = System.currentTimeMillis();
190                     while ((req = queue.poll(session)) != null) {
191                         Object m = req.getMessage();
192                         pushEvent(new IoEvent(IoEventType.MESSAGE_SENT, session, req), false);
193                         session.getRemoteSession().getFilterChain().fireMessageReceived(
194                                 getMessageCopy(m));
195                         if (m instanceof IoBuffer) {
196                             session.increaseWrittenBytes0(
197                                     ((IoBuffer) m).remaining(), currentTime);
198                         }
199                     }
200                 } finally {
201                     if (flushEnabled) {
202                         flushEvents();
203                     }
204                     session.getLock().unlock();
205                 }
206 
207                 flushPendingDataQueues(session);
208             } else {
209                 List<WriteRequest> failedRequests = new ArrayList<WriteRequest>();
210                 WriteRequest req;
211                 while ((req = queue.poll(session)) != null) {
212                     failedRequests.add(req);
213                 }
214 
215                 if (!failedRequests.isEmpty()) {
216                     WriteToClosedSessionException cause = new WriteToClosedSessionException(failedRequests);
217                     for (WriteRequest r: failedRequests) {
218                         r.getFuture().setException(cause);
219                     }
220                     session.getFilterChain().fireExceptionCaught(cause);
221                 }
222             }
223         }
224 
225         private Object getMessageCopy(Object message) {
226             Object messageCopy = message;
227             if (message instanceof IoBuffer) {
228                 IoBuffer rb = (IoBuffer) message;
229                 rb.mark();
230                 IoBuffer wb = IoBuffer.allocate(rb.remaining());
231                 wb.put(rb);
232                 wb.flip();
233                 rb.reset();
234                 messageCopy = wb;
235             }
236             return messageCopy;
237         }
238 
239         public void remove(VmPipeSession session) {
240             try {
241                 session.getLock().lock();
242                 if (!session.getCloseFuture().isClosed()) {
243                     session.getServiceListeners().fireSessionDestroyed(session);
244                     session.getRemoteSession().close();
245                 }
246             } finally {
247                 session.getLock().unlock();
248             }
249         }
250 
251         public void add(VmPipeSession session) {
252             // Unused
253         }
254 
255         public void updateTrafficMask(VmPipeSession session) {
256             if (session.getTrafficMask().isReadable()) {
257                 List<Object> data = new ArrayList<Object>();
258                 session.receivedMessageQueue.drainTo(data);
259                 for (Object aData : data) {
260                     VmPipeFilterChain.this.fireMessageReceived(aData);
261                 }
262             }
263 
264             if (session.getTrafficMask().isWritable()) {
265                 flush(session);
266             }
267         }
268 
269         public void dispose() {
270             // Nothing to dispose
271         }
272 
273         public boolean isDisposed() {
274             return false;
275         }
276 
277         public boolean isDisposing() {
278             return false;
279         }
280     }
281 }