001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 018package org.apache.commons.io.input; 019 020import java.io.IOException; 021import java.io.InputStream; 022import java.io.InterruptedIOException; 023import java.time.Duration; 024import java.time.temporal.ChronoUnit; 025import java.util.concurrent.TimeUnit; 026 027import org.apache.commons.io.build.AbstractStreamBuilder; 028 029/** 030 * Provides bandwidth throttling on a specified InputStream. It is implemented as a wrapper on top of another InputStream instance. The throttling works by 031 * examining the number of bytes read from the underlying InputStream from the beginning, and sleep()ing for a time interval if the byte-transfer is found 032 * exceed the specified tolerable maximum. (Thus, while the read-rate might exceed the maximum for a short interval, the average tends towards the 033 * specified maximum, overall.) 034 * <p> 035 * To build an instance, see {@link Builder} 036 * </p> 037 * <p> 038 * Inspired by Apache HBase's class of the same name. 039 * </p> 040 * 041 * @see Builder 042 * @since 2.16.0 043 */ 044public final class ThrottledInputStream extends CountingInputStream { 045 046 // @formatter:off 047 /** 048 * Builds a new {@link ThrottledInputStream}. 049 * 050 * <h2>Using NIO</h2> 051 * <pre>{@code 052 * ThrottledInputStream in = ThrottledInputStream.builder() 053 * .setPath(Paths.get("MyFile.xml")) 054 * .setMaxBytesPerSecond(100_000) 055 * .get(); 056 * } 057 * </pre> 058 * <h2>Using IO</h2> 059 * <pre>{@code 060 * ThrottledInputStream in = ThrottledInputStream.builder() 061 * .setFile(new File("MyFile.xml")) 062 * .setMaxBytesPerSecond(100_000) 063 * .get(); 064 * } 065 * </pre> 066 * <pre>{@code 067 * ThrottledInputStream in = ThrottledInputStream.builder() 068 * .setInputStream(inputStream) 069 * .setMaxBytesPerSecond(100_000) 070 * .get(); 071 * } 072 * </pre> 073 * 074 * @see #get() 075 */ 076 // @formatter:on 077 public static class Builder extends AbstractStreamBuilder<ThrottledInputStream, Builder> { 078 079 /** 080 * Effectively not throttled. 081 */ 082 private long maxBytesPerSecond = Long.MAX_VALUE; 083 084 /** 085 * Builds a new {@link ThrottledInputStream}. 086 * <p> 087 * You must set input that supports {@link #getInputStream()}, otherwise, this method throws an exception. 088 * </p> 089 * <p> 090 * This builder use the following aspects: 091 * </p> 092 * <ul> 093 * <li>{@link #getInputStream()}</li> 094 * <li>maxBytesPerSecond</li> 095 * </ul> 096 * 097 * @return a new instance. 098 * @throws IllegalStateException if the {@code origin} is {@code null}. 099 * @throws UnsupportedOperationException if the origin cannot be converted to an {@link InputStream}. 100 * @throws IOException if an I/O error occurs. 101 * @see #getInputStream() 102 */ 103 @SuppressWarnings("resource") 104 @Override 105 public ThrottledInputStream get() throws IOException { 106 return new ThrottledInputStream(getInputStream(), maxBytesPerSecond); 107 } 108 109 /** 110 * Sets the maximum bytes per second. 111 * 112 * @param maxBytesPerSecond the maximum bytes per second. 113 */ 114 public void setMaxBytesPerSecond(final long maxBytesPerSecond) { 115 this.maxBytesPerSecond = maxBytesPerSecond; 116 } 117 118 } 119 120 /** 121 * Constructs a new {@link Builder}. 122 * 123 * @return a new {@link Builder}. 124 */ 125 public static Builder builder() { 126 return new Builder(); 127 } 128 129 static long toSleepMillis(final long bytesRead, final long maxBytesPerSec, final long elapsedMillis) { 130 assert elapsedMillis >= 0 : "The elapsed time should be greater or equal to zero"; 131 if (bytesRead <= 0 || maxBytesPerSec <= 0 || elapsedMillis == 0) { 132 return 0; 133 } 134 // We use this class to load the single source file, so the bytesRead 135 // and maxBytesPerSec aren't greater than Double.MAX_VALUE. 136 // We can get the precise sleep time by using the double value. 137 final long millis = (long) ((double) bytesRead / (double) maxBytesPerSec * 1000 - elapsedMillis); 138 if (millis <= 0) { 139 return 0; 140 } 141 return millis; 142 } 143 144 private final long maxBytesPerSecond; 145 private final long startTime = System.currentTimeMillis(); 146 private Duration totalSleepDuration = Duration.ZERO; 147 148 private ThrottledInputStream(final InputStream proxy, final long maxBytesPerSecond) { 149 super(proxy); 150 assert maxBytesPerSecond > 0 : "Bandwidth " + maxBytesPerSecond + " is invalid."; 151 this.maxBytesPerSecond = maxBytesPerSecond; 152 } 153 154 @Override 155 protected void beforeRead(final int n) throws IOException { 156 throttle(); 157 } 158 159 /** 160 * Gets the read-rate from this stream, since creation. Calculated as bytesRead/elapsedTimeSinceStart. 161 * 162 * @return Read rate, in bytes/sec. 163 */ 164 private long getBytesPerSecond() { 165 final long elapsedSeconds = (System.currentTimeMillis() - startTime) / 1000; 166 if (elapsedSeconds == 0) { 167 return getByteCount(); 168 } 169 return getByteCount() / elapsedSeconds; 170 } 171 172 private long getSleepMillis() { 173 return toSleepMillis(getByteCount(), maxBytesPerSecond, System.currentTimeMillis() - startTime); 174 } 175 176 /** 177 * Gets the total duration spent in sleep. 178 * 179 * @return Duration spent in sleep. 180 */ 181 Duration getTotalSleepDuration() { 182 return totalSleepDuration; 183 } 184 185 private void throttle() throws InterruptedIOException { 186 final long sleepMillis = getSleepMillis(); 187 if (sleepMillis > 0) { 188 totalSleepDuration = totalSleepDuration.plus(sleepMillis, ChronoUnit.MILLIS); 189 try { 190 TimeUnit.MILLISECONDS.sleep(sleepMillis); 191 } catch (final InterruptedException e) { 192 throw new InterruptedIOException("Thread aborted"); 193 } 194 } 195 } 196 197 /** {@inheritDoc} */ 198 @Override 199 public String toString() { 200 return "ThrottledInputStream[bytesRead=" + getByteCount() + ", maxBytesPerSec=" + maxBytesPerSecond + ", bytesPerSec=" + getBytesPerSecond() 201 + ", totalSleepDuration=" + totalSleepDuration + ']'; 202 } 203}