1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.transport.vmpipe;
21
22 import java.util.ArrayList;
23 import java.util.List;
24 import java.util.Queue;
25 import java.util.concurrent.ConcurrentLinkedQueue;
26
27 import org.apache.mina.core.buffer.IoBuffer;
28 import org.apache.mina.core.filterchain.DefaultIoFilterChain;
29 import org.apache.mina.core.service.IoProcessor;
30 import org.apache.mina.core.session.AbstractIoSession;
31 import org.apache.mina.core.session.IdleStatus;
32 import org.apache.mina.core.session.IoEvent;
33 import org.apache.mina.core.session.IoEventType;
34 import org.apache.mina.core.write.WriteRequest;
35 import org.apache.mina.core.write.WriteRequestQueue;
36 import org.apache.mina.core.write.WriteToClosedSessionException;
37 import org.apache.mina.filter.FilterEvent;
38
39
40
41
42
43
44 class VmPipeFilterChain extends DefaultIoFilterChain {
45
46 private final Queue<IoEvent> eventQueue = new ConcurrentLinkedQueue<IoEvent>();
47
48 private final IoProcessor<VmPipeSession> processor = new VmPipeIoProcessor();
49
50 private volatile boolean flushEnabled;
51
52 private volatile boolean sessionOpened;
53
54 VmPipeFilterChain(AbstractIoSession session) {
55 super(session);
56 }
57
58 IoProcessor<VmPipeSession> getProcessor() {
59 return processor;
60 }
61
62 public void start() {
63 flushEnabled = true;
64 flushEvents();
65 flushPendingDataQueues((VmPipeSession) getSession());
66 }
67
68 private void pushEvent(IoEvent e) {
69 pushEvent(e, flushEnabled);
70 }
71
72 private void pushEvent(IoEvent e, boolean flushNow) {
73 eventQueue.add(e);
74 if (flushNow) {
75 flushEvents();
76 }
77 }
78
79 private void flushEvents() {
80 IoEvent e;
81 while ((e = eventQueue.poll()) != null) {
82 fireEvent(e);
83 }
84 }
85
86 private void fireEvent(IoEvent e) {
87 VmPipeSession/../../org/apache/mina/transport/vmpipe/VmPipeSession.html#VmPipeSession">VmPipeSession session = (VmPipeSession) getSession();
88 IoEventType type = e.getType();
89 Object data = e.getParameter();
90
91 switch (type) {
92 case EVENT:
93 super.fireEvent((FilterEvent) data);
94 break;
95
96 case EXCEPTION_CAUGHT:
97 super.fireExceptionCaught((Throwable) data);
98 break;
99
100 case CLOSE:
101 super.fireFilterClose();
102 break;
103
104 case INPUT_CLOSED:
105 super.fireInputClosed();
106 break;
107
108 case MESSAGE_SENT:
109 super.fireMessageSent((WriteRequest) data);
110 break;
111
112 case MESSAGE_RECEIVED:
113 if (sessionOpened && (!session.isReadSuspended()) && session.getLock().tryLock()) {
114 try {
115 if (session.isReadSuspended()) {
116 session.receivedMessageQueue.add(data);
117 } else {
118 super.fireMessageReceived(data);
119 }
120 } finally {
121 session.getLock().unlock();
122 }
123 } else {
124 session.receivedMessageQueue.add(data);
125 }
126
127 break;
128
129 case SESSION_CLOSED:
130 flushPendingDataQueues(session);
131 super.fireSessionClosed();
132 break;
133
134 case SESSION_CREATED:
135 session.getLock().lock();
136 try {
137 super.fireSessionCreated();
138 } finally {
139 session.getLock().unlock();
140 }
141
142 break;
143
144 case SESSION_IDLE:
145 super.fireSessionIdle((IdleStatus) data);
146 break;
147
148 case SESSION_OPENED:
149 super.fireSessionOpened();
150 sessionOpened = true;
151 break;
152
153 case WRITE:
154 super.fireFilterWrite((WriteRequest) data);
155 break;
156 }
157 }
158
159 private static void flushPendingDataQueues(VmPipeSession s) {
160 s.getProcessor().updateTrafficControl(s);
161 s.getRemoteSession().getProcessor().updateTrafficControl(s);
162 }
163
164 @Override
165 public void fireEvent(FilterEvent event) {
166 pushEvent(new IoEvent(IoEventType.EVENT, getSession(), event));
167 }
168
169 @Override
170 public void fireFilterClose() {
171 pushEvent(new IoEvent(IoEventType.CLOSE, getSession(), null));
172 }
173
174 @Override
175 public void fireInputClosed() {
176 pushEvent(new IoEvent(IoEventType.INPUT_CLOSED, getSession(), null));
177 }
178
179 @Override
180 public void fireFilterWrite(WriteRequest writeRequest) {
181 pushEvent(new IoEvent(IoEventType.WRITE, getSession(), writeRequest));
182 }
183
184 @Override
185 public void fireExceptionCaught(Throwable cause) {
186 pushEvent(new IoEvent(IoEventType.EXCEPTION_CAUGHT, getSession(), cause));
187 }
188
189 @Override
190 public void fireMessageSent(WriteRequest request) {
191 pushEvent(new IoEvent(IoEventType.MESSAGE_SENT, getSession(), request));
192 }
193
194 @Override
195 public void fireSessionClosed() {
196 pushEvent(new IoEvent(IoEventType.SESSION_CLOSED, getSession(), null));
197 }
198
199 @Override
200 public void fireSessionCreated() {
201 pushEvent(new IoEvent(IoEventType.SESSION_CREATED, getSession(), null));
202 }
203
204 @Override
205 public void fireSessionIdle(IdleStatus status) {
206 pushEvent(new IoEvent(IoEventType.SESSION_IDLE, getSession(), status));
207 }
208
209 @Override
210 public void fireSessionOpened() {
211 pushEvent(new IoEvent(IoEventType.SESSION_OPENED, getSession(), null));
212 }
213
214 @Override
215 public void fireMessageReceived(Object message) {
216 pushEvent(new IoEvent(IoEventType.MESSAGE_RECEIVED, getSession(), message));
217 }
218
219 private class VmPipeIoProcessor implements IoProcessor<VmPipeSession> {
220 public void flush(VmPipeSession session) {
221 WriteRequestQueue queue = session.getWriteRequestQueue0();
222 if (!session.isClosing()) {
223 session.getLock().lock();
224 try {
225 if (queue.isEmpty(session)) {
226 return;
227 }
228 WriteRequest req;
229 long currentTime = System.currentTimeMillis();
230 while ((req = queue.poll(session)) != null) {
231 Object m = req.getMessage();
232 pushEvent(new IoEvent(IoEventType.MESSAGE_SENT, session, req), false);
233 session.getRemoteSession().getFilterChain().fireMessageReceived(getMessageCopy(m));
234 if (m instanceof IoBuffer) {
235 session.increaseWrittenBytes0(((IoBuffer) m).remaining(), currentTime);
236 }
237 }
238 } finally {
239 if (flushEnabled) {
240 flushEvents();
241 }
242 session.getLock().unlock();
243 }
244
245 flushPendingDataQueues(session);
246 } else {
247 List<WriteRequest> failedRequests = new ArrayList<WriteRequest>();
248 WriteRequest req;
249 while ((req = queue.poll(session)) != null) {
250 failedRequests.add(req);
251 }
252
253 if (!failedRequests.isEmpty()) {
254 WriteToClosedSessionExceptionException.html#WriteToClosedSessionException">WriteToClosedSessionException cause = new WriteToClosedSessionException(failedRequests);
255 for (WriteRequest r : failedRequests) {
256 r.getFuture().setException(cause);
257 }
258 session.getFilterChain().fireExceptionCaught(cause);
259 }
260 }
261 }
262
263
264
265
266 public void write(VmPipeSession session, WriteRequest writeRequest) {
267 WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
268
269 writeRequestQueue.offer(session, writeRequest);
270
271 if (!session.isWriteSuspended()) {
272 this.flush(session);
273 }
274 }
275
276 private Object getMessageCopy(Object message) {
277 Object messageCopy = message;
278 if (message instanceof IoBuffer) {
279 IoBuffer="../../../../../org/apache/mina/core/buffer/IoBuffer.html#IoBuffer">IoBuffer rb = (IoBuffer) message;
280 rb.mark();
281 IoBuffer wb = IoBuffer.allocate(rb.remaining());
282 wb.put(rb);
283 wb.flip();
284 rb.reset();
285 messageCopy = wb;
286 }
287 return messageCopy;
288 }
289
290 public void remove(VmPipeSession session) {
291 try {
292 session.getLock().lock();
293 if (!session.getCloseFuture().isClosed()) {
294 session.getServiceListeners().fireSessionDestroyed(session);
295 session.getRemoteSession().closeNow();
296 }
297 } finally {
298 session.getLock().unlock();
299 }
300 }
301
302 public void add(VmPipeSession session) {
303
304 }
305
306 public void updateTrafficControl(VmPipeSession session) {
307 if (!session.isReadSuspended()) {
308 List<Object> data = new ArrayList<Object>();
309 session.receivedMessageQueue.drainTo(data);
310 for (Object aData : data) {
311 VmPipeFilterChain.this.fireMessageReceived(aData);
312 }
313 }
314
315 if (!session.isWriteSuspended()) {
316 flush(session);
317 }
318 }
319
320 public void dispose() {
321
322 }
323
324 public boolean isDisposed() {
325 return false;
326 }
327
328 public boolean isDisposing() {
329 return false;
330 }
331 }
332 }