/* * ==================================================================== * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * ==================================================================== * * This software consists of voluntary contributions made by many * individuals on behalf of the Apache Software Foundation. For more * information on the Apache Software Foundation, please see * . * */ package org.apache.hc.core5.http.impl.io.bootstrap; import java.io.IOException; import java.io.InputStream; import java.io.InterruptedIOException; import java.io.OutputStream; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.Socket; import java.net.SocketAddress; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import javax.net.ssl.SSLSocketFactory; import org.apache.hc.core5.function.Callback; import org.apache.hc.core5.http.ClassicHttpRequest; import org.apache.hc.core5.http.ClassicHttpResponse; import org.apache.hc.core5.http.ConnectionClosedException; import org.apache.hc.core5.http.ConnectionRequestTimeoutException; import org.apache.hc.core5.http.HttpEntity; import org.apache.hc.core5.http.HttpException; import org.apache.hc.core5.http.HttpHost; import org.apache.hc.core5.http.config.SocketConfig; import org.apache.hc.core5.http.impl.PoolEntryHolder; import org.apache.hc.core5.http.impl.io.DefaultBHttpClientConnectionFactory; import org.apache.hc.core5.http.impl.io.HttpRequestExecutor; import org.apache.hc.core5.http.io.EofSensorInputStream; import org.apache.hc.core5.http.io.EofSensorWatcher; import org.apache.hc.core5.http.io.HttpClientConnection; import org.apache.hc.core5.http.io.HttpConnectionFactory; import org.apache.hc.core5.http.io.ResponseHandler; import org.apache.hc.core5.http.io.entity.EntityUtils; import org.apache.hc.core5.http.io.entity.HttpEntityWrapper; import org.apache.hc.core5.http.protocol.HttpContext; import org.apache.hc.core5.http.protocol.HttpProcessor; import org.apache.hc.core5.pool.ConnPoolControl; import org.apache.hc.core5.pool.ControlledConnPool; import org.apache.hc.core5.pool.PoolEntry; import org.apache.hc.core5.util.Args; /** * @since 5.0 */ public class HttpRequester implements AutoCloseable { private final HttpRequestExecutor requestExecutor; private final HttpProcessor httpProcessor; private final ControlledConnPool connPool; private final HttpConnectionFactory connectFactory; private final SSLSocketFactory sslSocketFactory; public HttpRequester( final HttpRequestExecutor requestExecutor, final HttpProcessor httpProcessor, final ControlledConnPool connPool, final HttpConnectionFactory connectFactory, final SSLSocketFactory sslSocketFactory) { this.requestExecutor = Args.notNull(requestExecutor, "Request executor"); this.httpProcessor = Args.notNull(httpProcessor, "HTTP processor"); this.connPool = Args.notNull(connPool, "Connection pool"); this.connectFactory = connectFactory != null ? connectFactory : DefaultBHttpClientConnectionFactory.INSTANCE; this.sslSocketFactory = sslSocketFactory; } public ClassicHttpResponse execute( final HttpClientConnection connection, final ClassicHttpRequest request, final HttpContext context) throws HttpException, IOException { Args.notNull(connection, "HTTP connection"); Args.notNull(request, "HTTP request"); Args.notNull(context, "HTTP context"); if (!connection.isOpen()) { throw new ConnectionClosedException("Connection is closed"); } requestExecutor.preProcess(request, httpProcessor, context); final ClassicHttpResponse response = requestExecutor.execute(request, connection, context); requestExecutor.postProcess(response, httpProcessor, context); return response; } public boolean keepAlive( final HttpClientConnection connection, final ClassicHttpRequest request, final ClassicHttpResponse response, final HttpContext context) throws IOException { final boolean keepAlive = requestExecutor.keepAlive(request, response, connection, context); if (!keepAlive) { connection.close(); } return keepAlive; } public T execute( final HttpClientConnection connection, final ClassicHttpRequest request, final HttpContext context, final ResponseHandler responseHandler) throws HttpException, IOException { final ClassicHttpResponse response = execute(connection, request, context); try { final T result = responseHandler.handleResponse(response); EntityUtils.consume(response.getEntity()); final boolean keepAlive = requestExecutor.keepAlive(request, response, connection, context); if (!keepAlive) { connection.close(); } return result; } catch (HttpException | IOException | RuntimeException ex) { connection.shutdown(); throw ex; } finally { response.close(); } } private Socket createSocket(final HttpHost host) throws IOException { final String scheme = host.getSchemeName(); if ("https".equalsIgnoreCase(scheme)) { return (sslSocketFactory != null ? sslSocketFactory : SSLSocketFactory.getDefault()).createSocket(); } else { return new Socket(); } } private SocketAddress toEndpoint(final HttpHost host) { int port = host.getPort(); if (port < 0) { final String scheme = host.getSchemeName(); if ("http".equalsIgnoreCase(scheme)) { port = 80; } else if ("https".equalsIgnoreCase(scheme)) { port = 443; } } final InetAddress address = host.getAddress(); if (address != null) { return new InetSocketAddress(address, port); } else { return new InetSocketAddress(host.getHostName(), port); } } public ClassicHttpResponse execute( final HttpHost targetHost, final ClassicHttpRequest request, final SocketConfig socketConfig, final HttpContext context) throws HttpException, IOException { Args.notNull(targetHost, "HTTP host"); Args.notNull(request, "HTTP request"); Args.notNull(context, "HTTP context"); final Future> leaseFuture = connPool.lease(targetHost, null, null); final PoolEntry poolEntry; try { poolEntry = leaseFuture.get(socketConfig.getConnectTimeout(), TimeUnit.MILLISECONDS); } catch (InterruptedException ex) { throw new InterruptedIOException(ex.getMessage()); } catch (ExecutionException ex) { throw new HttpException("Unexpected failure leasing connection", ex); } catch (TimeoutException ex) { throw new ConnectionRequestTimeoutException("Connection request timeout"); } final PoolEntryHolder connectionHolder = new PoolEntryHolder<>( connPool, poolEntry, new Callback() { @Override public void execute(final HttpClientConnection conn) { try { conn.shutdown(); } catch (IOException ignore) { } } }); try { HttpClientConnection connection = poolEntry.getConnection(); if (connection == null) { final Socket socket = createSocket(targetHost); connection = connectFactory.createConnection(socket); poolEntry.assignConnection(connection); socket.connect(toEndpoint(targetHost), socketConfig.getConnectTimeout()); } final ClassicHttpResponse response = execute(connection, request, context); final HttpEntity entity = response.getEntity(); if (entity != null) { response.setEntity(new HttpEntityWrapper(entity) { private void releaseConnection() throws IOException { if (connectionHolder.isReleased()) { return; } try { final HttpClientConnection localConn = poolEntry.getConnection(); if (localConn != null) { if (requestExecutor.keepAlive(request, response, localConn, context)) { if (super.isStreaming()) { final InputStream content = super.getContent(); if (content != null) { content.close(); } } connectionHolder.markReusable(); } } } finally { connectionHolder.releaseConnection(); } } private void abortConnection() { connectionHolder.releaseConnection(); } @Override public boolean isStreaming() { return true; } @Override public InputStream getContent() throws IOException { return new EofSensorInputStream(super.getContent(), new EofSensorWatcher() { @Override public boolean eofDetected(final InputStream wrapped) throws IOException { releaseConnection(); return false; } @Override public boolean streamClosed(final InputStream wrapped) throws IOException { releaseConnection(); return false; } @Override public boolean streamAbort(final InputStream wrapped) throws IOException { abortConnection(); return false; } }); } @Override public void writeTo(final OutputStream outstream) throws IOException { try { if (outstream != null) { super.writeTo(outstream); } close(); } catch (final IOException | RuntimeException ex) { abortConnection(); } } @Override public void close() throws IOException { releaseConnection(); } }); } return response; } catch (HttpException | IOException | RuntimeException ex) { connectionHolder.abortConnection(); throw ex; } } public T execute( final HttpHost targetHost, final ClassicHttpRequest request, final SocketConfig socketConfig, final HttpContext context, final ResponseHandler responseHandler) throws HttpException, IOException { final ClassicHttpResponse response = execute(targetHost, request, socketConfig, context); try { final T result = responseHandler.handleResponse(response); EntityUtils.consume(response.getEntity()); return result; } finally { response.close(); } } public ConnPoolControl getConnPoolControl() { return connPool; } public void shutdown() { connPool.shutdown(); } @Override public void close() throws Exception { connPool.close(); } }