View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.hc.core5.http.nio.support;
28  
29  import java.io.IOException;
30  import java.nio.ByteBuffer;
31  import java.util.List;
32  import java.util.concurrent.atomic.AtomicReference;
33  
34  import org.apache.hc.core5.concurrent.FutureCallback;
35  import org.apache.hc.core5.http.EntityDetails;
36  import org.apache.hc.core5.http.Header;
37  import org.apache.hc.core5.http.HttpException;
38  import org.apache.hc.core5.http.HttpRequest;
39  import org.apache.hc.core5.http.HttpResponse;
40  import org.apache.hc.core5.http.HttpStatus;
41  import org.apache.hc.core5.http.nio.AsyncPushProducer;
42  import org.apache.hc.core5.http.nio.AsyncRequestConsumer;
43  import org.apache.hc.core5.http.nio.AsyncResponseProducer;
44  import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
45  import org.apache.hc.core5.http.nio.AsyncServerRequestHandler;
46  import org.apache.hc.core5.http.nio.CapacityChannel;
47  import org.apache.hc.core5.http.nio.DataStreamChannel;
48  import org.apache.hc.core5.http.nio.ResponseChannel;
49  import org.apache.hc.core5.http.protocol.HttpContext;
50  import org.apache.hc.core5.util.Asserts;
51  
52  /**
53   * Abstract server side message exchange handler.
54   *
55   * @since 5.0
56   */
57  public abstract class AbstractServerExchangeHandler<T> implements AsyncServerExchangeHandler {
58  
59      private final AtomicReference<AsyncRequestConsumer<T>> requestConsumerRef;
60      private final AtomicReference<AsyncResponseProducer> responseProducerRef;
61  
62      public AbstractServerExchangeHandler() {
63          this.requestConsumerRef = new AtomicReference<>(null);
64          this.responseProducerRef = new AtomicReference<>(null);
65      }
66  
67      /**
68       * Triggered to supply a request consumer to process the incoming request.
69       * @param request the request message.
70       * @param entityDetails the request entity details.
71       * @param context the actual execution context.
72       * @return the request consumer.
73       */
74      protected abstract AsyncRequestConsumer<T> supplyConsumer(
75              HttpRequest request,
76              EntityDetails entityDetails,
77              HttpContext context) throws HttpException;
78  
79      /**
80       * Triggered to handles the request object produced by the {@link AsyncRequestConsumer} returned
81       * from the {@link #supplyConsumer(HttpRequest, EntityDetails, HttpContext)} method. The handler
82       * can choose to send response messages immediately inside the call or asynchronously
83       * at some later point.
84       *
85       * @param requestMessage the request message.
86       * @param responseTrigger the response trigger.
87       * @param context the actual execution context.
88       */
89      protected abstract void handle(
90              T requestMessage,
91              AsyncServerRequestHandler.ResponseTrigger responseTrigger,
92              HttpContext context) throws HttpException, IOException;
93  
94      @Override
95      public final void handleRequest(
96              final HttpRequest request,
97              final EntityDetails entityDetails,
98              final ResponseChannel responseChannel,
99              final HttpContext context) throws HttpException, IOException {
100 
101         final AsyncRequestConsumer<T> requestConsumer = supplyConsumer(request, entityDetails, context);
102         if (requestConsumer == null) {
103             throw new HttpException("Unable to handle request");
104         }
105         requestConsumerRef.set(requestConsumer);
106         final AsyncServerRequestHandler.ResponseTrigger responseTrigger = new AsyncServerRequestHandler.ResponseTrigger() {
107 
108             @Override
109             public void sendInformation(final HttpResponse response, final HttpContext httpContext) throws HttpException, IOException {
110                 responseChannel.sendInformation(response, httpContext);
111             }
112 
113             @Override
114             public void submitResponse(
115                     final AsyncResponseProducer producer, final HttpContext httpContext) throws HttpException, IOException {
116                 if (responseProducerRef.compareAndSet(null, producer)) {
117                     producer.sendResponse(responseChannel, httpContext);
118                 }
119             }
120 
121             @Override
122             public void pushPromise(
123                     final HttpRequest promise, final HttpContext httpContext, final AsyncPushProducer pushProducer) throws HttpException, IOException {
124                 responseChannel.pushPromise(promise, pushProducer, httpContext);
125             }
126 
127             @Override
128             public String toString() {
129                 return "Response trigger: " + responseChannel;
130             }
131 
132         };
133         requestConsumer.consumeRequest(request, entityDetails, context, new FutureCallback<T>() {
134 
135             @Override
136             public void completed(final T result) {
137                 try {
138                     handle(result, responseTrigger, context);
139                 } catch (final HttpException ex) {
140                     try {
141                         responseTrigger.submitResponse(
142                                 AsyncResponseBuilder.create(HttpStatus.SC_INTERNAL_SERVER_ERROR)
143                                         .setEntity(ex.getMessage())
144                                         .build(),
145                                 context);
146                     } catch (final HttpException | IOException ex2) {
147                         failed(ex2);
148                     }
149                 } catch (final IOException ex) {
150                     failed(ex);
151                 }
152             }
153 
154             @Override
155             public void failed(final Exception ex) {
156                 AbstractServerExchangeHandler.this.failed(ex);
157             }
158 
159             @Override
160             public void cancelled() {
161                 releaseResources();
162             }
163 
164         });
165 
166     }
167 
168     @Override
169     public final void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
170         final AsyncRequestConsumer<T> requestConsumer = requestConsumerRef.get();
171         Asserts.notNull(requestConsumer, "Data consumer");
172         requestConsumer.updateCapacity(capacityChannel);
173     }
174 
175     @Override
176     public final void consume(final ByteBuffer src) throws IOException {
177         final AsyncRequestConsumer<T> requestConsumer = requestConsumerRef.get();
178         Asserts.notNull(requestConsumer, "Data consumer");
179         requestConsumer.consume(src);
180     }
181 
182     @Override
183     public final void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
184         final AsyncRequestConsumer<T> requestConsumer = requestConsumerRef.get();
185         Asserts.notNull(requestConsumer, "Data consumer");
186         requestConsumer.streamEnd(trailers);
187     }
188 
189     @Override
190     public final int available() {
191         final AsyncResponseProducer dataProducer = responseProducerRef.get();
192         return dataProducer != null ? dataProducer.available() : 0;
193     }
194 
195     @Override
196     public final void produce(final DataStreamChannel channel) throws IOException {
197         final AsyncResponseProducer dataProducer = responseProducerRef.get();
198         Asserts.notNull(dataProducer, "Data producer");
199         dataProducer.produce(channel);
200     }
201 
202     @Override
203     public final void failed(final Exception cause) {
204         try {
205             final AsyncRequestConsumer<T> requestConsumer = requestConsumerRef.get();
206             if (requestConsumer != null) {
207                 requestConsumer.failed(cause);
208             }
209             final AsyncResponseProducer dataProducer = responseProducerRef.get();
210             if (dataProducer != null) {
211                 dataProducer.failed(cause);
212             }
213         } finally {
214             releaseResources();
215         }
216     }
217 
218     @Override
219     public final void releaseResources() {
220         final AsyncRequestConsumer<T> requestConsumer = requestConsumerRef.getAndSet(null);
221         if (requestConsumer != null) {
222             requestConsumer.releaseResources();
223         }
224         final AsyncResponseProducer dataProducer = responseProducerRef.getAndSet(null);
225         if (dataProducer != null) {
226             dataProducer.releaseResources();
227         }
228     }
229 
230 }