1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.filter.stream;
21
22 import java.net.InetSocketAddress;
23 import java.net.SocketAddress;
24 import java.security.MessageDigest;
25 import java.util.LinkedList;
26 import java.util.Queue;
27 import java.util.Random;
28 import java.util.concurrent.CountDownLatch;
29 import java.util.concurrent.TimeUnit;
30
31 import junit.framework.TestCase;
32
33 import org.apache.mina.core.buffer.IoBuffer;
34 import org.apache.mina.core.filterchain.IoFilter.NextFilter;
35 import org.apache.mina.core.future.IoFutureListener;
36 import org.apache.mina.core.future.WriteFuture;
37 import org.apache.mina.core.service.IoHandlerAdapter;
38 import org.apache.mina.core.session.DummySession;
39 import org.apache.mina.core.session.IdleStatus;
40 import org.apache.mina.core.session.IoSession;
41 import org.apache.mina.core.write.DefaultWriteRequest;
42 import org.apache.mina.core.write.WriteRequest;
43 import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
44 import org.apache.mina.transport.socket.nio.NioSocketConnector;
45 import org.apache.mina.util.AvailablePortFinder;
46 import org.easymock.IArgumentMatcher;
47 import org.easymock.classextension.EasyMock;
48
49
50
51
52
53
54
55 public abstract class AbstractStreamWriteFilterTest<M, U extends AbstractStreamWriteFilter<M>> extends TestCase {
56
57 protected final IoSession session = new DummySession();
58
59 abstract protected U createFilter();
60
61 abstract protected M createMessage(byte[] data) throws Exception;
62
63 public void testWriteEmptyFile() throws Exception {
64 AbstractStreamWriteFilter<M> filter = createFilter();
65 M message = createMessage(new byte[0]);
66
67 WriteRequest writeRequest = new DefaultWriteRequest(message,
68 new DummyWriteFuture());
69
70 NextFilter nextFilter = EasyMock.createMock(NextFilter.class);
71
72
73
74 nextFilter.messageSent(session, writeRequest);
75
76
77
78
79 EasyMock.replay(nextFilter);
80
81 filter.filterWrite(nextFilter, session, writeRequest);
82
83
84
85
86 EasyMock.verify(nextFilter);
87
88 assertTrue(writeRequest.getFuture().isWritten());
89 }
90
91
92
93
94
95
96
97 public void testWriteNonFileRegionMessage() throws Exception {
98 AbstractStreamWriteFilter<M> filter = createFilter();
99
100 Object message = new Object();
101 WriteRequest writeRequest = new DefaultWriteRequest(message,
102 new DummyWriteFuture());
103
104 NextFilter nextFilter = EasyMock.createMock(NextFilter.class);
105
106
107
108 nextFilter.filterWrite(session, writeRequest);
109 nextFilter.messageSent(session, writeRequest);
110
111
112
113
114 EasyMock.replay(nextFilter);
115
116 filter.filterWrite(nextFilter, session, writeRequest);
117 filter.messageSent(nextFilter, session, writeRequest);
118
119
120
121
122 EasyMock.verify(nextFilter);
123 }
124
125
126
127
128
129
130 public void testWriteSingleBufferFile() throws Exception {
131 byte[] data = new byte[] { 1, 2, 3, 4 };
132
133 AbstractStreamWriteFilter<M> filter = createFilter();
134 M message = createMessage(data);
135
136 WriteRequest writeRequest = new DefaultWriteRequest(message,
137 new DummyWriteFuture());
138
139 NextFilter nextFilter = EasyMock.createMock(NextFilter.class);
140
141
142
143 nextFilter.filterWrite(EasyMock.eq(session), eqWriteRequest(new DefaultWriteRequest(IoBuffer
144 .wrap(data))));
145 nextFilter.messageSent(session, writeRequest);
146
147
148
149
150 EasyMock.replay(nextFilter);
151
152 filter.filterWrite(nextFilter, session, writeRequest);
153 filter.messageSent(nextFilter, session, writeRequest);
154
155
156
157
158 EasyMock.verify(nextFilter);
159
160 assertTrue(writeRequest.getFuture().isWritten());
161 }
162
163
164
165
166
167
168 public void testWriteSeveralBuffersStream() throws Exception {
169 AbstractStreamWriteFilter<M> filter = createFilter();
170 filter.setWriteBufferSize(4);
171
172 byte[] data = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
173 byte[] chunk1 = new byte[] { 1, 2, 3, 4 };
174 byte[] chunk2 = new byte[] { 5, 6, 7, 8 };
175 byte[] chunk3 = new byte[] { 9, 10 };
176
177 M message = createMessage(data);
178 WriteRequest writeRequest = new DefaultWriteRequest(message,
179 new DummyWriteFuture());
180
181 WriteRequest chunk1Request = new DefaultWriteRequest(IoBuffer
182 .wrap(chunk1));
183 WriteRequest chunk2Request = new DefaultWriteRequest(IoBuffer
184 .wrap(chunk2));
185 WriteRequest chunk3Request = new DefaultWriteRequest(IoBuffer
186 .wrap(chunk3));
187
188 NextFilter nextFilter = EasyMock.createMock(NextFilter.class);
189
190
191
192 nextFilter.filterWrite(EasyMock.eq(session), eqWriteRequest(chunk1Request));
193 nextFilter.filterWrite(EasyMock.eq(session), eqWriteRequest(chunk2Request));
194 nextFilter.filterWrite(EasyMock.eq(session), eqWriteRequest(chunk3Request));
195 nextFilter.messageSent(EasyMock.eq(session), eqWriteRequest(writeRequest));
196
197
198
199
200 EasyMock.replay(nextFilter);
201
202 filter.filterWrite(nextFilter, session, writeRequest);
203 filter.messageSent(nextFilter, session, chunk1Request);
204 filter.messageSent(nextFilter, session, chunk2Request);
205 filter.messageSent(nextFilter, session, chunk3Request);
206
207
208
209
210 EasyMock.verify(nextFilter);
211
212 assertTrue(writeRequest.getFuture().isWritten());
213 }
214
215 public void testWriteWhileWriteInProgress() throws Exception {
216 AbstractStreamWriteFilter<M> filter = createFilter();
217 M message = createMessage(new byte[5]);
218
219 Queue<WriteRequest> queue = new LinkedList<WriteRequest>();
220
221
222
223
224 session.setAttribute(filter.CURRENT_STREAM, message);
225 session.setAttribute(filter.WRITE_REQUEST_QUEUE, queue);
226
227 NextFilter nextFilter = EasyMock.createMock(NextFilter.class);
228
229
230
231 EasyMock.replay(nextFilter);
232
233 WriteRequest wr = new DefaultWriteRequest(new Object(),
234 new DummyWriteFuture());
235 filter.filterWrite(nextFilter, session, wr);
236 assertEquals(1, queue.size());
237 assertSame(wr, queue.poll());
238
239
240
241
242 EasyMock.verify(nextFilter);
243
244 session.removeAttribute(filter.CURRENT_STREAM);
245 session.removeAttribute(filter.WRITE_REQUEST_QUEUE);
246 }
247
248 public void testWritesWriteRequestQueueWhenFinished() throws Exception {
249 AbstractStreamWriteFilter<M> filter = createFilter();
250 M message = createMessage(new byte[0]);
251
252 WriteRequest wrs[] = new WriteRequest[] {
253 new DefaultWriteRequest(new Object(), new DummyWriteFuture()),
254 new DefaultWriteRequest(new Object(), new DummyWriteFuture()),
255 new DefaultWriteRequest(new Object(), new DummyWriteFuture()) };
256 Queue<WriteRequest> queue = new LinkedList<WriteRequest>();
257 queue.add(wrs[0]);
258 queue.add(wrs[1]);
259 queue.add(wrs[2]);
260
261
262
263
264 session.setAttribute(filter.CURRENT_STREAM, message);
265 session.setAttribute(filter.CURRENT_WRITE_REQUEST,
266 new DefaultWriteRequest(message));
267 session.setAttribute(filter.WRITE_REQUEST_QUEUE, queue);
268
269
270
271
272 NextFilter nextFilter = EasyMock.createMock(NextFilter.class);
273 nextFilter.filterWrite(session, wrs[0]);
274 nextFilter.filterWrite(session, wrs[1]);
275 nextFilter.filterWrite(session, wrs[2]);
276 nextFilter.messageSent(EasyMock.eq(session), eqWriteRequest(new DefaultWriteRequest(message)));
277
278
279
280
281 EasyMock.replay(nextFilter);
282
283 filter.messageSent(nextFilter, session, new DefaultWriteRequest(
284 new Object()));
285 assertEquals(0, queue.size());
286
287
288
289
290 EasyMock.verify(nextFilter);
291 }
292
293
294
295
296
297 public void testSetWriteBufferSize() {
298 AbstractStreamWriteFilter<M> filter = createFilter();
299
300 try {
301 filter.setWriteBufferSize(0);
302 fail("0 writeBuferSize specified. IllegalArgumentException expected.");
303 } catch (IllegalArgumentException iae) {
304
305 }
306
307 try {
308 filter.setWriteBufferSize(-100);
309 fail("Negative writeBuferSize specified. IllegalArgumentException expected.");
310 } catch (IllegalArgumentException iae) {
311
312 }
313
314 filter.setWriteBufferSize(1);
315 assertEquals(1, filter.getWriteBufferSize());
316 filter.setWriteBufferSize(1024);
317 assertEquals(1024, filter.getWriteBufferSize());
318 }
319
320 public void testWriteUsingSocketTransport() throws Exception {
321 NioSocketAcceptor acceptor = new NioSocketAcceptor();
322 acceptor.setReuseAddress(true);
323 SocketAddress address = new InetSocketAddress("localhost",
324 AvailablePortFinder.getNextAvailable());
325
326 NioSocketConnector connector = new NioSocketConnector();
327
328
329 byte[] data = new byte[4 * 1024 * 1024];
330 new Random().nextBytes(data);
331
332 byte[] expectedMd5 = MessageDigest.getInstance("MD5").digest(data);
333
334 M message = createMessage(data);
335
336 SenderHandler sender = new SenderHandler(message);
337 ReceiverHandler receiver = new ReceiverHandler(data.length);
338
339 acceptor.setHandler(sender);
340 connector.setHandler(receiver);
341
342 acceptor.bind(address);
343 connector.connect(address);
344 sender.latch.await();
345 receiver.latch.await();
346
347 acceptor.dispose();
348
349 assertEquals(data.length, receiver.bytesRead);
350 byte[] actualMd5 = receiver.digest.digest();
351 assertEquals(expectedMd5.length, actualMd5.length);
352 for (int i = 0; i < expectedMd5.length; i++) {
353 assertEquals(expectedMd5[i], actualMd5[i]);
354 }
355 }
356
357 private class SenderHandler extends IoHandlerAdapter {
358 final CountDownLatch latch = new CountDownLatch(1);
359
360 private M message;
361
362 StreamWriteFilter streamWriteFilter = new StreamWriteFilter();
363
364 private SenderHandler(M message) {
365 this.message = message;
366 }
367
368 @Override
369 public void sessionCreated(IoSession session) throws Exception {
370 super.sessionCreated(session);
371 session.getFilterChain().addLast("codec", streamWriteFilter);
372 }
373
374 @Override
375 public void sessionOpened(IoSession session) throws Exception {
376 session.write(message);
377 }
378
379 @Override
380 public void exceptionCaught(IoSession session, Throwable cause)
381 throws Exception {
382 latch.countDown();
383 }
384
385 @Override
386 public void sessionClosed(IoSession session) throws Exception {
387 latch.countDown();
388 }
389
390 @Override
391 public void sessionIdle(IoSession session, IdleStatus status)
392 throws Exception {
393 latch.countDown();
394 }
395
396 @Override
397 public void messageSent(IoSession session, Object message)
398 throws Exception {
399 if (message == this.message) {
400 latch.countDown();
401 }
402 }
403 }
404
405 private static class ReceiverHandler extends IoHandlerAdapter {
406 final CountDownLatch latch = new CountDownLatch(1);
407
408 long bytesRead = 0;
409
410 long size = 0;
411
412 MessageDigest digest;
413
414 private ReceiverHandler(long size) throws Exception {
415 this.size = size;
416 digest = MessageDigest.getInstance("MD5");
417 }
418
419 @Override
420 public void sessionCreated(IoSession session) throws Exception {
421 super.sessionCreated(session);
422
423 session.getConfig().setIdleTime(IdleStatus.READER_IDLE, 5);
424 }
425
426 @Override
427 public void sessionIdle(IoSession session, IdleStatus status)
428 throws Exception {
429 session.close(true);
430 }
431
432 @Override
433 public void exceptionCaught(IoSession session, Throwable cause)
434 throws Exception {
435 latch.countDown();
436 }
437
438 @Override
439 public void sessionClosed(IoSession session) throws Exception {
440 latch.countDown();
441 }
442
443 @Override
444 public void messageReceived(IoSession session, Object message)
445 throws Exception {
446 IoBuffer buf = (IoBuffer) message;
447 while (buf.hasRemaining()) {
448 digest.update(buf.get());
449 bytesRead++;
450 }
451 if (bytesRead >= size) {
452 session.close(true);
453 }
454 }
455 }
456
457 public static WriteRequest eqWriteRequest(WriteRequest expected) {
458 EasyMock.reportMatcher(new WriteRequestMatcher(expected));
459 return null;
460 }
461
462 public static class WriteRequestMatcher implements IArgumentMatcher {
463
464 private final WriteRequest expected;
465
466 public WriteRequestMatcher(WriteRequest expected) {
467 this.expected = expected;
468 }
469
470 public boolean matches(Object actual) {
471 if (actual instanceof WriteRequest) {
472 WriteRequest w2 = (WriteRequest) actual;
473
474 return expected.getMessage().equals(w2.getMessage())
475 && expected.getFuture().isWritten() == w2.getFuture()
476 .isWritten();
477 }
478 return false;
479 }
480 public void appendTo(StringBuffer buffer) {
481 buffer.append("Expected a WriteRequest with the message '").append(expected.getMessage()).append("'");
482 }
483 }
484
485 private static class DummyWriteFuture implements WriteFuture {
486 private boolean written;
487
488 public boolean isWritten() {
489 return written;
490 }
491
492 public void setWritten() {
493 this.written = true;
494 }
495
496 public IoSession getSession() {
497 return null;
498 }
499
500 public Object getLock() {
501 return this;
502 }
503
504 public void join() {
505 }
506
507 public boolean join(long timeoutInMillis) {
508 return true;
509 }
510
511 public boolean isDone() {
512 return true;
513 }
514
515 public WriteFuture addListener(IoFutureListener<?> listener) {
516 return this;
517 }
518
519 public WriteFuture removeListener(IoFutureListener<?> listener) {
520 return this;
521 }
522
523 public WriteFuture await() throws InterruptedException {
524 return this;
525 }
526
527 public boolean await(long timeout, TimeUnit unit)
528 throws InterruptedException {
529 return true;
530 }
531
532 public boolean await(long timeoutMillis) throws InterruptedException {
533 return true;
534 }
535
536 public WriteFuture awaitUninterruptibly() {
537 return this;
538 }
539
540 public boolean awaitUninterruptibly(long timeout, TimeUnit unit) {
541 return true;
542 }
543
544 public boolean awaitUninterruptibly(long timeoutMillis) {
545 return true;
546 }
547
548 public Throwable getException() {
549 return null;
550 }
551
552 public void setException(Throwable cause) {
553 throw new IllegalStateException();
554 }
555 }
556
557
558 }