View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.hc.core5.testing.nio;
28  
29  import java.io.IOException;
30  import java.nio.ByteBuffer;
31  import java.util.List;
32  import java.util.concurrent.atomic.AtomicInteger;
33  
34  import org.apache.hc.core5.http.EntityDetails;
35  import org.apache.hc.core5.http.Header;
36  import org.apache.hc.core5.http.HttpException;
37  import org.apache.hc.core5.http.HttpRequest;
38  import org.apache.hc.core5.http.HttpResponse;
39  import org.apache.hc.core5.http.HttpStatus;
40  import org.apache.hc.core5.http.message.BasicHttpResponse;
41  import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
42  import org.apache.hc.core5.http.nio.CapacityChannel;
43  import org.apache.hc.core5.http.nio.DataStreamChannel;
44  import org.apache.hc.core5.http.nio.ResponseChannel;
45  import org.apache.hc.core5.http.protocol.HttpContext;
46  
47  public class EchoHandler implements AsyncServerExchangeHandler {
48  
49      private final static int MIN_CHUNK = 4096;
50  
51      private final int initBufferSize;
52      private final AtomicInteger capacityIncrement;
53      private volatile ByteBuffer buffer;
54      private volatile CapacityChannel inputCapacityChannel;
55      private volatile DataStreamChannel outputDataChannel;
56      private volatile boolean endStream;
57  
58      public EchoHandler(final int bufferSize) {
59          this.initBufferSize = bufferSize;
60          this.capacityIncrement = new AtomicInteger();
61          this.buffer = ByteBuffer.allocate(bufferSize);
62      }
63  
64      private void ensureCapacity(final int chunk) {
65          if (buffer.remaining() < chunk) {
66              final ByteBuffer oldBuffer = buffer;
67              oldBuffer.flip();
68              final int newCapacity = oldBuffer.remaining() + Math.max(chunk, MIN_CHUNK);
69              buffer = ByteBuffer.allocate(newCapacity);
70              buffer.put(oldBuffer);
71          }
72      }
73  
74      private void signalCapacity() throws IOException {
75          if (inputCapacityChannel != null) {
76              if (capacityIncrement.get() > MIN_CHUNK) {
77                  final int n = capacityIncrement.getAndSet(0);
78                  if (n > 0) {
79                      inputCapacityChannel.update(n);
80                  }
81              }
82          }
83      }
84  
85      @Override
86      public void handleRequest(
87              final HttpRequest request,
88              final EntityDetails entityDetails,
89              final ResponseChannel responseChannel,
90              final HttpContext context) throws HttpException, IOException {
91          final HttpResponse response = new BasicHttpResponse(HttpStatus.SC_OK);
92          responseChannel.sendResponse(response, entityDetails, context);
93      }
94  
95      @Override
96      public void consume(final ByteBuffer src) throws IOException {
97          if (buffer.position() == 0) {
98              if (outputDataChannel != null) {
99                  final int bytesWritten = outputDataChannel.write(src);
100                 if (bytesWritten > 0) {
101                     capacityIncrement.addAndGet(bytesWritten);
102                 }
103             }
104         }
105         if (src.hasRemaining()) {
106             ensureCapacity(src.remaining());
107             buffer.put(src);
108             if (outputDataChannel != null) {
109                 outputDataChannel.requestOutput();
110             }
111         }
112         signalCapacity();
113     }
114 
115     @Override
116     public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
117         if (buffer.hasRemaining()) {
118             final int n = Math.min(initBufferSize, buffer.remaining()) + capacityIncrement.getAndSet(0);
119             capacityChannel.update(n);
120         }
121         inputCapacityChannel = capacityChannel;
122     }
123 
124     @Override
125     public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
126         endStream = true;
127         inputCapacityChannel = null;
128         if (buffer.position() == 0) {
129             if (outputDataChannel != null) {
130                 outputDataChannel.endStream();
131             }
132         } else {
133             if (outputDataChannel != null) {
134                 outputDataChannel.requestOutput();
135             }
136         }
137     }
138 
139     @Override
140     public int available() {
141         return buffer.position();
142     }
143 
144     @Override
145     public void produce(final DataStreamChannel channel) throws IOException {
146         outputDataChannel = channel;
147         buffer.flip();
148         if (buffer.hasRemaining()) {
149             final int bytesWritten = channel.write(buffer);
150             if (bytesWritten > 0) {
151                 capacityIncrement.addAndGet(bytesWritten);
152             }
153         }
154         buffer.compact();
155         if (buffer.position() == 0 && endStream) {
156             channel.endStream();
157         }
158         signalCapacity();
159     }
160 
161     @Override
162     public void failed(final Exception cause) {
163     }
164 
165     @Override
166     public void releaseResources() {
167     }
168 
169 }