1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.core5.reactive;
28
29 import java.nio.ByteBuffer;
30 import java.nio.charset.StandardCharsets;
31
32 import org.apache.hc.core5.http.HttpStreamResetException;
33 import org.apache.hc.core5.http.nio.DataStreamChannel;
34 import org.junit.jupiter.api.Assertions;
35 import org.junit.jupiter.api.Test;
36
37 import io.reactivex.rxjava3.core.Flowable;
38
39 public class TestReactiveDataProducer {
40 @Test
41 public void testStreamThatEndsNormally() throws Exception {
42 final Flowable<ByteBuffer> publisher = Flowable.just(
43 ByteBuffer.wrap(new byte[]{ '1', '2', '3' }),
44 ByteBuffer.wrap(new byte[]{ '4', '5', '6' }));
45 final ReactiveDataProducer producer = new ReactiveDataProducer(publisher);
46
47 final WritableByteChannelMock byteChannel = new WritableByteChannelMock(1024);
48 final DataStreamChannel streamChannel = new BasicDataStreamChannel(byteChannel);
49
50 producer.produce(streamChannel);
51
52 Assertions.assertTrue(byteChannel.isOpen());
53 Assertions.assertEquals("123456", byteChannel.dump(StandardCharsets.US_ASCII));
54
55 producer.produce(streamChannel);
56
57 Assertions.assertFalse(byteChannel.isOpen());
58 Assertions.assertEquals("", byteChannel.dump(StandardCharsets.US_ASCII));
59 }
60
61 @Test
62 public void testStreamThatEndsWithError() throws Exception {
63 final Flowable<ByteBuffer> publisher = Flowable.concatArray(
64 Flowable.just(
65 ByteBuffer.wrap(new byte[]{ '1' }),
66 ByteBuffer.wrap(new byte[]{ '2' }),
67 ByteBuffer.wrap(new byte[]{ '3' }),
68 ByteBuffer.wrap(new byte[]{ '4' }),
69 ByteBuffer.wrap(new byte[]{ '5' }),
70 ByteBuffer.wrap(new byte[]{ '6' })),
71 Flowable.error(new RuntimeException())
72 );
73 final ReactiveDataProducer producer = new ReactiveDataProducer(publisher);
74
75 final WritableByteChannelMock byteChannel = new WritableByteChannelMock(1024);
76 final DataStreamChannel streamChannel = new BasicDataStreamChannel(byteChannel);
77
78 producer.produce(streamChannel);
79 Assertions.assertEquals("12345", byteChannel.dump(StandardCharsets.US_ASCII));
80
81 final HttpStreamResetException exception = Assertions.assertThrows(HttpStreamResetException.class, () ->
82 producer.produce(streamChannel));
83 Assertions.assertTrue(exception.getCause() instanceof RuntimeException, "Expected published exception to be rethrown");
84 Assertions.assertEquals("", byteChannel.dump(StandardCharsets.US_ASCII));
85 }
86 }