1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.core5.reactive;
28
29 import java.nio.ByteBuffer;
30 import java.nio.charset.StandardCharsets;
31 import java.util.Collections;
32
33 import org.apache.hc.core5.http.ContentType;
34 import org.apache.hc.core5.http.HttpStreamResetException;
35 import org.apache.hc.core5.http.nio.DataStreamChannel;
36 import org.junit.jupiter.api.Assertions;
37 import org.junit.jupiter.api.Test;
38
39 import io.reactivex.rxjava3.core.Flowable;
40
41 public class TestReactiveEntityProducer {
42
43 private static final long CONTENT_LENGTH = 1;
44 private static final ContentType CONTENT_TYPE = ContentType.APPLICATION_JSON;
45 private static final String GZIP_CONTENT_ENCODING = "gzip";
46
47 @Test
48 public void testStreamThatEndsNormally() throws Exception {
49 final Flowable<ByteBuffer> publisher = Flowable.just(
50 ByteBuffer.wrap(new byte[]{'1', '2', '3'}),
51 ByteBuffer.wrap(new byte[]{'4', '5', '6'}));
52 final ReactiveEntityProducer entityProducer = new ReactiveEntityProducer(publisher, CONTENT_LENGTH, CONTENT_TYPE, GZIP_CONTENT_ENCODING);
53
54 final WritableByteChannelMock byteChannel = new WritableByteChannelMock(1024);
55 final DataStreamChannel streamChannel = new BasicDataStreamChannel(byteChannel);
56
57 entityProducer.produce(streamChannel);
58
59 Assertions.assertTrue(byteChannel.isOpen(), "Should be open");
60 Assertions.assertEquals("123456", byteChannel.dump(StandardCharsets.US_ASCII));
61
62 entityProducer.produce(streamChannel);
63
64 Assertions.assertFalse(byteChannel.isOpen(), "Should be closed");
65 Assertions.assertEquals("", byteChannel.dump(StandardCharsets.US_ASCII));
66 Assertions.assertFalse(entityProducer.isChunked());
67 Assertions.assertEquals(GZIP_CONTENT_ENCODING, entityProducer.getContentEncoding());
68 Assertions.assertEquals(Collections.emptySet(), entityProducer.getTrailerNames());
69 Assertions.assertEquals(CONTENT_LENGTH, entityProducer.getContentLength());
70 Assertions.assertEquals(CONTENT_TYPE.toString(), entityProducer.getContentType());
71 Assertions.assertFalse(entityProducer.isRepeatable());
72 Assertions.assertEquals(1, entityProducer.available());
73
74 entityProducer.releaseResources();
75 }
76
77 @Test
78
79 public void testStreamThatEndsWithError() throws Exception {
80 final Flowable<ByteBuffer> publisher = Flowable.concatArray(
81 Flowable.just(
82 ByteBuffer.wrap(new byte[]{'1'}),
83 ByteBuffer.wrap(new byte[]{'2'}),
84 ByteBuffer.wrap(new byte[]{'3'}),
85 ByteBuffer.wrap(new byte[]{'4'}),
86 ByteBuffer.wrap(new byte[]{'5'}),
87 ByteBuffer.wrap(new byte[]{'6'})),
88 Flowable.error(new RuntimeException())
89 );
90 final ReactiveEntityProducer entityProducer = new ReactiveEntityProducer(publisher, CONTENT_LENGTH, CONTENT_TYPE, GZIP_CONTENT_ENCODING);
91
92 final WritableByteChannelMock byteChannel = new WritableByteChannelMock(1024);
93 final DataStreamChannel streamChannel = new BasicDataStreamChannel(byteChannel);
94
95 entityProducer.produce(streamChannel);
96 Assertions.assertEquals("12345", byteChannel.dump(StandardCharsets.US_ASCII));
97
98 final HttpStreamResetException exception = Assertions.assertThrows(HttpStreamResetException.class, () ->
99 entityProducer.produce(streamChannel));
100 Assertions.assertTrue(exception.getCause() instanceof RuntimeException, "Expected published exception to be rethrown");
101 Assertions.assertEquals("", byteChannel.dump(StandardCharsets.US_ASCII));
102 entityProducer.failed(exception);
103 Assertions.assertEquals(1, entityProducer.available());
104
105 Assertions.assertTrue(byteChannel.isOpen());
106 Assertions.assertEquals("", byteChannel.dump(StandardCharsets.US_ASCII));
107 Assertions.assertFalse(entityProducer.isChunked());
108 Assertions.assertEquals(GZIP_CONTENT_ENCODING, entityProducer.getContentEncoding());
109 Assertions.assertEquals(Collections.emptySet(), entityProducer.getTrailerNames());
110 Assertions.assertEquals(CONTENT_LENGTH, entityProducer.getContentLength());
111 Assertions.assertEquals(CONTENT_TYPE.toString(), entityProducer.getContentType());
112 Assertions.assertFalse(entityProducer.isRepeatable());
113 Assertions.assertEquals(1, entityProducer.available());
114
115 entityProducer.releaseResources();
116 }
117
118 }