View Javadoc
1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.transport.vmpipe;
21  
22  import java.util.ArrayList;
23  import java.util.List;
24  import java.util.Queue;
25  import java.util.concurrent.ConcurrentLinkedQueue;
26  
27  import org.apache.mina.core.buffer.IoBuffer;
28  import org.apache.mina.core.filterchain.DefaultIoFilterChain;
29  import org.apache.mina.core.service.IoProcessor;
30  import org.apache.mina.core.session.AbstractIoSession;
31  import org.apache.mina.core.session.IdleStatus;
32  import org.apache.mina.core.session.IoEvent;
33  import org.apache.mina.core.session.IoEventType;
34  import org.apache.mina.core.write.WriteRequest;
35  import org.apache.mina.core.write.WriteRequestQueue;
36  import org.apache.mina.core.write.WriteToClosedSessionException;
37  
38  /**
39   * TODO Add documentation
40   * 
41   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
42   */
43  class VmPipeFilterChain extends DefaultIoFilterChain {
44  
45      private final Queue<IoEvent> eventQueue = new ConcurrentLinkedQueue<IoEvent>();
46  
47      private final IoProcessor<VmPipeSession> processor = new VmPipeIoProcessor();
48  
49      private volatile boolean flushEnabled;
50  
51      private volatile boolean sessionOpened;
52  
53      VmPipeFilterChain(AbstractIoSession session) {
54          super(session);
55      }
56  
57      IoProcessor<VmPipeSession> getProcessor() {
58          return processor;
59      }
60  
61      public void start() {
62          flushEnabled = true;
63          flushEvents();
64          flushPendingDataQueues((VmPipeSession) getSession());
65      }
66  
67      private void pushEvent(IoEvent e) {
68          pushEvent(e, flushEnabled);
69      }
70  
71      private void pushEvent(IoEvent e, boolean flushNow) {
72          eventQueue.add(e);
73          if (flushNow) {
74              flushEvents();
75          }
76      }
77  
78      private void flushEvents() {
79          IoEvent e;
80          while ((e = eventQueue.poll()) != null) {
81              fireEvent(e);
82          }
83      }
84  
85      private void fireEvent(IoEvent e) {
86          VmPipeSession session = (VmPipeSession) getSession();
87          IoEventType type = e.getType();
88          Object data = e.getParameter();
89  
90          if (type == IoEventType.MESSAGE_RECEIVED) {
91              if (sessionOpened && (!session.isReadSuspended()) && session.getLock().tryLock()) {
92                  try {
93                      if (session.isReadSuspended()) {
94                          session.receivedMessageQueue.add(data);
95                      } else {
96                          super.fireMessageReceived(data);
97                      }
98                  } finally {
99                      session.getLock().unlock();
100                 }
101             } else {
102                 session.receivedMessageQueue.add(data);
103             }
104         } else if (type == IoEventType.WRITE) {
105             super.fireFilterWrite((WriteRequest) data);
106         } else if (type == IoEventType.MESSAGE_SENT) {
107             super.fireMessageSent((WriteRequest) data);
108         } else if (type == IoEventType.EXCEPTION_CAUGHT) {
109             super.fireExceptionCaught((Throwable) data);
110         } else if (type == IoEventType.SESSION_IDLE) {
111             super.fireSessionIdle((IdleStatus) data);
112         } else if (type == IoEventType.SESSION_OPENED) {
113             super.fireSessionOpened();
114             sessionOpened = true;
115         } else if (type == IoEventType.SESSION_CREATED) {
116             session.getLock().lock();
117             try {
118                 super.fireSessionCreated();
119             } finally {
120                 session.getLock().unlock();
121             }
122         } else if (type == IoEventType.SESSION_CLOSED) {
123             flushPendingDataQueues(session);
124             super.fireSessionClosed();
125         } else if (type == IoEventType.CLOSE) {
126             super.fireFilterClose();
127         }
128     }
129 
130     private static void flushPendingDataQueues(VmPipeSession s) {
131         s.getProcessor().updateTrafficControl(s);
132         s.getRemoteSession().getProcessor().updateTrafficControl(s);
133     }
134 
135     @Override
136     public void fireFilterClose() {
137         pushEvent(new IoEvent(IoEventType.CLOSE, getSession(), null));
138     }
139 
140     @Override
141     public void fireFilterWrite(WriteRequest writeRequest) {
142         pushEvent(new IoEvent(IoEventType.WRITE, getSession(), writeRequest));
143     }
144 
145     @Override
146     public void fireExceptionCaught(Throwable cause) {
147         pushEvent(new IoEvent(IoEventType.EXCEPTION_CAUGHT, getSession(), cause));
148     }
149 
150     @Override
151     public void fireMessageSent(WriteRequest request) {
152         pushEvent(new IoEvent(IoEventType.MESSAGE_SENT, getSession(), request));
153     }
154 
155     @Override
156     public void fireSessionClosed() {
157         pushEvent(new IoEvent(IoEventType.SESSION_CLOSED, getSession(), null));
158     }
159 
160     @Override
161     public void fireSessionCreated() {
162         pushEvent(new IoEvent(IoEventType.SESSION_CREATED, getSession(), null));
163     }
164 
165     @Override
166     public void fireSessionIdle(IdleStatus status) {
167         pushEvent(new IoEvent(IoEventType.SESSION_IDLE, getSession(), status));
168     }
169 
170     @Override
171     public void fireSessionOpened() {
172         pushEvent(new IoEvent(IoEventType.SESSION_OPENED, getSession(), null));
173     }
174 
175     @Override
176     public void fireMessageReceived(Object message) {
177         pushEvent(new IoEvent(IoEventType.MESSAGE_RECEIVED, getSession(), message));
178     }
179 
180     private class VmPipeIoProcessor implements IoProcessor<VmPipeSession> {
181         public void flush(VmPipeSession session) {
182             WriteRequestQueue queue = session.getWriteRequestQueue0();
183             if (!session.isClosing()) {
184                 session.getLock().lock();
185                 try {
186                     if (queue.isEmpty(session)) {
187                         return;
188                     }
189                     WriteRequest req;
190                     long currentTime = System.currentTimeMillis();
191                     while ((req = queue.poll(session)) != null) {
192                         Object m = req.getMessage();
193                         pushEvent(new IoEvent(IoEventType.MESSAGE_SENT, session, req), false);
194                         session.getRemoteSession().getFilterChain().fireMessageReceived(getMessageCopy(m));
195                         if (m instanceof IoBuffer) {
196                             session.increaseWrittenBytes0(((IoBuffer) m).remaining(), currentTime);
197                         }
198                     }
199                 } finally {
200                     if (flushEnabled) {
201                         flushEvents();
202                     }
203                     session.getLock().unlock();
204                 }
205 
206                 flushPendingDataQueues(session);
207             } else {
208                 List<WriteRequest> failedRequests = new ArrayList<WriteRequest>();
209                 WriteRequest req;
210                 while ((req = queue.poll(session)) != null) {
211                     failedRequests.add(req);
212                 }
213 
214                 if (!failedRequests.isEmpty()) {
215                     WriteToClosedSessionException cause = new WriteToClosedSessionException(failedRequests);
216                     for (WriteRequest r : failedRequests) {
217                         r.getFuture().setException(cause);
218                     }
219                     session.getFilterChain().fireExceptionCaught(cause);
220                 }
221             }
222         }
223 
224         /**
225          * {@inheritDoc}
226          */
227         public void write(VmPipeSession session, WriteRequest writeRequest) {
228             WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
229 
230             writeRequestQueue.offer(session, writeRequest);
231 
232             if (!session.isWriteSuspended()) {
233                 this.flush(session);
234             }
235         }
236 
237         private Object getMessageCopy(Object message) {
238             Object messageCopy = message;
239             if (message instanceof IoBuffer) {
240                 IoBuffer rb = (IoBuffer) message;
241                 rb.mark();
242                 IoBuffer wb = IoBuffer.allocate(rb.remaining());
243                 wb.put(rb);
244                 wb.flip();
245                 rb.reset();
246                 messageCopy = wb;
247             }
248             return messageCopy;
249         }
250 
251         public void remove(VmPipeSession session) {
252             try {
253                 session.getLock().lock();
254                 if (!session.getCloseFuture().isClosed()) {
255                     session.getServiceListeners().fireSessionDestroyed(session);
256                     session.getRemoteSession().closeNow();
257                 }
258             } finally {
259                 session.getLock().unlock();
260             }
261         }
262 
263         public void add(VmPipeSession session) {
264             // Unused
265         }
266 
267         public void updateTrafficControl(VmPipeSession session) {
268             if (!session.isReadSuspended()) {
269                 List<Object> data = new ArrayList<Object>();
270                 session.receivedMessageQueue.drainTo(data);
271                 for (Object aData : data) {
272                     VmPipeFilterChain.this.fireMessageReceived(aData);
273                 }
274             }
275 
276             if (!session.isWriteSuspended()) {
277                 flush(session);
278             }
279         }
280 
281         public void dispose() {
282             // Nothing to dispose
283         }
284 
285         public boolean isDisposed() {
286             return false;
287         }
288 
289         public boolean isDisposing() {
290             return false;
291         }
292     }
293 }