1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.filter.stream;
21
22 import static org.junit.Assert.assertEquals;
23 import static org.junit.Assert.assertSame;
24 import static org.junit.Assert.assertTrue;
25 import static org.junit.Assert.fail;
26
27 import java.net.InetSocketAddress;
28 import java.net.SocketAddress;
29 import java.security.MessageDigest;
30 import java.util.LinkedList;
31 import java.util.Queue;
32 import java.util.Random;
33 import java.util.concurrent.CountDownLatch;
34 import java.util.concurrent.TimeUnit;
35
36 import org.apache.mina.core.buffer.IoBuffer;
37 import org.apache.mina.core.filterchain.IoFilter.NextFilter;
38 import org.apache.mina.core.future.IoFutureListener;
39 import org.apache.mina.core.future.WriteFuture;
40 import org.apache.mina.core.service.IoHandlerAdapter;
41 import org.apache.mina.core.session.DummySession;
42 import org.apache.mina.core.session.IdleStatus;
43 import org.apache.mina.core.session.IoSession;
44 import org.apache.mina.core.write.DefaultWriteRequest;
45 import org.apache.mina.core.write.WriteRequest;
46 import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
47 import org.apache.mina.transport.socket.nio.NioSocketConnector;
48 import org.apache.mina.util.AvailablePortFinder;
49 import org.easymock.IArgumentMatcher;
50 import org.easymock.classextension.EasyMock;
51 import org.junit.Test;
52 import org.slf4j.Logger;
53 import org.slf4j.LoggerFactory;
54
55
56
57
58
59
60 public abstract class AbstractStreamWriteFilterTest<M, U extends AbstractStreamWriteFilter<M>> {
61
62 private static final Logger LOGGER = LoggerFactory.getLogger(AbstractStreamWriteFilterTest.class);
63
64 protected final IoSession session = new DummySession();
65
66 abstract protected U createFilter();
67
68 abstract protected M createMessage(byte[] data) throws Exception;
69
70 @Test
71 public void testWriteEmptyFile() throws Exception {
72 AbstractStreamWriteFilter<M> filter = createFilter();
73 M message = createMessage(new byte[0]);
74
75 WriteRequest writeRequest = new DefaultWriteRequest(message,
76 new DummyWriteFuture());
77
78 NextFilter nextFilter = EasyMock.createMock(NextFilter.class);
79
80
81
82 nextFilter.messageSent(session, writeRequest);
83
84
85
86
87 EasyMock.replay(nextFilter);
88
89 filter.filterWrite(nextFilter, session, writeRequest);
90
91
92
93
94 EasyMock.verify(nextFilter);
95
96 assertTrue(writeRequest.getFuture().isWritten());
97 }
98
99
100
101
102
103
104
105 @Test
106 public void testWriteNonFileRegionMessage() throws Exception {
107 AbstractStreamWriteFilter<M> filter = createFilter();
108
109 Object message = new Object();
110 WriteRequest writeRequest = new DefaultWriteRequest(message,
111 new DummyWriteFuture());
112
113 NextFilter nextFilter = EasyMock.createMock(NextFilter.class);
114
115
116
117 nextFilter.filterWrite(session, writeRequest);
118 nextFilter.messageSent(session, writeRequest);
119
120
121
122
123 EasyMock.replay(nextFilter);
124
125 filter.filterWrite(nextFilter, session, writeRequest);
126 filter.messageSent(nextFilter, session, writeRequest);
127
128
129
130
131 EasyMock.verify(nextFilter);
132 }
133
134
135
136
137
138
139 @Test
140 public void testWriteSingleBufferFile() throws Exception {
141 byte[] data = new byte[] { 1, 2, 3, 4 };
142
143 AbstractStreamWriteFilter<M> filter = createFilter();
144 M message = createMessage(data);
145
146 WriteRequest writeRequest = new DefaultWriteRequest(message,
147 new DummyWriteFuture());
148
149 NextFilter nextFilter = EasyMock.createMock(NextFilter.class);
150
151
152
153 nextFilter.filterWrite(EasyMock.eq(session), eqWriteRequest(new DefaultWriteRequest(IoBuffer
154 .wrap(data))));
155 nextFilter.messageSent(session, writeRequest);
156
157
158
159
160 EasyMock.replay(nextFilter);
161
162 filter.filterWrite(nextFilter, session, writeRequest);
163 filter.messageSent(nextFilter, session, writeRequest);
164
165
166
167
168 EasyMock.verify(nextFilter);
169
170 assertTrue(writeRequest.getFuture().isWritten());
171 }
172
173
174
175
176
177
178 @Test
179 public void testWriteSeveralBuffersStream() throws Exception {
180 AbstractStreamWriteFilter<M> filter = createFilter();
181 filter.setWriteBufferSize(4);
182
183 byte[] data = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
184 byte[] chunk1 = new byte[] { 1, 2, 3, 4 };
185 byte[] chunk2 = new byte[] { 5, 6, 7, 8 };
186 byte[] chunk3 = new byte[] { 9, 10 };
187
188 M message = createMessage(data);
189 WriteRequest writeRequest = new DefaultWriteRequest(message,
190 new DummyWriteFuture());
191
192 WriteRequest chunk1Request = new DefaultWriteRequest(IoBuffer
193 .wrap(chunk1));
194 WriteRequest chunk2Request = new DefaultWriteRequest(IoBuffer
195 .wrap(chunk2));
196 WriteRequest chunk3Request = new DefaultWriteRequest(IoBuffer
197 .wrap(chunk3));
198
199 NextFilter nextFilter = EasyMock.createMock(NextFilter.class);
200
201
202
203 nextFilter.filterWrite(EasyMock.eq(session), eqWriteRequest(chunk1Request));
204 nextFilter.filterWrite(EasyMock.eq(session), eqWriteRequest(chunk2Request));
205 nextFilter.filterWrite(EasyMock.eq(session), eqWriteRequest(chunk3Request));
206 nextFilter.messageSent(EasyMock.eq(session), eqWriteRequest(writeRequest));
207
208
209
210
211 EasyMock.replay(nextFilter);
212
213 filter.filterWrite(nextFilter, session, writeRequest);
214 filter.messageSent(nextFilter, session, chunk1Request);
215 filter.messageSent(nextFilter, session, chunk2Request);
216 filter.messageSent(nextFilter, session, chunk3Request);
217
218
219
220
221 EasyMock.verify(nextFilter);
222
223 assertTrue(writeRequest.getFuture().isWritten());
224 }
225
226 @Test
227 public void testWriteWhileWriteInProgress() throws Exception {
228 AbstractStreamWriteFilter<M> filter = createFilter();
229 M message = createMessage(new byte[5]);
230
231 Queue<WriteRequest> queue = new LinkedList<WriteRequest>();
232
233
234
235
236 session.setAttribute(filter.CURRENT_STREAM, message);
237 session.setAttribute(filter.WRITE_REQUEST_QUEUE, queue);
238
239 NextFilter nextFilter = EasyMock.createMock(NextFilter.class);
240
241
242
243 EasyMock.replay(nextFilter);
244
245 WriteRequest wr = new DefaultWriteRequest(new Object(),
246 new DummyWriteFuture());
247 filter.filterWrite(nextFilter, session, wr);
248 assertEquals(1, queue.size());
249 assertSame(wr, queue.poll());
250
251
252
253
254 EasyMock.verify(nextFilter);
255
256 session.removeAttribute(filter.CURRENT_STREAM);
257 session.removeAttribute(filter.WRITE_REQUEST_QUEUE);
258 }
259
260 @Test
261 public void testWritesWriteRequestQueueWhenFinished() throws Exception {
262 AbstractStreamWriteFilter<M> filter = createFilter();
263 M message = createMessage(new byte[0]);
264
265 WriteRequest wrs[] = new WriteRequest[] {
266 new DefaultWriteRequest(new Object(), new DummyWriteFuture()),
267 new DefaultWriteRequest(new Object(), new DummyWriteFuture()),
268 new DefaultWriteRequest(new Object(), new DummyWriteFuture()) };
269 Queue<WriteRequest> queue = new LinkedList<WriteRequest>();
270 queue.add(wrs[0]);
271 queue.add(wrs[1]);
272 queue.add(wrs[2]);
273
274
275
276
277 session.setAttribute(filter.CURRENT_STREAM, message);
278 session.setAttribute(filter.CURRENT_WRITE_REQUEST,
279 new DefaultWriteRequest(message));
280 session.setAttribute(filter.WRITE_REQUEST_QUEUE, queue);
281
282
283
284
285 NextFilter nextFilter = EasyMock.createMock(NextFilter.class);
286 nextFilter.filterWrite(session, wrs[0]);
287 nextFilter.filterWrite(session, wrs[1]);
288 nextFilter.filterWrite(session, wrs[2]);
289 nextFilter.messageSent(EasyMock.eq(session), eqWriteRequest(new DefaultWriteRequest(message)));
290
291
292
293
294 EasyMock.replay(nextFilter);
295
296 filter.messageSent(nextFilter, session, new DefaultWriteRequest(
297 new Object()));
298 assertEquals(0, queue.size());
299
300
301
302
303 EasyMock.verify(nextFilter);
304 }
305
306
307
308
309
310 @Test
311 public void testSetWriteBufferSize() {
312 AbstractStreamWriteFilter<M> filter = createFilter();
313
314 try {
315 filter.setWriteBufferSize(0);
316 fail("0 writeBuferSize specified. IllegalArgumentException expected.");
317 } catch (IllegalArgumentException iae) {
318
319
320 assertTrue(true);
321 }
322
323 try {
324 filter.setWriteBufferSize(-100);
325 fail("Negative writeBuferSize specified. IllegalArgumentException expected.");
326 } catch (IllegalArgumentException iae) {
327
328
329 assertTrue(true);
330 }
331
332 filter.setWriteBufferSize(1);
333 assertEquals(1, filter.getWriteBufferSize());
334 filter.setWriteBufferSize(1024);
335 assertEquals(1024, filter.getWriteBufferSize());
336 }
337
338 @Test
339 public void testWriteUsingSocketTransport() throws Exception {
340 NioSocketAcceptor acceptor = new NioSocketAcceptor();
341 acceptor.setReuseAddress(true);
342 SocketAddress address = new InetSocketAddress("localhost",
343 AvailablePortFinder.getNextAvailable());
344
345 NioSocketConnector connector = new NioSocketConnector();
346
347
348 byte[] data = new byte[4 * 1024 * 1024];
349 new Random().nextBytes(data);
350
351 byte[] expectedMd5 = MessageDigest.getInstance("MD5").digest(data);
352
353 M message = createMessage(data);
354
355 SenderHandler sender = new SenderHandler(message);
356 ReceiverHandler receiver = new ReceiverHandler(data.length);
357
358 acceptor.setHandler(sender);
359 connector.setHandler(receiver);
360
361 acceptor.bind(address);
362 connector.connect(address);
363 sender.latch.await();
364 receiver.latch.await();
365
366 acceptor.dispose();
367
368 assertEquals(data.length, receiver.bytesRead);
369 byte[] actualMd5 = receiver.digest.digest();
370 assertEquals(expectedMd5.length, actualMd5.length);
371 for (int i = 0; i < expectedMd5.length; i++) {
372 assertEquals(expectedMd5[i], actualMd5[i]);
373 }
374 }
375
376 private class SenderHandler extends IoHandlerAdapter {
377 final CountDownLatch latch = new CountDownLatch(1);
378
379 private M message;
380
381 StreamWriteFilter streamWriteFilter = new StreamWriteFilter();
382
383 SenderHandler(M message) {
384 this.message = message;
385 }
386
387 @Override
388 public void sessionCreated(IoSession session) throws Exception {
389 super.sessionCreated(session);
390 session.getFilterChain().addLast("codec", streamWriteFilter);
391 }
392
393 @Override
394 public void sessionOpened(IoSession session) throws Exception {
395 session.write(message);
396 }
397
398 @Override
399 public void exceptionCaught(IoSession session, Throwable cause)
400 throws Exception {
401 LOGGER.error("SenderHandler: exceptionCaught", cause);
402 latch.countDown();
403 }
404
405 @Override
406 public void sessionClosed(IoSession session) throws Exception {
407 LOGGER.info("SenderHandler: sessionClosed");
408 latch.countDown();
409 }
410
411 @Override
412 public void sessionIdle(IoSession session, IdleStatus status)
413 throws Exception {
414 LOGGER.info("SenderHandler: sessionIdle");
415 latch.countDown();
416 }
417
418 @Override
419 public void messageSent(IoSession session, Object message)
420 throws Exception {
421 LOGGER.info("SenderHandler: messageSent");
422 if (message == this.message) {
423 LOGGER.info("message == this.message");
424 latch.countDown();
425 }
426 }
427 }
428
429 private static class ReceiverHandler extends IoHandlerAdapter {
430 final CountDownLatch latch = new CountDownLatch(1);
431
432 long bytesRead = 0;
433
434 long size = 0;
435
436 MessageDigest digest;
437
438 ReceiverHandler(long size) throws Exception {
439 this.size = size;
440 digest = MessageDigest.getInstance("MD5");
441 }
442
443 @Override
444 public void sessionCreated(IoSession session) throws Exception {
445 super.sessionCreated(session);
446
447 session.getConfig().setIdleTime(IdleStatus.READER_IDLE, 5);
448 }
449
450 @Override
451 public void sessionIdle(IoSession session, IdleStatus status)
452 throws Exception {
453 LOGGER.info("ReceiverHandler: sessionIdle");
454 session.close(true);
455 }
456
457 @Override
458 public void exceptionCaught(IoSession session, Throwable cause)
459 throws Exception {
460 LOGGER.error("ReceiverHandler: exceptionCaught", cause);
461 latch.countDown();
462 }
463
464 @Override
465 public void sessionClosed(IoSession session) throws Exception {
466 LOGGER.info("ReceiverHandler: sessionClosed");
467 latch.countDown();
468 }
469
470 @Override
471 public void messageReceived(IoSession session, Object message)
472 throws Exception {
473 LOGGER.info("messageReceived");
474 IoBuffer buf = (IoBuffer) message;
475 while (buf.hasRemaining()) {
476 digest.update(buf.get());
477 bytesRead++;
478 }
479 LOGGER.info("messageReceived: bytesRead = {}", bytesRead);
480 if (bytesRead >= size) {
481 session.close(true);
482 }
483 }
484 }
485
486 public static WriteRequest eqWriteRequest(WriteRequest expected) {
487 EasyMock.reportMatcher(new WriteRequestMatcher(expected));
488 return null;
489 }
490
491 private static class WriteRequestMatcher implements IArgumentMatcher {
492 private final WriteRequest expected;
493
494 public WriteRequestMatcher(WriteRequest expected) {
495 this.expected = expected;
496 }
497
498 public boolean matches(Object actual) {
499 if (actual instanceof WriteRequest) {
500 WriteRequest w2 = (WriteRequest) actual;
501
502 return expected.getMessage().equals(w2.getMessage())
503 && expected.getFuture().isWritten() == w2.getFuture()
504 .isWritten();
505 }
506 return false;
507 }
508 public void appendTo(StringBuffer buffer) {
509 buffer.append("Expected a WriteRequest with the message '").append(expected.getMessage()).append("'");
510 }
511 }
512
513 private static class DummyWriteFuture implements WriteFuture {
514 private boolean written;
515
516
517
518
519 public DummyWriteFuture() {
520 super();
521 }
522
523 public boolean isWritten() {
524 return written;
525 }
526
527 public void setWritten() {
528 this.written = true;
529 }
530
531 public IoSession getSession() {
532 return null;
533 }
534
535 public Object getLock() {
536 return this;
537 }
538
539 public void join() {
540
541 }
542
543 public boolean join(long timeoutInMillis) {
544 return true;
545 }
546
547 public boolean isDone() {
548 return true;
549 }
550
551 public WriteFuture addListener(IoFutureListener<?> listener) {
552 return this;
553 }
554
555 public WriteFuture removeListener(IoFutureListener<?> listener) {
556 return this;
557 }
558
559 public WriteFuture await() throws InterruptedException {
560 return this;
561 }
562
563 public boolean await(long timeout, TimeUnit unit)
564 throws InterruptedException {
565 return true;
566 }
567
568 public boolean await(long timeoutMillis) throws InterruptedException {
569 return true;
570 }
571
572 public WriteFuture awaitUninterruptibly() {
573 return this;
574 }
575
576 public boolean awaitUninterruptibly(long timeout, TimeUnit unit) {
577 return true;
578 }
579
580 public boolean awaitUninterruptibly(long timeoutMillis) {
581 return true;
582 }
583
584 public Throwable getException() {
585 return null;
586 }
587
588 public void setException(Throwable cause) {
589 throw new IllegalStateException();
590 }
591 }
592
593
594 }