View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.hc.core5.http.impl.nio;
29  
30  import java.io.IOException;
31  import java.nio.ByteBuffer;
32  import java.nio.channels.WritableByteChannel;
33  import java.util.List;
34  
35  import org.apache.hc.core5.http.Header;
36  import org.apache.hc.core5.http.impl.BasicHttpTransportMetrics;
37  import org.apache.hc.core5.http.nio.ContentEncoder;
38  import org.apache.hc.core5.http.nio.SessionOutputBuffer;
39  import org.apache.hc.core5.util.Args;
40  import org.apache.hc.core5.util.Asserts;
41  
42  /**
43   * Abstract {@link ContentEncoder} that serves as a base for all content
44   * encoder implementations.
45   *
46   * @since 4.0
47   */
48  public abstract class AbstractContentEncoder implements ContentEncoder {
49  
50      final WritableByteChannel channel;
51      final SessionOutputBuffer buffer;
52      final BasicHttpTransportMetrics metrics;
53  
54      boolean completed;
55  
56      /**
57       * Creates an instance of this class.
58       *
59       * @param channel the destination channel.
60       * @param buffer the session output buffer that can be used to store
61       *    session data for intermediate processing.
62       * @param metrics Transport metrics of the underlying HTTP transport.
63       */
64      public AbstractContentEncoder(
65              final WritableByteChannel channel,
66              final SessionOutputBuffer buffer,
67              final BasicHttpTransportMetrics metrics) {
68          super();
69          Args.notNull(channel, "Channel");
70          Args.notNull(buffer, "Session input buffer");
71          Args.notNull(metrics, "Transport metrics");
72          this.buffer = buffer;
73          this.channel = channel;
74          this.metrics = metrics;
75      }
76  
77      protected WritableByteChannel channel() {
78          return this.channel;
79      }
80  
81      protected SessionOutputBuffer buffer() {
82          return this.buffer;
83      }
84  
85      protected BasicHttpTransportMetrics metrics() {
86          return this.metrics;
87      }
88  
89      @Override
90      public boolean isCompleted() {
91          return this.completed;
92      }
93  
94      @Override
95      public void complete(final List<? extends Header> trailers) throws IOException {
96          this.completed = true;
97      }
98  
99      public final void complete() throws IOException {
100         complete(null);
101     }
102 
103     protected void assertNotCompleted() {
104         Asserts.check(!this.completed, "Encoding process already completed");
105     }
106 
107     /**
108      * Flushes content of the session buffer to the channel and updates transport metrics.
109      *
110      * @return number of bytes written to the channel.
111      *
112      * @since 4.3
113      */
114     protected int flushToChannel() throws IOException {
115         if (!this.buffer.hasData()) {
116             return 0;
117         }
118         final int bytesWritten = this.buffer.flush(this.channel);
119         if (bytesWritten > 0) {
120             this.metrics.incrementBytesTransferred(bytesWritten);
121         }
122         return bytesWritten;
123     }
124 
125     /**
126      * Flushes content of the given buffer to the channel and updates transport metrics.
127      *
128      * @return number of bytes written to the channel.
129      *
130      * @since 4.3
131      */
132     protected int writeToChannel(final ByteBuffer src) throws IOException {
133         if (!src.hasRemaining()) {
134             return 0;
135         }
136         final int bytesWritten = this.channel.write(src);
137         if (bytesWritten > 0) {
138             this.metrics.incrementBytesTransferred(bytesWritten);
139         }
140         return bytesWritten;
141     }
142 
143     /**
144      * Transfers content of the source to the channel and updates transport metrics.
145      *
146      * @param src source.
147      * @param limit max number of bytes to transfer.
148      * @return number of bytes transferred.
149      *
150      * @since 4.3
151      */
152     protected int writeToChannel(final ByteBuffer src, final int limit) throws IOException {
153         return doWriteChunk(src, limit, true);
154     }
155 
156     /**
157      * Transfers content of the source to the buffer and updates transport metrics.
158      *
159      * @param src source.
160      * @param limit max number of bytes to transfer.
161      * @return number of bytes transferred.
162      *
163      * @since 4.3
164      */
165     protected int writeToBuffer(final ByteBuffer src, final int limit) throws IOException {
166         return doWriteChunk(src, limit, false);
167     }
168 
169     private int doWriteChunk(
170         final ByteBuffer src, final int chunk, final boolean direct) throws IOException {
171         final int bytesWritten;
172         if (src.remaining() > chunk) {
173             final int oldLimit = src.limit();
174             final int newLimit = oldLimit - (src.remaining() - chunk);
175             src.limit(newLimit);
176             bytesWritten = doWriteChunk(src, direct);
177             src.limit(oldLimit);
178         } else {
179             bytesWritten = doWriteChunk(src, direct);
180         }
181         return bytesWritten;
182     }
183 
184     private int doWriteChunk(final ByteBuffer src, final boolean direct) throws IOException {
185         if (direct) {
186             final int bytesWritten = this.channel.write(src);
187             if (bytesWritten > 0) {
188                 this.metrics.incrementBytesTransferred(bytesWritten);
189             }
190             return bytesWritten;
191         }
192         final int chunk = src.remaining();
193         this.buffer.write(src);
194         return chunk;
195     }
196 
197 }