View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing,
13   * software distributed under the License is distributed on an
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   * KIND, either express or implied.  See the License for the
16   * specific language governing permissions and limitations
17   * under the License.
18   */
19  package org.eclipse.aether.transport.jetty;
20  
21  import java.io.EOFException;
22  import java.io.IOException;
23  import java.nio.ByteBuffer;
24  import java.nio.channels.Channels;
25  import java.nio.channels.ReadableByteChannel;
26  import java.nio.file.Files;
27  import java.nio.file.StandardOpenOption;
28  
29  import org.eclipse.aether.spi.connector.transport.PutTask;
30  import org.eclipse.jetty.client.util.AbstractRequestContent;
31  import org.eclipse.jetty.io.ByteBufferPool;
32  import org.eclipse.jetty.util.BufferUtil;
33  import org.eclipse.jetty.util.Callback;
34  import org.eclipse.jetty.util.IO;
35  
36  class PutTaskRequestContent extends AbstractRequestContent {
37      private final PutTask putTask;
38      private final int bufferSize;
39      private ByteBufferPool bufferPool;
40      private boolean useDirectByteBuffers = true;
41  
42      @SuppressWarnings("checkstyle:MagicNumber")
43      PutTaskRequestContent(PutTask putTask) {
44          this(putTask, 4096);
45      }
46  
47      PutTaskRequestContent(PutTask putTask, int bufferSize) {
48          super("application/octet-stream");
49          this.putTask = putTask;
50          this.bufferSize = bufferSize;
51      }
52  
53      @Override
54      public long getLength() {
55          return putTask.getDataLength();
56      }
57  
58      @Override
59      public boolean isReproducible() {
60          return true;
61      }
62  
63      public ByteBufferPool getByteBufferPool() {
64          return bufferPool;
65      }
66  
67      public void setByteBufferPool(ByteBufferPool byteBufferPool) {
68          this.bufferPool = byteBufferPool;
69      }
70  
71      public boolean isUseDirectByteBuffers() {
72          return useDirectByteBuffers;
73      }
74  
75      public void setUseDirectByteBuffers(boolean useDirectByteBuffers) {
76          this.useDirectByteBuffers = useDirectByteBuffers;
77      }
78  
79      @Override
80      protected Subscription newSubscription(Consumer consumer, boolean emitInitialContent) {
81          return new SubscriptionImpl(consumer, emitInitialContent);
82      }
83  
84      private class SubscriptionImpl extends AbstractSubscription {
85          private ReadableByteChannel channel;
86          private long readTotal;
87  
88          private SubscriptionImpl(Consumer consumer, boolean emitInitialContent) {
89              super(consumer, emitInitialContent);
90          }
91  
92          @Override
93          protected boolean produceContent(Producer producer) throws IOException {
94              ByteBuffer buffer;
95              boolean last;
96              if (channel == null) {
97                  if (putTask.getDataPath() != null) {
98                      channel = Files.newByteChannel(putTask.getDataPath(), StandardOpenOption.READ);
99                  } else {
100                     channel = Channels.newChannel(putTask.newInputStream());
101                 }
102             }
103 
104             buffer = bufferPool == null
105                     ? BufferUtil.allocate(bufferSize, isUseDirectByteBuffers())
106                     : bufferPool.acquire(bufferSize, isUseDirectByteBuffers());
107 
108             BufferUtil.clearToFill(buffer);
109             int read = channel.read(buffer);
110             BufferUtil.flipToFlush(buffer, 0);
111             if (!channel.isOpen() && read < 0) {
112                 throw new EOFException("EOF reached for " + putTask);
113             }
114             if (read > 0) {
115                 readTotal += read;
116             }
117             last = readTotal == getLength();
118             if (last) {
119                 IO.close(channel);
120             }
121             return producer.produce(buffer, last, Callback.from(() -> release(buffer)));
122         }
123 
124         private void release(ByteBuffer buffer) {
125             if (bufferPool != null) {
126                 bufferPool.release(buffer);
127             }
128         }
129 
130         @Override
131         public void fail(Throwable failure) {
132             super.fail(failure);
133             IO.close(channel);
134         }
135     }
136 }