1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.transport.vmpipe;
21
22 import java.util.ArrayList;
23 import java.util.List;
24 import java.util.Queue;
25 import java.util.concurrent.ConcurrentLinkedQueue;
26
27 import org.apache.mina.core.buffer.IoBuffer;
28 import org.apache.mina.core.filterchain.DefaultIoFilterChain;
29 import org.apache.mina.core.service.IoProcessor;
30 import org.apache.mina.core.session.AbstractIoSession;
31 import org.apache.mina.core.session.IdleStatus;
32 import org.apache.mina.core.session.IoEvent;
33 import org.apache.mina.core.session.IoEventType;
34 import org.apache.mina.core.write.WriteRequest;
35 import org.apache.mina.core.write.WriteRequestQueue;
36 import org.apache.mina.core.write.WriteToClosedSessionException;
37
38
39
40
41
42
43 class VmPipeFilterChain extends DefaultIoFilterChain {
44
45 private final Queue<IoEvent> eventQueue = new ConcurrentLinkedQueue<IoEvent>();
46 private final IoProcessor<VmPipeSession> processor = new VmPipeIoProcessor();
47
48 private volatile boolean flushEnabled;
49 private volatile boolean sessionOpened;
50
51 VmPipeFilterChain(AbstractIoSession session) {
52 super(session);
53 }
54
55 IoProcessor<VmPipeSession> getProcessor() {
56 return processor;
57 }
58
59 public void start() {
60 flushEnabled = true;
61 flushEvents();
62 flushPendingDataQueues((VmPipeSession) getSession());
63 }
64
65 private void pushEvent(IoEvent e) {
66 pushEvent(e, flushEnabled);
67 }
68
69 private void pushEvent(IoEvent e, boolean flushNow) {
70 eventQueue.add(e);
71 if (flushNow) {
72 flushEvents();
73 }
74 }
75
76 private void flushEvents() {
77 IoEvent e;
78 while ((e = eventQueue.poll()) != null) {
79 fireEvent(e);
80 }
81 }
82
83 private void fireEvent(IoEvent e) {
84 VmPipeSession session = (VmPipeSession) getSession();
85 IoEventType type = e.getType();
86 Object data = e.getParameter();
87
88 if (type == IoEventType.MESSAGE_RECEIVED) {
89 if (sessionOpened && (! session.isReadSuspended() ) && session.getLock().tryLock()) {
90 try {
91 if (session.isReadSuspended()) {
92 session.receivedMessageQueue.add(data);
93 } else {
94 super.fireMessageReceived(data);
95 }
96 } finally {
97 session.getLock().unlock();
98 }
99 } else {
100 session.receivedMessageQueue.add(data);
101 }
102 } else if (type == IoEventType.WRITE) {
103 super.fireFilterWrite((WriteRequest) data);
104 } else if (type == IoEventType.MESSAGE_SENT) {
105 super.fireMessageSent((WriteRequest) data);
106 } else if (type == IoEventType.EXCEPTION_CAUGHT) {
107 super.fireExceptionCaught((Throwable) data);
108 } else if (type == IoEventType.SESSION_IDLE) {
109 super.fireSessionIdle((IdleStatus) data);
110 } else if (type == IoEventType.SESSION_OPENED) {
111 super.fireSessionOpened();
112 sessionOpened = true;
113 } else if (type == IoEventType.SESSION_CREATED) {
114 session.getLock().lock();
115 try {
116 super.fireSessionCreated();
117 } finally {
118 session.getLock().unlock();
119 }
120 } else if (type == IoEventType.SESSION_CLOSED) {
121 flushPendingDataQueues(session);
122 super.fireSessionClosed();
123 } else if (type == IoEventType.CLOSE) {
124 super.fireFilterClose();
125 }
126 }
127
128 private static void flushPendingDataQueues(VmPipeSession s) {
129 s.getProcessor().updateTrafficControl(s);
130 s.getRemoteSession().getProcessor().updateTrafficControl(s);
131 }
132
133 @Override
134 public void fireFilterClose() {
135 pushEvent(new IoEvent(IoEventType.CLOSE, getSession(), null));
136 }
137
138 @Override
139 public void fireFilterWrite(WriteRequest writeRequest) {
140 pushEvent(new IoEvent(IoEventType.WRITE, getSession(), writeRequest));
141 }
142
143 @Override
144 public void fireExceptionCaught(Throwable cause) {
145 pushEvent(new IoEvent(IoEventType.EXCEPTION_CAUGHT, getSession(), cause));
146 }
147
148 @Override
149 public void fireMessageSent(WriteRequest request) {
150 pushEvent(new IoEvent(IoEventType.MESSAGE_SENT, getSession(), request));
151 }
152
153 @Override
154 public void fireSessionClosed() {
155 pushEvent(new IoEvent(IoEventType.SESSION_CLOSED, getSession(), null));
156 }
157
158 @Override
159 public void fireSessionCreated() {
160 pushEvent(new IoEvent(IoEventType.SESSION_CREATED, getSession(), null));
161 }
162
163 @Override
164 public void fireSessionIdle(IdleStatus status) {
165 pushEvent(new IoEvent(IoEventType.SESSION_IDLE, getSession(), status));
166 }
167
168 @Override
169 public void fireSessionOpened() {
170 pushEvent(new IoEvent(IoEventType.SESSION_OPENED, getSession(), null));
171 }
172
173 @Override
174 public void fireMessageReceived(Object message) {
175 pushEvent(new IoEvent(IoEventType.MESSAGE_RECEIVED, getSession(), message));
176 }
177
178 private class VmPipeIoProcessor implements IoProcessor<VmPipeSession> {
179 public void flush(VmPipeSession session) {
180 WriteRequestQueue queue = session.getWriteRequestQueue0();
181 if (!session.isClosing()) {
182 session.getLock().lock();
183 try {
184 if (queue.isEmpty(session)) {
185 return;
186 }
187 WriteRequest req;
188 long currentTime = System.currentTimeMillis();
189 while ((req = queue.poll(session)) != null) {
190 Object m = req.getMessage();
191 pushEvent(new IoEvent(IoEventType.MESSAGE_SENT, session, req), false);
192 session.getRemoteSession().getFilterChain().fireMessageReceived(
193 getMessageCopy(m));
194 if (m instanceof IoBuffer) {
195 session.increaseWrittenBytes0(
196 ((IoBuffer) m).remaining(), currentTime);
197 }
198 }
199 } finally {
200 if (flushEnabled) {
201 flushEvents();
202 }
203 session.getLock().unlock();
204 }
205
206 flushPendingDataQueues(session);
207 } else {
208 List<WriteRequest> failedRequests = new ArrayList<WriteRequest>();
209 WriteRequest req;
210 while ((req = queue.poll(session)) != null) {
211 failedRequests.add(req);
212 }
213
214 if (!failedRequests.isEmpty()) {
215 WriteToClosedSessionException cause = new WriteToClosedSessionException(failedRequests);
216 for (WriteRequest r: failedRequests) {
217 r.getFuture().setException(cause);
218 }
219 session.getFilterChain().fireExceptionCaught(cause);
220 }
221 }
222 }
223
224 private Object getMessageCopy(Object message) {
225 Object messageCopy = message;
226 if (message instanceof IoBuffer) {
227 IoBuffer rb = (IoBuffer) message;
228 rb.mark();
229 IoBuffer wb = IoBuffer.allocate(rb.remaining());
230 wb.put(rb);
231 wb.flip();
232 rb.reset();
233 messageCopy = wb;
234 }
235 return messageCopy;
236 }
237
238 public void remove(VmPipeSession session) {
239 try {
240 session.getLock().lock();
241 if (!session.getCloseFuture().isClosed()) {
242 session.getServiceListeners().fireSessionDestroyed(session);
243 session.getRemoteSession().close(true);
244 }
245 } finally {
246 session.getLock().unlock();
247 }
248 }
249
250 public void add(VmPipeSession session) {
251
252 }
253
254 public void updateTrafficControl(VmPipeSession session) {
255 if ( ! session.isReadSuspended()) {
256 List<Object> data = new ArrayList<Object>();
257 session.receivedMessageQueue.drainTo(data);
258 for (Object aData : data) {
259 VmPipeFilterChain.this.fireMessageReceived(aData);
260 }
261 }
262
263 if ( ! session.isWriteSuspended()) {
264 flush(session);
265 }
266 }
267
268 public void dispose() {
269
270 }
271
272 public boolean isDisposed() {
273 return false;
274 }
275
276 public boolean isDisposing() {
277 return false;
278 }
279 }
280 }