View Javadoc
1   package org.eclipse.aether.connector.basic;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *  http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import static java.util.Objects.requireNonNull;
23  
24  import java.io.ByteArrayOutputStream;
25  import java.io.File;
26  import java.io.IOException;
27  import java.io.InputStream;
28  import java.io.UncheckedIOException;
29  import java.net.URI;
30  import java.util.ArrayList;
31  import java.util.Collection;
32  import java.util.Collections;
33  import java.util.List;
34  import java.util.Map;
35  import java.util.concurrent.Executor;
36  import java.util.concurrent.ExecutorService;
37  import java.util.concurrent.LinkedBlockingQueue;
38  import java.util.concurrent.ThreadPoolExecutor;
39  import java.util.concurrent.TimeUnit;
40  import java.util.concurrent.atomic.AtomicBoolean;
41  
42  import org.eclipse.aether.ConfigurationProperties;
43  import org.eclipse.aether.RepositorySystemSession;
44  import org.eclipse.aether.RequestTrace;
45  import org.eclipse.aether.repository.RemoteRepository;
46  import org.eclipse.aether.spi.connector.ArtifactDownload;
47  import org.eclipse.aether.spi.connector.ArtifactUpload;
48  import org.eclipse.aether.spi.connector.MetadataDownload;
49  import org.eclipse.aether.spi.connector.MetadataUpload;
50  import org.eclipse.aether.spi.connector.RepositoryConnector;
51  import org.eclipse.aether.spi.connector.checksum.ChecksumAlgorithmFactory;
52  import org.eclipse.aether.spi.connector.checksum.ChecksumAlgorithmHelper;
53  import org.eclipse.aether.spi.connector.checksum.ChecksumPolicy;
54  import org.eclipse.aether.spi.connector.checksum.ChecksumPolicyProvider;
55  import org.eclipse.aether.spi.connector.checksum.ProvidedChecksumsSource;
56  import org.eclipse.aether.spi.connector.layout.RepositoryLayout;
57  import org.eclipse.aether.spi.connector.layout.RepositoryLayoutProvider;
58  import org.eclipse.aether.spi.connector.transport.GetTask;
59  import org.eclipse.aether.spi.connector.transport.PeekTask;
60  import org.eclipse.aether.spi.connector.transport.PutTask;
61  import org.eclipse.aether.spi.connector.transport.Transporter;
62  import org.eclipse.aether.spi.connector.transport.TransporterProvider;
63  import org.eclipse.aether.spi.io.FileProcessor;
64  import org.eclipse.aether.transfer.ChecksumFailureException;
65  import org.eclipse.aether.transfer.NoRepositoryConnectorException;
66  import org.eclipse.aether.transfer.NoRepositoryLayoutException;
67  import org.eclipse.aether.transfer.NoTransporterException;
68  import org.eclipse.aether.transfer.TransferEvent;
69  import org.eclipse.aether.transfer.TransferResource;
70  import org.eclipse.aether.transform.FileTransformer;
71  import org.eclipse.aether.util.ConfigUtils;
72  import org.eclipse.aether.util.FileUtils;
73  import org.eclipse.aether.util.concurrency.RunnableErrorForwarder;
74  import org.eclipse.aether.util.concurrency.WorkerThreadFactory;
75  import org.slf4j.Logger;
76  import org.slf4j.LoggerFactory;
77  
78  /**
79   *
80   */
81  final class BasicRepositoryConnector
82          implements RepositoryConnector
83  {
84  
85      private static final String CONFIG_PROP_THREADS = "aether.connector.basic.threads";
86  
87      private static final String CONFIG_PROP_SMART_CHECKSUMS = "aether.connector.smartChecksums";
88  
89      private static final Logger LOGGER = LoggerFactory.getLogger( BasicRepositoryConnector.class );
90  
91      private final Map<String, ProvidedChecksumsSource> providedChecksumsSources;
92  
93      private final FileProcessor fileProcessor;
94  
95      private final RemoteRepository repository;
96  
97      private final RepositorySystemSession session;
98  
99      private final Transporter transporter;
100 
101     private final RepositoryLayout layout;
102 
103     private final ChecksumPolicyProvider checksumPolicyProvider;
104 
105     private final int maxThreads;
106 
107     private final boolean smartChecksums;
108 
109     private final boolean persistedChecksums;
110 
111     private Executor executor;
112 
113     private final AtomicBoolean closed;
114 
115     BasicRepositoryConnector( RepositorySystemSession session,
116                               RemoteRepository repository,
117                               TransporterProvider transporterProvider,
118                               RepositoryLayoutProvider layoutProvider,
119                               ChecksumPolicyProvider checksumPolicyProvider,
120                               FileProcessor fileProcessor,
121                               Map<String, ProvidedChecksumsSource> providedChecksumsSources )
122             throws NoRepositoryConnectorException
123     {
124         try
125         {
126             layout = layoutProvider.newRepositoryLayout( session, repository );
127         }
128         catch ( NoRepositoryLayoutException e )
129         {
130             throw new NoRepositoryConnectorException( repository, e.getMessage(), e );
131         }
132         try
133         {
134             transporter = transporterProvider.newTransporter( session, repository );
135         }
136         catch ( NoTransporterException e )
137         {
138             throw new NoRepositoryConnectorException( repository, e.getMessage(), e );
139         }
140         this.checksumPolicyProvider = checksumPolicyProvider;
141 
142         this.session = session;
143         this.repository = repository;
144         this.fileProcessor = fileProcessor;
145         this.providedChecksumsSources = providedChecksumsSources;
146         this.closed = new AtomicBoolean( false );
147 
148         maxThreads = ConfigUtils.getInteger( session, 5, CONFIG_PROP_THREADS, "maven.artifact.threads" );
149         smartChecksums = ConfigUtils.getBoolean( session, true, CONFIG_PROP_SMART_CHECKSUMS );
150         persistedChecksums =
151                 ConfigUtils.getBoolean( session, ConfigurationProperties.DEFAULT_PERSISTED_CHECKSUMS,
152                         ConfigurationProperties.PERSISTED_CHECKSUMS );
153     }
154 
155     private Executor getExecutor( Collection<?> artifacts, Collection<?> metadatas )
156     {
157         if ( maxThreads <= 1 )
158         {
159             return DirectExecutor.INSTANCE;
160         }
161         int tasks = safe( artifacts ).size() + safe( metadatas ).size();
162         if ( tasks <= 1 )
163         {
164             return DirectExecutor.INSTANCE;
165         }
166         if ( executor == null )
167         {
168             executor =
169                     new ThreadPoolExecutor( maxThreads, maxThreads, 3L, TimeUnit.SECONDS,
170                             new LinkedBlockingQueue<>(),
171                             new WorkerThreadFactory( getClass().getSimpleName() + '-'
172                                     + repository.getHost() + '-' ) );
173         }
174         return executor;
175     }
176 
177     @Override
178     protected void finalize()
179             throws Throwable
180     {
181         try
182         {
183             close();
184         }
185         finally
186         {
187             super.finalize();
188         }
189     }
190 
191     @Override
192     public void close()
193     {
194         if ( closed.compareAndSet( false, true ) )
195         {
196             if ( executor instanceof ExecutorService )
197             {
198                 ( (ExecutorService) executor ).shutdown();
199             }
200             transporter.close();
201         }
202     }
203 
204     private void failIfClosed()
205     {
206         if ( closed.get() )
207         {
208             throw new IllegalStateException( "connector already closed" );
209         }
210     }
211 
212     @Override
213     public void get( Collection<? extends ArtifactDownload> artifactDownloads,
214                      Collection<? extends MetadataDownload> metadataDownloads )
215     {
216         failIfClosed();
217 
218         Executor executor = getExecutor( artifactDownloads, metadataDownloads );
219         RunnableErrorForwarder errorForwarder = new RunnableErrorForwarder();
220         List<ChecksumAlgorithmFactory> checksumAlgorithmFactories = layout.getChecksumAlgorithmFactories();
221 
222         for ( MetadataDownload transfer : safe( metadataDownloads ) )
223         {
224             URI location = layout.getLocation( transfer.getMetadata(), false );
225 
226             TransferResource resource = newTransferResource( location, transfer.getFile(), transfer.getTrace() );
227             TransferEvent.Builder builder = newEventBuilder( resource, false, false );
228             MetadataTransportListener listener = new MetadataTransportListener( transfer, repository, builder );
229 
230             ChecksumPolicy checksumPolicy = newChecksumPolicy( transfer.getChecksumPolicy(), resource );
231             List<RepositoryLayout.ChecksumLocation> checksumLocations = null;
232             if ( checksumPolicy != null )
233             {
234                 checksumLocations = layout.getChecksumLocations( transfer.getMetadata(), false, location );
235             }
236 
237             Runnable task = new GetTaskRunner( location, transfer.getFile(), checksumPolicy,
238                     checksumAlgorithmFactories, checksumLocations, null, listener );
239             executor.execute( errorForwarder.wrap( task ) );
240         }
241 
242         for ( ArtifactDownload transfer : safe( artifactDownloads ) )
243         {
244             Map<String, String> providedChecksums = Collections.emptyMap();
245             for ( ProvidedChecksumsSource providedChecksumsSource : providedChecksumsSources.values() )
246             {
247                 Map<String, String> provided = providedChecksumsSource.getProvidedArtifactChecksums(
248                     session, transfer, checksumAlgorithmFactories );
249 
250                 if ( provided != null )
251                 {
252                     providedChecksums = provided;
253                     break;
254                 }
255             }
256 
257             URI location = layout.getLocation( transfer.getArtifact(), false );
258 
259             TransferResource resource = newTransferResource( location, transfer.getFile(), transfer.getTrace() );
260             TransferEvent.Builder builder = newEventBuilder( resource, false, transfer.isExistenceCheck() );
261             ArtifactTransportListener listener = new ArtifactTransportListener( transfer, repository, builder );
262 
263             Runnable task;
264             if ( transfer.isExistenceCheck() )
265             {
266                 task = new PeekTaskRunner( location, listener );
267             }
268             else
269             {
270                 ChecksumPolicy checksumPolicy = newChecksumPolicy( transfer.getChecksumPolicy(), resource );
271                 List<RepositoryLayout.ChecksumLocation> checksumLocations = null;
272                 if ( checksumPolicy != null )
273                 {
274                     checksumLocations = layout.getChecksumLocations( transfer.getArtifact(), false, location );
275                 }
276 
277                 task = new GetTaskRunner( location, transfer.getFile(), checksumPolicy,
278                         checksumAlgorithmFactories, checksumLocations, providedChecksums, listener );
279             }
280             executor.execute( errorForwarder.wrap( task ) );
281         }
282 
283         errorForwarder.await();
284     }
285 
286     @Override
287     public void put( Collection<? extends ArtifactUpload> artifactUploads,
288                      Collection<? extends MetadataUpload> metadataUploads )
289     {
290         failIfClosed();
291 
292         for ( ArtifactUpload transfer : safe( artifactUploads ) )
293         {
294             URI location = layout.getLocation( transfer.getArtifact(), true );
295 
296             TransferResource resource = newTransferResource( location, transfer.getFile(), transfer.getTrace() );
297             TransferEvent.Builder builder = newEventBuilder( resource, true, false );
298             ArtifactTransportListener listener = new ArtifactTransportListener( transfer, repository, builder );
299 
300             List<RepositoryLayout.ChecksumLocation> checksumLocations =
301                     layout.getChecksumLocations( transfer.getArtifact(), true, location );
302 
303             Runnable task = new PutTaskRunner( location, transfer.getFile(), transfer.getFileTransformer(),
304                     checksumLocations, listener );
305             task.run();
306         }
307 
308         for ( MetadataUpload transfer : safe( metadataUploads ) )
309         {
310             URI location = layout.getLocation( transfer.getMetadata(), true );
311 
312             TransferResource resource = newTransferResource( location, transfer.getFile(), transfer.getTrace() );
313             TransferEvent.Builder builder = newEventBuilder( resource, true, false );
314             MetadataTransportListener listener = new MetadataTransportListener( transfer, repository, builder );
315 
316             List<RepositoryLayout.ChecksumLocation> checksumLocations =
317                     layout.getChecksumLocations( transfer.getMetadata(), true, location );
318 
319             Runnable task = new PutTaskRunner( location, transfer.getFile(), checksumLocations, listener );
320             task.run();
321         }
322     }
323 
324     private static <T> Collection<T> safe( Collection<T> items )
325     {
326         return ( items != null ) ? items : Collections.emptyList();
327     }
328 
329     private TransferResource newTransferResource( URI path, File file, RequestTrace trace )
330     {
331         return new TransferResource( repository.getId(), repository.getUrl(), path.toString(), file, trace );
332     }
333 
334     private TransferEvent.Builder newEventBuilder( TransferResource resource, boolean upload, boolean peek )
335     {
336         TransferEvent.Builder builder = new TransferEvent.Builder( session, resource );
337         if ( upload )
338         {
339             builder.setRequestType( TransferEvent.RequestType.PUT );
340         }
341         else if ( !peek )
342         {
343             builder.setRequestType( TransferEvent.RequestType.GET );
344         }
345         else
346         {
347             builder.setRequestType( TransferEvent.RequestType.GET_EXISTENCE );
348         }
349         return builder;
350     }
351 
352     private ChecksumPolicy newChecksumPolicy( String policy, TransferResource resource )
353     {
354         return checksumPolicyProvider.newChecksumPolicy( session, repository, resource, policy );
355     }
356 
357     @Override
358     public String toString()
359     {
360         return String.valueOf( repository );
361     }
362 
363     abstract class TaskRunner
364             implements Runnable
365     {
366 
367         protected final URI path;
368 
369         protected final TransferTransportListener<?> listener;
370 
371         TaskRunner( URI path, TransferTransportListener<?> listener )
372         {
373             this.path = path;
374             this.listener = listener;
375         }
376 
377         @Override
378         public void run()
379         {
380             try
381             {
382                 listener.transferInitiated();
383                 runTask();
384                 listener.transferSucceeded();
385             }
386             catch ( Exception e )
387             {
388                 listener.transferFailed( e, transporter.classify( e ) );
389             }
390         }
391 
392         protected abstract void runTask()
393                 throws Exception;
394 
395     }
396 
397     class PeekTaskRunner
398             extends TaskRunner
399     {
400 
401         PeekTaskRunner( URI path, TransferTransportListener<?> listener )
402         {
403             super( path, listener );
404         }
405 
406         @Override
407         protected void runTask()
408                 throws Exception
409         {
410             transporter.peek( new PeekTask( path ) );
411         }
412 
413     }
414 
415     class GetTaskRunner
416             extends TaskRunner
417             implements ChecksumValidator.ChecksumFetcher
418     {
419 
420         private final File file;
421 
422         private final ChecksumValidator checksumValidator;
423 
424         GetTaskRunner( URI path, File file, ChecksumPolicy checksumPolicy,
425                        List<ChecksumAlgorithmFactory> checksumAlgorithmFactories,
426                        List<RepositoryLayout.ChecksumLocation> checksumLocations,
427                        Map<String, String> providedChecksums,
428                        TransferTransportListener<?> listener )
429         {
430             super( path, listener );
431             this.file = requireNonNull( file, "destination file cannot be null" );
432             checksumValidator = new ChecksumValidator( file, checksumAlgorithmFactories, fileProcessor, this,
433                     checksumPolicy, providedChecksums, safe( checksumLocations ) );
434         }
435 
436         @Override
437         public boolean fetchChecksum( URI remote, File local )
438                 throws Exception
439         {
440             try
441             {
442                 transporter.get( new GetTask( remote ).setDataFile( local ) );
443             }
444             catch ( Exception e )
445             {
446                 if ( transporter.classify( e ) == Transporter.ERROR_NOT_FOUND )
447                 {
448                     return false;
449                 }
450                 throw e;
451             }
452             return true;
453         }
454 
455         @Override
456         protected void runTask()
457                 throws Exception
458         {
459             try ( FileUtils.CollocatedTempFile tempFile = FileUtils.newTempFile( file.toPath() ) )
460             {
461                 final File tmp = tempFile.getPath().toFile();
462                 listener.setChecksumCalculator( checksumValidator.newChecksumCalculator( tmp ) );
463                 for ( int firstTrial = 0, lastTrial = 1, trial = firstTrial; ; trial++ )
464                 {
465                     GetTask task = new GetTask( path ).setDataFile( tmp, false ).setListener( listener );
466                     transporter.get( task );
467                     try
468                     {
469                         checksumValidator.validate( listener.getChecksums(), smartChecksums ? task.getChecksums()
470                                 : null );
471                         break;
472                     }
473                     catch ( ChecksumFailureException e )
474                     {
475                         boolean retry = trial < lastTrial && e.isRetryWorthy();
476                         if ( !retry && !checksumValidator.handle( e ) )
477                         {
478                             throw e;
479                         }
480                         listener.transferCorrupted( e );
481                         if ( retry )
482                         {
483                             checksumValidator.retry();
484                         }
485                         else
486                         {
487                             break;
488                         }
489                     }
490                 }
491                 tempFile.move();
492                 if ( persistedChecksums )
493                 {
494                     checksumValidator.commit();
495                 }
496             }
497         }
498 
499     }
500 
501     class PutTaskRunner
502             extends TaskRunner
503     {
504 
505         private final File file;
506 
507         private final FileTransformer fileTransformer;
508 
509         private final Collection<RepositoryLayout.ChecksumLocation> checksumLocations;
510 
511         PutTaskRunner( URI path, File file, List<RepositoryLayout.ChecksumLocation> checksumLocations,
512                        TransferTransportListener<?> listener )
513         {
514             this( path, file, null, checksumLocations, listener );
515         }
516 
517         /**
518          * <strong>IMPORTANT</strong> When using a fileTransformer, the content of the file is stored in memory to
519          * ensure that file content and checksums stay in sync!
520          *
521          * @param path
522          * @param file
523          * @param fileTransformer
524          * @param checksumLocations
525          * @param listener
526          */
527         PutTaskRunner( URI path, File file, FileTransformer fileTransformer,
528                        List<RepositoryLayout.ChecksumLocation> checksumLocations,
529                        TransferTransportListener<?> listener )
530         {
531             super( path, listener );
532             this.file = requireNonNull( file, "source file cannot be null" );
533             this.fileTransformer = fileTransformer;
534             this.checksumLocations = safe( checksumLocations );
535         }
536 
537         @SuppressWarnings( "checkstyle:innerassignment" )
538         @Override
539         protected void runTask()
540                 throws Exception
541         {
542             if ( fileTransformer != null )
543             {
544                 // transform data once to byte array, ensure constant data for checksum
545                 ByteArrayOutputStream baos = new ByteArrayOutputStream();
546                 byte[] buffer = new byte[1024];
547 
548                 try ( InputStream transformData = fileTransformer.transformData( file ) )
549                 {
550                     for ( int read; ( read = transformData.read( buffer, 0, buffer.length ) ) != -1; )
551                     {
552                         baos.write( buffer, 0, read );
553                     }
554                 }
555 
556                 byte[] bytes = baos.toByteArray();
557                 transporter.put( new PutTask( path ).setDataBytes( bytes ).setListener( listener ) );
558                 uploadChecksums( file, bytes );
559             }
560             else
561             {
562                 transporter.put( new PutTask( path ).setDataFile( file ).setListener( listener ) );
563                 uploadChecksums( file, null );
564             }
565         }
566 
567         /**
568          * @param file  source
569          * @param bytes transformed data from file or {@code null}
570          */
571         private void uploadChecksums( File file, byte[] bytes )
572         {
573             if ( checksumLocations.isEmpty() )
574             {
575                 return;
576             }
577             try
578             {
579                 ArrayList<ChecksumAlgorithmFactory> algorithms = new ArrayList<>();
580                 for ( RepositoryLayout.ChecksumLocation checksumLocation : checksumLocations )
581                 {
582                     algorithms.add( checksumLocation.getChecksumAlgorithmFactory() );
583                 }
584 
585                 Map<String, String> sumsByAlgo;
586                 if ( bytes != null )
587                 {
588                     sumsByAlgo = ChecksumAlgorithmHelper.calculate( bytes, algorithms );
589                 }
590                 else
591                 {
592                     sumsByAlgo = ChecksumAlgorithmHelper.calculate( file, algorithms );
593                 }
594 
595                 for ( RepositoryLayout.ChecksumLocation checksumLocation : checksumLocations )
596                 {
597                     uploadChecksum( checksumLocation.getLocation(),
598                             sumsByAlgo.get( checksumLocation.getChecksumAlgorithmFactory().getName() ) );
599                 }
600             }
601             catch ( IOException e )
602             {
603                 LOGGER.warn( "Failed to upload checksums for {}", file, e );
604                 throw new UncheckedIOException( e );
605             }
606         }
607 
608         private void uploadChecksum( URI location, Object checksum )
609         {
610             try
611             {
612                 if ( checksum instanceof Exception )
613                 {
614                     throw (Exception) checksum;
615                 }
616                 transporter.put( new PutTask( location ).setDataString( (String) checksum ) );
617             }
618             catch ( Exception e )
619             {
620                 LOGGER.warn( "Failed to upload checksum to {}", location, e );
621             }
622         }
623 
624     }
625 
626     private static class DirectExecutor
627             implements Executor
628     {
629 
630         static final Executor INSTANCE = new DirectExecutor();
631 
632         @Override
633         public void execute( Runnable command )
634         {
635             command.run();
636         }
637 
638     }
639 }