View Javadoc
1   package org.eclipse.aether.connector.basic;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   * 
12   *  http://www.apache.org/licenses/LICENSE-2.0
13   * 
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.io.File;
23  import java.io.IOException;
24  import java.net.URI;
25  import java.util.Collection;
26  import java.util.Collections;
27  import java.util.HashSet;
28  import java.util.List;
29  import java.util.Map;
30  import java.util.Set;
31  import java.util.concurrent.Executor;
32  import java.util.concurrent.ExecutorService;
33  import java.util.concurrent.LinkedBlockingQueue;
34  import java.util.concurrent.ThreadPoolExecutor;
35  import java.util.concurrent.TimeUnit;
36  
37  import org.eclipse.aether.ConfigurationProperties;
38  import org.eclipse.aether.RepositorySystemSession;
39  import org.eclipse.aether.RequestTrace;
40  import org.eclipse.aether.repository.RemoteRepository;
41  import org.eclipse.aether.spi.connector.ArtifactDownload;
42  import org.eclipse.aether.spi.connector.ArtifactUpload;
43  import org.eclipse.aether.spi.connector.MetadataDownload;
44  import org.eclipse.aether.spi.connector.MetadataUpload;
45  import org.eclipse.aether.spi.connector.RepositoryConnector;
46  import org.eclipse.aether.spi.connector.checksum.ChecksumPolicy;
47  import org.eclipse.aether.spi.connector.checksum.ChecksumPolicyProvider;
48  import org.eclipse.aether.spi.connector.layout.RepositoryLayout;
49  import org.eclipse.aether.spi.connector.layout.RepositoryLayoutProvider;
50  import org.eclipse.aether.spi.connector.transport.GetTask;
51  import org.eclipse.aether.spi.connector.transport.PeekTask;
52  import org.eclipse.aether.spi.connector.transport.PutTask;
53  import org.eclipse.aether.spi.connector.transport.Transporter;
54  import org.eclipse.aether.spi.connector.transport.TransporterProvider;
55  import org.eclipse.aether.spi.io.FileProcessor;
56  import org.eclipse.aether.spi.log.Logger;
57  import org.eclipse.aether.transfer.ChecksumFailureException;
58  import org.eclipse.aether.transfer.NoRepositoryConnectorException;
59  import org.eclipse.aether.transfer.NoRepositoryLayoutException;
60  import org.eclipse.aether.transfer.NoTransporterException;
61  import org.eclipse.aether.transfer.TransferEvent;
62  import org.eclipse.aether.transfer.TransferResource;
63  import org.eclipse.aether.util.ChecksumUtils;
64  import org.eclipse.aether.util.ConfigUtils;
65  import org.eclipse.aether.util.concurrency.RunnableErrorForwarder;
66  import org.eclipse.aether.util.concurrency.WorkerThreadFactory;
67  
68  /**
69   */
70  final class BasicRepositoryConnector
71      implements RepositoryConnector
72  {
73  
74      private static final String CONFIG_PROP_THREADS = "aether.connector.basic.threads";
75  
76      private static final String CONFIG_PROP_RESUME = "aether.connector.resumeDownloads";
77  
78      private static final String CONFIG_PROP_RESUME_THRESHOLD = "aether.connector.resumeThreshold";
79  
80      private static final String CONFIG_PROP_SMART_CHECKSUMS = "aether.connector.smartChecksums";
81  
82      private final Logger logger;
83  
84      private final FileProcessor fileProcessor;
85  
86      private final RemoteRepository repository;
87  
88      private final RepositorySystemSession session;
89  
90      private final Transporter transporter;
91  
92      private final RepositoryLayout layout;
93  
94      private final ChecksumPolicyProvider checksumPolicyProvider;
95  
96      private final PartialFile.Factory partialFileFactory;
97  
98      private final int maxThreads;
99  
100     private final boolean smartChecksums;
101 
102     private final boolean persistedChecksums;
103 
104     private Executor executor;
105 
106     private boolean closed;
107 
108     public BasicRepositoryConnector( RepositorySystemSession session, RemoteRepository repository,
109                                      TransporterProvider transporterProvider, RepositoryLayoutProvider layoutProvider,
110                                      ChecksumPolicyProvider checksumPolicyProvider, FileProcessor fileProcessor,
111                                      Logger logger )
112         throws NoRepositoryConnectorException
113     {
114         try
115         {
116             layout = layoutProvider.newRepositoryLayout( session, repository );
117         }
118         catch ( NoRepositoryLayoutException e )
119         {
120             throw new NoRepositoryConnectorException( repository, e.getMessage(), e );
121         }
122         try
123         {
124             transporter = transporterProvider.newTransporter( session, repository );
125         }
126         catch ( NoTransporterException e )
127         {
128             throw new NoRepositoryConnectorException( repository, e.getMessage(), e );
129         }
130         this.checksumPolicyProvider = checksumPolicyProvider;
131 
132         this.session = session;
133         this.repository = repository;
134         this.fileProcessor = fileProcessor;
135         this.logger = logger;
136 
137         maxThreads = ConfigUtils.getInteger( session, 5, CONFIG_PROP_THREADS, "maven.artifact.threads" );
138         smartChecksums = ConfigUtils.getBoolean( session, true, CONFIG_PROP_SMART_CHECKSUMS );
139         persistedChecksums =
140             ConfigUtils.getBoolean( session, ConfigurationProperties.DEFAULT_PERSISTED_CHECKSUMS,
141                                     ConfigurationProperties.PERSISTED_CHECKSUMS );
142 
143         boolean resumeDownloads =
144             ConfigUtils.getBoolean( session, true, CONFIG_PROP_RESUME + '.' + repository.getId(), CONFIG_PROP_RESUME );
145         long resumeThreshold =
146             ConfigUtils.getLong( session, 64 * 1024, CONFIG_PROP_RESUME_THRESHOLD + '.' + repository.getId(),
147                                  CONFIG_PROP_RESUME_THRESHOLD );
148         int requestTimeout =
149             ConfigUtils.getInteger( session, ConfigurationProperties.DEFAULT_REQUEST_TIMEOUT,
150                                     ConfigurationProperties.REQUEST_TIMEOUT + '.' + repository.getId(),
151                                     ConfigurationProperties.REQUEST_TIMEOUT );
152         partialFileFactory = new PartialFile.Factory( resumeDownloads, resumeThreshold, requestTimeout, logger );
153     }
154 
155     private Executor getExecutor( Collection<?> artifacts, Collection<?> metadatas )
156     {
157         if ( maxThreads <= 1 )
158         {
159             return DirectExecutor.INSTANCE;
160         }
161         int tasks = safe( artifacts ).size() + safe( metadatas ).size();
162         if ( tasks <= 1 )
163         {
164             return DirectExecutor.INSTANCE;
165         }
166         if ( executor == null )
167         {
168             executor =
169                 new ThreadPoolExecutor( maxThreads, maxThreads, 3, TimeUnit.SECONDS,
170                                         new LinkedBlockingQueue<Runnable>(),
171                                         new WorkerThreadFactory( getClass().getSimpleName() + '-'
172                                             + repository.getHost() + '-' ) );
173         }
174         return executor;
175     }
176 
177     @Override
178     protected void finalize()
179         throws Throwable
180     {
181         try
182         {
183             close();
184         }
185         finally
186         {
187             super.finalize();
188         }
189     }
190 
191     public void close()
192     {
193         if ( !closed )
194         {
195             closed = true;
196             if ( executor instanceof ExecutorService )
197             {
198                 ( (ExecutorService) executor ).shutdown();
199             }
200             transporter.close();
201         }
202     }
203 
204     public void get( Collection<? extends ArtifactDownload> artifactDownloads,
205                      Collection<? extends MetadataDownload> metadataDownloads )
206     {
207         if ( closed )
208         {
209             throw new IllegalStateException( "connector closed" );
210         }
211 
212         Executor executor = getExecutor( artifactDownloads, metadataDownloads );
213         RunnableErrorForwarder errorForwarder = new RunnableErrorForwarder();
214 
215         for ( MetadataDownload transfer : safe( metadataDownloads ) )
216         {
217             URI location = layout.getLocation( transfer.getMetadata(), false );
218 
219             TransferResource resource = newTransferResource( location, transfer.getFile(), transfer.getTrace() );
220             TransferEvent.Builder builder = newEventBuilder( resource, false, false );
221             MetadataTransportListener listener = new MetadataTransportListener( transfer, repository, builder );
222 
223             ChecksumPolicy checksumPolicy = newChecksumPolicy( transfer.getChecksumPolicy(), resource );
224             List<RepositoryLayout.Checksum> checksums = null;
225             if ( checksumPolicy != null )
226             {
227                 checksums = layout.getChecksums( transfer.getMetadata(), false, location );
228             }
229 
230             Runnable task = new GetTaskRunner( location, transfer.getFile(), checksumPolicy, checksums, listener );
231             executor.execute( errorForwarder.wrap( task ) );
232         }
233 
234         for ( ArtifactDownload transfer : safe( artifactDownloads ) )
235         {
236             URI location = layout.getLocation( transfer.getArtifact(), false );
237 
238             TransferResource resource = newTransferResource( location, transfer.getFile(), transfer.getTrace() );
239             TransferEvent.Builder builder = newEventBuilder( resource, false, transfer.isExistenceCheck() );
240             ArtifactTransportListener listener = new ArtifactTransportListener( transfer, repository, builder );
241 
242             Runnable task;
243             if ( transfer.isExistenceCheck() )
244             {
245                 task = new PeekTaskRunner( location, listener );
246             }
247             else
248             {
249                 ChecksumPolicy checksumPolicy = newChecksumPolicy( transfer.getChecksumPolicy(), resource );
250                 List<RepositoryLayout.Checksum> checksums = null;
251                 if ( checksumPolicy != null )
252                 {
253                     checksums = layout.getChecksums( transfer.getArtifact(), false, location );
254                 }
255 
256                 task = new GetTaskRunner( location, transfer.getFile(), checksumPolicy, checksums, listener );
257             }
258             executor.execute( errorForwarder.wrap( task ) );
259         }
260 
261         errorForwarder.await();
262     }
263 
264     public void put( Collection<? extends ArtifactUpload> artifactUploads,
265                      Collection<? extends MetadataUpload> metadataUploads )
266     {
267         if ( closed )
268         {
269             throw new IllegalStateException( "connector closed" );
270         }
271 
272         for ( ArtifactUpload transfer : safe( artifactUploads ) )
273         {
274             URI location = layout.getLocation( transfer.getArtifact(), true );
275 
276             TransferResource resource = newTransferResource( location, transfer.getFile(), transfer.getTrace() );
277             TransferEvent.Builder builder = newEventBuilder( resource, true, false );
278             ArtifactTransportListener listener = new ArtifactTransportListener( transfer, repository, builder );
279 
280             List<RepositoryLayout.Checksum> checksums = layout.getChecksums( transfer.getArtifact(), true, location );
281 
282             Runnable task = new PutTaskRunner( location, transfer.getFile(), checksums, listener );
283             task.run();
284         }
285 
286         for ( MetadataUpload transfer : safe( metadataUploads ) )
287         {
288             URI location = layout.getLocation( transfer.getMetadata(), true );
289 
290             TransferResource resource = newTransferResource( location, transfer.getFile(), transfer.getTrace() );
291             TransferEvent.Builder builder = newEventBuilder( resource, true, false );
292             MetadataTransportListener listener = new MetadataTransportListener( transfer, repository, builder );
293 
294             List<RepositoryLayout.Checksum> checksums = layout.getChecksums( transfer.getMetadata(), true, location );
295 
296             Runnable task = new PutTaskRunner( location, transfer.getFile(), checksums, listener );
297             task.run();
298         }
299     }
300 
301     private static <T> Collection<T> safe( Collection<T> items )
302     {
303         return ( items != null ) ? items : Collections.<T>emptyList();
304     }
305 
306     private TransferResource newTransferResource( URI path, File file, RequestTrace trace )
307     {
308         return new TransferResource( repository.getUrl(), path.toString(), file, trace );
309     }
310 
311     private TransferEvent.Builder newEventBuilder( TransferResource resource, boolean upload, boolean peek )
312     {
313         TransferEvent.Builder builder = new TransferEvent.Builder( session, resource );
314         if ( upload )
315         {
316             builder.setRequestType( TransferEvent.RequestType.PUT );
317         }
318         else if ( !peek )
319         {
320             builder.setRequestType( TransferEvent.RequestType.GET );
321         }
322         else
323         {
324             builder.setRequestType( TransferEvent.RequestType.GET_EXISTENCE );
325         }
326         return builder;
327     }
328 
329     private ChecksumPolicy newChecksumPolicy( String policy, TransferResource resource )
330     {
331         return checksumPolicyProvider.newChecksumPolicy( session, repository, resource, policy );
332     }
333 
334     @Override
335     public String toString()
336     {
337         return String.valueOf( repository );
338     }
339 
340     abstract class TaskRunner
341         implements Runnable
342     {
343 
344         protected final URI path;
345 
346         protected final TransferTransportListener<?> listener;
347 
348         public TaskRunner( URI path, TransferTransportListener<?> listener )
349         {
350             this.path = path;
351             this.listener = listener;
352         }
353 
354         public void run()
355         {
356             try
357             {
358                 listener.transferInitiated();
359                 runTask();
360                 listener.transferSucceeded();
361             }
362             catch ( Exception e )
363             {
364                 listener.transferFailed( e, transporter.classify( e ) );
365             }
366         }
367 
368         protected abstract void runTask()
369             throws Exception;
370 
371     }
372 
373     class PeekTaskRunner
374         extends TaskRunner
375     {
376 
377         public PeekTaskRunner( URI path, TransferTransportListener<?> listener )
378         {
379             super( path, listener );
380         }
381 
382         protected void runTask()
383             throws Exception
384         {
385             transporter.peek( new PeekTask( path ) );
386         }
387 
388     }
389 
390     class GetTaskRunner
391         extends TaskRunner
392         implements PartialFile.RemoteAccessChecker, ChecksumValidator.ChecksumFetcher
393     {
394 
395         private final File file;
396 
397         private final ChecksumValidator checksumValidator;
398 
399         public GetTaskRunner( URI path, File file, ChecksumPolicy checksumPolicy,
400                               List<RepositoryLayout.Checksum> checksums, TransferTransportListener<?> listener )
401         {
402             super( path, listener );
403             this.file = file;
404             checksumValidator =
405                 new ChecksumValidator( logger, file, fileProcessor, this, checksumPolicy, safe( checksums ) );
406         }
407 
408         public void checkRemoteAccess()
409             throws Exception
410         {
411             transporter.peek( new PeekTask( path ) );
412         }
413 
414         public boolean fetchChecksum( URI remote, File local )
415             throws Exception
416         {
417             try
418             {
419                 transporter.get( new GetTask( remote ).setDataFile( local ) );
420             }
421             catch ( Exception e )
422             {
423                 if ( transporter.classify( e ) == Transporter.ERROR_NOT_FOUND )
424                 {
425                     return false;
426                 }
427                 throw e;
428             }
429             return true;
430         }
431 
432         protected void runTask()
433             throws Exception
434         {
435             if ( file == null )
436             {
437                 throw new IllegalArgumentException( "destination file has not been specified" );
438             }
439             fileProcessor.mkdirs( file.getParentFile() );
440 
441             PartialFile partFile = partialFileFactory.newInstance( file, this );
442             if ( partFile == null )
443             {
444                 logger.debug( "Concurrent download of " + file + " just finished, skipping download" );
445                 return;
446             }
447 
448             try
449             {
450                 File tmp = partFile.getFile();
451                 listener.setChecksumCalculator( checksumValidator.newChecksumCalculator( tmp ) );
452                 for ( int firstTrial = 0, lastTrial = 1, trial = firstTrial;; trial++ )
453                 {
454                     boolean resume = partFile.isResume() && trial <= firstTrial;
455                     GetTask task = new GetTask( path ).setDataFile( tmp, resume ).setListener( listener );
456                     transporter.get( task );
457                     try
458                     {
459                         checksumValidator.validate( listener.getChecksums(), smartChecksums ? task.getChecksums()
460                                         : null );
461                         break;
462                     }
463                     catch ( ChecksumFailureException e )
464                     {
465                         boolean retry = trial < lastTrial && e.isRetryWorthy();
466                         if ( !retry && !checksumValidator.handle( e ) )
467                         {
468                             throw e;
469                         }
470                         listener.transferCorrupted( e );
471                         if ( retry )
472                         {
473                             checksumValidator.retry();
474                         }
475                         else
476                         {
477                             break;
478                         }
479                     }
480                 }
481                 fileProcessor.move( tmp, file );
482                 if ( persistedChecksums )
483                 {
484                     checksumValidator.commit();
485                 }
486             }
487             finally
488             {
489                 partFile.close();
490                 checksumValidator.close();
491             }
492         }
493 
494     }
495 
496     class PutTaskRunner
497         extends TaskRunner
498     {
499 
500         private final File file;
501 
502         private final Collection<RepositoryLayout.Checksum> checksums;
503 
504         public PutTaskRunner( URI path, File file, List<RepositoryLayout.Checksum> checksums,
505                               TransferTransportListener<?> listener )
506         {
507             super( path, listener );
508             this.file = file;
509             this.checksums = safe( checksums );
510         }
511 
512         protected void runTask()
513             throws Exception
514         {
515             if ( file == null )
516             {
517                 throw new IllegalArgumentException( "source file has not been specified" );
518             }
519             transporter.put( new PutTask( path ).setDataFile( file ).setListener( listener ) );
520             uploadChecksums( file, path );
521         }
522 
523         private void uploadChecksums( File file, URI location )
524         {
525             if ( checksums.isEmpty() )
526             {
527                 return;
528             }
529             try
530             {
531                 Set<String> algos = new HashSet<String>();
532                 for ( RepositoryLayout.Checksum checksum : checksums )
533                 {
534                     algos.add( checksum.getAlgorithm() );
535                 }
536                 Map<String, Object> sumsByAlgo = ChecksumUtils.calc( file, algos );
537                 for ( RepositoryLayout.Checksum checksum : checksums )
538                 {
539                     uploadChecksum( checksum.getLocation(), sumsByAlgo.get( checksum.getAlgorithm() ) );
540                 }
541             }
542             catch ( IOException e )
543             {
544                 String msg = "Failed to upload checksums for " + file + ": " + e.getMessage();
545                 if ( logger.isDebugEnabled() )
546                 {
547                     logger.warn( msg, e );
548                 }
549                 else
550                 {
551                     logger.warn( msg );
552                 }
553             }
554         }
555 
556         private void uploadChecksum( URI location, Object checksum )
557         {
558             try
559             {
560                 if ( checksum instanceof Exception )
561                 {
562                     throw (Exception) checksum;
563                 }
564                 transporter.put( new PutTask( location ).setDataString( (String) checksum ) );
565             }
566             catch ( Exception e )
567             {
568                 String msg = "Failed to upload checksum " + location + ": " + e.getMessage();
569                 if ( logger.isDebugEnabled() )
570                 {
571                     logger.warn( msg, e );
572                 }
573                 else
574                 {
575                     logger.warn( msg );
576                 }
577             }
578         }
579 
580     }
581 
582     private static class DirectExecutor
583         implements Executor
584     {
585 
586         static final Executor INSTANCE = new DirectExecutor();
587 
588         public void execute( Runnable command )
589         {
590             command.run();
591         }
592 
593     }
594 
595 }