1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.transport.vmpipe;
21
22 import java.util.ArrayList;
23 import java.util.List;
24 import java.util.Queue;
25 import java.util.concurrent.ConcurrentLinkedQueue;
26
27 import org.apache.mina.core.buffer.IoBuffer;
28 import org.apache.mina.core.filterchain.DefaultIoFilterChain;
29 import org.apache.mina.core.service.IoProcessor;
30 import org.apache.mina.core.session.AbstractIoSession;
31 import org.apache.mina.core.session.IdleStatus;
32 import org.apache.mina.core.session.IoEvent;
33 import org.apache.mina.core.session.IoEventType;
34 import org.apache.mina.core.write.WriteRequest;
35 import org.apache.mina.core.write.WriteRequestQueue;
36 import org.apache.mina.core.write.WriteToClosedSessionException;
37
38
39
40
41
42
43 class VmPipeFilterChain extends DefaultIoFilterChain {
44
45 private final Queue<IoEvent> eventQueue = new ConcurrentLinkedQueue<IoEvent>();
46
47 private final IoProcessor<VmPipeSession> processor = new VmPipeIoProcessor();
48
49 private volatile boolean flushEnabled;
50
51 private volatile boolean sessionOpened;
52
53 VmPipeFilterChain(AbstractIoSession session) {
54 super(session);
55 }
56
57 IoProcessor<VmPipeSession> getProcessor() {
58 return processor;
59 }
60
61 public void start() {
62 flushEnabled = true;
63 flushEvents();
64 flushPendingDataQueues((VmPipeSession) getSession());
65 }
66
67 private void pushEvent(IoEvent e) {
68 pushEvent(e, flushEnabled);
69 }
70
71 private void pushEvent(IoEvent e, boolean flushNow) {
72 eventQueue.add(e);
73 if (flushNow) {
74 flushEvents();
75 }
76 }
77
78 private void flushEvents() {
79 IoEvent e;
80 while ((e = eventQueue.poll()) != null) {
81 fireEvent(e);
82 }
83 }
84
85 private void fireEvent(IoEvent e) {
86 VmPipeSession session = (VmPipeSession) getSession();
87 IoEventType type = e.getType();
88 Object data = e.getParameter();
89
90 if (type == IoEventType.MESSAGE_RECEIVED) {
91 if (sessionOpened && (!session.isReadSuspended()) && session.getLock().tryLock()) {
92 try {
93 if (session.isReadSuspended()) {
94 session.receivedMessageQueue.add(data);
95 } else {
96 super.fireMessageReceived(data);
97 }
98 } finally {
99 session.getLock().unlock();
100 }
101 } else {
102 session.receivedMessageQueue.add(data);
103 }
104 } else if (type == IoEventType.WRITE) {
105 super.fireFilterWrite((WriteRequest) data);
106 } else if (type == IoEventType.MESSAGE_SENT) {
107 super.fireMessageSent((WriteRequest) data);
108 } else if (type == IoEventType.EXCEPTION_CAUGHT) {
109 super.fireExceptionCaught((Throwable) data);
110 } else if (type == IoEventType.SESSION_IDLE) {
111 super.fireSessionIdle((IdleStatus) data);
112 } else if (type == IoEventType.SESSION_OPENED) {
113 super.fireSessionOpened();
114 sessionOpened = true;
115 } else if (type == IoEventType.SESSION_CREATED) {
116 session.getLock().lock();
117 try {
118 super.fireSessionCreated();
119 } finally {
120 session.getLock().unlock();
121 }
122 } else if (type == IoEventType.SESSION_CLOSED) {
123 flushPendingDataQueues(session);
124 super.fireSessionClosed();
125 } else if (type == IoEventType.CLOSE) {
126 super.fireFilterClose();
127 }
128 }
129
130 private static void flushPendingDataQueues(VmPipeSession s) {
131 s.getProcessor().updateTrafficControl(s);
132 s.getRemoteSession().getProcessor().updateTrafficControl(s);
133 }
134
135 @Override
136 public void fireFilterClose() {
137 pushEvent(new IoEvent(IoEventType.CLOSE, getSession(), null));
138 }
139
140 @Override
141 public void fireFilterWrite(WriteRequest writeRequest) {
142 pushEvent(new IoEvent(IoEventType.WRITE, getSession(), writeRequest));
143 }
144
145 @Override
146 public void fireExceptionCaught(Throwable cause) {
147 pushEvent(new IoEvent(IoEventType.EXCEPTION_CAUGHT, getSession(), cause));
148 }
149
150 @Override
151 public void fireMessageSent(WriteRequest request) {
152 pushEvent(new IoEvent(IoEventType.MESSAGE_SENT, getSession(), request));
153 }
154
155 @Override
156 public void fireSessionClosed() {
157 pushEvent(new IoEvent(IoEventType.SESSION_CLOSED, getSession(), null));
158 }
159
160 @Override
161 public void fireSessionCreated() {
162 pushEvent(new IoEvent(IoEventType.SESSION_CREATED, getSession(), null));
163 }
164
165 @Override
166 public void fireSessionIdle(IdleStatus status) {
167 pushEvent(new IoEvent(IoEventType.SESSION_IDLE, getSession(), status));
168 }
169
170 @Override
171 public void fireSessionOpened() {
172 pushEvent(new IoEvent(IoEventType.SESSION_OPENED, getSession(), null));
173 }
174
175 @Override
176 public void fireMessageReceived(Object message) {
177 pushEvent(new IoEvent(IoEventType.MESSAGE_RECEIVED, getSession(), message));
178 }
179
180 private class VmPipeIoProcessor implements IoProcessor<VmPipeSession> {
181 public void flush(VmPipeSession session) {
182 WriteRequestQueue queue = session.getWriteRequestQueue0();
183 if (!session.isClosing()) {
184 session.getLock().lock();
185 try {
186 if (queue.isEmpty(session)) {
187 return;
188 }
189 WriteRequest req;
190 long currentTime = System.currentTimeMillis();
191 while ((req = queue.poll(session)) != null) {
192 Object m = req.getMessage();
193 pushEvent(new IoEvent(IoEventType.MESSAGE_SENT, session, req), false);
194 session.getRemoteSession().getFilterChain().fireMessageReceived(getMessageCopy(m));
195 if (m instanceof IoBuffer) {
196 session.increaseWrittenBytes0(((IoBuffer) m).remaining(), currentTime);
197 }
198 }
199 } finally {
200 if (flushEnabled) {
201 flushEvents();
202 }
203 session.getLock().unlock();
204 }
205
206 flushPendingDataQueues(session);
207 } else {
208 List<WriteRequest> failedRequests = new ArrayList<WriteRequest>();
209 WriteRequest req;
210 while ((req = queue.poll(session)) != null) {
211 failedRequests.add(req);
212 }
213
214 if (!failedRequests.isEmpty()) {
215 WriteToClosedSessionException cause = new WriteToClosedSessionException(failedRequests);
216 for (WriteRequest r : failedRequests) {
217 r.getFuture().setException(cause);
218 }
219 session.getFilterChain().fireExceptionCaught(cause);
220 }
221 }
222 }
223
224
225
226
227 public void write(VmPipeSession session, WriteRequest writeRequest) {
228 WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
229
230 writeRequestQueue.offer(session, writeRequest);
231
232 if (!session.isWriteSuspended()) {
233 this.flush(session);
234 }
235 }
236
237 private Object getMessageCopy(Object message) {
238 Object messageCopy = message;
239 if (message instanceof IoBuffer) {
240 IoBuffer rb = (IoBuffer) message;
241 rb.mark();
242 IoBuffer wb = IoBuffer.allocate(rb.remaining());
243 wb.put(rb);
244 wb.flip();
245 rb.reset();
246 messageCopy = wb;
247 }
248 return messageCopy;
249 }
250
251 public void remove(VmPipeSession session) {
252 try {
253 session.getLock().lock();
254 if (!session.getCloseFuture().isClosed()) {
255 session.getServiceListeners().fireSessionDestroyed(session);
256 session.getRemoteSession().close(true);
257 }
258 } finally {
259 session.getLock().unlock();
260 }
261 }
262
263 public void add(VmPipeSession session) {
264
265 }
266
267 public void updateTrafficControl(VmPipeSession session) {
268 if (!session.isReadSuspended()) {
269 List<Object> data = new ArrayList<Object>();
270 session.receivedMessageQueue.drainTo(data);
271 for (Object aData : data) {
272 VmPipeFilterChain.this.fireMessageReceived(aData);
273 }
274 }
275
276 if (!session.isWriteSuspended()) {
277 flush(session);
278 }
279 }
280
281 public void dispose() {
282
283 }
284
285 public boolean isDisposed() {
286 return false;
287 }
288
289 public boolean isDisposing() {
290 return false;
291 }
292 }
293 }