View Javadoc
1   package org.eclipse.aether.connector.basic;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *  http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import static java.util.Objects.requireNonNull;
23  
24  import java.io.ByteArrayOutputStream;
25  import java.io.File;
26  import java.io.IOException;
27  import java.io.InputStream;
28  import java.net.URI;
29  import java.util.Collection;
30  import java.util.Collections;
31  import java.util.HashSet;
32  import java.util.List;
33  import java.util.Map;
34  import java.util.Set;
35  import java.util.concurrent.Executor;
36  import java.util.concurrent.ExecutorService;
37  import java.util.concurrent.LinkedBlockingQueue;
38  import java.util.concurrent.ThreadPoolExecutor;
39  import java.util.concurrent.TimeUnit;
40  
41  import org.eclipse.aether.ConfigurationProperties;
42  import org.eclipse.aether.RepositorySystemSession;
43  import org.eclipse.aether.RequestTrace;
44  import org.eclipse.aether.repository.RemoteRepository;
45  import org.eclipse.aether.spi.connector.ArtifactDownload;
46  import org.eclipse.aether.spi.connector.ArtifactUpload;
47  import org.eclipse.aether.spi.connector.MetadataDownload;
48  import org.eclipse.aether.spi.connector.MetadataUpload;
49  import org.eclipse.aether.spi.connector.RepositoryConnector;
50  import org.eclipse.aether.spi.connector.checksum.ChecksumPolicy;
51  import org.eclipse.aether.spi.connector.checksum.ChecksumPolicyProvider;
52  import org.eclipse.aether.spi.connector.layout.RepositoryLayout;
53  import org.eclipse.aether.spi.connector.layout.RepositoryLayoutProvider;
54  import org.eclipse.aether.spi.connector.transport.GetTask;
55  import org.eclipse.aether.spi.connector.transport.PeekTask;
56  import org.eclipse.aether.spi.connector.transport.PutTask;
57  import org.eclipse.aether.spi.connector.transport.Transporter;
58  import org.eclipse.aether.spi.connector.transport.TransporterProvider;
59  import org.eclipse.aether.spi.io.FileProcessor;
60  import org.eclipse.aether.transfer.ChecksumFailureException;
61  import org.eclipse.aether.transfer.NoRepositoryConnectorException;
62  import org.eclipse.aether.transfer.NoRepositoryLayoutException;
63  import org.eclipse.aether.transfer.NoTransporterException;
64  import org.eclipse.aether.transfer.TransferEvent;
65  import org.eclipse.aether.transfer.TransferResource;
66  import org.eclipse.aether.transform.FileTransformer;
67  import org.eclipse.aether.util.ChecksumUtils;
68  import org.eclipse.aether.util.ConfigUtils;
69  import org.eclipse.aether.util.concurrency.RunnableErrorForwarder;
70  import org.eclipse.aether.util.concurrency.WorkerThreadFactory;
71  import org.slf4j.Logger;
72  import org.slf4j.LoggerFactory;
73  
74  /**
75   */
76  final class BasicRepositoryConnector
77      implements RepositoryConnector
78  {
79  
80      private static final String CONFIG_PROP_THREADS = "aether.connector.basic.threads";
81  
82      private static final String CONFIG_PROP_RESUME = "aether.connector.resumeDownloads";
83  
84      private static final String CONFIG_PROP_RESUME_THRESHOLD = "aether.connector.resumeThreshold";
85  
86      private static final String CONFIG_PROP_SMART_CHECKSUMS = "aether.connector.smartChecksums";
87  
88      private static final Logger LOGGER = LoggerFactory.getLogger( BasicRepositoryConnector.class );
89  
90      private final FileProcessor fileProcessor;
91  
92      private final RemoteRepository repository;
93  
94      private final RepositorySystemSession session;
95  
96      private final Transporter transporter;
97  
98      private final RepositoryLayout layout;
99  
100     private final ChecksumPolicyProvider checksumPolicyProvider;
101 
102     private final PartialFile.Factory partialFileFactory;
103 
104     private final int maxThreads;
105 
106     private final boolean smartChecksums;
107 
108     private final boolean persistedChecksums;
109 
110     private Executor executor;
111 
112     private boolean closed;
113 
114     BasicRepositoryConnector( RepositorySystemSession session, RemoteRepository repository,
115                                      TransporterProvider transporterProvider, RepositoryLayoutProvider layoutProvider,
116                                      ChecksumPolicyProvider checksumPolicyProvider, FileProcessor fileProcessor )
117         throws NoRepositoryConnectorException
118     {
119         try
120         {
121             layout = layoutProvider.newRepositoryLayout( session, repository );
122         }
123         catch ( NoRepositoryLayoutException e )
124         {
125             throw new NoRepositoryConnectorException( repository, e.getMessage(), e );
126         }
127         try
128         {
129             transporter = transporterProvider.newTransporter( session, repository );
130         }
131         catch ( NoTransporterException e )
132         {
133             throw new NoRepositoryConnectorException( repository, e.getMessage(), e );
134         }
135         this.checksumPolicyProvider = checksumPolicyProvider;
136 
137         this.session = session;
138         this.repository = repository;
139         this.fileProcessor = fileProcessor;
140 
141         maxThreads = ConfigUtils.getInteger( session, 5, CONFIG_PROP_THREADS, "maven.artifact.threads" );
142         smartChecksums = ConfigUtils.getBoolean( session, true, CONFIG_PROP_SMART_CHECKSUMS );
143         persistedChecksums =
144             ConfigUtils.getBoolean( session, ConfigurationProperties.DEFAULT_PERSISTED_CHECKSUMS,
145                                     ConfigurationProperties.PERSISTED_CHECKSUMS );
146 
147         boolean resumeDownloads =
148             ConfigUtils.getBoolean( session, true, CONFIG_PROP_RESUME + '.' + repository.getId(), CONFIG_PROP_RESUME );
149         long resumeThreshold =
150             ConfigUtils.getLong( session, 64 * 1024, CONFIG_PROP_RESUME_THRESHOLD + '.' + repository.getId(),
151                                  CONFIG_PROP_RESUME_THRESHOLD );
152         int requestTimeout =
153             ConfigUtils.getInteger( session, ConfigurationProperties.DEFAULT_REQUEST_TIMEOUT,
154                                     ConfigurationProperties.REQUEST_TIMEOUT + '.' + repository.getId(),
155                                     ConfigurationProperties.REQUEST_TIMEOUT );
156         partialFileFactory = new PartialFile.Factory( resumeDownloads, resumeThreshold, requestTimeout );
157     }
158 
159     private Executor getExecutor( Collection<?> artifacts, Collection<?> metadatas )
160     {
161         if ( maxThreads <= 1 )
162         {
163             return DirectExecutor.INSTANCE;
164         }
165         int tasks = safe( artifacts ).size() + safe( metadatas ).size();
166         if ( tasks <= 1 )
167         {
168             return DirectExecutor.INSTANCE;
169         }
170         if ( executor == null )
171         {
172             executor =
173                 new ThreadPoolExecutor( maxThreads, maxThreads, 3L, TimeUnit.SECONDS,
174                                         new LinkedBlockingQueue<Runnable>(),
175                                         new WorkerThreadFactory( getClass().getSimpleName() + '-'
176                                             + repository.getHost() + '-' ) );
177         }
178         return executor;
179     }
180 
181     @Override
182     protected void finalize()
183         throws Throwable
184     {
185         try
186         {
187             close();
188         }
189         finally
190         {
191             super.finalize();
192         }
193     }
194 
195     public void close()
196     {
197         if ( !closed )
198         {
199             closed = true;
200             if ( executor instanceof ExecutorService )
201             {
202                 ( (ExecutorService) executor ).shutdown();
203             }
204             transporter.close();
205         }
206     }
207 
208     public void get( Collection<? extends ArtifactDownload> artifactDownloads,
209                      Collection<? extends MetadataDownload> metadataDownloads )
210     {
211         if ( closed )
212         {
213             throw new IllegalStateException( "connector closed" );
214         }
215 
216         Executor executor = getExecutor( artifactDownloads, metadataDownloads );
217         RunnableErrorForwarder errorForwarder = new RunnableErrorForwarder();
218 
219         for ( MetadataDownload transfer : safe( metadataDownloads ) )
220         {
221             URI location = layout.getLocation( transfer.getMetadata(), false );
222 
223             TransferResource resource = newTransferResource( location, transfer.getFile(), transfer.getTrace() );
224             TransferEvent.Builder builder = newEventBuilder( resource, false, false );
225             MetadataTransportListener listener = new MetadataTransportListener( transfer, repository, builder );
226 
227             ChecksumPolicy checksumPolicy = newChecksumPolicy( transfer.getChecksumPolicy(), resource );
228             List<RepositoryLayout.Checksum> checksums = null;
229             if ( checksumPolicy != null )
230             {
231                 checksums = layout.getChecksums( transfer.getMetadata(), false, location );
232             }
233 
234             Runnable task = new GetTaskRunner( location, transfer.getFile(), checksumPolicy, checksums, listener );
235             executor.execute( errorForwarder.wrap( task ) );
236         }
237 
238         for ( ArtifactDownload transfer : safe( artifactDownloads ) )
239         {
240             URI location = layout.getLocation( transfer.getArtifact(), false );
241 
242             TransferResource resource = newTransferResource( location, transfer.getFile(), transfer.getTrace() );
243             TransferEvent.Builder builder = newEventBuilder( resource, false, transfer.isExistenceCheck() );
244             ArtifactTransportListener listener = new ArtifactTransportListener( transfer, repository, builder );
245 
246             Runnable task;
247             if ( transfer.isExistenceCheck() )
248             {
249                 task = new PeekTaskRunner( location, listener );
250             }
251             else
252             {
253                 ChecksumPolicy checksumPolicy = newChecksumPolicy( transfer.getChecksumPolicy(), resource );
254                 List<RepositoryLayout.Checksum> checksums = null;
255                 if ( checksumPolicy != null )
256                 {
257                     checksums = layout.getChecksums( transfer.getArtifact(), false, location );
258                 }
259 
260                 task = new GetTaskRunner( location, transfer.getFile(), checksumPolicy, checksums, listener );
261             }
262             executor.execute( errorForwarder.wrap( task ) );
263         }
264 
265         errorForwarder.await();
266     }
267 
268     public void put( Collection<? extends ArtifactUpload> artifactUploads,
269                      Collection<? extends MetadataUpload> metadataUploads )
270     {
271         if ( closed )
272         {
273             throw new IllegalStateException( "connector closed" );
274         }
275 
276         for ( ArtifactUpload transfer : safe( artifactUploads ) )
277         {
278             URI location = layout.getLocation( transfer.getArtifact(), true );
279 
280             TransferResource resource = newTransferResource( location, transfer.getFile(), transfer.getTrace() );
281             TransferEvent.Builder builder = newEventBuilder( resource, true, false );
282             ArtifactTransportListener listener = new ArtifactTransportListener( transfer, repository, builder );
283 
284             List<RepositoryLayout.Checksum> checksums = layout.getChecksums( transfer.getArtifact(), true, location );
285 
286             Runnable task = new PutTaskRunner( location, transfer.getFile(), transfer.getFileTransformer(), checksums,
287                     listener );
288             task.run();
289         }
290 
291         for ( MetadataUpload transfer : safe( metadataUploads ) )
292         {
293             URI location = layout.getLocation( transfer.getMetadata(), true );
294 
295             TransferResource resource = newTransferResource( location, transfer.getFile(), transfer.getTrace() );
296             TransferEvent.Builder builder = newEventBuilder( resource, true, false );
297             MetadataTransportListener listener = new MetadataTransportListener( transfer, repository, builder );
298 
299             List<RepositoryLayout.Checksum> checksums = layout.getChecksums( transfer.getMetadata(), true, location );
300 
301             Runnable task = new PutTaskRunner( location, transfer.getFile(), checksums, listener );
302             task.run();
303         }
304     }
305 
306     private static <T> Collection<T> safe( Collection<T> items )
307     {
308         return ( items != null ) ? items : Collections.<T>emptyList();
309     }
310 
311     private TransferResource newTransferResource( URI path, File file, RequestTrace trace )
312     {
313         return new TransferResource( repository.getId(), repository.getUrl(), path.toString(), file, trace );
314     }
315 
316     private TransferEvent.Builder newEventBuilder( TransferResource resource, boolean upload, boolean peek )
317     {
318         TransferEvent.Builder builder = new TransferEvent.Builder( session, resource );
319         if ( upload )
320         {
321             builder.setRequestType( TransferEvent.RequestType.PUT );
322         }
323         else if ( !peek )
324         {
325             builder.setRequestType( TransferEvent.RequestType.GET );
326         }
327         else
328         {
329             builder.setRequestType( TransferEvent.RequestType.GET_EXISTENCE );
330         }
331         return builder;
332     }
333 
334     private ChecksumPolicy newChecksumPolicy( String policy, TransferResource resource )
335     {
336         return checksumPolicyProvider.newChecksumPolicy( session, repository, resource, policy );
337     }
338 
339     @Override
340     public String toString()
341     {
342         return String.valueOf( repository );
343     }
344 
345     abstract class TaskRunner
346         implements Runnable
347     {
348 
349         protected final URI path;
350 
351         protected final TransferTransportListener<?> listener;
352 
353         TaskRunner( URI path, TransferTransportListener<?> listener )
354         {
355             this.path = path;
356             this.listener = listener;
357         }
358 
359         public void run()
360         {
361             try
362             {
363                 listener.transferInitiated();
364                 runTask();
365                 listener.transferSucceeded();
366             }
367             catch ( Exception e )
368             {
369                 listener.transferFailed( e, transporter.classify( e ) );
370             }
371         }
372 
373         protected abstract void runTask()
374             throws Exception;
375 
376     }
377 
378     class PeekTaskRunner
379         extends TaskRunner
380     {
381 
382         PeekTaskRunner( URI path, TransferTransportListener<?> listener )
383         {
384             super( path, listener );
385         }
386 
387         protected void runTask()
388             throws Exception
389         {
390             transporter.peek( new PeekTask( path ) );
391         }
392 
393     }
394 
395     class GetTaskRunner
396         extends TaskRunner
397         implements PartialFile.RemoteAccessChecker, ChecksumValidator.ChecksumFetcher
398     {
399 
400         private final File file;
401 
402         private final ChecksumValidator checksumValidator;
403 
404         GetTaskRunner( URI path, File file, ChecksumPolicy checksumPolicy,
405                               List<RepositoryLayout.Checksum> checksums, TransferTransportListener<?> listener )
406         {
407             super( path, listener );
408             this.file = requireNonNull( file, "destination file cannot be null" );
409             checksumValidator =
410                 new ChecksumValidator( file, fileProcessor, this, checksumPolicy, safe( checksums ) );
411         }
412 
413         public void checkRemoteAccess()
414             throws Exception
415         {
416             transporter.peek( new PeekTask( path ) );
417         }
418 
419         public boolean fetchChecksum( URI remote, File local )
420             throws Exception
421         {
422             try
423             {
424                 transporter.get( new GetTask( remote ).setDataFile( local ) );
425             }
426             catch ( Exception e )
427             {
428                 if ( transporter.classify( e ) == Transporter.ERROR_NOT_FOUND )
429                 {
430                     return false;
431                 }
432                 throw e;
433             }
434             return true;
435         }
436 
437         protected void runTask()
438             throws Exception
439         {
440             fileProcessor.mkdirs( file.getParentFile() );
441 
442             PartialFile partFile = partialFileFactory.newInstance( file, this );
443             if ( partFile == null )
444             {
445                 LOGGER.debug( "Concurrent download of {} just finished, skipping download", file );
446                 return;
447             }
448 
449             try
450             {
451                 File tmp = partFile.getFile();
452                 listener.setChecksumCalculator( checksumValidator.newChecksumCalculator( tmp ) );
453                 for ( int firstTrial = 0, lastTrial = 1, trial = firstTrial;; trial++ )
454                 {
455                     boolean resume = partFile.isResume() && trial <= firstTrial;
456                     GetTask task = new GetTask( path ).setDataFile( tmp, resume ).setListener( listener );
457                     transporter.get( task );
458                     try
459                     {
460                         checksumValidator.validate( listener.getChecksums(), smartChecksums ? task.getChecksums()
461                                         : null );
462                         break;
463                     }
464                     catch ( ChecksumFailureException e )
465                     {
466                         boolean retry = trial < lastTrial && e.isRetryWorthy();
467                         if ( !retry && !checksumValidator.handle( e ) )
468                         {
469                             throw e;
470                         }
471                         listener.transferCorrupted( e );
472                         if ( retry )
473                         {
474                             checksumValidator.retry();
475                         }
476                         else
477                         {
478                             break;
479                         }
480                     }
481                 }
482                 fileProcessor.move( tmp, file );
483                 if ( persistedChecksums )
484                 {
485                     checksumValidator.commit();
486                 }
487             }
488             finally
489             {
490                 partFile.close();
491                 checksumValidator.close();
492             }
493         }
494 
495     }
496 
497     class PutTaskRunner
498         extends TaskRunner
499     {
500 
501         private final File file;
502 
503         private final FileTransformer fileTransformer;
504 
505         private final Collection<RepositoryLayout.Checksum> checksums;
506 
507         PutTaskRunner( URI path, File file, List<RepositoryLayout.Checksum> checksums,
508                        TransferTransportListener<?> listener )
509         {
510             this( path, file, null, checksums, listener );
511         }
512 
513         /**
514          * <strong>IMPORTANT</strong> When using a fileTransformer, the content of the file is stored in memory to
515          * ensure that file content and checksums stay in sync!
516          *
517          * @param path
518          * @param file
519          * @param fileTransformer
520          * @param checksums
521          * @param listener
522          */
523         PutTaskRunner( URI path, File file, FileTransformer fileTransformer, List<RepositoryLayout.Checksum> checksums,
524                               TransferTransportListener<?> listener )
525         {
526             super( path, listener );
527             this.file = requireNonNull( file, "source file cannot be null" );
528             this.fileTransformer = fileTransformer;
529             this.checksums = safe( checksums );
530         }
531 
532         @SuppressWarnings( "checkstyle:innerassignment" )
533         protected void runTask()
534             throws Exception
535         {
536             if ( fileTransformer != null )
537             {
538                 // transform data once to byte array, ensure constant data for checksum
539                 ByteArrayOutputStream baos = new ByteArrayOutputStream();
540                 byte[] buffer = new byte[1024];
541 
542                 try ( InputStream transformData = fileTransformer.transformData( file ) )
543                 {
544                     for ( int read; ( read = transformData.read( buffer, 0, buffer.length ) ) != -1; )
545                     {
546                         baos.write( buffer, 0, read );
547                     }
548                 }
549 
550                 byte[] bytes = baos.toByteArray();
551                 transporter.put( new PutTask( path ).setDataBytes( bytes ).setListener( listener ) );
552                 uploadChecksums( file, bytes );
553             }
554             else
555             {
556                 transporter.put( new PutTask( path ).setDataFile( file ).setListener( listener ) );
557                 uploadChecksums( file, null );
558             }
559         }
560 
561         /**
562          *
563          * @param file source
564          * @param bytes transformed data from file or {@code null}
565          */
566         private void uploadChecksums( File file, byte[] bytes )
567         {
568             if ( checksums.isEmpty() )
569             {
570                 return;
571             }
572             try
573             {
574                 Set<String> algos = new HashSet<>();
575                 for ( RepositoryLayout.Checksum checksum : checksums )
576                 {
577                     algos.add( checksum.getAlgorithm() );
578                 }
579 
580                 Map<String, Object> sumsByAlgo;
581                 if ( bytes != null )
582                 {
583                     sumsByAlgo = ChecksumUtils.calc( bytes, algos );
584                 }
585                 else
586                 {
587                     sumsByAlgo = ChecksumUtils.calc( file, algos );
588                 }
589 
590                 for ( RepositoryLayout.Checksum checksum : checksums )
591                 {
592                     uploadChecksum( checksum.getLocation(), sumsByAlgo.get( checksum.getAlgorithm() ) );
593                 }
594             }
595             catch ( IOException e )
596             {
597                 LOGGER.warn( "Failed to upload checksums for {}", file, e );
598             }
599         }
600 
601         private void uploadChecksum( URI location, Object checksum )
602         {
603             try
604             {
605                 if ( checksum instanceof Exception )
606                 {
607                     throw (Exception) checksum;
608                 }
609                 transporter.put( new PutTask( location ).setDataString( (String) checksum ) );
610             }
611             catch ( Exception e )
612             {
613                 LOGGER.warn( "Failed to upload checksum to {}", location, e );
614             }
615         }
616 
617     }
618 
619     private static class DirectExecutor
620         implements Executor
621     {
622 
623         static final Executor INSTANCE = new DirectExecutor();
624 
625         public void execute( Runnable command )
626         {
627             command.run();
628         }
629 
630     }
631 
632 }