1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.commons.io.input;
19
20 import java.io.IOException;
21 import java.io.InputStream;
22 import java.io.InterruptedIOException;
23 import java.time.Duration;
24 import java.time.temporal.ChronoUnit;
25 import java.util.concurrent.TimeUnit;
26
27 import org.apache.commons.io.build.AbstractStreamBuilder;
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44 public final class ThrottledInputStream extends CountingInputStream {
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77 public static class Builder extends AbstractStreamBuilder<ThrottledInputStream, Builder> {
78
79
80
81
82 private long maxBytesPerSecond = Long.MAX_VALUE;
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103 @SuppressWarnings("resource")
104 @Override
105 public ThrottledInputStream get() throws IOException {
106 return new ThrottledInputStream(getInputStream(), maxBytesPerSecond);
107 }
108
109
110
111
112
113
114 public void setMaxBytesPerSecond(final long maxBytesPerSecond) {
115 this.maxBytesPerSecond = maxBytesPerSecond;
116 }
117
118 }
119
120
121
122
123
124
125 public static Builder builder() {
126 return new Builder();
127 }
128
129 static long toSleepMillis(final long bytesRead, final long maxBytesPerSec, final long elapsedMillis) {
130 assert elapsedMillis >= 0 : "The elapsed time should be greater or equal to zero";
131 if (bytesRead <= 0 || maxBytesPerSec <= 0 || elapsedMillis == 0) {
132 return 0;
133 }
134
135
136
137 final long millis = (long) ((double) bytesRead / (double) maxBytesPerSec * 1000 - elapsedMillis);
138 if (millis <= 0) {
139 return 0;
140 }
141 return millis;
142 }
143
144 private final long maxBytesPerSecond;
145 private final long startTime = System.currentTimeMillis();
146 private Duration totalSleepDuration = Duration.ZERO;
147
148 private ThrottledInputStream(final InputStream proxy, final long maxBytesPerSecond) {
149 super(proxy);
150 assert maxBytesPerSecond > 0 : "Bandwidth " + maxBytesPerSecond + " is invalid.";
151 this.maxBytesPerSecond = maxBytesPerSecond;
152 }
153
154 @Override
155 protected void beforeRead(final int n) throws IOException {
156 throttle();
157 }
158
159
160
161
162
163
164 private long getBytesPerSecond() {
165 final long elapsedSeconds = (System.currentTimeMillis() - startTime) / 1000;
166 if (elapsedSeconds == 0) {
167 return getByteCount();
168 }
169 return getByteCount() / elapsedSeconds;
170 }
171
172 private long getSleepMillis() {
173 return toSleepMillis(getByteCount(), maxBytesPerSecond, System.currentTimeMillis() - startTime);
174 }
175
176
177
178
179
180
181 Duration getTotalSleepDuration() {
182 return totalSleepDuration;
183 }
184
185 private void throttle() throws InterruptedIOException {
186 final long sleepMillis = getSleepMillis();
187 if (sleepMillis > 0) {
188 totalSleepDuration = totalSleepDuration.plus(sleepMillis, ChronoUnit.MILLIS);
189 try {
190 TimeUnit.MILLISECONDS.sleep(sleepMillis);
191 } catch (final InterruptedException e) {
192 throw new InterruptedIOException("Thread aborted");
193 }
194 }
195 }
196
197
198 @Override
199 public String toString() {
200 return "ThrottledInputStream[bytesRead=" + getByteCount() + ", maxBytesPerSec=" + maxBytesPerSecond + ", bytesPerSec=" + getBytesPerSecond()
201 + ", totalSleepDuration=" + totalSleepDuration + ']';
202 }
203 }