View Javadoc
1   package org.eclipse.aether.connector.basic;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   * 
12   *  http://www.apache.org/licenses/LICENSE-2.0
13   * 
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import static java.util.Objects.requireNonNull;
23  
24  import java.io.ByteArrayOutputStream;
25  import java.io.File;
26  import java.io.IOException;
27  import java.io.InputStream;
28  import java.net.URI;
29  import java.util.Collection;
30  import java.util.Collections;
31  import java.util.HashSet;
32  import java.util.List;
33  import java.util.Map;
34  import java.util.Set;
35  import java.util.concurrent.Executor;
36  import java.util.concurrent.ExecutorService;
37  import java.util.concurrent.LinkedBlockingQueue;
38  import java.util.concurrent.ThreadPoolExecutor;
39  import java.util.concurrent.TimeUnit;
40  
41  import org.eclipse.aether.ConfigurationProperties;
42  import org.eclipse.aether.RepositorySystemSession;
43  import org.eclipse.aether.RequestTrace;
44  import org.eclipse.aether.repository.RemoteRepository;
45  import org.eclipse.aether.spi.connector.ArtifactDownload;
46  import org.eclipse.aether.spi.connector.ArtifactUpload;
47  import org.eclipse.aether.spi.connector.MetadataDownload;
48  import org.eclipse.aether.spi.connector.MetadataUpload;
49  import org.eclipse.aether.spi.connector.RepositoryConnector;
50  import org.eclipse.aether.spi.connector.checksum.ChecksumPolicy;
51  import org.eclipse.aether.spi.connector.checksum.ChecksumPolicyProvider;
52  import org.eclipse.aether.spi.connector.layout.RepositoryLayout;
53  import org.eclipse.aether.spi.connector.layout.RepositoryLayoutProvider;
54  import org.eclipse.aether.spi.connector.transport.GetTask;
55  import org.eclipse.aether.spi.connector.transport.PeekTask;
56  import org.eclipse.aether.spi.connector.transport.PutTask;
57  import org.eclipse.aether.spi.connector.transport.Transporter;
58  import org.eclipse.aether.spi.connector.transport.TransporterProvider;
59  import org.eclipse.aether.spi.io.FileProcessor;
60  import org.eclipse.aether.transfer.ChecksumFailureException;
61  import org.eclipse.aether.transfer.NoRepositoryConnectorException;
62  import org.eclipse.aether.transfer.NoRepositoryLayoutException;
63  import org.eclipse.aether.transfer.NoTransporterException;
64  import org.eclipse.aether.transfer.TransferEvent;
65  import org.eclipse.aether.transfer.TransferResource;
66  import org.eclipse.aether.transform.FileTransformer;
67  import org.eclipse.aether.util.ChecksumUtils;
68  import org.eclipse.aether.util.ConfigUtils;
69  import org.eclipse.aether.util.concurrency.RunnableErrorForwarder;
70  import org.eclipse.aether.util.concurrency.WorkerThreadFactory;
71  import org.slf4j.Logger;
72  import org.slf4j.LoggerFactory;
73  
74  /**
75   */
76  final class BasicRepositoryConnector
77      implements RepositoryConnector
78  {
79  
80      private static final String CONFIG_PROP_THREADS = "aether.connector.basic.threads";
81  
82      private static final String CONFIG_PROP_RESUME = "aether.connector.resumeDownloads";
83  
84      private static final String CONFIG_PROP_RESUME_THRESHOLD = "aether.connector.resumeThreshold";
85  
86      private static final String CONFIG_PROP_SMART_CHECKSUMS = "aether.connector.smartChecksums";
87  
88      private static final Logger LOGGER = LoggerFactory.getLogger( BasicRepositoryConnector.class );
89  
90      private final FileProcessor fileProcessor;
91  
92      private final RemoteRepository repository;
93  
94      private final RepositorySystemSession session;
95  
96      private final Transporter transporter;
97  
98      private final RepositoryLayout layout;
99  
100     private final ChecksumPolicyProvider checksumPolicyProvider;
101 
102     private final PartialFile.Factory partialFileFactory;
103 
104     private final int maxThreads;
105 
106     private final boolean smartChecksums;
107 
108     private final boolean persistedChecksums;
109 
110     private Executor executor;
111 
112     private boolean closed;
113 
114     BasicRepositoryConnector( RepositorySystemSession session, RemoteRepository repository,
115                                      TransporterProvider transporterProvider, RepositoryLayoutProvider layoutProvider,
116                                      ChecksumPolicyProvider checksumPolicyProvider, FileProcessor fileProcessor )
117         throws NoRepositoryConnectorException
118     {
119         try
120         {
121             layout = layoutProvider.newRepositoryLayout( session, repository );
122         }
123         catch ( NoRepositoryLayoutException e )
124         {
125             throw new NoRepositoryConnectorException( repository, e.getMessage(), e );
126         }
127         try
128         {
129             transporter = transporterProvider.newTransporter( session, repository );
130         }
131         catch ( NoTransporterException e )
132         {
133             throw new NoRepositoryConnectorException( repository, e.getMessage(), e );
134         }
135         this.checksumPolicyProvider = checksumPolicyProvider;
136 
137         this.session = session;
138         this.repository = repository;
139         this.fileProcessor = fileProcessor;
140 
141         maxThreads = ConfigUtils.getInteger( session, 5, CONFIG_PROP_THREADS, "maven.artifact.threads" );
142         smartChecksums = ConfigUtils.getBoolean( session, true, CONFIG_PROP_SMART_CHECKSUMS );
143         persistedChecksums =
144             ConfigUtils.getBoolean( session, ConfigurationProperties.DEFAULT_PERSISTED_CHECKSUMS,
145                                     ConfigurationProperties.PERSISTED_CHECKSUMS );
146 
147         boolean resumeDownloads =
148             ConfigUtils.getBoolean( session, true, CONFIG_PROP_RESUME + '.' + repository.getId(), CONFIG_PROP_RESUME );
149         long resumeThreshold =
150             ConfigUtils.getLong( session, 64 * 1024, CONFIG_PROP_RESUME_THRESHOLD + '.' + repository.getId(),
151                                  CONFIG_PROP_RESUME_THRESHOLD );
152         int requestTimeout =
153             ConfigUtils.getInteger( session, ConfigurationProperties.DEFAULT_REQUEST_TIMEOUT,
154                                     ConfigurationProperties.REQUEST_TIMEOUT + '.' + repository.getId(),
155                                     ConfigurationProperties.REQUEST_TIMEOUT );
156         partialFileFactory = new PartialFile.Factory( resumeDownloads, resumeThreshold, requestTimeout );
157     }
158 
159     private Executor getExecutor( Collection<?> artifacts, Collection<?> metadatas )
160     {
161         if ( maxThreads <= 1 )
162         {
163             return DirectExecutor.INSTANCE;
164         }
165         int tasks = safe( artifacts ).size() + safe( metadatas ).size();
166         if ( tasks <= 1 )
167         {
168             return DirectExecutor.INSTANCE;
169         }
170         if ( executor == null )
171         {
172             executor =
173                 new ThreadPoolExecutor( maxThreads, maxThreads, 3L, TimeUnit.SECONDS,
174                                         new LinkedBlockingQueue<Runnable>(),
175                                         new WorkerThreadFactory( getClass().getSimpleName() + '-'
176                                             + repository.getHost() + '-' ) );
177         }
178         return executor;
179     }
180 
181     @Override
182     protected void finalize()
183         throws Throwable
184     {
185         try
186         {
187             close();
188         }
189         finally
190         {
191             super.finalize();
192         }
193     }
194 
195     public void close()
196     {
197         if ( !closed )
198         {
199             closed = true;
200             if ( executor instanceof ExecutorService )
201             {
202                 ( (ExecutorService) executor ).shutdown();
203             }
204             transporter.close();
205         }
206     }
207 
208     public void get( Collection<? extends ArtifactDownload> artifactDownloads,
209                      Collection<? extends MetadataDownload> metadataDownloads )
210     {
211         if ( closed )
212         {
213             throw new IllegalStateException( "connector closed" );
214         }
215 
216         Executor executor = getExecutor( artifactDownloads, metadataDownloads );
217         RunnableErrorForwarder errorForwarder = new RunnableErrorForwarder();
218 
219         for ( MetadataDownload transfer : safe( metadataDownloads ) )
220         {
221             URI location = layout.getLocation( transfer.getMetadata(), false );
222 
223             TransferResource resource = newTransferResource( location, transfer.getFile(), transfer.getTrace() );
224             TransferEvent.Builder builder = newEventBuilder( resource, false, false );
225             MetadataTransportListener listener = new MetadataTransportListener( transfer, repository, builder );
226 
227             ChecksumPolicy checksumPolicy = newChecksumPolicy( transfer.getChecksumPolicy(), resource );
228             List<RepositoryLayout.Checksum> checksums = null;
229             if ( checksumPolicy != null )
230             {
231                 checksums = layout.getChecksums( transfer.getMetadata(), false, location );
232             }
233 
234             Runnable task = new GetTaskRunner( location, transfer.getFile(), checksumPolicy, checksums, listener );
235             executor.execute( errorForwarder.wrap( task ) );
236         }
237 
238         for ( ArtifactDownload transfer : safe( artifactDownloads ) )
239         {
240             URI location = layout.getLocation( transfer.getArtifact(), false );
241 
242             TransferResource resource = newTransferResource( location, transfer.getFile(), transfer.getTrace() );
243             TransferEvent.Builder builder = newEventBuilder( resource, false, transfer.isExistenceCheck() );
244             ArtifactTransportListener listener = new ArtifactTransportListener( transfer, repository, builder );
245 
246             Runnable task;
247             if ( transfer.isExistenceCheck() )
248             {
249                 task = new PeekTaskRunner( location, listener );
250             }
251             else
252             {
253                 ChecksumPolicy checksumPolicy = newChecksumPolicy( transfer.getChecksumPolicy(), resource );
254                 List<RepositoryLayout.Checksum> checksums = null;
255                 if ( checksumPolicy != null )
256                 {
257                     checksums = layout.getChecksums( transfer.getArtifact(), false, location );
258                 }
259 
260                 task = new GetTaskRunner( location, transfer.getFile(), checksumPolicy, checksums, listener );
261             }
262             executor.execute( errorForwarder.wrap( task ) );
263         }
264 
265         errorForwarder.await();
266     }
267 
268     public void put( Collection<? extends ArtifactUpload> artifactUploads,
269                      Collection<? extends MetadataUpload> metadataUploads )
270     {
271         if ( closed )
272         {
273             throw new IllegalStateException( "connector closed" );
274         }
275 
276         for ( ArtifactUpload transfer : safe( artifactUploads ) )
277         {
278             URI location = layout.getLocation( transfer.getArtifact(), true );
279 
280             TransferResource resource = newTransferResource( location, transfer.getFile(), transfer.getTrace() );
281             TransferEvent.Builder builder = newEventBuilder( resource, true, false );
282             ArtifactTransportListener listener = new ArtifactTransportListener( transfer, repository, builder );
283 
284             List<RepositoryLayout.Checksum> checksums = layout.getChecksums( transfer.getArtifact(), true, location );
285 
286             Runnable task = new PutTaskRunner( location, transfer.getFile(), transfer.getFileTransformer(), checksums, listener );
287             task.run();
288         }
289 
290         for ( MetadataUpload transfer : safe( metadataUploads ) )
291         {
292             URI location = layout.getLocation( transfer.getMetadata(), true );
293 
294             TransferResource resource = newTransferResource( location, transfer.getFile(), transfer.getTrace() );
295             TransferEvent.Builder builder = newEventBuilder( resource, true, false );
296             MetadataTransportListener listener = new MetadataTransportListener( transfer, repository, builder );
297 
298             List<RepositoryLayout.Checksum> checksums = layout.getChecksums( transfer.getMetadata(), true, location );
299 
300             Runnable task = new PutTaskRunner( location, transfer.getFile(), checksums, listener );
301             task.run();
302         }
303     }
304 
305     private static <T> Collection<T> safe( Collection<T> items )
306     {
307         return ( items != null ) ? items : Collections.<T>emptyList();
308     }
309 
310     private TransferResource newTransferResource( URI path, File file, RequestTrace trace )
311     {
312         return new TransferResource( repository.getId(), repository.getUrl(), path.toString(), file, trace );
313     }
314 
315     private TransferEvent.Builder newEventBuilder( TransferResource resource, boolean upload, boolean peek )
316     {
317         TransferEvent.Builder builder = new TransferEvent.Builder( session, resource );
318         if ( upload )
319         {
320             builder.setRequestType( TransferEvent.RequestType.PUT );
321         }
322         else if ( !peek )
323         {
324             builder.setRequestType( TransferEvent.RequestType.GET );
325         }
326         else
327         {
328             builder.setRequestType( TransferEvent.RequestType.GET_EXISTENCE );
329         }
330         return builder;
331     }
332 
333     private ChecksumPolicy newChecksumPolicy( String policy, TransferResource resource )
334     {
335         return checksumPolicyProvider.newChecksumPolicy( session, repository, resource, policy );
336     }
337 
338     @Override
339     public String toString()
340     {
341         return String.valueOf( repository );
342     }
343 
344     abstract class TaskRunner
345         implements Runnable
346     {
347 
348         protected final URI path;
349 
350         protected final TransferTransportListener<?> listener;
351 
352         TaskRunner( URI path, TransferTransportListener<?> listener )
353         {
354             this.path = path;
355             this.listener = listener;
356         }
357 
358         public void run()
359         {
360             try
361             {
362                 listener.transferInitiated();
363                 runTask();
364                 listener.transferSucceeded();
365             }
366             catch ( Exception e )
367             {
368                 listener.transferFailed( e, transporter.classify( e ) );
369             }
370         }
371 
372         protected abstract void runTask()
373             throws Exception;
374 
375     }
376 
377     class PeekTaskRunner
378         extends TaskRunner
379     {
380 
381         PeekTaskRunner( URI path, TransferTransportListener<?> listener )
382         {
383             super( path, listener );
384         }
385 
386         protected void runTask()
387             throws Exception
388         {
389             transporter.peek( new PeekTask( path ) );
390         }
391 
392     }
393 
394     class GetTaskRunner
395         extends TaskRunner
396         implements PartialFile.RemoteAccessChecker, ChecksumValidator.ChecksumFetcher
397     {
398 
399         private final File file;
400 
401         private final ChecksumValidator checksumValidator;
402 
403         GetTaskRunner( URI path, File file, ChecksumPolicy checksumPolicy,
404                               List<RepositoryLayout.Checksum> checksums, TransferTransportListener<?> listener )
405         {
406             super( path, listener );
407             this.file = requireNonNull( file, "destination file cannot be null" );
408             checksumValidator =
409                 new ChecksumValidator( file, fileProcessor, this, checksumPolicy, safe( checksums ) );
410         }
411 
412         public void checkRemoteAccess()
413             throws Exception
414         {
415             transporter.peek( new PeekTask( path ) );
416         }
417 
418         public boolean fetchChecksum( URI remote, File local )
419             throws Exception
420         {
421             try
422             {
423                 transporter.get( new GetTask( remote ).setDataFile( local ) );
424             }
425             catch ( Exception e )
426             {
427                 if ( transporter.classify( e ) == Transporter.ERROR_NOT_FOUND )
428                 {
429                     return false;
430                 }
431                 throw e;
432             }
433             return true;
434         }
435 
436         protected void runTask()
437             throws Exception
438         {
439             fileProcessor.mkdirs( file.getParentFile() );
440 
441             PartialFile partFile = partialFileFactory.newInstance( file, this );
442             if ( partFile == null )
443             {
444                 LOGGER.debug( "Concurrent download of {} just finished, skipping download", file );
445                 return;
446             }
447 
448             try
449             {
450                 File tmp = partFile.getFile();
451                 listener.setChecksumCalculator( checksumValidator.newChecksumCalculator( tmp ) );
452                 for ( int firstTrial = 0, lastTrial = 1, trial = firstTrial;; trial++ )
453                 {
454                     boolean resume = partFile.isResume() && trial <= firstTrial;
455                     GetTask task = new GetTask( path ).setDataFile( tmp, resume ).setListener( listener );
456                     transporter.get( task );
457                     try
458                     {
459                         checksumValidator.validate( listener.getChecksums(), smartChecksums ? task.getChecksums()
460                                         : null );
461                         break;
462                     }
463                     catch ( ChecksumFailureException e )
464                     {
465                         boolean retry = trial < lastTrial && e.isRetryWorthy();
466                         if ( !retry && !checksumValidator.handle( e ) )
467                         {
468                             throw e;
469                         }
470                         listener.transferCorrupted( e );
471                         if ( retry )
472                         {
473                             checksumValidator.retry();
474                         }
475                         else
476                         {
477                             break;
478                         }
479                     }
480                 }
481                 fileProcessor.move( tmp, file );
482                 if ( persistedChecksums )
483                 {
484                     checksumValidator.commit();
485                 }
486             }
487             finally
488             {
489                 partFile.close();
490                 checksumValidator.close();
491             }
492         }
493 
494     }
495 
496     class PutTaskRunner
497         extends TaskRunner
498     {
499 
500         private final File file;
501 
502         private final FileTransformer fileTransformer; 
503 
504         private final Collection<RepositoryLayout.Checksum> checksums;
505 
506         PutTaskRunner( URI path, File file, List<RepositoryLayout.Checksum> checksums,
507                        TransferTransportListener<?> listener )
508         {
509             this( path, file, null, checksums, listener );
510         }
511 
512         /**
513          * <strong>IMPORTANT</strong> When using a fileTransformer, the content of the file is stored in memory to 
514          * ensure that file content and checksums stay in sync!
515          * 
516          * @param path
517          * @param file
518          * @param fileTransformer
519          * @param checksums
520          * @param listener
521          */
522         PutTaskRunner( URI path, File file, FileTransformer fileTransformer, List<RepositoryLayout.Checksum> checksums,
523                               TransferTransportListener<?> listener )
524         {
525             super( path, listener );
526             this.file = requireNonNull( file, "source file cannot be null" );
527             this.fileTransformer = fileTransformer;
528             this.checksums = safe( checksums );
529         }
530 
531         protected void runTask()
532             throws Exception
533         {
534             if ( fileTransformer != null )
535             {
536                 // transform data once to byte array, ensure constant data for checksum
537                 ByteArrayOutputStream baos = new ByteArrayOutputStream();
538                 byte[] buffer = new byte[1024];
539                 
540                 try ( InputStream transformData = fileTransformer.transformData( file ) )
541                 {
542                     for ( int read; ( read = transformData.read( buffer, 0, buffer.length ) ) != -1; )
543                     {
544                         baos.write( buffer, 0, read );
545                     }
546                 }
547 
548                 byte[] bytes = baos.toByteArray();
549                 transporter.put( new PutTask( path ).setDataBytes( bytes ).setListener( listener ) );
550                 uploadChecksums( file, bytes, path );
551             }
552             else
553             {
554                 transporter.put( new PutTask( path ).setDataFile( file ).setListener( listener ) );
555                 uploadChecksums( file, null , path );
556             }
557         }
558 
559         /**
560          * 
561          * @param file source
562          * @param bytes transformed data from file or {@code null}
563          * @param location target
564          */
565         private void uploadChecksums( File file, byte[] bytes, URI location )
566         {
567             if ( checksums.isEmpty() )
568             {
569                 return;
570             }
571             try
572             {
573                 Set<String> algos = new HashSet<>();
574                 for ( RepositoryLayout.Checksum checksum : checksums )
575                 {
576                     algos.add( checksum.getAlgorithm() );
577                 }
578                 
579                 Map<String, Object> sumsByAlgo;
580                 if ( bytes != null )
581                 {
582                     sumsByAlgo = ChecksumUtils.calc( bytes, algos );
583                 }
584                 else
585                 {
586                     sumsByAlgo = ChecksumUtils.calc( file, algos );
587                 }
588 
589                 for ( RepositoryLayout.Checksum checksum : checksums )
590                 {
591                     uploadChecksum( checksum.getLocation(), sumsByAlgo.get( checksum.getAlgorithm() ) );
592                 }
593             }
594             catch ( IOException e )
595             {
596                 String msg = "Failed to upload checksums for " + file + ": " + e.getMessage();
597                 if ( LOGGER.isDebugEnabled() )
598                 {
599                     LOGGER.warn( msg, e );
600                 }
601                 else
602                 {
603                     LOGGER.warn( msg );
604                 }
605             }
606         }
607 
608         private void uploadChecksum( URI location, Object checksum )
609         {
610             try
611             {
612                 if ( checksum instanceof Exception )
613                 {
614                     throw (Exception) checksum;
615                 }
616                 transporter.put( new PutTask( location ).setDataString( (String) checksum ) );
617             }
618             catch ( Exception e )
619             {
620                 String msg = "Failed to upload checksum " + location + ": " + e.getMessage();
621                 if ( LOGGER.isDebugEnabled() )
622                 {
623                     LOGGER.warn( msg, e );
624                 }
625                 else
626                 {
627                     LOGGER.warn( msg );
628                 }
629             }
630         }
631 
632     }
633 
634     private static class DirectExecutor
635         implements Executor
636     {
637 
638         static final Executor INSTANCE = new DirectExecutor();
639 
640         public void execute( Runnable command )
641         {
642             command.run();
643         }
644 
645     }
646 
647 }