1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.transport.vmpipe;
21
22 import java.util.ArrayList;
23 import java.util.List;
24 import java.util.Queue;
25 import java.util.concurrent.ConcurrentLinkedQueue;
26
27 import org.apache.mina.core.buffer.IoBuffer;
28 import org.apache.mina.core.filterchain.DefaultIoFilterChain;
29 import org.apache.mina.core.service.IoProcessor;
30 import org.apache.mina.core.session.AbstractIoSession;
31 import org.apache.mina.core.session.IdleStatus;
32 import org.apache.mina.core.session.IoEvent;
33 import org.apache.mina.core.session.IoEventType;
34 import org.apache.mina.core.write.WriteRequest;
35 import org.apache.mina.core.write.WriteRequestQueue;
36 import org.apache.mina.core.write.WriteToClosedSessionException;
37
38
39
40
41
42
43
44 class VmPipeFilterChain extends DefaultIoFilterChain {
45
46 private final Queue<IoEvent> eventQueue = new ConcurrentLinkedQueue<IoEvent>();
47 private final IoProcessor<VmPipeSession> processor = new VmPipeIoProcessor();
48
49 private volatile boolean flushEnabled;
50 private volatile boolean sessionOpened;
51
52 VmPipeFilterChain(AbstractIoSession session) {
53 super(session);
54 }
55
56 IoProcessor<VmPipeSession> getProcessor() {
57 return processor;
58 }
59
60 public void start() {
61 flushEnabled = true;
62 flushEvents();
63 flushPendingDataQueues((VmPipeSession) getSession());
64 }
65
66 private void pushEvent(IoEvent e) {
67 pushEvent(e, flushEnabled);
68 }
69
70 private void pushEvent(IoEvent e, boolean flushNow) {
71 eventQueue.add(e);
72 if (flushNow) {
73 flushEvents();
74 }
75 }
76
77 private void flushEvents() {
78 IoEvent e;
79 while ((e = eventQueue.poll()) != null) {
80 fireEvent(e);
81 }
82 }
83
84 private void fireEvent(IoEvent e) {
85 VmPipeSession session = (VmPipeSession) getSession();
86 IoEventType type = e.getType();
87 Object data = e.getParameter();
88
89 if (type == IoEventType.MESSAGE_RECEIVED) {
90 if (sessionOpened && session.getTrafficMask().isReadable() && session.getLock().tryLock()) {
91 try {
92 if (!session.getTrafficMask().isReadable()) {
93 session.receivedMessageQueue.add(data);
94 } else {
95 super.fireMessageReceived(data);
96 }
97 } finally {
98 session.getLock().unlock();
99 }
100 } else {
101 session.receivedMessageQueue.add(data);
102 }
103 } else if (type == IoEventType.WRITE) {
104 super.fireFilterWrite((WriteRequest) data);
105 } else if (type == IoEventType.MESSAGE_SENT) {
106 super.fireMessageSent((WriteRequest) data);
107 } else if (type == IoEventType.EXCEPTION_CAUGHT) {
108 super.fireExceptionCaught((Throwable) data);
109 } else if (type == IoEventType.SESSION_IDLE) {
110 super.fireSessionIdle((IdleStatus) data);
111 } else if (type == IoEventType.SESSION_OPENED) {
112 super.fireSessionOpened();
113 sessionOpened = true;
114 } else if (type == IoEventType.SESSION_CREATED) {
115 session.getLock().lock();
116 try {
117 super.fireSessionCreated();
118 } finally {
119 session.getLock().unlock();
120 }
121 } else if (type == IoEventType.SESSION_CLOSED) {
122 flushPendingDataQueues(session);
123 super.fireSessionClosed();
124 } else if (type == IoEventType.CLOSE) {
125 super.fireFilterClose();
126 }
127 }
128
129 private static void flushPendingDataQueues(VmPipeSession s) {
130 s.getProcessor().updateTrafficMask(s);
131 s.getRemoteSession().getProcessor().updateTrafficMask(s);
132 }
133
134 @Override
135 public void fireFilterClose() {
136 pushEvent(new IoEvent(IoEventType.CLOSE, getSession(), null));
137 }
138
139 @Override
140 public void fireFilterWrite(WriteRequest writeRequest) {
141 pushEvent(new IoEvent(IoEventType.WRITE, getSession(), writeRequest));
142 }
143
144 @Override
145 public void fireExceptionCaught(Throwable cause) {
146 pushEvent(new IoEvent(IoEventType.EXCEPTION_CAUGHT, getSession(), cause));
147 }
148
149 @Override
150 public void fireMessageSent(WriteRequest request) {
151 pushEvent(new IoEvent(IoEventType.MESSAGE_SENT, getSession(), request));
152 }
153
154 @Override
155 public void fireSessionClosed() {
156 pushEvent(new IoEvent(IoEventType.SESSION_CLOSED, getSession(), null));
157 }
158
159 @Override
160 public void fireSessionCreated() {
161 pushEvent(new IoEvent(IoEventType.SESSION_CREATED, getSession(), null));
162 }
163
164 @Override
165 public void fireSessionIdle(IdleStatus status) {
166 pushEvent(new IoEvent(IoEventType.SESSION_IDLE, getSession(), status));
167 }
168
169 @Override
170 public void fireSessionOpened() {
171 pushEvent(new IoEvent(IoEventType.SESSION_OPENED, getSession(), null));
172 }
173
174 @Override
175 public void fireMessageReceived(Object message) {
176 pushEvent(new IoEvent(IoEventType.MESSAGE_RECEIVED, getSession(), message));
177 }
178
179 private class VmPipeIoProcessor implements IoProcessor<VmPipeSession> {
180 public void flush(VmPipeSession session) {
181 WriteRequestQueue queue = session.getWriteRequestQueue0();
182 if (!session.isClosing()) {
183 session.getLock().lock();
184 try {
185 if (queue.isEmpty(session)) {
186 return;
187 }
188 WriteRequest req;
189 long currentTime = System.currentTimeMillis();
190 while ((req = queue.poll(session)) != null) {
191 Object m = req.getMessage();
192 pushEvent(new IoEvent(IoEventType.MESSAGE_SENT, session, req), false);
193 session.getRemoteSession().getFilterChain().fireMessageReceived(
194 getMessageCopy(m));
195 if (m instanceof IoBuffer) {
196 session.increaseWrittenBytes0(
197 ((IoBuffer) m).remaining(), currentTime);
198 }
199 }
200 } finally {
201 if (flushEnabled) {
202 flushEvents();
203 }
204 session.getLock().unlock();
205 }
206
207 flushPendingDataQueues(session);
208 } else {
209 List<WriteRequest> failedRequests = new ArrayList<WriteRequest>();
210 WriteRequest req;
211 while ((req = queue.poll(session)) != null) {
212 failedRequests.add(req);
213 }
214
215 if (!failedRequests.isEmpty()) {
216 WriteToClosedSessionException cause = new WriteToClosedSessionException(failedRequests);
217 for (WriteRequest r: failedRequests) {
218 r.getFuture().setException(cause);
219 }
220 session.getFilterChain().fireExceptionCaught(cause);
221 }
222 }
223 }
224
225 private Object getMessageCopy(Object message) {
226 Object messageCopy = message;
227 if (message instanceof IoBuffer) {
228 IoBuffer rb = (IoBuffer) message;
229 rb.mark();
230 IoBuffer wb = IoBuffer.allocate(rb.remaining());
231 wb.put(rb);
232 wb.flip();
233 rb.reset();
234 messageCopy = wb;
235 }
236 return messageCopy;
237 }
238
239 public void remove(VmPipeSession session) {
240 try {
241 session.getLock().lock();
242 if (!session.getCloseFuture().isClosed()) {
243 session.getServiceListeners().fireSessionDestroyed(session);
244 session.getRemoteSession().close();
245 }
246 } finally {
247 session.getLock().unlock();
248 }
249 }
250
251 public void add(VmPipeSession session) {
252
253 }
254
255 public void updateTrafficMask(VmPipeSession session) {
256 if (session.getTrafficMask().isReadable()) {
257 List<Object> data = new ArrayList<Object>();
258 session.receivedMessageQueue.drainTo(data);
259 for (Object aData : data) {
260 VmPipeFilterChain.this.fireMessageReceived(aData);
261 }
262 }
263
264 if (session.getTrafficMask().isWritable()) {
265 flush(session);
266 }
267 }
268
269 public void dispose() {
270
271 }
272
273 public boolean isDisposed() {
274 return false;
275 }
276
277 public boolean isDisposing() {
278 return false;
279 }
280 }
281 }