View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.hc.core5.http.nio.support.classic;
28  
29  import java.io.IOException;
30  import java.io.InputStream;
31  import java.io.OutputStream;
32  import java.nio.ByteBuffer;
33  import java.util.List;
34  import java.util.Locale;
35  import java.util.Set;
36  import java.util.concurrent.Executor;
37  import java.util.concurrent.atomic.AtomicBoolean;
38  import java.util.concurrent.atomic.AtomicReference;
39  
40  import org.apache.hc.core5.http.EntityDetails;
41  import org.apache.hc.core5.http.Header;
42  import org.apache.hc.core5.http.HttpException;
43  import org.apache.hc.core5.http.HttpHeaders;
44  import org.apache.hc.core5.http.HttpRequest;
45  import org.apache.hc.core5.http.HttpResponse;
46  import org.apache.hc.core5.http.HttpStatus;
47  import org.apache.hc.core5.http.ProtocolVersion;
48  import org.apache.hc.core5.http.message.BasicHttpResponse;
49  import org.apache.hc.core5.http.message.HttpResponseWrapper;
50  import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
51  import org.apache.hc.core5.http.nio.CapacityChannel;
52  import org.apache.hc.core5.http.nio.DataStreamChannel;
53  import org.apache.hc.core5.http.nio.ResponseChannel;
54  import org.apache.hc.core5.http.protocol.HttpContext;
55  import org.apache.hc.core5.io.Closer;
56  import org.apache.hc.core5.util.Args;
57  import org.apache.hc.core5.util.Asserts;
58  
59  /**
60   * {@link AsyncServerExchangeHandler} implementation that acts as a compatibility
61   * layer for classic {@link InputStream} / {@link OutputStream} based interfaces.
62   * Blocking input / output processing is executed through an {@link Executor}.
63   *
64   * @since 5.0
65   */
66  public abstract class AbstractClassicServerExchangeHandler implements AsyncServerExchangeHandler {
67  
68      private enum State { IDLE, ACTIVE, COMPLETED }
69  
70      private final int initialBufferSize;
71      private final Executor executor;
72      private final AtomicReference<State> state;
73      private final AtomicReference<Exception> exception;
74  
75      private volatile SharedInputBuffer inputBuffer;
76      private volatile SharedOutputBuffer outputBuffer;
77  
78      public AbstractClassicServerExchangeHandler(final int initialBufferSize, final Executor executor) {
79          this.initialBufferSize = Args.positive(initialBufferSize, "Initial buffer size");
80          this.executor = Args.notNull(executor, "Executor");
81          this.exception = new AtomicReference<>(null);
82          this.state = new AtomicReference<>(State.IDLE);
83      }
84  
85      /**
86       * Handles an incoming request optionally reading its entity content form the given input stream
87       * and generates a response optionally writing out its entity content into the given output stream.
88       *
89       * @param request the incoming request
90       * @param requestStream the request stream if the request encloses an entity,
91       *                      {@code null} otherwise.
92       * @param response the outgoing response.
93       * @param responseStream the response entity output stream.
94       * @param context the actual execution context.
95       */
96      protected abstract void handle(
97              HttpRequest request, InputStream requestStream,
98              HttpResponse response, OutputStream responseStream,
99              HttpContext context) throws IOException, HttpException;
100 
101     public Exception getException() {
102         return exception.get();
103     }
104 
105     @Override
106     public final void handleRequest(
107             final HttpRequest request,
108             final EntityDetails entityDetails,
109             final ResponseChannel responseChannel,
110             final HttpContext context) throws HttpException, IOException {
111         final AtomicBoolean responseCommitted = new AtomicBoolean(false);
112 
113         final HttpResponse response = new BasicHttpResponse(HttpStatus.SC_OK);
114         final HttpResponse responseWrapper = new HttpResponseWrapper(response){
115 
116             private void ensureNotCommitted() {
117                 Asserts.check(!responseCommitted.get(), "Response already committed");
118             }
119 
120             @Override
121             public void addHeader(final String name, final Object value) {
122                 ensureNotCommitted();
123                 super.addHeader(name, value);
124             }
125 
126             @Override
127             public void setHeader(final String name, final Object value) {
128                 ensureNotCommitted();
129                 super.setHeader(name, value);
130             }
131 
132             @Override
133             public void setVersion(final ProtocolVersion version) {
134                 ensureNotCommitted();
135                 super.setVersion(version);
136             }
137 
138             @Override
139             public void setCode(final int code) {
140                 ensureNotCommitted();
141                 super.setCode(code);
142             }
143 
144             @Override
145             public void setReasonPhrase(final String reason) {
146                 ensureNotCommitted();
147                 super.setReasonPhrase(reason);
148             }
149 
150             @Override
151             public void setLocale(final Locale locale) {
152                 ensureNotCommitted();
153                 super.setLocale(locale);
154             }
155 
156         };
157 
158         final InputStream inputStream;
159         if (entityDetails != null) {
160             inputBuffer = new SharedInputBuffer(initialBufferSize);
161             inputStream = new ContentInputStream(inputBuffer);
162         } else {
163             inputStream = null;
164         }
165         outputBuffer = new SharedOutputBuffer(initialBufferSize);
166 
167         final OutputStream outputStream = new ContentOutputStream(outputBuffer) {
168 
169             private void triggerResponse() throws IOException {
170                 try {
171                     if (responseCommitted.compareAndSet(false, true)) {
172                         responseChannel.sendResponse(response, new EntityDetails() {
173 
174                             @Override
175                             public long getContentLength() {
176                                 return -1;
177                             }
178 
179                             @Override
180                             public String getContentType() {
181                                 final Header h = response.getFirstHeader(HttpHeaders.CONTENT_TYPE);
182                                 return h != null ? h.getValue() : null;
183                             }
184 
185                             @Override
186                             public String getContentEncoding() {
187                                 final Header h = response.getFirstHeader(HttpHeaders.CONTENT_ENCODING);
188                                 return h != null ? h.getValue() : null;
189                             }
190 
191                             @Override
192                             public boolean isChunked() {
193                                 return false;
194                             }
195 
196                             @Override
197                             public Set<String> getTrailerNames() {
198                                 return null;
199                             }
200 
201                         }, context);
202                     }
203                 } catch (final HttpException ex) {
204                     throw new IOException(ex.getMessage(), ex);
205                 }
206             }
207 
208             @Override
209             public void close() throws IOException {
210                 triggerResponse();
211                 super.close();
212             }
213 
214             @Override
215             public void write(final byte[] b, final int off, final int len) throws IOException {
216                 triggerResponse();
217                 super.write(b, off, len);
218             }
219 
220             @Override
221             public void write(final byte[] b) throws IOException {
222                 triggerResponse();
223                 super.write(b);
224             }
225 
226             @Override
227             public void write(final int b) throws IOException {
228                 triggerResponse();
229                 super.write(b);
230             }
231 
232         };
233 
234         if (state.compareAndSet(State.IDLE, State.ACTIVE)) {
235             executor.execute(new Runnable() {
236 
237                 @Override
238                 public void run() {
239                     try {
240                         handle(request, inputStream, responseWrapper, outputStream, context);
241                         Closer.close(inputStream);
242                         outputStream.close();
243                     } catch (final Exception ex) {
244                         exception.compareAndSet(null, ex);
245                         if (inputBuffer != null) {
246                             inputBuffer.abort();
247                         }
248                         outputBuffer.abort();
249                     } finally {
250                         state.set(State.COMPLETED);
251                     }
252                 }
253 
254             });
255         }
256     }
257 
258     @Override
259     public final void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
260         if (inputBuffer != null) {
261             inputBuffer.updateCapacity(capacityChannel);
262         }
263     }
264 
265     @Override
266     public final void consume(final ByteBuffer src) throws IOException {
267         Asserts.notNull(inputBuffer, "Input buffer");
268         inputBuffer.fill(src);
269     }
270 
271     @Override
272     public final void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
273         Asserts.notNull(inputBuffer, "Input buffer");
274         inputBuffer.markEndStream();
275     }
276 
277     @Override
278     public final int available() {
279         Asserts.notNull(outputBuffer, "Output buffer");
280         return outputBuffer.length();
281     }
282 
283     @Override
284     public final void produce(final DataStreamChannel channel) throws IOException {
285         Asserts.notNull(outputBuffer, "Output buffer");
286         outputBuffer.flush(channel);
287     }
288 
289     @Override
290     public final void failed(final Exception cause) {
291         exception.compareAndSet(null, cause);
292         releaseResources();
293     }
294 
295     @Override
296     public void releaseResources() {
297     }
298 
299 }