View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.transport.vmpipe;
21  
22  import java.util.ArrayList;
23  import java.util.List;
24  import java.util.Queue;
25  import java.util.concurrent.ConcurrentLinkedQueue;
26  
27  import org.apache.mina.core.buffer.IoBuffer;
28  import org.apache.mina.core.filterchain.DefaultIoFilterChain;
29  import org.apache.mina.core.service.IoProcessor;
30  import org.apache.mina.core.session.AbstractIoSession;
31  import org.apache.mina.core.session.IdleStatus;
32  import org.apache.mina.core.session.IoEvent;
33  import org.apache.mina.core.session.IoEventType;
34  import org.apache.mina.core.write.WriteRequest;
35  import org.apache.mina.core.write.WriteRequestQueue;
36  import org.apache.mina.core.write.WriteToClosedSessionException;
37  
38  /**
39   * TODO Add documentation
40   * 
41   * @author The Apache MINA Project (dev@mina.apache.org)
42   */
43  class VmPipeFilterChain extends DefaultIoFilterChain {
44  
45      private final Queue<IoEvent> eventQueue = new ConcurrentLinkedQueue<IoEvent>();
46      private final IoProcessor<VmPipeSession> processor = new VmPipeIoProcessor();
47  
48      private volatile boolean flushEnabled;
49      private volatile boolean sessionOpened;
50  
51      VmPipeFilterChain(AbstractIoSession session) {
52          super(session);
53      }
54  
55      IoProcessor<VmPipeSession> getProcessor() {
56          return processor;
57      }
58  
59      public void start() {
60          flushEnabled = true;
61          flushEvents();
62          flushPendingDataQueues((VmPipeSession) getSession());
63      }
64  
65      private void pushEvent(IoEvent e) {
66          pushEvent(e, flushEnabled);
67      }
68  
69      private void pushEvent(IoEvent e, boolean flushNow) {
70          eventQueue.add(e);
71          if (flushNow) {
72              flushEvents();
73          }
74      }
75  
76      private void flushEvents() {
77          IoEvent e;
78          while ((e = eventQueue.poll()) != null) {
79              fireEvent(e);
80          }
81      }
82  
83      private void fireEvent(IoEvent e) {
84          VmPipeSession session = (VmPipeSession) getSession();
85          IoEventType type = e.getType();
86          Object data = e.getParameter();
87  
88          if (type == IoEventType.MESSAGE_RECEIVED) {
89              if (sessionOpened && (! session.isReadSuspended() ) && session.getLock().tryLock()) {
90                  try {
91                      if (session.isReadSuspended()) {
92                          session.receivedMessageQueue.add(data);
93                      } else {
94                          super.fireMessageReceived(data);
95                      }
96                  } finally {
97                      session.getLock().unlock();
98                  }
99              } else {
100                 session.receivedMessageQueue.add(data);
101             }
102         } else if (type == IoEventType.WRITE) {
103             super.fireFilterWrite((WriteRequest) data);
104         } else if (type == IoEventType.MESSAGE_SENT) {
105             super.fireMessageSent((WriteRequest) data);
106         } else if (type == IoEventType.EXCEPTION_CAUGHT) {
107             super.fireExceptionCaught((Throwable) data);
108         } else if (type == IoEventType.SESSION_IDLE) {
109             super.fireSessionIdle((IdleStatus) data);
110         } else if (type == IoEventType.SESSION_OPENED) {
111             super.fireSessionOpened();
112             sessionOpened = true;
113         } else if (type == IoEventType.SESSION_CREATED) {
114             session.getLock().lock();
115             try {
116                 super.fireSessionCreated();
117             } finally {
118                 session.getLock().unlock();
119             }
120         } else if (type == IoEventType.SESSION_CLOSED) {
121             flushPendingDataQueues(session);
122             super.fireSessionClosed();
123         } else if (type == IoEventType.CLOSE) {
124             super.fireFilterClose();
125         }
126     }
127 
128     private static void flushPendingDataQueues(VmPipeSession s) {
129         s.getProcessor().updateTrafficControl(s);
130         s.getRemoteSession().getProcessor().updateTrafficControl(s);
131     }
132 
133     @Override
134     public void fireFilterClose() {
135         pushEvent(new IoEvent(IoEventType.CLOSE, getSession(), null));
136     }
137 
138     @Override
139     public void fireFilterWrite(WriteRequest writeRequest) {
140         pushEvent(new IoEvent(IoEventType.WRITE, getSession(), writeRequest));
141     }
142 
143     @Override
144     public void fireExceptionCaught(Throwable cause) {
145         pushEvent(new IoEvent(IoEventType.EXCEPTION_CAUGHT, getSession(), cause));
146     }
147 
148     @Override
149     public void fireMessageSent(WriteRequest request) {
150         pushEvent(new IoEvent(IoEventType.MESSAGE_SENT, getSession(), request));
151     }
152 
153     @Override
154     public void fireSessionClosed() {
155         pushEvent(new IoEvent(IoEventType.SESSION_CLOSED, getSession(), null));
156     }
157 
158     @Override
159     public void fireSessionCreated() {
160         pushEvent(new IoEvent(IoEventType.SESSION_CREATED, getSession(), null));
161     }
162 
163     @Override
164     public void fireSessionIdle(IdleStatus status) {
165         pushEvent(new IoEvent(IoEventType.SESSION_IDLE, getSession(), status));
166     }
167 
168     @Override
169     public void fireSessionOpened() {
170         pushEvent(new IoEvent(IoEventType.SESSION_OPENED, getSession(), null));
171     }
172 
173     @Override
174     public void fireMessageReceived(Object message) {
175         pushEvent(new IoEvent(IoEventType.MESSAGE_RECEIVED, getSession(), message));
176     }
177 
178     private class VmPipeIoProcessor implements IoProcessor<VmPipeSession> {
179         public void flush(VmPipeSession session) {
180             WriteRequestQueue queue = session.getWriteRequestQueue0();
181             if (!session.isClosing()) {
182                 session.getLock().lock();
183                 try {
184                     if (queue.isEmpty(session)) {
185                         return;
186                     }
187                     WriteRequest req;
188                     long currentTime = System.currentTimeMillis();
189                     while ((req = queue.poll(session)) != null) {
190                         Object m = req.getMessage();
191                         pushEvent(new IoEvent(IoEventType.MESSAGE_SENT, session, req), false);
192                         session.getRemoteSession().getFilterChain().fireMessageReceived(
193                                 getMessageCopy(m));
194                         if (m instanceof IoBuffer) {
195                             session.increaseWrittenBytes0(
196                                     ((IoBuffer) m).remaining(), currentTime);
197                         }
198                     }
199                 } finally {
200                     if (flushEnabled) {
201                         flushEvents();
202                     }
203                     session.getLock().unlock();
204                 }
205 
206                 flushPendingDataQueues(session);
207             } else {
208                 List<WriteRequest> failedRequests = new ArrayList<WriteRequest>();
209                 WriteRequest req;
210                 while ((req = queue.poll(session)) != null) {
211                     failedRequests.add(req);
212                 }
213 
214                 if (!failedRequests.isEmpty()) {
215                     WriteToClosedSessionException cause = new WriteToClosedSessionException(failedRequests);
216                     for (WriteRequest r: failedRequests) {
217                         r.getFuture().setException(cause);
218                     }
219                     session.getFilterChain().fireExceptionCaught(cause);
220                 }
221             }
222         }
223 
224         private Object getMessageCopy(Object message) {
225             Object messageCopy = message;
226             if (message instanceof IoBuffer) {
227                 IoBuffer rb = (IoBuffer) message;
228                 rb.mark();
229                 IoBuffer wb = IoBuffer.allocate(rb.remaining());
230                 wb.put(rb);
231                 wb.flip();
232                 rb.reset();
233                 messageCopy = wb;
234             }
235             return messageCopy;
236         }
237 
238         public void remove(VmPipeSession session) {
239             try {
240                 session.getLock().lock();
241                 if (!session.getCloseFuture().isClosed()) {
242                     session.getServiceListeners().fireSessionDestroyed(session);
243                     session.getRemoteSession().close(true);
244                 }
245             } finally {
246                 session.getLock().unlock();
247             }
248         }
249 
250         public void add(VmPipeSession session) {
251             // Unused
252         }
253 
254         public void updateTrafficControl(VmPipeSession session) {
255             if ( ! session.isReadSuspended()) {
256                 List<Object> data = new ArrayList<Object>();
257                 session.receivedMessageQueue.drainTo(data);
258                 for (Object aData : data) {
259                     VmPipeFilterChain.this.fireMessageReceived(aData);
260                 }
261             }
262 
263             if ( ! session.isWriteSuspended()) {
264                 flush(session);
265             }
266         }
267 
268         public void dispose() {
269             // Nothing to dispose
270         }
271 
272         public boolean isDisposed() {
273             return false;
274         }
275 
276         public boolean isDisposing() {
277             return false;
278         }
279     }
280 }