View Javadoc
1   package org.eclipse.aether.transport.wagon;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *  http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.io.File;
23  import java.io.FileInputStream;
24  import java.io.FileOutputStream;
25  import java.io.IOException;
26  import java.io.InputStream;
27  import java.io.OutputStream;
28  import java.lang.reflect.InvocationTargetException;
29  import java.lang.reflect.Method;
30  import java.util.Locale;
31  import java.util.Map;
32  import java.util.Objects;
33  import java.util.Properties;
34  import java.util.Queue;
35  import java.util.UUID;
36  import java.util.concurrent.ConcurrentLinkedQueue;
37  import java.util.concurrent.atomic.AtomicBoolean;
38  
39  import org.apache.maven.wagon.ConnectionException;
40  import org.apache.maven.wagon.ResourceDoesNotExistException;
41  import org.apache.maven.wagon.StreamingWagon;
42  import org.apache.maven.wagon.Wagon;
43  import org.apache.maven.wagon.WagonException;
44  import org.apache.maven.wagon.authentication.AuthenticationInfo;
45  import org.apache.maven.wagon.proxy.ProxyInfo;
46  import org.apache.maven.wagon.proxy.ProxyInfoProvider;
47  import org.apache.maven.wagon.repository.Repository;
48  import org.apache.maven.wagon.repository.RepositoryPermissions;
49  import org.eclipse.aether.ConfigurationProperties;
50  import org.eclipse.aether.RepositorySystemSession;
51  import org.eclipse.aether.repository.AuthenticationContext;
52  import org.eclipse.aether.repository.Proxy;
53  import org.eclipse.aether.repository.RemoteRepository;
54  import org.eclipse.aether.spi.connector.transport.GetTask;
55  import org.eclipse.aether.spi.connector.transport.PeekTask;
56  import org.eclipse.aether.spi.connector.transport.PutTask;
57  import org.eclipse.aether.spi.connector.transport.TransportTask;
58  import org.eclipse.aether.spi.connector.transport.Transporter;
59  import org.eclipse.aether.transfer.NoTransporterException;
60  import org.eclipse.aether.util.ConfigUtils;
61  import org.slf4j.Logger;
62  import org.slf4j.LoggerFactory;
63  
64  /**
65   * A transporter using Maven Wagon.
66   */
67  final class WagonTransporter
68      implements Transporter
69  {
70  
71      private static final String CONFIG_PROP_CONFIG = "aether.connector.wagon.config";
72  
73      private static final String CONFIG_PROP_FILE_MODE = "aether.connector.perms.fileMode";
74  
75      private static final String CONFIG_PROP_DIR_MODE = "aether.connector.perms.dirMode";
76  
77      private static final String CONFIG_PROP_GROUP = "aether.connector.perms.group";
78  
79      private static final Logger LOGGER = LoggerFactory.getLogger( WagonTransporter.class );
80  
81      private final RemoteRepository repository;
82  
83      private final RepositorySystemSession session;
84  
85      private final AuthenticationContext repoAuthContext;
86  
87      private final AuthenticationContext proxyAuthContext;
88  
89      private final WagonProvider wagonProvider;
90  
91      private final WagonConfigurator wagonConfigurator;
92  
93      private final String wagonHint;
94  
95      private final Repository wagonRepo;
96  
97      private final AuthenticationInfo wagonAuth;
98  
99      private final ProxyInfoProvider wagonProxy;
100 
101     private final Properties headers;
102 
103     private final Queue<Wagon> wagons = new ConcurrentLinkedQueue<>();
104 
105     private final AtomicBoolean closed = new AtomicBoolean();
106 
107     WagonTransporter( WagonProvider wagonProvider, WagonConfigurator wagonConfigurator,
108                              RemoteRepository repository, RepositorySystemSession session )
109         throws NoTransporterException
110     {
111         this.wagonProvider = wagonProvider;
112         this.wagonConfigurator = wagonConfigurator;
113         this.repository = repository;
114         this.session = session;
115 
116         wagonRepo = new Repository( repository.getId(), repository.getUrl() );
117         wagonRepo.setPermissions( getPermissions( repository.getId(), session ) );
118 
119         wagonHint = wagonRepo.getProtocol().toLowerCase( Locale.ENGLISH );
120         if ( wagonHint.isEmpty() )
121         {
122             throw new NoTransporterException( repository );
123         }
124 
125         try
126         {
127             wagons.add( lookupWagon() );
128         }
129         catch ( Exception e )
130         {
131             LOGGER.debug( "No transport {}", e );
132             throw new NoTransporterException( repository, e );
133         }
134 
135         repoAuthContext = AuthenticationContext.forRepository( session, repository );
136         proxyAuthContext = AuthenticationContext.forProxy( session, repository );
137 
138         wagonAuth = getAuthenticationInfo( repoAuthContext );
139         wagonProxy = getProxy( repository, proxyAuthContext );
140 
141         headers = new Properties();
142         headers.put( "User-Agent", ConfigUtils.getString( session, ConfigurationProperties.DEFAULT_USER_AGENT,
143                                                           ConfigurationProperties.USER_AGENT ) );
144         Map<?, ?> headers =
145             ConfigUtils.getMap( session, null, ConfigurationProperties.HTTP_HEADERS + "." + repository.getId(),
146                                 ConfigurationProperties.HTTP_HEADERS );
147         if ( headers != null )
148         {
149             this.headers.putAll( headers );
150         }
151     }
152 
153     private static RepositoryPermissions getPermissions( String repoId, RepositorySystemSession session )
154     {
155         RepositoryPermissions result = null;
156 
157         RepositoryPermissions perms = new RepositoryPermissions();
158 
159         String suffix = '.' + repoId;
160 
161         String fileMode = ConfigUtils.getString( session, null, CONFIG_PROP_FILE_MODE + suffix );
162         if ( fileMode != null )
163         {
164             perms.setFileMode( fileMode );
165             result = perms;
166         }
167 
168         String dirMode = ConfigUtils.getString( session, null, CONFIG_PROP_DIR_MODE + suffix );
169         if ( dirMode != null )
170         {
171             perms.setDirectoryMode( dirMode );
172             result = perms;
173         }
174 
175         String group = ConfigUtils.getString( session, null, CONFIG_PROP_GROUP + suffix );
176         if ( group != null )
177         {
178             perms.setGroup( group );
179             result = perms;
180         }
181 
182         return result;
183     }
184 
185     private AuthenticationInfo getAuthenticationInfo( final AuthenticationContext authContext )
186     {
187         AuthenticationInfo auth = null;
188 
189         if ( authContext != null )
190         {
191             auth = new AuthenticationInfo()
192             {
193                 @Override
194                 public String getUserName()
195                 {
196                     return authContext.get( AuthenticationContext.USERNAME );
197                 }
198 
199                 @Override
200                 public String getPassword()
201                 {
202                     return authContext.get( AuthenticationContext.PASSWORD );
203                 }
204 
205                 @Override
206                 public String getPrivateKey()
207                 {
208                     return authContext.get( AuthenticationContext.PRIVATE_KEY_PATH );
209                 }
210 
211                 @Override
212                 public String getPassphrase()
213                 {
214                     return authContext.get( AuthenticationContext.PRIVATE_KEY_PASSPHRASE );
215                 }
216             };
217         }
218 
219         return auth;
220     }
221 
222     private ProxyInfoProvider getProxy( RemoteRepository repository, final AuthenticationContext authContext )
223     {
224         ProxyInfoProvider proxy = null;
225 
226         Proxy p = repository.getProxy();
227         if ( p != null )
228         {
229             final ProxyInfo prox;
230             if ( authContext != null )
231             {
232                 prox = new ProxyInfo()
233                 {
234                     @Override
235                     public String getUserName()
236                     {
237                         return authContext.get( AuthenticationContext.USERNAME );
238                     }
239 
240                     @Override
241                     public String getPassword()
242                     {
243                         return authContext.get( AuthenticationContext.PASSWORD );
244                     }
245 
246                     @Override
247                     public String getNtlmDomain()
248                     {
249                         return authContext.get( AuthenticationContext.NTLM_DOMAIN );
250                     }
251 
252                     @Override
253                     public String getNtlmHost()
254                     {
255                         return authContext.get( AuthenticationContext.NTLM_WORKSTATION );
256                     }
257                 };
258             }
259             else
260             {
261                 prox = new ProxyInfo();
262             }
263             prox.setType( p.getType() );
264             prox.setHost( p.getHost() );
265             prox.setPort( p.getPort() );
266 
267             proxy = protocol -> prox;
268         }
269 
270         return proxy;
271     }
272 
273     private Wagon lookupWagon()
274         throws Exception
275     {
276         return wagonProvider.lookup( wagonHint );
277     }
278 
279     private void releaseWagon( Wagon wagon )
280     {
281         wagonProvider.release( wagon );
282     }
283 
284     private void connectWagon( Wagon wagon )
285         throws WagonException
286     {
287         if ( !headers.isEmpty() )
288         {
289             try
290             {
291                 Method setHttpHeaders = wagon.getClass().getMethod( "setHttpHeaders", Properties.class );
292                 setHttpHeaders.invoke( wagon, headers );
293             }
294             catch ( NoSuchMethodException e )
295             {
296                 // normal for non-http wagons
297             }
298             catch ( InvocationTargetException | IllegalAccessException | RuntimeException e )
299             {
300                 LOGGER.debug( "Could not set user agent for Wagon {}", wagon.getClass().getName(), e );
301             }
302         }
303 
304         int connectTimeout =
305             ConfigUtils.getInteger( session, ConfigurationProperties.DEFAULT_CONNECT_TIMEOUT,
306                                     ConfigurationProperties.CONNECT_TIMEOUT );
307         int requestTimeout =
308             ConfigUtils.getInteger( session, ConfigurationProperties.DEFAULT_REQUEST_TIMEOUT,
309                                     ConfigurationProperties.REQUEST_TIMEOUT );
310 
311         wagon.setTimeout( Math.max( Math.max( connectTimeout, requestTimeout ), 0 ) );
312 
313         wagon.setInteractive( ConfigUtils.getBoolean( session, ConfigurationProperties.DEFAULT_INTERACTIVE,
314                                                       ConfigurationProperties.INTERACTIVE ) );
315 
316         Object configuration = ConfigUtils.getObject( session, null, CONFIG_PROP_CONFIG + "." + repository.getId() );
317         if ( configuration != null && wagonConfigurator != null )
318         {
319             try
320             {
321                 wagonConfigurator.configure( wagon, configuration );
322             }
323             catch ( Exception e )
324             {
325                 LOGGER.warn( "Could not apply configuration for {} to Wagon {}",
326                         repository.getId(), wagon.getClass().getName(), e );
327             }
328         }
329 
330         wagon.connect( wagonRepo, wagonAuth, wagonProxy );
331     }
332 
333     private void disconnectWagon( Wagon wagon )
334     {
335         try
336         {
337             if ( wagon != null )
338             {
339                 wagon.disconnect();
340             }
341         }
342         catch ( ConnectionException e )
343         {
344             LOGGER.debug( "Could not disconnect Wagon {}", wagon, e );
345         }
346     }
347 
348     private Wagon pollWagon()
349         throws Exception
350     {
351         Wagon wagon = wagons.poll();
352 
353         if ( wagon == null )
354         {
355             try
356             {
357                 wagon = lookupWagon();
358                 connectWagon( wagon );
359             }
360             catch ( Exception e )
361             {
362                 releaseWagon( wagon );
363                 throw e;
364             }
365         }
366         else if ( wagon.getRepository() == null )
367         {
368             try
369             {
370                 connectWagon( wagon );
371             }
372             catch ( Exception e )
373             {
374                 wagons.add( wagon );
375                 throw e;
376             }
377         }
378 
379         return wagon;
380     }
381 
382     public int classify( Throwable error )
383     {
384         if ( error instanceof ResourceDoesNotExistException )
385         {
386             return ERROR_NOT_FOUND;
387         }
388         return ERROR_OTHER;
389     }
390 
391     public void peek( PeekTask task )
392         throws Exception
393     {
394         execute( task, new PeekTaskRunner( task ) );
395     }
396 
397     public void get( GetTask task )
398         throws Exception
399     {
400         execute( task, new GetTaskRunner( task ) );
401     }
402 
403     public void put( PutTask task )
404         throws Exception
405     {
406         execute( task, new PutTaskRunner( task ) );
407     }
408 
409     private void execute( TransportTask task, TaskRunner runner )
410         throws Exception
411     {
412         Objects.requireNonNull( "task", "task cannot be null" );
413 
414         if ( closed.get() )
415         {
416             throw new IllegalStateException( "transporter closed, cannot execute task " + task );
417         }
418         try
419         {
420             WagonTransferListener listener = new WagonTransferListener( task.getListener() );
421             Wagon wagon = pollWagon();
422             try
423             {
424                 wagon.addTransferListener( listener );
425                 runner.run( wagon );
426             }
427             finally
428             {
429                 wagon.removeTransferListener( listener );
430                 wagons.add( wagon );
431             }
432         }
433         catch ( RuntimeException e )
434         {
435             throw WagonCancelledException.unwrap( e );
436         }
437     }
438 
439     private static File newTempFile()
440         throws IOException
441     {
442         return File.createTempFile( "wagon-" + UUID.randomUUID().toString().replace( "-", "" ), ".tmp" );
443     }
444 
445     private void delTempFile( File path )
446     {
447         if ( path != null && !path.delete() && path.exists() )
448         {
449             LOGGER.debug( "Could not delete temporary file {}", path );
450             path.deleteOnExit();
451         }
452     }
453 
454     private static void copy( OutputStream os, InputStream is )
455         throws IOException
456     {
457         byte[] buffer = new byte[1024 * 32];
458         for ( int read = is.read( buffer ); read >= 0; read = is.read( buffer ) )
459         {
460             os.write( buffer, 0, read );
461         }
462     }
463 
464     public void close()
465     {
466         if ( closed.compareAndSet( false, true ) )
467         {
468             AuthenticationContext.close( repoAuthContext );
469             AuthenticationContext.close( proxyAuthContext );
470 
471             for ( Wagon wagon = wagons.poll(); wagon != null; wagon = wagons.poll() )
472             {
473                 disconnectWagon( wagon );
474                 releaseWagon( wagon );
475             }
476         }
477     }
478 
479     private interface TaskRunner
480     {
481 
482         void run( Wagon wagon )
483             throws IOException, WagonException;
484 
485     }
486 
487     private static class PeekTaskRunner
488         implements TaskRunner
489     {
490 
491         private final PeekTask task;
492 
493         PeekTaskRunner( PeekTask task )
494         {
495             this.task = task;
496         }
497 
498         @Override
499         public void run( Wagon wagon )
500             throws WagonException
501         {
502             String src = task.getLocation().toString();
503             if ( !wagon.resourceExists( src ) )
504             {
505                 throw new ResourceDoesNotExistException( "Could not find " + src + " in "
506                     + wagon.getRepository().getUrl() );
507             }
508         }
509 
510     }
511 
512     private class GetTaskRunner
513         implements TaskRunner
514     {
515 
516         private final GetTask task;
517 
518         GetTaskRunner( GetTask task )
519         {
520             this.task = task;
521         }
522 
523         @Override
524         public void run( Wagon wagon )
525             throws IOException, WagonException
526         {
527             String src = task.getLocation().toString();
528             File file = task.getDataFile();
529             if ( file == null && wagon instanceof StreamingWagon )
530             {
531                 try ( OutputStream dst = task.newOutputStream() )
532                 {
533                     ( (StreamingWagon) wagon ).getToStream( src, dst );
534                 }
535             }
536             else
537             {
538                 File dst = ( file != null ) ? file : newTempFile();
539                 try
540                 {
541                     wagon.get( src, dst );
542                     /*
543                      * NOTE: Wagon (1.0-beta-6) doesn't create the destination file when transferring a 0-byte
544                      * resource. So if the resource we asked for didn't cause any exception but doesn't show up in
545                      * the dst file either, Wagon tells us in its weird way the file is empty.
546                      */
547                     if ( !dst.exists() && !dst.createNewFile() )
548                     {
549                         throw new IOException( String.format( "Failure creating file '%s'.", dst.getAbsolutePath() ) );
550                     }
551                     if ( file == null )
552                     {
553                         readTempFile( dst );
554                     }
555                 }
556                 finally
557                 {
558                     if ( file == null )
559                     {
560                         delTempFile( dst );
561                     }
562                 }
563             }
564         }
565 
566         private void readTempFile( File dst )
567             throws IOException
568         {
569             try ( FileInputStream in = new FileInputStream( dst );
570                     OutputStream out = task.newOutputStream() )
571             {
572                 copy( out, in );
573             }
574         }
575 
576     }
577 
578     private class PutTaskRunner
579         implements TaskRunner
580     {
581 
582         private final PutTask task;
583 
584         PutTaskRunner( PutTask task )
585         {
586             this.task = task;
587         }
588 
589         @Override
590         public void run( Wagon wagon )
591             throws WagonException, IOException
592         {
593             String dst = task.getLocation().toString();
594             File file = task.getDataFile();
595             if ( file == null && wagon instanceof StreamingWagon )
596             {
597                 try ( InputStream src = task.newInputStream() )
598                 {
599                     // StreamingWagon uses an internal buffer on src input stream.
600                     ( (StreamingWagon) wagon ).putFromStream( src, dst, task.getDataLength(), -1 );
601                 }
602             }
603             else
604             {
605                 File src = ( file != null ) ? file : createTempFile();
606                 try
607                 {
608                     wagon.put( src, dst );
609                 }
610                 finally
611                 {
612                     if ( file == null )
613                     {
614                         delTempFile( src );
615                     }
616                 }
617             }
618         }
619 
620         private File createTempFile()
621             throws IOException
622         {
623             File tmp = newTempFile();
624 
625             try ( InputStream in = task.newInputStream();
626                     OutputStream out = new FileOutputStream( tmp ) )
627             {
628                 copy( out, in );
629             }
630             catch ( IOException e )
631             {
632                 delTempFile( tmp );
633                 throw e;
634             }
635 
636             return tmp;
637         }
638 
639     }
640 
641 }