001/*
002 *  Licensed to the Apache Software Foundation (ASF) under one
003 *  or more contributor license agreements.  See the NOTICE file
004 *  distributed with this work for additional information
005 *  regarding copyright ownership.  The ASF licenses this file
006 *  to you under the Apache License, Version 2.0 (the
007 *  "License"); you may not use this file except in compliance
008 *  with the License.  You may obtain a copy of the License at
009 *
010 *    http://www.apache.org/licenses/LICENSE-2.0
011 *
012 *  Unless required by applicable law or agreed to in writing,
013 *  software distributed under the License is distributed on an
014 *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 *  KIND, either express or implied.  See the License for the
016 *  specific language governing permissions and limitations
017 *  under the License.
018 *
019 */
020package org.apache.mina.filter.stream;
021
022import static org.junit.Assert.assertEquals;
023import static org.junit.Assert.assertSame;
024import static org.junit.Assert.assertTrue;
025import static org.junit.Assert.fail;
026
027import java.net.InetSocketAddress;
028import java.net.SocketAddress;
029import java.security.MessageDigest;
030import java.util.LinkedList;
031import java.util.Queue;
032import java.util.Random;
033import java.util.concurrent.CountDownLatch;
034import java.util.concurrent.TimeUnit;
035
036import org.apache.mina.core.buffer.IoBuffer;
037import org.apache.mina.core.filterchain.IoFilter.NextFilter;
038import org.apache.mina.core.future.IoFutureListener;
039import org.apache.mina.core.future.WriteFuture;
040import org.apache.mina.core.service.IoHandlerAdapter;
041import org.apache.mina.core.session.DummySession;
042import org.apache.mina.core.session.IdleStatus;
043import org.apache.mina.core.session.IoSession;
044import org.apache.mina.core.write.DefaultWriteRequest;
045import org.apache.mina.core.write.WriteRequest;
046import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
047import org.apache.mina.transport.socket.nio.NioSocketConnector;
048import org.apache.mina.util.AvailablePortFinder;
049import org.easymock.IArgumentMatcher;
050import org.easymock.EasyMock;
051import org.junit.Test;
052import org.slf4j.Logger;
053import org.slf4j.LoggerFactory;
054
055/**
056 * TODO Add documentation
057 * 
058 * @author <a href="http://mina.apache.org">Apache MINA Project</a>
059 */
060public abstract class AbstractStreamWriteFilterTest<M, U extends AbstractStreamWriteFilter<M>> {
061
062    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractStreamWriteFilterTest.class);
063
064    protected final IoSession session = new DummySession();
065
066    abstract protected U createFilter();
067
068    abstract protected M createMessage(byte[] data) throws Exception;
069
070    @Test
071    public void testWriteEmptyFile() throws Exception {
072        AbstractStreamWriteFilter<M> filter = createFilter();
073        M message = createMessage(new byte[0]);
074
075        WriteRequest writeRequest = new DefaultWriteRequest(message, new DummyWriteFuture());
076
077        NextFilter nextFilter = EasyMock.createMock(NextFilter.class);
078        /*
079          * Record expectations
080          */
081        nextFilter.messageSent(session, writeRequest);
082
083        /*
084         * Replay.
085         */
086        EasyMock.replay(nextFilter);
087
088        filter.filterWrite(nextFilter, session, writeRequest);
089
090        /*
091         * Verify.
092         */
093        EasyMock.verify(nextFilter);
094
095        assertTrue(writeRequest.getFuture().isWritten());
096    }
097
098    /**
099     * Tests that the filter just passes objects which aren't FileRegion's
100     * through to the next filter.
101     *
102     * @throws Exception when something goes wrong
103     */
104    @Test
105    public void testWriteNonFileRegionMessage() throws Exception {
106        AbstractStreamWriteFilter<M> filter = createFilter();
107
108        Object message = new Object();
109        WriteRequest writeRequest = new DefaultWriteRequest(message, new DummyWriteFuture());
110
111        NextFilter nextFilter = EasyMock.createMock(NextFilter.class);
112        /*
113         * Record expectations
114         */
115        nextFilter.filterWrite(session, writeRequest);
116        nextFilter.messageSent(session, writeRequest);
117
118        /*
119         * Replay.
120         */
121        EasyMock.replay(nextFilter);
122
123        filter.filterWrite(nextFilter, session, writeRequest);
124        filter.messageSent(nextFilter, session, writeRequest);
125
126        /*
127         * Verify.
128         */
129        EasyMock.verify(nextFilter);
130    }
131
132    /**
133     * Tests when the contents of the file fits into one write buffer.
134     *
135     * @throws Exception when something goes wrong
136     */
137    @Test
138    public void testWriteSingleBufferFile() throws Exception {
139        byte[] data = new byte[] { 1, 2, 3, 4 };
140
141        AbstractStreamWriteFilter<M> filter = createFilter();
142        M message = createMessage(data);
143
144        WriteRequest writeRequest = new DefaultWriteRequest(message, new DummyWriteFuture());
145
146        NextFilter nextFilter = EasyMock.createMock(NextFilter.class);
147        /*
148         * Record expectations
149         */
150        nextFilter.filterWrite(EasyMock.eq(session), eqWriteRequest(new DefaultWriteRequest(IoBuffer.wrap(data))));
151        nextFilter.messageSent(session, writeRequest);
152
153        /*
154         * Replay.
155         */
156        EasyMock.replay(nextFilter);
157
158        filter.filterWrite(nextFilter, session, writeRequest);
159        filter.messageSent(nextFilter, session, writeRequest);
160
161        /*
162         * Verify.
163         */
164        EasyMock.verify(nextFilter);
165
166        assertTrue(writeRequest.getFuture().isWritten());
167    }
168
169    /**
170     * Tests when the contents of the file doesn't fit into one write buffer.
171     *
172     * @throws Exception when something goes wrong
173     */
174    @Test
175    public void testWriteSeveralBuffersStream() throws Exception {
176        AbstractStreamWriteFilter<M> filter = createFilter();
177        filter.setWriteBufferSize(4);
178
179        byte[] data = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
180        byte[] chunk1 = new byte[] { 1, 2, 3, 4 };
181        byte[] chunk2 = new byte[] { 5, 6, 7, 8 };
182        byte[] chunk3 = new byte[] { 9, 10 };
183
184        M message = createMessage(data);
185        WriteRequest writeRequest = new DefaultWriteRequest(message, new DummyWriteFuture());
186
187        WriteRequest chunk1Request = new DefaultWriteRequest(IoBuffer.wrap(chunk1));
188        WriteRequest chunk2Request = new DefaultWriteRequest(IoBuffer.wrap(chunk2));
189        WriteRequest chunk3Request = new DefaultWriteRequest(IoBuffer.wrap(chunk3));
190
191        NextFilter nextFilter = EasyMock.createMock(NextFilter.class);
192        /*
193         * Record expectations
194         */
195        nextFilter.filterWrite(EasyMock.eq(session), eqWriteRequest(chunk1Request));
196        nextFilter.filterWrite(EasyMock.eq(session), eqWriteRequest(chunk2Request));
197        nextFilter.filterWrite(EasyMock.eq(session), eqWriteRequest(chunk3Request));
198        nextFilter.messageSent(EasyMock.eq(session), eqWriteRequest(writeRequest));
199
200        /*
201         * Replay.
202         */
203        EasyMock.replay(nextFilter);
204
205        filter.filterWrite(nextFilter, session, writeRequest);
206        filter.messageSent(nextFilter, session, chunk1Request);
207        filter.messageSent(nextFilter, session, chunk2Request);
208        filter.messageSent(nextFilter, session, chunk3Request);
209
210        /*
211         * Verify.
212         */
213        EasyMock.verify(nextFilter);
214
215        assertTrue(writeRequest.getFuture().isWritten());
216    }
217
218    @Test
219    public void testWriteWhileWriteInProgress() throws Exception {
220        AbstractStreamWriteFilter<M> filter = createFilter();
221        M message = createMessage(new byte[5]);
222
223        Queue<WriteRequest> queue = new LinkedList<WriteRequest>();
224
225        /*
226         * Make up the situation.
227         */
228        session.setAttribute(filter.CURRENT_STREAM, message);
229        session.setAttribute(filter.WRITE_REQUEST_QUEUE, queue);
230
231        NextFilter nextFilter = EasyMock.createMock(NextFilter.class);
232        /*
233         * Replay.  (We recorded *nothing* because nothing should occur.)
234         */
235        EasyMock.replay(nextFilter);
236
237        WriteRequest wr = new DefaultWriteRequest(new Object(), new DummyWriteFuture());
238        filter.filterWrite(nextFilter, session, wr);
239        assertEquals(1, queue.size());
240        assertSame(wr, queue.poll());
241
242        /*
243         * Verify.
244         */
245        EasyMock.verify(nextFilter);
246
247        session.removeAttribute(filter.CURRENT_STREAM);
248        session.removeAttribute(filter.WRITE_REQUEST_QUEUE);
249    }
250
251    @Test
252    public void testWritesWriteRequestQueueWhenFinished() throws Exception {
253        AbstractStreamWriteFilter<M> filter = createFilter();
254        M message = createMessage(new byte[0]);
255
256        WriteRequest wrs[] = new WriteRequest[] { new DefaultWriteRequest(new Object(), new DummyWriteFuture()),
257                new DefaultWriteRequest(new Object(), new DummyWriteFuture()),
258                new DefaultWriteRequest(new Object(), new DummyWriteFuture()) };
259        Queue<WriteRequest> queue = new LinkedList<WriteRequest>();
260        queue.add(wrs[0]);
261        queue.add(wrs[1]);
262        queue.add(wrs[2]);
263
264        /*
265         * Make up the situation.
266         */
267        session.setAttribute(filter.CURRENT_STREAM, message);
268        session.setAttribute(filter.CURRENT_WRITE_REQUEST, new DefaultWriteRequest(message));
269        session.setAttribute(filter.WRITE_REQUEST_QUEUE, queue);
270
271        /*
272         * Record expectations
273         */
274        NextFilter nextFilter = EasyMock.createMock(NextFilter.class);
275        nextFilter.filterWrite(session, wrs[0]);
276        nextFilter.filterWrite(session, wrs[1]);
277        nextFilter.filterWrite(session, wrs[2]);
278        nextFilter.messageSent(EasyMock.eq(session), eqWriteRequest(new DefaultWriteRequest(message)));
279
280        /*
281         * Replay.
282         */
283        EasyMock.replay(nextFilter);
284
285        filter.messageSent(nextFilter, session, new DefaultWriteRequest(new Object()));
286        assertEquals(0, queue.size());
287
288        /*
289         * Verify.
290         */
291        EasyMock.verify(nextFilter);
292    }
293
294    /**
295     * Tests that {@link StreamWriteFilter#setWriteBufferSize(int)} checks the
296     * specified size.
297     */
298    @Test
299    public void testSetWriteBufferSize() {
300        AbstractStreamWriteFilter<M> filter = createFilter();
301
302        try {
303            filter.setWriteBufferSize(0);
304            fail("0 writeBuferSize specified. IllegalArgumentException expected.");
305        } catch (IllegalArgumentException iae) {
306            // Pass, exception was thrown
307            // Signifies a successful test execution
308            assertTrue(true);
309        }
310
311        try {
312            filter.setWriteBufferSize(-100);
313            fail("Negative writeBuferSize specified. IllegalArgumentException expected.");
314        } catch (IllegalArgumentException iae) {
315            // Pass, exception was thrown
316            // Signifies a successful test execution
317            assertTrue(true);
318        }
319
320        filter.setWriteBufferSize(1);
321        assertEquals(1, filter.getWriteBufferSize());
322        filter.setWriteBufferSize(1024);
323        assertEquals(1024, filter.getWriteBufferSize());
324    }
325
326    @Test
327    public void testWriteUsingSocketTransport() throws Exception {
328        NioSocketAcceptor acceptor = new NioSocketAcceptor();
329        acceptor.setReuseAddress(true);
330        SocketAddress address = new InetSocketAddress("localhost", AvailablePortFinder.getNextAvailable());
331
332        NioSocketConnector connector = new NioSocketConnector();
333
334        // Generate 4MB of random data
335        byte[] data = new byte[4 * 1024 * 1024];
336        new Random().nextBytes(data);
337
338        byte[] expectedMd5 = MessageDigest.getInstance("MD5").digest(data);
339
340        M message = createMessage(data);
341
342        SenderHandler sender = new SenderHandler(message);
343        ReceiverHandler receiver = new ReceiverHandler(data.length);
344
345        acceptor.setHandler(sender);
346        connector.setHandler(receiver);
347
348        acceptor.bind(address);
349        connector.connect(address);
350        sender.latch.await();
351        receiver.latch.await();
352
353        acceptor.dispose();
354
355        assertEquals(data.length, receiver.bytesRead);
356        byte[] actualMd5 = receiver.digest.digest();
357        assertEquals(expectedMd5.length, actualMd5.length);
358        for (int i = 0; i < expectedMd5.length; i++) {
359            assertEquals(expectedMd5[i], actualMd5[i]);
360        }
361    }
362
363    private class SenderHandler extends IoHandlerAdapter {
364        final CountDownLatch latch = new CountDownLatch(1);
365
366        private M message;
367
368        StreamWriteFilter streamWriteFilter = new StreamWriteFilter();
369
370        SenderHandler(M message) {
371            this.message = message;
372        }
373
374        @Override
375        public void sessionCreated(IoSession session) throws Exception {
376            super.sessionCreated(session);
377            session.getFilterChain().addLast("codec", streamWriteFilter);
378        }
379
380        @Override
381        public void sessionOpened(IoSession session) throws Exception {
382            session.write(message);
383        }
384
385        @Override
386        public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
387            LOGGER.error("SenderHandler: exceptionCaught", cause);
388            latch.countDown();
389        }
390
391        @Override
392        public void sessionClosed(IoSession session) throws Exception {
393            LOGGER.info("SenderHandler: sessionClosed");
394            latch.countDown();
395        }
396
397        @Override
398        public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
399            LOGGER.info("SenderHandler: sessionIdle");
400            latch.countDown();
401        }
402
403        @Override
404        public void messageSent(IoSession session, Object message) throws Exception {
405            LOGGER.info("SenderHandler: messageSent");
406            if (message == this.message) {
407                LOGGER.info("message == this.message");
408                latch.countDown();
409            }
410        }
411    }
412
413    private static class ReceiverHandler extends IoHandlerAdapter {
414        final CountDownLatch latch = new CountDownLatch(1);
415
416        long bytesRead = 0;
417
418        long size = 0;
419
420        MessageDigest digest;
421
422        ReceiverHandler(long size) throws Exception {
423            this.size = size;
424            digest = MessageDigest.getInstance("MD5");
425        }
426
427        @Override
428        public void sessionCreated(IoSession session) throws Exception {
429            super.sessionCreated(session);
430
431            session.getConfig().setIdleTime(IdleStatus.READER_IDLE, 5);
432        }
433
434        @Override
435        public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
436            LOGGER.info("ReceiverHandler: sessionIdle");
437            session.close(true);
438        }
439
440        @Override
441        public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
442            LOGGER.error("ReceiverHandler: exceptionCaught", cause);
443            latch.countDown();
444        }
445
446        @Override
447        public void sessionClosed(IoSession session) throws Exception {
448            LOGGER.info("ReceiverHandler: sessionClosed");
449            latch.countDown();
450        }
451
452        @Override
453        public void messageReceived(IoSession session, Object message) throws Exception {
454            LOGGER.info("messageReceived");
455            IoBuffer buf = (IoBuffer) message;
456            while (buf.hasRemaining()) {
457                digest.update(buf.get());
458                bytesRead++;
459            }
460            LOGGER.info("messageReceived: bytesRead = {}", bytesRead);
461            if (bytesRead >= size) {
462                session.close(true);
463            }
464        }
465    }
466
467    public static WriteRequest eqWriteRequest(WriteRequest expected) {
468        EasyMock.reportMatcher(new WriteRequestMatcher(expected));
469        return null;
470    }
471
472    private static class WriteRequestMatcher implements IArgumentMatcher {
473        private final WriteRequest expected;
474
475        public WriteRequestMatcher(WriteRequest expected) {
476            this.expected = expected;
477        }
478
479        public boolean matches(Object actual) {
480            if (actual instanceof WriteRequest) {
481                WriteRequest w2 = (WriteRequest) actual;
482
483                return expected.getMessage().equals(w2.getMessage())
484                        && expected.getFuture().isWritten() == w2.getFuture().isWritten();
485            }
486            return false;
487        }
488
489        public void appendTo(StringBuffer buffer) {
490            buffer.append("Expected a WriteRequest with the message '").append(expected.getMessage()).append("'");
491        }
492    }
493
494    private static class DummyWriteFuture implements WriteFuture {
495        private boolean written;
496
497        /**
498         * Default constructor
499         */
500        public DummyWriteFuture() {
501            super();
502        }
503
504        public boolean isWritten() {
505            return written;
506        }
507
508        public void setWritten() {
509            this.written = true;
510        }
511
512        public IoSession getSession() {
513            return null;
514        }
515
516        public void join() {
517            // Do nothing
518        }
519
520        public boolean join(long timeoutInMillis) {
521            return true;
522        }
523
524        public boolean isDone() {
525            return true;
526        }
527
528        public WriteFuture addListener(IoFutureListener<?> listener) {
529            return this;
530        }
531
532        public WriteFuture removeListener(IoFutureListener<?> listener) {
533            return this;
534        }
535
536        public WriteFuture await() throws InterruptedException {
537            return this;
538        }
539
540        public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
541            return true;
542        }
543
544        public boolean await(long timeoutMillis) throws InterruptedException {
545            return true;
546        }
547
548        public WriteFuture awaitUninterruptibly() {
549            return this;
550        }
551
552        public boolean awaitUninterruptibly(long timeout, TimeUnit unit) {
553            return true;
554        }
555
556        public boolean awaitUninterruptibly(long timeoutMillis) {
557            return true;
558        }
559
560        public Throwable getException() {
561            return null;
562        }
563
564        public void setException(Throwable cause) {
565            throw new IllegalStateException();
566        }
567    }
568
569}