1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.commons.io;
19
20 import static org.junit.jupiter.api.Assertions.assertEquals;
21 import static org.junit.jupiter.api.Assertions.fail;
22
23 import java.io.ByteArrayInputStream;
24 import java.io.ByteArrayOutputStream;
25 import java.io.EOFException;
26 import java.io.IOException;
27 import java.io.InputStream;
28 import java.util.Random;
29 import java.util.concurrent.ExecutorCompletionService;
30 import java.util.concurrent.ExecutorService;
31 import java.util.concurrent.Executors;
32 import java.util.concurrent.Future;
33 import java.util.function.Supplier;
34 import java.util.zip.Inflater;
35 import java.util.zip.InflaterInputStream;
36
37 import org.junit.jupiter.api.BeforeEach;
38 import org.junit.jupiter.api.Test;
39
40
41
42
43 public class IOUtilsMultithreadedSkipTest {
44
45 private static final String FIXTURE = "TIKA-4065.bin";
46 long seed = 1;
47 private final ThreadLocal<byte[]> threadLocal = ThreadLocal.withInitial(() -> new byte[4096]);
48
49 private int[] generateExpected(final InputStream is, final int[] skips) throws IOException {
50 final int[] testBytes = new int[skips.length];
51 for (int i = 0; i < skips.length; i++) {
52 try {
53 IOUtils.skipFully(is, skips[i]);
54 testBytes[i] = is.read();
55 } catch (final EOFException e) {
56 testBytes[i] = -1;
57 }
58 }
59 return testBytes;
60 }
61
62 private int[] generateSkips(final byte[] bytes, final int numSkips, final Random random) {
63 final int[] skips = new int[numSkips];
64 for (int i = 0; i < skips.length; i++) {
65 skips[i] = random.nextInt(bytes.length / numSkips) + bytes.length / 10;
66 }
67 return skips;
68 }
69
70 private InputStream inflate(final byte[] deflated) throws IOException {
71 final ByteArrayOutputStream bos = new ByteArrayOutputStream();
72 IOUtils.copy(new InflaterInputStream(new ByteArrayInputStream(deflated), new Inflater(true)), bos);
73 return new ByteArrayInputStream(bos.toByteArray());
74 }
75
76 @BeforeEach
77 public void setUp() {
78
79 seed = new Random().nextLong();
80 }
81
82 private void testSkipFullyOnInflaterInputStream(final Supplier<byte[]> baSupplier) throws Exception {
83 final long thisSeed = seed;
84
85 final Random random = new Random(thisSeed);
86 final byte[] bytes;
87 try (final InputStream inputStream = getClass().getResourceAsStream(FIXTURE)) {
88 bytes = IOUtils.toByteArray(inputStream);
89 }
90 final int numSkips = random.nextInt(bytes.length) / 100 + 1;
91
92 final int[] skips = generateSkips(bytes, numSkips, random);
93 final int[] expected;
94 try (final InputStream inflate = inflate(bytes)) {
95 expected = generateExpected(inflate, skips);
96 }
97
98 final int numThreads = 2;
99 final int iterations = 100;
100 final ExecutorService executorService = Executors.newFixedThreadPool(numThreads);
101 final ExecutorCompletionService<Integer> executorCompletionService = new ExecutorCompletionService<>(executorService);
102
103 for (int i = 0; i < numThreads; i++) {
104 executorCompletionService.submit(() -> {
105 for (int iteration = 0; iteration < iterations; iteration++) {
106 try (InputStream is = new InflaterInputStream(new ByteArrayInputStream(bytes), new Inflater(true))) {
107 for (int skipIndex = 0; skipIndex < skips.length; skipIndex++) {
108 try {
109 IOUtils.skipFully(is, skips[skipIndex], baSupplier);
110 final int c = is.read();
111 assertEquals(expected[skipIndex], c, "failed on seed=" + seed + " iteration=" + iteration);
112 } catch (final EOFException e) {
113 assertEquals(expected[skipIndex], is.read(), "failed on " + "seed=" + seed + " iteration=" + iteration);
114 }
115 }
116 }
117 }
118 return 1;
119 });
120 }
121
122 int finished = 0;
123 while (finished < numThreads) {
124
125 final Future<Integer> future = executorCompletionService.take();
126 try {
127 future.get();
128 } catch (final Exception e) {
129
130 e.printStackTrace();
131 fail("failed on seed=" + seed);
132 }
133 finished++;
134 }
135 }
136
137 @Test
138 public void testSkipFullyOnInflaterInputStream_New_bytes() throws Exception {
139 testSkipFullyOnInflaterInputStream(() -> new byte[4096]);
140 }
141
142 @Test
143 public void testSkipFullyOnInflaterInputStream_ThreadLocal() throws Exception {
144 testSkipFullyOnInflaterInputStream(threadLocal::get);
145 }
146
147 }