View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing,
13   * software distributed under the License is distributed on an
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   * KIND, either express or implied.  See the License for the
16   * specific language governing permissions and limitations
17   * under the License.
18   */
19  package org.eclipse.aether.transport.wagon;
20  
21  import java.io.File;
22  import java.io.IOException;
23  import java.io.InputStream;
24  import java.io.OutputStream;
25  import java.lang.reflect.InvocationTargetException;
26  import java.lang.reflect.Method;
27  import java.nio.file.Files;
28  import java.nio.file.StandardCopyOption;
29  import java.util.Locale;
30  import java.util.Map;
31  import java.util.Objects;
32  import java.util.Properties;
33  import java.util.Queue;
34  import java.util.concurrent.ConcurrentLinkedQueue;
35  import java.util.concurrent.atomic.AtomicBoolean;
36  
37  import org.apache.maven.wagon.ConnectionException;
38  import org.apache.maven.wagon.ResourceDoesNotExistException;
39  import org.apache.maven.wagon.StreamingWagon;
40  import org.apache.maven.wagon.Wagon;
41  import org.apache.maven.wagon.WagonException;
42  import org.apache.maven.wagon.authentication.AuthenticationInfo;
43  import org.apache.maven.wagon.proxy.ProxyInfo;
44  import org.apache.maven.wagon.proxy.ProxyInfoProvider;
45  import org.apache.maven.wagon.repository.Repository;
46  import org.apache.maven.wagon.repository.RepositoryPermissions;
47  import org.eclipse.aether.ConfigurationProperties;
48  import org.eclipse.aether.RepositorySystemSession;
49  import org.eclipse.aether.repository.AuthenticationContext;
50  import org.eclipse.aether.repository.Proxy;
51  import org.eclipse.aether.repository.RemoteRepository;
52  import org.eclipse.aether.spi.connector.transport.GetTask;
53  import org.eclipse.aether.spi.connector.transport.PeekTask;
54  import org.eclipse.aether.spi.connector.transport.PutTask;
55  import org.eclipse.aether.spi.connector.transport.TransportTask;
56  import org.eclipse.aether.spi.connector.transport.Transporter;
57  import org.eclipse.aether.transfer.NoTransporterException;
58  import org.eclipse.aether.util.ConfigUtils;
59  import org.eclipse.aether.util.FileUtils;
60  import org.slf4j.Logger;
61  import org.slf4j.LoggerFactory;
62  
63  /**
64   * A transporter using Maven Wagon.
65   */
66  final class WagonTransporter implements Transporter {
67  
68      private static final String CONFIG_PROP_CONFIG = "aether.connector.wagon.config";
69  
70      private static final String CONFIG_PROP_FILE_MODE = "aether.connector.perms.fileMode";
71  
72      private static final String CONFIG_PROP_DIR_MODE = "aether.connector.perms.dirMode";
73  
74      private static final String CONFIG_PROP_GROUP = "aether.connector.perms.group";
75  
76      private static final Logger LOGGER = LoggerFactory.getLogger(WagonTransporter.class);
77  
78      private final RemoteRepository repository;
79  
80      private final RepositorySystemSession session;
81  
82      private final AuthenticationContext repoAuthContext;
83  
84      private final AuthenticationContext proxyAuthContext;
85  
86      private final WagonProvider wagonProvider;
87  
88      private final WagonConfigurator wagonConfigurator;
89  
90      private final String wagonHint;
91  
92      private final Repository wagonRepo;
93  
94      private final AuthenticationInfo wagonAuth;
95  
96      private final ProxyInfoProvider wagonProxy;
97  
98      private final Properties headers;
99  
100     private final Queue<Wagon> wagons = new ConcurrentLinkedQueue<>();
101 
102     private final AtomicBoolean closed = new AtomicBoolean();
103 
104     WagonTransporter(
105             WagonProvider wagonProvider,
106             WagonConfigurator wagonConfigurator,
107             RemoteRepository repository,
108             RepositorySystemSession session)
109             throws NoTransporterException {
110         this.wagonProvider = wagonProvider;
111         this.wagonConfigurator = wagonConfigurator;
112         this.repository = repository;
113         this.session = session;
114 
115         wagonRepo = new Repository(repository.getId(), repository.getUrl());
116         wagonRepo.setPermissions(getPermissions(repository.getId(), session));
117 
118         wagonHint = wagonRepo.getProtocol().toLowerCase(Locale.ENGLISH);
119         if (wagonHint.isEmpty()) {
120             throw new NoTransporterException(repository);
121         }
122 
123         try {
124             wagons.add(lookupWagon());
125         } catch (Exception e) {
126             LOGGER.debug("No transport", e);
127             throw new NoTransporterException(repository, e);
128         }
129 
130         repoAuthContext = AuthenticationContext.forRepository(session, repository);
131         proxyAuthContext = AuthenticationContext.forProxy(session, repository);
132 
133         wagonAuth = getAuthenticationInfo(repoAuthContext);
134         wagonProxy = getProxy(repository, proxyAuthContext);
135 
136         headers = new Properties();
137         headers.put(
138                 "User-Agent",
139                 ConfigUtils.getString(
140                         session, ConfigurationProperties.DEFAULT_USER_AGENT, ConfigurationProperties.USER_AGENT));
141         Map<?, ?> headers = ConfigUtils.getMap(
142                 session,
143                 null,
144                 ConfigurationProperties.HTTP_HEADERS + "." + repository.getId(),
145                 ConfigurationProperties.HTTP_HEADERS);
146         if (headers != null) {
147             this.headers.putAll(headers);
148         }
149     }
150 
151     private static RepositoryPermissions getPermissions(String repoId, RepositorySystemSession session) {
152         RepositoryPermissions result = null;
153 
154         RepositoryPermissions perms = new RepositoryPermissions();
155 
156         String suffix = '.' + repoId;
157 
158         String fileMode = ConfigUtils.getString(session, null, CONFIG_PROP_FILE_MODE + suffix);
159         if (fileMode != null) {
160             perms.setFileMode(fileMode);
161             result = perms;
162         }
163 
164         String dirMode = ConfigUtils.getString(session, null, CONFIG_PROP_DIR_MODE + suffix);
165         if (dirMode != null) {
166             perms.setDirectoryMode(dirMode);
167             result = perms;
168         }
169 
170         String group = ConfigUtils.getString(session, null, CONFIG_PROP_GROUP + suffix);
171         if (group != null) {
172             perms.setGroup(group);
173             result = perms;
174         }
175 
176         return result;
177     }
178 
179     private AuthenticationInfo getAuthenticationInfo(final AuthenticationContext authContext) {
180         AuthenticationInfo auth = null;
181 
182         if (authContext != null) {
183             auth = new AuthenticationInfo() {
184                 @Override
185                 public String getUserName() {
186                     return authContext.get(AuthenticationContext.USERNAME);
187                 }
188 
189                 @Override
190                 public String getPassword() {
191                     return authContext.get(AuthenticationContext.PASSWORD);
192                 }
193 
194                 @Override
195                 public String getPrivateKey() {
196                     return authContext.get(AuthenticationContext.PRIVATE_KEY_PATH);
197                 }
198 
199                 @Override
200                 public String getPassphrase() {
201                     return authContext.get(AuthenticationContext.PRIVATE_KEY_PASSPHRASE);
202                 }
203             };
204         }
205 
206         return auth;
207     }
208 
209     private ProxyInfoProvider getProxy(RemoteRepository repository, final AuthenticationContext authContext) {
210         ProxyInfoProvider proxy = null;
211 
212         Proxy p = repository.getProxy();
213         if (p != null) {
214             final ProxyInfo prox;
215             if (authContext != null) {
216                 prox = new ProxyInfo() {
217                     @Override
218                     public String getUserName() {
219                         return authContext.get(AuthenticationContext.USERNAME);
220                     }
221 
222                     @Override
223                     public String getPassword() {
224                         return authContext.get(AuthenticationContext.PASSWORD);
225                     }
226 
227                     @Override
228                     public String getNtlmDomain() {
229                         return authContext.get(AuthenticationContext.NTLM_DOMAIN);
230                     }
231 
232                     @Override
233                     public String getNtlmHost() {
234                         return authContext.get(AuthenticationContext.NTLM_WORKSTATION);
235                     }
236                 };
237             } else {
238                 prox = new ProxyInfo();
239             }
240             prox.setType(p.getType());
241             prox.setHost(p.getHost());
242             prox.setPort(p.getPort());
243 
244             proxy = protocol -> prox;
245         }
246 
247         return proxy;
248     }
249 
250     private Wagon lookupWagon() throws Exception {
251         return wagonProvider.lookup(wagonHint);
252     }
253 
254     private void releaseWagon(Wagon wagon) {
255         wagonProvider.release(wagon);
256     }
257 
258     private void connectWagon(Wagon wagon) throws WagonException {
259         if (!headers.isEmpty()) {
260             try {
261                 Method setHttpHeaders = wagon.getClass().getMethod("setHttpHeaders", Properties.class);
262                 setHttpHeaders.invoke(wagon, headers);
263             } catch (NoSuchMethodException e) {
264                 // normal for non-http wagons
265             } catch (InvocationTargetException | IllegalAccessException | RuntimeException e) {
266                 LOGGER.debug(
267                         "Could not set user agent for Wagon {}",
268                         wagon.getClass().getName(),
269                         e);
270             }
271         }
272 
273         int connectTimeout = ConfigUtils.getInteger(
274                 session, ConfigurationProperties.DEFAULT_CONNECT_TIMEOUT, ConfigurationProperties.CONNECT_TIMEOUT);
275         int requestTimeout = ConfigUtils.getInteger(
276                 session, ConfigurationProperties.DEFAULT_REQUEST_TIMEOUT, ConfigurationProperties.REQUEST_TIMEOUT);
277 
278         wagon.setTimeout(Math.max(Math.max(connectTimeout, requestTimeout), 0));
279 
280         wagon.setInteractive(ConfigUtils.getBoolean(
281                 session, ConfigurationProperties.DEFAULT_INTERACTIVE, ConfigurationProperties.INTERACTIVE));
282 
283         Object configuration = ConfigUtils.getObject(session, null, CONFIG_PROP_CONFIG + "." + repository.getId());
284         if (configuration != null && wagonConfigurator != null) {
285             try {
286                 wagonConfigurator.configure(wagon, configuration);
287             } catch (Exception e) {
288                 LOGGER.warn(
289                         "Could not apply configuration for {} to Wagon {}",
290                         repository.getId(),
291                         wagon.getClass().getName(),
292                         e);
293             }
294         }
295 
296         wagon.connect(wagonRepo, wagonAuth, wagonProxy);
297     }
298 
299     private void disconnectWagon(Wagon wagon) {
300         try {
301             if (wagon != null) {
302                 wagon.disconnect();
303             }
304         } catch (ConnectionException e) {
305             LOGGER.debug("Could not disconnect Wagon {}", wagon, e);
306         }
307     }
308 
309     private Wagon pollWagon() throws Exception {
310         Wagon wagon = wagons.poll();
311 
312         if (wagon == null) {
313             try {
314                 wagon = lookupWagon();
315                 connectWagon(wagon);
316             } catch (Exception e) {
317                 releaseWagon(wagon);
318                 throw e;
319             }
320         } else if (wagon.getRepository() == null) {
321             try {
322                 connectWagon(wagon);
323             } catch (Exception e) {
324                 wagons.add(wagon);
325                 throw e;
326             }
327         }
328 
329         return wagon;
330     }
331 
332     @Override
333     public int classify(Throwable error) {
334         if (error instanceof ResourceDoesNotExistException) {
335             return ERROR_NOT_FOUND;
336         }
337         return ERROR_OTHER;
338     }
339 
340     @Override
341     public void peek(PeekTask task) throws Exception {
342         execute(task, new PeekTaskRunner(task));
343     }
344 
345     @Override
346     public void get(GetTask task) throws Exception {
347         execute(task, new GetTaskRunner(task));
348     }
349 
350     @Override
351     public void put(PutTask task) throws Exception {
352         execute(task, new PutTaskRunner(task));
353     }
354 
355     private void execute(TransportTask task, TaskRunner runner) throws Exception {
356         Objects.requireNonNull(task, "task cannot be null");
357 
358         if (closed.get()) {
359             throw new IllegalStateException("transporter closed, cannot execute task " + task);
360         }
361         try {
362             WagonTransferListener listener = new WagonTransferListener(task.getListener());
363             Wagon wagon = pollWagon();
364             try {
365                 wagon.addTransferListener(listener);
366                 runner.run(wagon);
367             } finally {
368                 wagon.removeTransferListener(listener);
369                 wagons.add(wagon);
370             }
371         } catch (RuntimeException e) {
372             throw WagonCancelledException.unwrap(e);
373         }
374     }
375 
376     @Override
377     public void close() {
378         if (closed.compareAndSet(false, true)) {
379             AuthenticationContext.close(repoAuthContext);
380             AuthenticationContext.close(proxyAuthContext);
381 
382             for (Wagon wagon = wagons.poll(); wagon != null; wagon = wagons.poll()) {
383                 disconnectWagon(wagon);
384                 releaseWagon(wagon);
385             }
386         }
387     }
388 
389     private interface TaskRunner {
390 
391         void run(Wagon wagon) throws IOException, WagonException;
392     }
393 
394     private static class PeekTaskRunner implements TaskRunner {
395 
396         private final PeekTask task;
397 
398         PeekTaskRunner(PeekTask task) {
399             this.task = task;
400         }
401 
402         @Override
403         public void run(Wagon wagon) throws WagonException {
404             String src = task.getLocation().toString();
405             if (!wagon.resourceExists(src)) {
406                 throw new ResourceDoesNotExistException(
407                         "Could not find " + src + " in " + wagon.getRepository().getUrl());
408             }
409         }
410     }
411 
412     private static class GetTaskRunner implements TaskRunner {
413 
414         private final GetTask task;
415 
416         GetTaskRunner(GetTask task) {
417             this.task = task;
418         }
419 
420         @Override
421         public void run(Wagon wagon) throws IOException, WagonException {
422             final String src = task.getLocation().toString();
423             final File file = task.getDataFile();
424             if (file == null && wagon instanceof StreamingWagon) {
425                 try (OutputStream dst = task.newOutputStream()) {
426                     ((StreamingWagon) wagon).getToStream(src, dst);
427                 }
428             } else {
429                 // if file == null -> $TMP used, otherwise we place tmp file next to file
430                 try (FileUtils.TempFile tempFile =
431                         file == null ? FileUtils.newTempFile() : FileUtils.newTempFile(file.toPath())) {
432                     File dst = tempFile.getPath().toFile();
433                     wagon.get(src, dst);
434                     /*
435                      * NOTE: Wagon (1.0-beta-6) doesn't create the destination file when transferring a 0-byte
436                      * resource. So if the resource we asked for didn't cause any exception but doesn't show up in
437                      * the dst file either, Wagon tells us in its weird way the file is empty.
438                      */
439                     if (!dst.exists() && !dst.createNewFile()) {
440                         throw new IOException(String.format("Failure creating file '%s'.", dst.getAbsolutePath()));
441                     }
442 
443                     if (file != null) {
444                         ((FileUtils.CollocatedTempFile) tempFile).move();
445                     } else {
446                         try (OutputStream outputStream = task.newOutputStream()) {
447                             Files.copy(dst.toPath(), outputStream);
448                         }
449                     }
450                 }
451             }
452         }
453     }
454 
455     private static class PutTaskRunner implements TaskRunner {
456 
457         private final PutTask task;
458 
459         PutTaskRunner(PutTask task) {
460             this.task = task;
461         }
462 
463         @Override
464         public void run(Wagon wagon) throws WagonException, IOException {
465             final String dst = task.getLocation().toString();
466             final File file = task.getDataFile();
467             if (file == null && wagon instanceof StreamingWagon) {
468                 try (InputStream src = task.newInputStream()) {
469                     // StreamingWagon uses an internal buffer on src input stream.
470                     ((StreamingWagon) wagon).putFromStream(src, dst, task.getDataLength(), -1);
471                 }
472             } else if (file == null) {
473                 try (FileUtils.TempFile tempFile = FileUtils.newTempFile()) {
474                     try (InputStream inputStream = task.newInputStream()) {
475                         Files.copy(inputStream, tempFile.getPath(), StandardCopyOption.REPLACE_EXISTING);
476                     }
477                     wagon.put(tempFile.getPath().toFile(), dst);
478                 }
479             } else {
480                 wagon.put(file, dst);
481             }
482         }
483     }
484 }