View Javadoc
1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.transport.vmpipe;
21  
22  import java.util.ArrayList;
23  import java.util.List;
24  import java.util.Queue;
25  import java.util.concurrent.ConcurrentLinkedQueue;
26  
27  import org.apache.mina.core.buffer.IoBuffer;
28  import org.apache.mina.core.filterchain.DefaultIoFilterChain;
29  import org.apache.mina.core.service.IoProcessor;
30  import org.apache.mina.core.session.AbstractIoSession;
31  import org.apache.mina.core.session.IdleStatus;
32  import org.apache.mina.core.session.IoEvent;
33  import org.apache.mina.core.session.IoEventType;
34  import org.apache.mina.core.write.WriteRequest;
35  import org.apache.mina.core.write.WriteRequestQueue;
36  import org.apache.mina.core.write.WriteToClosedSessionException;
37  import org.apache.mina.filter.FilterEvent;
38  
39  /**
40   * TODO Add documentation
41   * 
42   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
43   */
44  class VmPipeFilterChain extends DefaultIoFilterChain {
45  
46      private final Queue<IoEvent> eventQueue = new ConcurrentLinkedQueue<IoEvent>();
47  
48      private final IoProcessor<VmPipeSession> processor = new VmPipeIoProcessor();
49  
50      private volatile boolean flushEnabled;
51  
52      private volatile boolean sessionOpened;
53  
54      VmPipeFilterChain(AbstractIoSession session) {
55          super(session);
56      }
57  
58      IoProcessor<VmPipeSession> getProcessor() {
59          return processor;
60      }
61  
62      public void start() {
63          flushEnabled = true;
64          flushEvents();
65          flushPendingDataQueues((VmPipeSession) getSession());
66      }
67  
68      private void pushEvent(IoEvent e) {
69          pushEvent(e, flushEnabled);
70      }
71  
72      private void pushEvent(IoEvent e, boolean flushNow) {
73          eventQueue.add(e);
74          if (flushNow) {
75              flushEvents();
76          }
77      }
78  
79      private void flushEvents() {
80          IoEvent e;
81          while ((e = eventQueue.poll()) != null) {
82              fireEvent(e);
83          }
84      }
85  
86      private void fireEvent(IoEvent e) {
87          VmPipeSession/../../org/apache/mina/transport/vmpipe/VmPipeSession.html#VmPipeSession">VmPipeSession session = (VmPipeSession) getSession();
88          IoEventType type = e.getType();
89          Object data = e.getParameter();
90  
91          switch (type) {
92              case EVENT:
93                  super.fireEvent((FilterEvent) data);
94                  break;
95                  
96              case EXCEPTION_CAUGHT:
97                  super.fireExceptionCaught((Throwable) data);
98                  break;
99                  
100             case CLOSE:
101                 super.fireFilterClose();
102                 break;
103                 
104             case INPUT_CLOSED:
105                 super.fireInputClosed();
106                 break;
107 
108             case MESSAGE_SENT:
109                 super.fireMessageSent((WriteRequest) data);
110                 break;
111                 
112             case MESSAGE_RECEIVED:
113                 if (sessionOpened && (!session.isReadSuspended()) && session.getLock().tryLock()) {
114                     try {
115                         if (session.isReadSuspended()) {
116                             session.receivedMessageQueue.add(data);
117                         } else {
118                             super.fireMessageReceived(data);
119                         }
120                     } finally {
121                         session.getLock().unlock();
122                     }
123                 } else {
124                     session.receivedMessageQueue.add(data);
125                 }
126                 
127                 break;
128                 
129             case SESSION_CLOSED:
130                 flushPendingDataQueues(session);
131                 super.fireSessionClosed();
132                 break;
133                 
134             case SESSION_CREATED:
135                 session.getLock().lock();
136                 try {
137                     super.fireSessionCreated();
138                 } finally {
139                     session.getLock().unlock();
140                 }
141                 
142                 break;
143                 
144             case SESSION_IDLE:
145                 super.fireSessionIdle((IdleStatus) data);
146                 break;
147                 
148             case SESSION_OPENED:
149                 super.fireSessionOpened();
150                 sessionOpened = true;
151                 break;
152                 
153             case WRITE:
154                 super.fireFilterWrite((WriteRequest) data);
155                 break;
156         }
157     }
158 
159     private static void flushPendingDataQueues(VmPipeSession s) {
160         s.getProcessor().updateTrafficControl(s);
161         s.getRemoteSession().getProcessor().updateTrafficControl(s);
162     }
163 
164     @Override
165     public void fireEvent(FilterEvent event) {
166         pushEvent(new IoEvent(IoEventType.EVENT, getSession(), event));
167     }
168 
169     @Override
170     public void fireFilterClose() {
171         pushEvent(new IoEvent(IoEventType.CLOSE, getSession(), null));
172     }
173 
174     @Override
175     public void fireInputClosed() {
176         pushEvent(new IoEvent(IoEventType.INPUT_CLOSED, getSession(), null));
177     }
178 
179     @Override
180     public void fireFilterWrite(WriteRequest writeRequest) {
181         pushEvent(new IoEvent(IoEventType.WRITE, getSession(), writeRequest));
182     }
183 
184     @Override
185     public void fireExceptionCaught(Throwable cause) {
186         pushEvent(new IoEvent(IoEventType.EXCEPTION_CAUGHT, getSession(), cause));
187     }
188 
189     @Override
190     public void fireMessageSent(WriteRequest request) {
191         pushEvent(new IoEvent(IoEventType.MESSAGE_SENT, getSession(), request));
192     }
193 
194     @Override
195     public void fireSessionClosed() {
196         pushEvent(new IoEvent(IoEventType.SESSION_CLOSED, getSession(), null));
197     }
198 
199     @Override
200     public void fireSessionCreated() {
201         pushEvent(new IoEvent(IoEventType.SESSION_CREATED, getSession(), null));
202     }
203 
204     @Override
205     public void fireSessionIdle(IdleStatus status) {
206         pushEvent(new IoEvent(IoEventType.SESSION_IDLE, getSession(), status));
207     }
208 
209     @Override
210     public void fireSessionOpened() {
211         pushEvent(new IoEvent(IoEventType.SESSION_OPENED, getSession(), null));
212     }
213 
214     @Override
215     public void fireMessageReceived(Object message) {
216         pushEvent(new IoEvent(IoEventType.MESSAGE_RECEIVED, getSession(), message));
217     }
218 
219     private class VmPipeIoProcessor implements IoProcessor<VmPipeSession> {
220         public void flush(VmPipeSession session) {
221             WriteRequestQueue queue = session.getWriteRequestQueue0();
222             if (!session.isClosing()) {
223                 session.getLock().lock();
224                 try {
225                     if (queue.isEmpty(session)) {
226                         return;
227                     }
228                     WriteRequest req;
229                     long currentTime = System.currentTimeMillis();
230                     while ((req = queue.poll(session)) != null) {
231                         Object m = req.getMessage();
232                         pushEvent(new IoEvent(IoEventType.MESSAGE_SENT, session, req), false);
233                         session.getRemoteSession().getFilterChain().fireMessageReceived(getMessageCopy(m));
234                         if (m instanceof IoBuffer) {
235                             session.increaseWrittenBytes0(((IoBuffer) m).remaining(), currentTime);
236                         }
237                     }
238                 } finally {
239                     if (flushEnabled) {
240                         flushEvents();
241                     }
242                     session.getLock().unlock();
243                 }
244 
245                 flushPendingDataQueues(session);
246             } else {
247                 List<WriteRequest> failedRequests = new ArrayList<WriteRequest>();
248                 WriteRequest req;
249                 while ((req = queue.poll(session)) != null) {
250                     failedRequests.add(req);
251                 }
252 
253                 if (!failedRequests.isEmpty()) {
254                     WriteToClosedSessionExceptionException.html#WriteToClosedSessionException">WriteToClosedSessionException cause = new WriteToClosedSessionException(failedRequests);
255                     for (WriteRequest r : failedRequests) {
256                         r.getFuture().setException(cause);
257                     }
258                     session.getFilterChain().fireExceptionCaught(cause);
259                 }
260             }
261         }
262 
263         /**
264          * {@inheritDoc}
265          */
266         public void write(VmPipeSession session, WriteRequest writeRequest) {
267             WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
268 
269             writeRequestQueue.offer(session, writeRequest);
270 
271             if (!session.isWriteSuspended()) {
272                 this.flush(session);
273             }
274         }
275 
276         private Object getMessageCopy(Object message) {
277             Object messageCopy = message;
278             if (message instanceof IoBuffer) {
279                 IoBuffer="../../../../../org/apache/mina/core/buffer/IoBuffer.html#IoBuffer">IoBuffer rb = (IoBuffer) message;
280                 rb.mark();
281                 IoBuffer wb = IoBuffer.allocate(rb.remaining());
282                 wb.put(rb);
283                 wb.flip();
284                 rb.reset();
285                 messageCopy = wb;
286             }
287             return messageCopy;
288         }
289 
290         public void remove(VmPipeSession session) {
291             try {
292                 session.getLock().lock();
293                 if (!session.getCloseFuture().isClosed()) {
294                     session.getServiceListeners().fireSessionDestroyed(session);
295                     session.getRemoteSession().closeNow();
296                 }
297             } finally {
298                 session.getLock().unlock();
299             }
300         }
301 
302         public void add(VmPipeSession session) {
303             // Unused
304         }
305 
306         public void updateTrafficControl(VmPipeSession session) {
307             if (!session.isReadSuspended()) {
308                 List<Object> data = new ArrayList<Object>();
309                 session.receivedMessageQueue.drainTo(data);
310                 for (Object aData : data) {
311                     VmPipeFilterChain.this.fireMessageReceived(aData);
312                 }
313             }
314 
315             if (!session.isWriteSuspended()) {
316                 flush(session);
317             }
318         }
319 
320         public void dispose() {
321             // Nothing to dispose
322         }
323 
324         public boolean isDisposed() {
325             return false;
326         }
327 
328         public boolean isDisposing() {
329             return false;
330         }
331     }
332 }