View Javadoc
1   package org.eclipse.aether.connector.basic;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   * 
12   *  http://www.apache.org/licenses/LICENSE-2.0
13   * 
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.io.File;
23  import java.io.IOException;
24  import java.net.URI;
25  import java.util.Collection;
26  import java.util.Collections;
27  import java.util.HashSet;
28  import java.util.List;
29  import java.util.Map;
30  import static java.util.Objects.requireNonNull;
31  import java.util.Set;
32  import java.util.concurrent.Executor;
33  import java.util.concurrent.ExecutorService;
34  import java.util.concurrent.LinkedBlockingQueue;
35  import java.util.concurrent.ThreadPoolExecutor;
36  import java.util.concurrent.TimeUnit;
37  
38  import org.eclipse.aether.ConfigurationProperties;
39  import org.eclipse.aether.RepositorySystemSession;
40  import org.eclipse.aether.RequestTrace;
41  import org.eclipse.aether.repository.RemoteRepository;
42  import org.eclipse.aether.spi.connector.ArtifactDownload;
43  import org.eclipse.aether.spi.connector.ArtifactUpload;
44  import org.eclipse.aether.spi.connector.MetadataDownload;
45  import org.eclipse.aether.spi.connector.MetadataUpload;
46  import org.eclipse.aether.spi.connector.RepositoryConnector;
47  import org.eclipse.aether.spi.connector.checksum.ChecksumPolicy;
48  import org.eclipse.aether.spi.connector.checksum.ChecksumPolicyProvider;
49  import org.eclipse.aether.spi.connector.layout.RepositoryLayout;
50  import org.eclipse.aether.spi.connector.layout.RepositoryLayoutProvider;
51  import org.eclipse.aether.spi.connector.transport.GetTask;
52  import org.eclipse.aether.spi.connector.transport.PeekTask;
53  import org.eclipse.aether.spi.connector.transport.PutTask;
54  import org.eclipse.aether.spi.connector.transport.Transporter;
55  import org.eclipse.aether.spi.connector.transport.TransporterProvider;
56  import org.eclipse.aether.spi.io.FileProcessor;
57  import org.eclipse.aether.spi.log.Logger;
58  import org.eclipse.aether.transfer.ChecksumFailureException;
59  import org.eclipse.aether.transfer.NoRepositoryConnectorException;
60  import org.eclipse.aether.transfer.NoRepositoryLayoutException;
61  import org.eclipse.aether.transfer.NoTransporterException;
62  import org.eclipse.aether.transfer.TransferEvent;
63  import org.eclipse.aether.transfer.TransferResource;
64  import org.eclipse.aether.util.ChecksumUtils;
65  import org.eclipse.aether.util.ConfigUtils;
66  import org.eclipse.aether.util.concurrency.RunnableErrorForwarder;
67  import org.eclipse.aether.util.concurrency.WorkerThreadFactory;
68  
69  /**
70   */
71  final class BasicRepositoryConnector
72      implements RepositoryConnector
73  {
74  
75      private static final String CONFIG_PROP_THREADS = "aether.connector.basic.threads";
76  
77      private static final String CONFIG_PROP_RESUME = "aether.connector.resumeDownloads";
78  
79      private static final String CONFIG_PROP_RESUME_THRESHOLD = "aether.connector.resumeThreshold";
80  
81      private static final String CONFIG_PROP_SMART_CHECKSUMS = "aether.connector.smartChecksums";
82  
83      private final Logger logger;
84  
85      private final FileProcessor fileProcessor;
86  
87      private final RemoteRepository repository;
88  
89      private final RepositorySystemSession session;
90  
91      private final Transporter transporter;
92  
93      private final RepositoryLayout layout;
94  
95      private final ChecksumPolicyProvider checksumPolicyProvider;
96  
97      private final PartialFile.Factory partialFileFactory;
98  
99      private final int maxThreads;
100 
101     private final boolean smartChecksums;
102 
103     private final boolean persistedChecksums;
104 
105     private Executor executor;
106 
107     private boolean closed;
108 
109     public BasicRepositoryConnector( RepositorySystemSession session, RemoteRepository repository,
110                                      TransporterProvider transporterProvider, RepositoryLayoutProvider layoutProvider,
111                                      ChecksumPolicyProvider checksumPolicyProvider, FileProcessor fileProcessor,
112                                      Logger logger )
113         throws NoRepositoryConnectorException
114     {
115         try
116         {
117             layout = layoutProvider.newRepositoryLayout( session, repository );
118         }
119         catch ( NoRepositoryLayoutException e )
120         {
121             throw new NoRepositoryConnectorException( repository, e.getMessage(), e );
122         }
123         try
124         {
125             transporter = transporterProvider.newTransporter( session, repository );
126         }
127         catch ( NoTransporterException e )
128         {
129             throw new NoRepositoryConnectorException( repository, e.getMessage(), e );
130         }
131         this.checksumPolicyProvider = checksumPolicyProvider;
132 
133         this.session = session;
134         this.repository = repository;
135         this.fileProcessor = fileProcessor;
136         this.logger = logger;
137 
138         maxThreads = ConfigUtils.getInteger( session, 5, CONFIG_PROP_THREADS, "maven.artifact.threads" );
139         smartChecksums = ConfigUtils.getBoolean( session, true, CONFIG_PROP_SMART_CHECKSUMS );
140         persistedChecksums =
141             ConfigUtils.getBoolean( session, ConfigurationProperties.DEFAULT_PERSISTED_CHECKSUMS,
142                                     ConfigurationProperties.PERSISTED_CHECKSUMS );
143 
144         boolean resumeDownloads =
145             ConfigUtils.getBoolean( session, true, CONFIG_PROP_RESUME + '.' + repository.getId(), CONFIG_PROP_RESUME );
146         long resumeThreshold =
147             ConfigUtils.getLong( session, 64 * 1024, CONFIG_PROP_RESUME_THRESHOLD + '.' + repository.getId(),
148                                  CONFIG_PROP_RESUME_THRESHOLD );
149         int requestTimeout =
150             ConfigUtils.getInteger( session, ConfigurationProperties.DEFAULT_REQUEST_TIMEOUT,
151                                     ConfigurationProperties.REQUEST_TIMEOUT + '.' + repository.getId(),
152                                     ConfigurationProperties.REQUEST_TIMEOUT );
153         partialFileFactory = new PartialFile.Factory( resumeDownloads, resumeThreshold, requestTimeout, logger );
154     }
155 
156     private Executor getExecutor( Collection<?> artifacts, Collection<?> metadatas )
157     {
158         if ( maxThreads <= 1 )
159         {
160             return DirectExecutor.INSTANCE;
161         }
162         int tasks = safe( artifacts ).size() + safe( metadatas ).size();
163         if ( tasks <= 1 )
164         {
165             return DirectExecutor.INSTANCE;
166         }
167         if ( executor == null )
168         {
169             executor =
170                 new ThreadPoolExecutor( maxThreads, maxThreads, 3L, TimeUnit.SECONDS,
171                                         new LinkedBlockingQueue<Runnable>(),
172                                         new WorkerThreadFactory( getClass().getSimpleName() + '-'
173                                             + repository.getHost() + '-' ) );
174         }
175         return executor;
176     }
177 
178     @Override
179     protected void finalize()
180         throws Throwable
181     {
182         try
183         {
184             close();
185         }
186         finally
187         {
188             super.finalize();
189         }
190     }
191 
192     public void close()
193     {
194         if ( !closed )
195         {
196             closed = true;
197             if ( executor instanceof ExecutorService )
198             {
199                 ( (ExecutorService) executor ).shutdown();
200             }
201             transporter.close();
202         }
203     }
204 
205     public void get( Collection<? extends ArtifactDownload> artifactDownloads,
206                      Collection<? extends MetadataDownload> metadataDownloads )
207     {
208         if ( closed )
209         {
210             throw new IllegalStateException( "connector closed" );
211         }
212 
213         Executor executor = getExecutor( artifactDownloads, metadataDownloads );
214         RunnableErrorForwarder errorForwarder = new RunnableErrorForwarder();
215 
216         for ( MetadataDownload transfer : safe( metadataDownloads ) )
217         {
218             URI location = layout.getLocation( transfer.getMetadata(), false );
219 
220             TransferResource resource = newTransferResource( location, transfer.getFile(), transfer.getTrace() );
221             TransferEvent.Builder builder = newEventBuilder( resource, false, false );
222             MetadataTransportListener listener = new MetadataTransportListener( transfer, repository, builder );
223 
224             ChecksumPolicy checksumPolicy = newChecksumPolicy( transfer.getChecksumPolicy(), resource );
225             List<RepositoryLayout.Checksum> checksums = null;
226             if ( checksumPolicy != null )
227             {
228                 checksums = layout.getChecksums( transfer.getMetadata(), false, location );
229             }
230 
231             Runnable task = new GetTaskRunner( location, transfer.getFile(), checksumPolicy, checksums, listener );
232             executor.execute( errorForwarder.wrap( task ) );
233         }
234 
235         for ( ArtifactDownload transfer : safe( artifactDownloads ) )
236         {
237             URI location = layout.getLocation( transfer.getArtifact(), false );
238 
239             TransferResource resource = newTransferResource( location, transfer.getFile(), transfer.getTrace() );
240             TransferEvent.Builder builder = newEventBuilder( resource, false, transfer.isExistenceCheck() );
241             ArtifactTransportListener listener = new ArtifactTransportListener( transfer, repository, builder );
242 
243             Runnable task;
244             if ( transfer.isExistenceCheck() )
245             {
246                 task = new PeekTaskRunner( location, listener );
247             }
248             else
249             {
250                 ChecksumPolicy checksumPolicy = newChecksumPolicy( transfer.getChecksumPolicy(), resource );
251                 List<RepositoryLayout.Checksum> checksums = null;
252                 if ( checksumPolicy != null )
253                 {
254                     checksums = layout.getChecksums( transfer.getArtifact(), false, location );
255                 }
256 
257                 task = new GetTaskRunner( location, transfer.getFile(), checksumPolicy, checksums, listener );
258             }
259             executor.execute( errorForwarder.wrap( task ) );
260         }
261 
262         errorForwarder.await();
263     }
264 
265     public void put( Collection<? extends ArtifactUpload> artifactUploads,
266                      Collection<? extends MetadataUpload> metadataUploads )
267     {
268         if ( closed )
269         {
270             throw new IllegalStateException( "connector closed" );
271         }
272 
273         for ( ArtifactUpload transfer : safe( artifactUploads ) )
274         {
275             URI location = layout.getLocation( transfer.getArtifact(), true );
276 
277             TransferResource resource = newTransferResource( location, transfer.getFile(), transfer.getTrace() );
278             TransferEvent.Builder builder = newEventBuilder( resource, true, false );
279             ArtifactTransportListener listener = new ArtifactTransportListener( transfer, repository, builder );
280 
281             List<RepositoryLayout.Checksum> checksums = layout.getChecksums( transfer.getArtifact(), true, location );
282 
283             Runnable task = new PutTaskRunner( location, transfer.getFile(), checksums, listener );
284             task.run();
285         }
286 
287         for ( MetadataUpload transfer : safe( metadataUploads ) )
288         {
289             URI location = layout.getLocation( transfer.getMetadata(), true );
290 
291             TransferResource resource = newTransferResource( location, transfer.getFile(), transfer.getTrace() );
292             TransferEvent.Builder builder = newEventBuilder( resource, true, false );
293             MetadataTransportListener listener = new MetadataTransportListener( transfer, repository, builder );
294 
295             List<RepositoryLayout.Checksum> checksums = layout.getChecksums( transfer.getMetadata(), true, location );
296 
297             Runnable task = new PutTaskRunner( location, transfer.getFile(), checksums, listener );
298             task.run();
299         }
300     }
301 
302     private static <T> Collection<T> safe( Collection<T> items )
303     {
304         return ( items != null ) ? items : Collections.<T>emptyList();
305     }
306 
307     private TransferResource newTransferResource( URI path, File file, RequestTrace trace )
308     {
309         return new TransferResource( repository.getId(), repository.getUrl(), path.toString(), file, trace );
310     }
311 
312     private TransferEvent.Builder newEventBuilder( TransferResource resource, boolean upload, boolean peek )
313     {
314         TransferEvent.Builder builder = new TransferEvent.Builder( session, resource );
315         if ( upload )
316         {
317             builder.setRequestType( TransferEvent.RequestType.PUT );
318         }
319         else if ( !peek )
320         {
321             builder.setRequestType( TransferEvent.RequestType.GET );
322         }
323         else
324         {
325             builder.setRequestType( TransferEvent.RequestType.GET_EXISTENCE );
326         }
327         return builder;
328     }
329 
330     private ChecksumPolicy newChecksumPolicy( String policy, TransferResource resource )
331     {
332         return checksumPolicyProvider.newChecksumPolicy( session, repository, resource, policy );
333     }
334 
335     @Override
336     public String toString()
337     {
338         return String.valueOf( repository );
339     }
340 
341     abstract class TaskRunner
342         implements Runnable
343     {
344 
345         protected final URI path;
346 
347         protected final TransferTransportListener<?> listener;
348 
349         public TaskRunner( URI path, TransferTransportListener<?> listener )
350         {
351             this.path = path;
352             this.listener = listener;
353         }
354 
355         public void run()
356         {
357             try
358             {
359                 listener.transferInitiated();
360                 runTask();
361                 listener.transferSucceeded();
362             }
363             catch ( Exception e )
364             {
365                 listener.transferFailed( e, transporter.classify( e ) );
366             }
367         }
368 
369         protected abstract void runTask()
370             throws Exception;
371 
372     }
373 
374     class PeekTaskRunner
375         extends TaskRunner
376     {
377 
378         public PeekTaskRunner( URI path, TransferTransportListener<?> listener )
379         {
380             super( path, listener );
381         }
382 
383         protected void runTask()
384             throws Exception
385         {
386             transporter.peek( new PeekTask( path ) );
387         }
388 
389     }
390 
391     class GetTaskRunner
392         extends TaskRunner
393         implements PartialFile.RemoteAccessChecker, ChecksumValidator.ChecksumFetcher
394     {
395 
396         private final File file;
397 
398         private final ChecksumValidator checksumValidator;
399 
400         public GetTaskRunner( URI path, File file, ChecksumPolicy checksumPolicy,
401                               List<RepositoryLayout.Checksum> checksums, TransferTransportListener<?> listener )
402         {
403             super( path, listener );
404             this.file = requireNonNull( file, "destination file cannot be null" );
405             checksumValidator =
406                 new ChecksumValidator( logger, file, fileProcessor, this, checksumPolicy, safe( checksums ) );
407         }
408 
409         public void checkRemoteAccess()
410             throws Exception
411         {
412             transporter.peek( new PeekTask( path ) );
413         }
414 
415         public boolean fetchChecksum( URI remote, File local )
416             throws Exception
417         {
418             try
419             {
420                 transporter.get( new GetTask( remote ).setDataFile( local ) );
421             }
422             catch ( Exception e )
423             {
424                 if ( transporter.classify( e ) == Transporter.ERROR_NOT_FOUND )
425                 {
426                     return false;
427                 }
428                 throw e;
429             }
430             return true;
431         }
432 
433         protected void runTask()
434             throws Exception
435         {
436             fileProcessor.mkdirs( file.getParentFile() );
437 
438             PartialFile partFile = partialFileFactory.newInstance( file, this );
439             if ( partFile == null )
440             {
441                 logger.debug( "Concurrent download of " + file + " just finished, skipping download" );
442                 return;
443             }
444 
445             try
446             {
447                 File tmp = partFile.getFile();
448                 listener.setChecksumCalculator( checksumValidator.newChecksumCalculator( tmp ) );
449                 for ( int firstTrial = 0, lastTrial = 1, trial = firstTrial;; trial++ )
450                 {
451                     boolean resume = partFile.isResume() && trial <= firstTrial;
452                     GetTask task = new GetTask( path ).setDataFile( tmp, resume ).setListener( listener );
453                     transporter.get( task );
454                     try
455                     {
456                         checksumValidator.validate( listener.getChecksums(), smartChecksums ? task.getChecksums()
457                                         : null );
458                         break;
459                     }
460                     catch ( ChecksumFailureException e )
461                     {
462                         boolean retry = trial < lastTrial && e.isRetryWorthy();
463                         if ( !retry && !checksumValidator.handle( e ) )
464                         {
465                             throw e;
466                         }
467                         listener.transferCorrupted( e );
468                         if ( retry )
469                         {
470                             checksumValidator.retry();
471                         }
472                         else
473                         {
474                             break;
475                         }
476                     }
477                 }
478                 fileProcessor.move( tmp, file );
479                 if ( persistedChecksums )
480                 {
481                     checksumValidator.commit();
482                 }
483             }
484             finally
485             {
486                 partFile.close();
487                 checksumValidator.close();
488             }
489         }
490 
491     }
492 
493     class PutTaskRunner
494         extends TaskRunner
495     {
496 
497         private final File file;
498 
499         private final Collection<RepositoryLayout.Checksum> checksums;
500 
501         public PutTaskRunner( URI path, File file, List<RepositoryLayout.Checksum> checksums,
502                               TransferTransportListener<?> listener )
503         {
504             super( path, listener );
505             this.file = requireNonNull( file, "source file cannot be null" );
506             this.checksums = safe( checksums );
507         }
508 
509         protected void runTask()
510             throws Exception
511         {
512             transporter.put( new PutTask( path ).setDataFile( file ).setListener( listener ) );
513             uploadChecksums( file, path );
514         }
515 
516         private void uploadChecksums( File file, URI location )
517         {
518             if ( checksums.isEmpty() )
519             {
520                 return;
521             }
522             try
523             {
524                 Set<String> algos = new HashSet<String>();
525                 for ( RepositoryLayout.Checksum checksum : checksums )
526                 {
527                     algos.add( checksum.getAlgorithm() );
528                 }
529                 Map<String, Object> sumsByAlgo = ChecksumUtils.calc( file, algos );
530                 for ( RepositoryLayout.Checksum checksum : checksums )
531                 {
532                     uploadChecksum( checksum.getLocation(), sumsByAlgo.get( checksum.getAlgorithm() ) );
533                 }
534             }
535             catch ( IOException e )
536             {
537                 String msg = "Failed to upload checksums for " + file + ": " + e.getMessage();
538                 if ( logger.isDebugEnabled() )
539                 {
540                     logger.warn( msg, e );
541                 }
542                 else
543                 {
544                     logger.warn( msg );
545                 }
546             }
547         }
548 
549         private void uploadChecksum( URI location, Object checksum )
550         {
551             try
552             {
553                 if ( checksum instanceof Exception )
554                 {
555                     throw (Exception) checksum;
556                 }
557                 transporter.put( new PutTask( location ).setDataString( (String) checksum ) );
558             }
559             catch ( Exception e )
560             {
561                 String msg = "Failed to upload checksum " + location + ": " + e.getMessage();
562                 if ( logger.isDebugEnabled() )
563                 {
564                     logger.warn( msg, e );
565                 }
566                 else
567                 {
568                     logger.warn( msg );
569                 }
570             }
571         }
572 
573     }
574 
575     private static class DirectExecutor
576         implements Executor
577     {
578 
579         static final Executor INSTANCE = new DirectExecutor();
580 
581         public void execute( Runnable command )
582         {
583             command.run();
584         }
585 
586     }
587 
588 }