View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.filter.stream;
21  
22  import static org.junit.Assert.assertEquals;
23  import static org.junit.Assert.assertSame;
24  import static org.junit.Assert.assertTrue;
25  import static org.junit.Assert.fail;
26  
27  import java.net.InetSocketAddress;
28  import java.net.SocketAddress;
29  import java.security.MessageDigest;
30  import java.util.LinkedList;
31  import java.util.Queue;
32  import java.util.Random;
33  import java.util.concurrent.CountDownLatch;
34  import java.util.concurrent.TimeUnit;
35  
36  import org.apache.mina.core.buffer.IoBuffer;
37  import org.apache.mina.core.filterchain.IoFilter.NextFilter;
38  import org.apache.mina.core.future.IoFutureListener;
39  import org.apache.mina.core.future.WriteFuture;
40  import org.apache.mina.core.service.IoHandlerAdapter;
41  import org.apache.mina.core.session.DummySession;
42  import org.apache.mina.core.session.IdleStatus;
43  import org.apache.mina.core.session.IoSession;
44  import org.apache.mina.core.write.DefaultWriteRequest;
45  import org.apache.mina.core.write.WriteRequest;
46  import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
47  import org.apache.mina.transport.socket.nio.NioSocketConnector;
48  import org.apache.mina.util.AvailablePortFinder;
49  import org.easymock.IArgumentMatcher;
50  import org.easymock.classextension.EasyMock;
51  import org.junit.Test;
52  import org.slf4j.Logger;
53  import org.slf4j.LoggerFactory;
54  
55  /**
56   * TODO Add documentation
57   * 
58   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
59   */
60  public abstract class AbstractStreamWriteFilterTest<M, U extends AbstractStreamWriteFilter<M>> {
61  
62      private static final Logger LOGGER = LoggerFactory.getLogger(AbstractStreamWriteFilterTest.class);
63  
64      protected final IoSession session = new DummySession();
65  
66      abstract protected U createFilter();
67      
68      abstract protected M createMessage(byte[] data) throws Exception;
69      
70      @Test
71      public void testWriteEmptyFile() throws Exception {
72          AbstractStreamWriteFilter<M> filter = createFilter();
73          M message = createMessage(new byte[0]);
74  
75          WriteRequest writeRequest = new DefaultWriteRequest(message,
76                  new DummyWriteFuture());
77  
78          NextFilter nextFilter = EasyMock.createMock(NextFilter.class);
79         /*
80           * Record expectations
81           */
82          nextFilter.messageSent(session, writeRequest);
83  
84          /*
85           * Replay.
86           */
87          EasyMock.replay(nextFilter);
88  
89          filter.filterWrite(nextFilter, session, writeRequest);
90  
91          /*
92           * Verify.
93           */
94          EasyMock.verify(nextFilter);
95  
96          assertTrue(writeRequest.getFuture().isWritten());
97      }
98  
99      /**
100      * Tests that the filter just passes objects which aren't FileRegion's
101      * through to the next filter.
102      *
103      * @throws Exception when something goes wrong
104      */
105     @Test
106     public void testWriteNonFileRegionMessage() throws Exception {
107         AbstractStreamWriteFilter<M> filter = createFilter();
108 
109         Object message = new Object();
110         WriteRequest writeRequest = new DefaultWriteRequest(message,
111                 new DummyWriteFuture());
112 
113         NextFilter nextFilter = EasyMock.createMock(NextFilter.class);
114         /*
115          * Record expectations
116          */
117         nextFilter.filterWrite(session, writeRequest);
118         nextFilter.messageSent(session, writeRequest);
119 
120         /*
121          * Replay.
122          */
123         EasyMock.replay(nextFilter);
124 
125         filter.filterWrite(nextFilter, session, writeRequest);
126         filter.messageSent(nextFilter, session, writeRequest);
127 
128         /*
129          * Verify.
130          */
131         EasyMock.verify(nextFilter);
132     }
133 
134     /**
135      * Tests when the contents of the file fits into one write buffer.
136      *
137      * @throws Exception when something goes wrong
138      */
139     @Test
140     public void testWriteSingleBufferFile() throws Exception {
141         byte[] data = new byte[] { 1, 2, 3, 4 };
142 
143         AbstractStreamWriteFilter<M> filter = createFilter();
144         M message = createMessage(data);
145 
146         WriteRequest writeRequest = new DefaultWriteRequest(message,
147                 new DummyWriteFuture());
148 
149         NextFilter nextFilter = EasyMock.createMock(NextFilter.class);
150         /*
151          * Record expectations
152          */
153         nextFilter.filterWrite(EasyMock.eq(session), eqWriteRequest(new DefaultWriteRequest(IoBuffer
154                 .wrap(data))));
155         nextFilter.messageSent(session, writeRequest);
156 
157         /*
158          * Replay.
159          */
160         EasyMock.replay(nextFilter);
161 
162         filter.filterWrite(nextFilter, session, writeRequest);
163         filter.messageSent(nextFilter, session, writeRequest);
164 
165         /*
166          * Verify.
167          */
168         EasyMock.verify(nextFilter);
169 
170         assertTrue(writeRequest.getFuture().isWritten());
171     }
172 
173     /**
174      * Tests when the contents of the file doesn't fit into one write buffer.
175      *
176      * @throws Exception when something goes wrong
177      */
178     @Test
179     public void testWriteSeveralBuffersStream() throws Exception {
180         AbstractStreamWriteFilter<M> filter = createFilter();
181         filter.setWriteBufferSize(4);
182 
183         byte[] data = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
184         byte[] chunk1 = new byte[] { 1, 2, 3, 4 };
185         byte[] chunk2 = new byte[] { 5, 6, 7, 8 };
186         byte[] chunk3 = new byte[] { 9, 10 };
187 
188         M message = createMessage(data);
189         WriteRequest writeRequest = new DefaultWriteRequest(message,
190                 new DummyWriteFuture());
191 
192         WriteRequest chunk1Request = new DefaultWriteRequest(IoBuffer
193                 .wrap(chunk1));
194         WriteRequest chunk2Request = new DefaultWriteRequest(IoBuffer
195                 .wrap(chunk2));
196         WriteRequest chunk3Request = new DefaultWriteRequest(IoBuffer
197                 .wrap(chunk3));
198 
199         NextFilter nextFilter = EasyMock.createMock(NextFilter.class);
200         /*
201          * Record expectations
202          */
203         nextFilter.filterWrite(EasyMock.eq(session), eqWriteRequest(chunk1Request));
204         nextFilter.filterWrite(EasyMock.eq(session), eqWriteRequest(chunk2Request));
205         nextFilter.filterWrite(EasyMock.eq(session), eqWriteRequest(chunk3Request));
206         nextFilter.messageSent(EasyMock.eq(session), eqWriteRequest(writeRequest));
207 
208         /*
209          * Replay.
210          */
211         EasyMock.replay(nextFilter);
212 
213         filter.filterWrite(nextFilter, session, writeRequest);
214         filter.messageSent(nextFilter, session, chunk1Request);
215         filter.messageSent(nextFilter, session, chunk2Request);
216         filter.messageSent(nextFilter, session, chunk3Request);
217 
218         /*
219          * Verify.
220          */
221         EasyMock.verify(nextFilter);
222 
223         assertTrue(writeRequest.getFuture().isWritten());
224     }
225 
226     @Test
227     public void testWriteWhileWriteInProgress() throws Exception {
228         AbstractStreamWriteFilter<M> filter = createFilter();
229         M message = createMessage(new byte[5]);
230 
231         Queue<WriteRequest> queue = new LinkedList<WriteRequest>();
232 
233         /*
234          * Make up the situation.
235          */
236         session.setAttribute(filter.CURRENT_STREAM, message);
237         session.setAttribute(filter.WRITE_REQUEST_QUEUE, queue);
238 
239         NextFilter nextFilter = EasyMock.createMock(NextFilter.class);
240         /*
241          * Replay.  (We recorded *nothing* because nothing should occur.)
242          */
243         EasyMock.replay(nextFilter);
244 
245         WriteRequest wr = new DefaultWriteRequest(new Object(),
246                 new DummyWriteFuture());
247         filter.filterWrite(nextFilter, session, wr);
248         assertEquals(1, queue.size());
249         assertSame(wr, queue.poll());
250 
251         /*
252          * Verify.
253          */
254         EasyMock.verify(nextFilter);
255 
256         session.removeAttribute(filter.CURRENT_STREAM);
257         session.removeAttribute(filter.WRITE_REQUEST_QUEUE);
258     }
259 
260     @Test
261     public void testWritesWriteRequestQueueWhenFinished() throws Exception {
262         AbstractStreamWriteFilter<M> filter = createFilter();
263         M message = createMessage(new byte[0]);
264         
265         WriteRequest wrs[] = new WriteRequest[] {
266                 new DefaultWriteRequest(new Object(), new DummyWriteFuture()),
267                 new DefaultWriteRequest(new Object(), new DummyWriteFuture()),
268                 new DefaultWriteRequest(new Object(), new DummyWriteFuture()) };
269         Queue<WriteRequest> queue = new LinkedList<WriteRequest>();
270         queue.add(wrs[0]);
271         queue.add(wrs[1]);
272         queue.add(wrs[2]);
273 
274         /*
275          * Make up the situation.
276          */
277         session.setAttribute(filter.CURRENT_STREAM, message);
278         session.setAttribute(filter.CURRENT_WRITE_REQUEST,
279                 new DefaultWriteRequest(message));
280         session.setAttribute(filter.WRITE_REQUEST_QUEUE, queue);
281 
282         /*
283          * Record expectations
284          */
285         NextFilter nextFilter = EasyMock.createMock(NextFilter.class);
286         nextFilter.filterWrite(session, wrs[0]);
287         nextFilter.filterWrite(session, wrs[1]);
288         nextFilter.filterWrite(session, wrs[2]);
289         nextFilter.messageSent(EasyMock.eq(session), eqWriteRequest(new DefaultWriteRequest(message)));
290 
291         /*
292          * Replay.
293          */
294         EasyMock.replay(nextFilter);
295 
296         filter.messageSent(nextFilter, session, new DefaultWriteRequest(
297                 new Object()));
298         assertEquals(0, queue.size());
299 
300         /*
301          * Verify.
302          */
303         EasyMock.verify(nextFilter);
304     }
305 
306     /**
307      * Tests that {@link StreamWriteFilter#setWriteBufferSize(int)} checks the
308      * specified size.
309      */
310     @Test
311     public void testSetWriteBufferSize() {
312         AbstractStreamWriteFilter<M> filter = createFilter();
313 
314         try {
315             filter.setWriteBufferSize(0);
316             fail("0 writeBuferSize specified. IllegalArgumentException expected.");
317         } catch (IllegalArgumentException iae) {
318             // Pass, exception was thrown
319             // Signifies a successful test execution
320             assertTrue(true);
321         }
322 
323         try {
324             filter.setWriteBufferSize(-100);
325             fail("Negative writeBuferSize specified. IllegalArgumentException expected.");
326         } catch (IllegalArgumentException iae) {
327             // Pass, exception was thrown
328             // Signifies a successful test execution
329             assertTrue(true);
330         }
331 
332         filter.setWriteBufferSize(1);
333         assertEquals(1, filter.getWriteBufferSize());
334         filter.setWriteBufferSize(1024);
335         assertEquals(1024, filter.getWriteBufferSize());
336     }
337 
338     @Test
339     public void testWriteUsingSocketTransport() throws Exception {
340         NioSocketAcceptor acceptor = new NioSocketAcceptor();
341         acceptor.setReuseAddress(true);
342         SocketAddress address = new InetSocketAddress("localhost",
343                 AvailablePortFinder.getNextAvailable());
344 
345         NioSocketConnector connector = new NioSocketConnector();
346 
347         // Generate 4MB of random data
348         byte[] data = new byte[4 * 1024 * 1024];
349         new Random().nextBytes(data);
350 
351         byte[] expectedMd5 = MessageDigest.getInstance("MD5").digest(data);
352 
353         M message = createMessage(data);
354         
355         SenderHandler sender = new SenderHandler(message);
356         ReceiverHandler receiver = new ReceiverHandler(data.length);
357 
358         acceptor.setHandler(sender);
359         connector.setHandler(receiver);
360         
361         acceptor.bind(address);
362         connector.connect(address);
363         sender.latch.await();
364         receiver.latch.await();
365 
366         acceptor.dispose();
367 
368         assertEquals(data.length, receiver.bytesRead);
369         byte[] actualMd5 = receiver.digest.digest();
370         assertEquals(expectedMd5.length, actualMd5.length);
371         for (int i = 0; i < expectedMd5.length; i++) {
372             assertEquals(expectedMd5[i], actualMd5[i]);
373         }
374     }
375 
376     private class SenderHandler extends IoHandlerAdapter {
377         final CountDownLatch latch = new CountDownLatch(1);
378 
379         private M message;
380 
381         StreamWriteFilter streamWriteFilter = new StreamWriteFilter();
382 
383         SenderHandler(M message) {
384             this.message = message;
385         }
386 
387         @Override
388         public void sessionCreated(IoSession session) throws Exception {
389             super.sessionCreated(session);
390             session.getFilterChain().addLast("codec", streamWriteFilter);
391         }
392 
393         @Override
394         public void sessionOpened(IoSession session) throws Exception {
395             session.write(message);
396         }
397 
398         @Override
399         public void exceptionCaught(IoSession session, Throwable cause)
400                 throws Exception {
401             LOGGER.error("SenderHandler: exceptionCaught", cause);
402             latch.countDown();
403         }
404 
405         @Override
406         public void sessionClosed(IoSession session) throws Exception {
407             LOGGER.info("SenderHandler: sessionClosed");
408             latch.countDown();
409         }
410 
411         @Override
412         public void sessionIdle(IoSession session, IdleStatus status)
413                 throws Exception {
414             LOGGER.info("SenderHandler: sessionIdle");
415             latch.countDown();
416         }
417 
418         @Override
419         public void messageSent(IoSession session, Object message)
420                 throws Exception {
421             LOGGER.info("SenderHandler: messageSent");
422             if (message == this.message) {
423                 LOGGER.info("message == this.message");
424                 latch.countDown();
425             }
426         }
427     }
428 
429     private static class ReceiverHandler extends IoHandlerAdapter {
430         final CountDownLatch latch = new CountDownLatch(1);
431 
432         long bytesRead = 0;
433 
434         long size = 0;
435 
436         MessageDigest digest;
437 
438         ReceiverHandler(long size) throws Exception {
439             this.size = size;
440             digest = MessageDigest.getInstance("MD5");
441         }
442 
443         @Override
444         public void sessionCreated(IoSession session) throws Exception {
445             super.sessionCreated(session);
446 
447             session.getConfig().setIdleTime(IdleStatus.READER_IDLE, 5);
448         }
449 
450         @Override
451         public void sessionIdle(IoSession session, IdleStatus status)
452                 throws Exception {
453             LOGGER.info("ReceiverHandler: sessionIdle");
454             session.close(true);
455         }
456 
457         @Override
458         public void exceptionCaught(IoSession session, Throwable cause)
459                 throws Exception {
460             LOGGER.error("ReceiverHandler: exceptionCaught", cause);
461             latch.countDown();
462         }
463 
464         @Override
465         public void sessionClosed(IoSession session) throws Exception {
466             LOGGER.info("ReceiverHandler: sessionClosed");
467             latch.countDown();
468         }
469 
470         @Override
471         public void messageReceived(IoSession session, Object message)
472                 throws Exception {
473             LOGGER.info("messageReceived");
474             IoBuffer buf = (IoBuffer) message;
475             while (buf.hasRemaining()) {
476                 digest.update(buf.get());
477                 bytesRead++;
478             }
479             LOGGER.info("messageReceived: bytesRead = {}", bytesRead);
480             if (bytesRead >= size) {
481                 session.close(true);
482             }
483         }
484     }
485 
486     public static WriteRequest eqWriteRequest(WriteRequest expected) {
487         EasyMock.reportMatcher(new WriteRequestMatcher(expected));
488         return null;
489     }
490     
491     private static class WriteRequestMatcher implements IArgumentMatcher {
492         private final WriteRequest expected;
493         
494         public WriteRequestMatcher(WriteRequest expected) {
495                 this.expected = expected;
496         }
497         
498         public boolean matches(Object actual) {
499             if (actual instanceof WriteRequest) {
500                 WriteRequest w2 = (WriteRequest) actual;
501 
502                 return expected.getMessage().equals(w2.getMessage())
503                         && expected.getFuture().isWritten() == w2.getFuture()
504                         .isWritten();
505             }
506             return false;
507         }
508         public void appendTo(StringBuffer buffer) {
509                 buffer.append("Expected a WriteRequest with the message '").append(expected.getMessage()).append("'");
510         }
511     }
512 
513     private static class DummyWriteFuture implements WriteFuture {
514         private boolean written;
515 
516         /**
517          * Default constructor
518          */
519         public DummyWriteFuture() {
520             super();
521         }
522         
523         public boolean isWritten() {
524             return written;
525         }
526 
527         public void setWritten() {
528             this.written = true;
529         }
530 
531         public IoSession getSession() {
532             return null;
533         }
534 
535         public Object getLock() {
536             return this;
537         }
538 
539         public void join() {
540             // Do nothing
541         }
542 
543         public boolean join(long timeoutInMillis) {
544             return true;
545         }
546 
547         public boolean isDone() {
548             return true;
549         }
550 
551         public WriteFuture addListener(IoFutureListener<?> listener) {
552             return this;
553         }
554 
555         public WriteFuture removeListener(IoFutureListener<?> listener) {
556             return this;
557         }
558 
559         public WriteFuture await() throws InterruptedException {
560             return this;
561         }
562 
563         public boolean await(long timeout, TimeUnit unit)
564                 throws InterruptedException {
565             return true;
566         }
567 
568         public boolean await(long timeoutMillis) throws InterruptedException {
569             return true;
570         }
571 
572         public WriteFuture awaitUninterruptibly() {
573             return this;
574         }
575 
576         public boolean awaitUninterruptibly(long timeout, TimeUnit unit) {
577             return true;
578         }
579 
580         public boolean awaitUninterruptibly(long timeoutMillis) {
581             return true;
582         }
583 
584         public Throwable getException() {
585             return null;
586         }
587 
588         public void setException(Throwable cause) {
589             throw new IllegalStateException();
590         }
591     }
592 
593     
594 }