1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.core5.reactive;
28
29 import java.nio.ByteBuffer;
30 import java.nio.charset.StandardCharsets;
31
32 import org.apache.hc.core5.http.HttpStreamResetException;
33 import org.apache.hc.core5.http.nio.DataStreamChannel;
34 import org.junit.Assert;
35 import org.junit.Test;
36
37 import io.reactivex.Flowable;
38
39 public class TestReactiveDataProducer {
40 @Test
41 public void testStreamThatEndsNormally() throws Exception {
42 final Flowable<ByteBuffer> publisher = Flowable.just(
43 ByteBuffer.wrap(new byte[]{ '1', '2', '3' }),
44 ByteBuffer.wrap(new byte[]{ '4', '5', '6' }));
45 final ReactiveDataProducer producer = new ReactiveDataProducer(publisher);
46
47 final WritableByteChannelMocklMock.html#WritableByteChannelMock">WritableByteChannelMock byteChannel = new WritableByteChannelMock(1024);
48 final DataStreamChannel streamChannel = new BasicDataStreamChannel(byteChannel);
49
50 producer.produce(streamChannel);
51
52 Assert.assertTrue(byteChannel.isOpen());
53 Assert.assertEquals("123456", byteChannel.dump(StandardCharsets.US_ASCII));
54
55 producer.produce(streamChannel);
56
57 Assert.assertFalse(byteChannel.isOpen());
58 Assert.assertEquals("", byteChannel.dump(StandardCharsets.US_ASCII));
59 }
60
61 @Test
62 public void testStreamThatEndsWithError() throws Exception {
63 final Flowable<ByteBuffer> publisher = Flowable.concatArray(
64 Flowable.just(
65 ByteBuffer.wrap(new byte[]{ '1' }),
66 ByteBuffer.wrap(new byte[]{ '2' }),
67 ByteBuffer.wrap(new byte[]{ '3' }),
68 ByteBuffer.wrap(new byte[]{ '4' }),
69 ByteBuffer.wrap(new byte[]{ '5' }),
70 ByteBuffer.wrap(new byte[]{ '6' })),
71 Flowable.<ByteBuffer>error(new RuntimeException())
72 );
73 final ReactiveDataProducer producer = new ReactiveDataProducer(publisher);
74
75 final WritableByteChannelMocklMock.html#WritableByteChannelMock">WritableByteChannelMock byteChannel = new WritableByteChannelMock(1024);
76 final DataStreamChannel streamChannel = new BasicDataStreamChannel(byteChannel);
77
78 producer.produce(streamChannel);
79 Assert.assertEquals("12345", byteChannel.dump(StandardCharsets.US_ASCII));
80
81 try {
82 producer.produce(streamChannel);
83 Assert.fail("Expected ProtocolException");
84 } catch (final HttpStreamResetException ex) {
85 Assert.assertTrue("Expected published exception to be rethrown", ex.getCause() instanceof RuntimeException);
86 Assert.assertEquals("", byteChannel.dump(StandardCharsets.US_ASCII));
87 }
88 }
89 }