1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.filter.stream;
21  
22  import java.net.InetSocketAddress;
23  import java.net.SocketAddress;
24  import java.security.MessageDigest;
25  import java.util.LinkedList;
26  import java.util.Queue;
27  import java.util.Random;
28  import java.util.concurrent.CountDownLatch;
29  import java.util.concurrent.TimeUnit;
30  
31  import junit.framework.TestCase;
32  
33  import org.apache.mina.core.buffer.IoBuffer;
34  import org.apache.mina.core.filterchain.IoFilter.NextFilter;
35  import org.apache.mina.core.future.IoFutureListener;
36  import org.apache.mina.core.future.WriteFuture;
37  import org.apache.mina.core.service.IoHandlerAdapter;
38  import org.apache.mina.core.session.DummySession;
39  import org.apache.mina.core.session.IdleStatus;
40  import org.apache.mina.core.session.IoSession;
41  import org.apache.mina.core.write.DefaultWriteRequest;
42  import org.apache.mina.core.write.WriteRequest;
43  import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
44  import org.apache.mina.transport.socket.nio.NioSocketConnector;
45  import org.apache.mina.util.AvailablePortFinder;
46  import org.easymock.IArgumentMatcher;
47  import org.easymock.classextension.EasyMock;
48  
49  /**
50   * TODO Add documentation
51   * 
52   * @author The Apache MINA Project (dev@mina.apache.org)
53   * @version $Rev: 713957 $, $Date: 2008-11-14 10:27:16 +0100 (Fri, 14 Nov 2008) $
54   */
55  public abstract class AbstractStreamWriteFilterTest<M, U extends AbstractStreamWriteFilter<M>> extends TestCase {
56  
57      protected final IoSession session = new DummySession();
58  
59      abstract protected U createFilter();
60      
61      abstract protected M createMessage(byte[] data) throws Exception;
62      
63      public void testWriteEmptyFile() throws Exception {
64          AbstractStreamWriteFilter<M> filter = createFilter();
65          M message = createMessage(new byte[0]);
66  
67          WriteRequest writeRequest = new DefaultWriteRequest(message,
68                  new DummyWriteFuture());
69  
70          NextFilter nextFilter = EasyMock.createMock(NextFilter.class);
71         /*
72           * Record expectations
73           */
74          nextFilter.messageSent(session, writeRequest);
75  
76          /*
77           * Replay.
78           */
79          EasyMock.replay(nextFilter);
80  
81          filter.filterWrite(nextFilter, session, writeRequest);
82  
83          /*
84           * Verify.
85           */
86          EasyMock.verify(nextFilter);
87  
88          assertTrue(writeRequest.getFuture().isWritten());
89      }
90  
91      /**
92       * Tests that the filter just passes objects which aren't FileRegion's
93       * through to the next filter.
94       *
95       * @throws Exception when something goes wrong
96       */
97      public void testWriteNonFileRegionMessage() throws Exception {
98          AbstractStreamWriteFilter<M> filter = createFilter();
99  
100         Object message = new Object();
101         WriteRequest writeRequest = new DefaultWriteRequest(message,
102                 new DummyWriteFuture());
103 
104         NextFilter nextFilter = EasyMock.createMock(NextFilter.class);
105         /*
106          * Record expectations
107          */
108         nextFilter.filterWrite(session, writeRequest);
109         nextFilter.messageSent(session, writeRequest);
110 
111         /*
112          * Replay.
113          */
114         EasyMock.replay(nextFilter);
115 
116         filter.filterWrite(nextFilter, session, writeRequest);
117         filter.messageSent(nextFilter, session, writeRequest);
118 
119         /*
120          * Verify.
121          */
122         EasyMock.verify(nextFilter);
123     }
124 
125     /**
126      * Tests when the contents of the file fits into one write buffer.
127      *
128      * @throws Exception when something goes wrong
129      */
130     public void testWriteSingleBufferFile() throws Exception {
131         byte[] data = new byte[] { 1, 2, 3, 4 };
132 
133         AbstractStreamWriteFilter<M> filter = createFilter();
134         M message = createMessage(data);
135 
136         WriteRequest writeRequest = new DefaultWriteRequest(message,
137                 new DummyWriteFuture());
138 
139         NextFilter nextFilter = EasyMock.createMock(NextFilter.class);
140         /*
141          * Record expectations
142          */
143         nextFilter.filterWrite(EasyMock.eq(session), eqWriteRequest(new DefaultWriteRequest(IoBuffer
144                 .wrap(data))));
145         nextFilter.messageSent(session, writeRequest);
146 
147         /*
148          * Replay.
149          */
150         EasyMock.replay(nextFilter);
151 
152         filter.filterWrite(nextFilter, session, writeRequest);
153         filter.messageSent(nextFilter, session, writeRequest);
154 
155         /*
156          * Verify.
157          */
158         EasyMock.verify(nextFilter);
159 
160         assertTrue(writeRequest.getFuture().isWritten());
161     }
162 
163     /**
164      * Tests when the contents of the file doesn't fit into one write buffer.
165      *
166      * @throws Exception when something goes wrong
167      */
168     public void testWriteSeveralBuffersStream() throws Exception {
169         AbstractStreamWriteFilter<M> filter = createFilter();
170         filter.setWriteBufferSize(4);
171 
172         byte[] data = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
173         byte[] chunk1 = new byte[] { 1, 2, 3, 4 };
174         byte[] chunk2 = new byte[] { 5, 6, 7, 8 };
175         byte[] chunk3 = new byte[] { 9, 10 };
176 
177         M message = createMessage(data);
178         WriteRequest writeRequest = new DefaultWriteRequest(message,
179                 new DummyWriteFuture());
180 
181         WriteRequest chunk1Request = new DefaultWriteRequest(IoBuffer
182                 .wrap(chunk1));
183         WriteRequest chunk2Request = new DefaultWriteRequest(IoBuffer
184                 .wrap(chunk2));
185         WriteRequest chunk3Request = new DefaultWriteRequest(IoBuffer
186                 .wrap(chunk3));
187 
188         NextFilter nextFilter = EasyMock.createMock(NextFilter.class);
189         /*
190          * Record expectations
191          */
192         nextFilter.filterWrite(EasyMock.eq(session), eqWriteRequest(chunk1Request));
193         nextFilter.filterWrite(EasyMock.eq(session), eqWriteRequest(chunk2Request));
194         nextFilter.filterWrite(EasyMock.eq(session), eqWriteRequest(chunk3Request));
195         nextFilter.messageSent(EasyMock.eq(session), eqWriteRequest(writeRequest));
196 
197         /*
198          * Replay.
199          */
200         EasyMock.replay(nextFilter);
201 
202         filter.filterWrite(nextFilter, session, writeRequest);
203         filter.messageSent(nextFilter, session, chunk1Request);
204         filter.messageSent(nextFilter, session, chunk2Request);
205         filter.messageSent(nextFilter, session, chunk3Request);
206 
207         /*
208          * Verify.
209          */
210         EasyMock.verify(nextFilter);
211 
212         assertTrue(writeRequest.getFuture().isWritten());
213     }
214 
215     public void testWriteWhileWriteInProgress() throws Exception {
216         AbstractStreamWriteFilter<M> filter = createFilter();
217         M message = createMessage(new byte[5]);
218 
219         Queue<WriteRequest> queue = new LinkedList<WriteRequest>();
220 
221         /*
222          * Make up the situation.
223          */
224         session.setAttribute(filter.CURRENT_STREAM, message);
225         session.setAttribute(filter.WRITE_REQUEST_QUEUE, queue);
226 
227         NextFilter nextFilter = EasyMock.createMock(NextFilter.class);
228         /*
229          * Replay.  (We recorded *nothing* because nothing should occur.)
230          */
231         EasyMock.replay(nextFilter);
232 
233         WriteRequest wr = new DefaultWriteRequest(new Object(),
234                 new DummyWriteFuture());
235         filter.filterWrite(nextFilter, session, wr);
236         assertEquals(1, queue.size());
237         assertSame(wr, queue.poll());
238 
239         /*
240          * Verify.
241          */
242         EasyMock.verify(nextFilter);
243 
244         session.removeAttribute(filter.CURRENT_STREAM);
245         session.removeAttribute(filter.WRITE_REQUEST_QUEUE);
246     }
247 
248     public void testWritesWriteRequestQueueWhenFinished() throws Exception {
249         AbstractStreamWriteFilter<M> filter = createFilter();
250         M message = createMessage(new byte[0]);
251         
252         WriteRequest wrs[] = new WriteRequest[] {
253                 new DefaultWriteRequest(new Object(), new DummyWriteFuture()),
254                 new DefaultWriteRequest(new Object(), new DummyWriteFuture()),
255                 new DefaultWriteRequest(new Object(), new DummyWriteFuture()) };
256         Queue<WriteRequest> queue = new LinkedList<WriteRequest>();
257         queue.add(wrs[0]);
258         queue.add(wrs[1]);
259         queue.add(wrs[2]);
260 
261         /*
262          * Make up the situation.
263          */
264         session.setAttribute(filter.CURRENT_STREAM, message);
265         session.setAttribute(filter.CURRENT_WRITE_REQUEST,
266                 new DefaultWriteRequest(message));
267         session.setAttribute(filter.WRITE_REQUEST_QUEUE, queue);
268 
269         /*
270          * Record expectations
271          */
272         NextFilter nextFilter = EasyMock.createMock(NextFilter.class);
273         nextFilter.filterWrite(session, wrs[0]);
274         nextFilter.filterWrite(session, wrs[1]);
275         nextFilter.filterWrite(session, wrs[2]);
276         nextFilter.messageSent(EasyMock.eq(session), eqWriteRequest(new DefaultWriteRequest(message)));
277 
278         /*
279          * Replay.
280          */
281         EasyMock.replay(nextFilter);
282 
283         filter.messageSent(nextFilter, session, new DefaultWriteRequest(
284                 new Object()));
285         assertEquals(0, queue.size());
286 
287         /*
288          * Verify.
289          */
290         EasyMock.verify(nextFilter);
291     }
292 
293     /**
294      * Tests that {@link StreamWriteFilter#setWriteBufferSize(int)} checks the
295      * specified size.
296      */
297     public void testSetWriteBufferSize() {
298         AbstractStreamWriteFilter<M> filter = createFilter();
299 
300         try {
301             filter.setWriteBufferSize(0);
302             fail("0 writeBuferSize specified. IllegalArgumentException expected.");
303         } catch (IllegalArgumentException iae) {
304             // Pass, exception was thrown
305         }
306 
307         try {
308             filter.setWriteBufferSize(-100);
309             fail("Negative writeBuferSize specified. IllegalArgumentException expected.");
310         } catch (IllegalArgumentException iae) {
311             // Pass, exception was thrown
312         }
313 
314         filter.setWriteBufferSize(1);
315         assertEquals(1, filter.getWriteBufferSize());
316         filter.setWriteBufferSize(1024);
317         assertEquals(1024, filter.getWriteBufferSize());
318     }
319 
320     public void testWriteUsingSocketTransport() throws Exception {
321         NioSocketAcceptor acceptor = new NioSocketAcceptor();
322         acceptor.setReuseAddress(true);
323         SocketAddress address = new InetSocketAddress("localhost",
324                 AvailablePortFinder.getNextAvailable());
325 
326         NioSocketConnector connector = new NioSocketConnector();
327 
328         // Generate 4MB of random data
329         byte[] data = new byte[4 * 1024 * 1024];
330         new Random().nextBytes(data);
331 
332         byte[] expectedMd5 = MessageDigest.getInstance("MD5").digest(data);
333 
334         M message = createMessage(data);
335         
336         SenderHandler sender = new SenderHandler(message);
337         ReceiverHandler receiver = new ReceiverHandler(data.length);
338 
339         acceptor.setHandler(sender);
340         connector.setHandler(receiver);
341         
342         acceptor.bind(address);
343         connector.connect(address);
344         sender.latch.await();
345         receiver.latch.await();
346 
347         acceptor.dispose();
348 
349         assertEquals(data.length, receiver.bytesRead);
350         byte[] actualMd5 = receiver.digest.digest();
351         assertEquals(expectedMd5.length, actualMd5.length);
352         for (int i = 0; i < expectedMd5.length; i++) {
353             assertEquals(expectedMd5[i], actualMd5[i]);
354         }
355     }
356 
357     private class SenderHandler extends IoHandlerAdapter {
358         final CountDownLatch latch = new CountDownLatch(1);
359 
360         private M message;
361 
362         StreamWriteFilter streamWriteFilter = new StreamWriteFilter();
363 
364         private SenderHandler(M message) {
365             this.message = message;
366         }
367 
368         @Override
369         public void sessionCreated(IoSession session) throws Exception {
370             super.sessionCreated(session);
371             session.getFilterChain().addLast("codec", streamWriteFilter);
372         }
373 
374         @Override
375         public void sessionOpened(IoSession session) throws Exception {
376             session.write(message);
377         }
378 
379         @Override
380         public void exceptionCaught(IoSession session, Throwable cause)
381                 throws Exception {
382             latch.countDown();
383         }
384 
385         @Override
386         public void sessionClosed(IoSession session) throws Exception {
387             latch.countDown();
388         }
389 
390         @Override
391         public void sessionIdle(IoSession session, IdleStatus status)
392                 throws Exception {
393             latch.countDown();
394         }
395 
396         @Override
397         public void messageSent(IoSession session, Object message)
398                 throws Exception {
399             if (message == this.message) {
400                 latch.countDown();
401             }
402         }
403     }
404 
405     private static class ReceiverHandler extends IoHandlerAdapter {
406         final CountDownLatch latch = new CountDownLatch(1);
407 
408         long bytesRead = 0;
409 
410         long size = 0;
411 
412         MessageDigest digest;
413 
414         private ReceiverHandler(long size) throws Exception {
415             this.size = size;
416             digest = MessageDigest.getInstance("MD5");
417         }
418 
419         @Override
420         public void sessionCreated(IoSession session) throws Exception {
421             super.sessionCreated(session);
422 
423             session.getConfig().setIdleTime(IdleStatus.READER_IDLE, 5);
424         }
425 
426         @Override
427         public void sessionIdle(IoSession session, IdleStatus status)
428                 throws Exception {
429             session.close(true);
430         }
431 
432         @Override
433         public void exceptionCaught(IoSession session, Throwable cause)
434                 throws Exception {
435             latch.countDown();
436         }
437 
438         @Override
439         public void sessionClosed(IoSession session) throws Exception {
440             latch.countDown();
441         }
442 
443         @Override
444         public void messageReceived(IoSession session, Object message)
445                 throws Exception {
446             IoBuffer buf = (IoBuffer) message;
447             while (buf.hasRemaining()) {
448                 digest.update(buf.get());
449                 bytesRead++;
450             }
451             if (bytesRead >= size) {
452                 session.close(true);
453             }
454         }
455     }
456 
457     public static WriteRequest eqWriteRequest(WriteRequest expected) {
458         EasyMock.reportMatcher(new WriteRequestMatcher(expected));
459         return null;
460     }
461     
462     public static class WriteRequestMatcher implements IArgumentMatcher {
463         
464         private final WriteRequest expected;
465         
466         public WriteRequestMatcher(WriteRequest expected) {
467                 this.expected = expected;
468         }
469         
470         public boolean matches(Object actual) {
471             if (actual instanceof WriteRequest) {
472                 WriteRequest w2 = (WriteRequest) actual;
473 
474                 return expected.getMessage().equals(w2.getMessage())
475                         && expected.getFuture().isWritten() == w2.getFuture()
476                         .isWritten();
477             }
478             return false;
479         }
480         public void appendTo(StringBuffer buffer) {
481                 buffer.append("Expected a WriteRequest with the message '").append(expected.getMessage()).append("'");
482         }
483     }
484 
485     private static class DummyWriteFuture implements WriteFuture {
486         private boolean written;
487 
488         public boolean isWritten() {
489             return written;
490         }
491 
492         public void setWritten() {
493             this.written = true;
494         }
495 
496         public IoSession getSession() {
497             return null;
498         }
499 
500         public Object getLock() {
501             return this;
502         }
503 
504         public void join() {
505         }
506 
507         public boolean join(long timeoutInMillis) {
508             return true;
509         }
510 
511         public boolean isDone() {
512             return true;
513         }
514 
515         public WriteFuture addListener(IoFutureListener<?> listener) {
516             return this;
517         }
518 
519         public WriteFuture removeListener(IoFutureListener<?> listener) {
520             return this;
521         }
522 
523         public WriteFuture await() throws InterruptedException {
524             return this;
525         }
526 
527         public boolean await(long timeout, TimeUnit unit)
528                 throws InterruptedException {
529             return true;
530         }
531 
532         public boolean await(long timeoutMillis) throws InterruptedException {
533             return true;
534         }
535 
536         public WriteFuture awaitUninterruptibly() {
537             return this;
538         }
539 
540         public boolean awaitUninterruptibly(long timeout, TimeUnit unit) {
541             return true;
542         }
543 
544         public boolean awaitUninterruptibly(long timeoutMillis) {
545             return true;
546         }
547 
548         public Throwable getException() {
549             return null;
550         }
551 
552         public void setException(Throwable cause) {
553             throw new IllegalStateException();
554         }
555     }
556 
557     
558 }