View Javadoc
1   package org.apache.maven.surefire.api.util.internal;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import javax.annotation.Nonnegative;
23  import javax.annotation.Nonnull;
24  import java.io.BufferedInputStream;
25  import java.io.BufferedOutputStream;
26  import java.io.IOException;
27  import java.io.InputStream;
28  import java.io.OutputStream;
29  import java.nio.Buffer;
30  import java.nio.ByteBuffer;
31  import java.nio.channels.AsynchronousByteChannel;
32  import java.nio.channels.ClosedChannelException;
33  import java.nio.channels.ReadableByteChannel;
34  import java.nio.channels.WritableByteChannel;
35  import java.util.concurrent.ExecutionException;
36  import java.util.concurrent.atomic.AtomicLong;
37  
38  import static java.lang.Math.max;
39  import static java.util.Objects.requireNonNull;
40  
41  /**
42   * Converts {@link OutputStream}, {@link java.io.PrintStream}, {@link InputStream} to the Java {@link
43   * java.nio.channels.Channel}.
44   * <br>
45   * We do not use the Java's utility class {@link java.nio.channels.Channels} because the utility closes the stream as
46   * soon as the particular Thread is interrupted. If the frameworks (Zookeeper, Netty) interrupts the thread, the
47   * communication channels become closed and the JVM hangs. Therefore we developed internal utility which is safe for the
48   * Surefire.
49   *
50   * @since 3.0.0-M5
51   */
52  public final class Channels
53  {
54      private static final int BUFFER_SIZE = 64 * 1024;
55  
56      private Channels()
57      {
58          throw new IllegalStateException( "no instantiable constructor" );
59      }
60  
61      public static WritableByteChannel newChannel( @Nonnull OutputStream out )
62      {
63          return newChannel( out, 0 );
64      }
65  
66      public static WritableBufferedByteChannel newBufferedChannel( @Nonnull OutputStream out )
67      {
68          return newChannel( out, BUFFER_SIZE );
69      }
70  
71      public static ReadableByteChannel newChannel( @Nonnull final InputStream is )
72      {
73          return newChannel( is, 0 );
74      }
75  
76      public static ReadableByteChannel newBufferedChannel( @Nonnull final InputStream is )
77      {
78          return newChannel( is, BUFFER_SIZE );
79      }
80  
81      public static OutputStream newOutputStream( final AsynchronousByteChannel channel )
82      {
83          return new OutputStream()
84          {
85              @Override
86              public synchronized void write( byte[] b, int off, int len ) throws IOException
87              {
88                  if ( off < 0 || off > b.length || len < 0 || off + len > b.length || off + len < 0 )
89                  {
90                      throw new IndexOutOfBoundsException(
91                          "b.length = " + b.length + ", off = " + off + ", len = " + len );
92                  }
93                  else if ( len > 0 )
94                  {
95                      ByteBuffer bb = ByteBuffer.wrap( b, off, len );
96                      while ( bb.hasRemaining() )
97                      {
98                          try
99                          {
100                             channel.write( bb ).get();
101                         }
102                         catch ( ExecutionException e )
103                         {
104                             Throwable t = e.getCause();
105                             throw t instanceof IOException
106                                 ? (IOException) t
107                                 : new IOException( ( t == null ? e : t ).getLocalizedMessage(), t );
108                         }
109                         catch ( Exception e )
110                         {
111                             throw new IOException( e.getLocalizedMessage(), e );
112                         }
113                     }
114                 }
115             }
116 
117             @Override
118             public void write( int b ) throws IOException
119             {
120                 write( new byte[] {(byte) b} );
121             }
122 
123             @Override
124             public synchronized void close() throws IOException
125             {
126                 if ( channel.isOpen() )
127                 {
128                     try
129                     {
130                         channel.close();
131                     }
132                     catch ( ClosedChannelException e )
133                     {
134                         // closed channel anyway
135                     }
136                 }
137             }
138         };
139     }
140 
141     public static InputStream newInputStream( final AsynchronousByteChannel channel )
142     {
143         return new InputStream()
144         {
145             @Override
146             public synchronized int read( byte[] b, int off, int len ) throws IOException
147             {
148                 if ( off < 0 || off > b.length || len < 0 || off + len > b.length || off + len < 0 )
149                 {
150                     throw new IndexOutOfBoundsException(
151                         "b.length = " + b.length + ", off = " + off + ", len = " + len );
152                 }
153                 else if ( len == 0 )
154                 {
155                     return 0;
156                 }
157                 ByteBuffer bb = ByteBuffer.wrap( b, off, len );
158                 try
159                 {
160                     return channel.read( bb ).get();
161                 }
162                 catch ( ExecutionException e )
163                 {
164                     Throwable t = e.getCause();
165                     throw t instanceof IOException
166                         ? (IOException) t
167                         : new IOException( ( t == null ? e : t ).getLocalizedMessage(), t );
168                 }
169                 catch ( Exception e )
170                 {
171                     throw new IOException( e.getLocalizedMessage(), e );
172                 }
173             }
174 
175             @Override
176             public int read() throws IOException
177             {
178                 int count;
179                 byte[] b = new byte[1];
180                 do
181                 {
182                     count = read( b, 0, 1 );
183                 }
184                 while ( count == 0 );
185 
186                 return count == -1 ? -1 : b[0];
187             }
188 
189             @Override
190             public synchronized void close() throws IOException
191             {
192                 if ( channel.isOpen() )
193                 {
194                     try
195                     {
196                         channel.close();
197                     }
198                     catch ( ClosedChannelException e )
199                     {
200                         // closed channel anyway
201                     }
202                 }
203             }
204         };
205     }
206 
207     private static ReadableByteChannel newChannel( @Nonnull InputStream is, @Nonnegative int bufferSize )
208     {
209         requireNonNull( is, "the stream should not be null" );
210         final InputStream bis = bufferSize == 0 ? is : new BufferedInputStream( is, bufferSize );
211 
212         return new AbstractNoninterruptibleReadableChannel()
213         {
214             @Override
215             protected int readImpl( ByteBuffer src ) throws IOException
216             {
217                 int count = bis.read( src.array(), src.arrayOffset() + ( (Buffer) src ).position(), src.remaining() );
218                 if ( count > 0 )
219                 {
220                     ( (Buffer) src ).position( count + ( (Buffer) src ).position() );
221                 }
222                 return count;
223             }
224 
225             @Override
226             protected void closeImpl() throws IOException
227             {
228                 bis.close();
229             }
230         };
231     }
232 
233     private static WritableBufferedByteChannel newChannel( @Nonnull OutputStream out,
234                                                            @Nonnegative final int bufferSize )
235     {
236         requireNonNull( out, "the stream should not be null" );
237         final OutputStream bos = bufferSize == 0 ? out : new BufferedOutputStream( out, bufferSize );
238 
239         return new AbstractNoninterruptibleWritableChannel()
240         {
241             private final AtomicLong bytesCounter = new AtomicLong();
242 
243             @Override
244             public long countBufferOverflows()
245             {
246                 return bufferSize == 0 ? 0 : max( bytesCounter.get() - 1, 0 ) / bufferSize;
247             }
248 
249             @Override
250             protected void writeImpl( ByteBuffer src ) throws IOException
251             {
252                 int count = src.remaining();
253                 bos.write( src.array(), src.arrayOffset() + ( (Buffer) src ).position(), count );
254                 bytesCounter.getAndAdd( count );
255             }
256 
257             @Override
258             protected void closeImpl() throws IOException
259             {
260                 bos.close();
261             }
262 
263             @Override
264             protected void flushImpl() throws IOException
265             {
266                 bos.flush();
267             }
268         };
269     }
270 }