1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.hc.core5.http.nio.entity;
29
30 import java.io.IOException;
31 import java.nio.ByteBuffer;
32 import java.util.concurrent.atomic.AtomicLong;
33
34 import org.apache.hc.core5.concurrent.FutureCallback;
35 import org.apache.hc.core5.http.ContentType;
36 import org.apache.hc.core5.http.HttpException;
37 import org.apache.hc.core5.http.impl.BasicEntityDetails;
38 import org.apache.hc.core5.http.nio.AsyncEntityConsumer;
39 import org.apache.hc.core5.util.ByteArrayBuffer;
40 import org.junit.Assert;
41 import org.junit.Test;
42
43 public class TestAbstractBinAsyncEntityConsumer {
44
45 static private class ByteArrayAsyncEntityConsumer extends AbstractBinAsyncEntityConsumer<byte[]> {
46
47 private final ByteArrayBuffer buffer;
48
49 public ByteArrayAsyncEntityConsumer() {
50 super();
51 this.buffer = new ByteArrayBuffer(1024);
52 }
53
54 @Override
55 protected void streamStart(final ContentType contentType) throws HttpException, IOException {
56 }
57
58 @Override
59 protected int capacityIncrement() {
60 return Integer.MAX_VALUE;
61 }
62
63 @Override
64 protected void data(final ByteBuffer src, final boolean endOfStream) throws IOException {
65 if (src == null) {
66 return;
67 }
68 if (src.hasArray()) {
69 buffer.append(src.array(), src.arrayOffset() + src.position(), src.remaining());
70 } else {
71 while (src.hasRemaining()) {
72 buffer.append(src.get());
73 }
74 }
75 }
76
77 @Override
78 protected byte[] generateContent() throws IOException {
79 return buffer.toByteArray();
80 }
81
82 @Override
83 public void releaseResources() {
84 }
85
86 }
87
88 @Test
89 public void testConsumeData() throws Exception {
90
91 final AsyncEntityConsumer<byte[]> consumer = new ByteArrayAsyncEntityConsumer();
92
93 final AtomicLong count = new AtomicLong(0);
94 consumer.streamStart(new BasicEntityDetails(-1, ContentType.APPLICATION_OCTET_STREAM), new FutureCallback<byte[]>() {
95
96 @Override
97 public void completed(final byte[] result) {
98 count.incrementAndGet();
99 }
100
101 @Override
102 public void failed(final Exception ex) {
103 count.incrementAndGet();
104 }
105
106 @Override
107 public void cancelled() {
108 count.incrementAndGet();
109 }
110
111 });
112
113 consumer.consume(ByteBuffer.wrap(new byte[]{'1', '2', '3'}));
114 consumer.consume(ByteBuffer.wrap(new byte[]{'4', '5'}));
115 consumer.consume(ByteBuffer.wrap(new byte[]{}));
116
117 Assert.assertEquals(null, consumer.getContent());
118 consumer.streamEnd(null);
119
120 Assert.assertArrayEquals(new byte[] {'1', '2', '3', '4', '5'}, consumer.getContent());
121 Assert.assertEquals(1L, count.longValue());
122 }
123
124 }