1 package org.eclipse.aether.connector.basic;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import static java.util.Objects.requireNonNull;
23
24 import java.io.ByteArrayOutputStream;
25 import java.io.File;
26 import java.io.IOException;
27 import java.io.InputStream;
28 import java.io.UncheckedIOException;
29 import java.net.URI;
30 import java.util.ArrayList;
31 import java.util.Collection;
32 import java.util.Collections;
33 import java.util.List;
34 import java.util.Map;
35 import java.util.concurrent.Executor;
36 import java.util.concurrent.ExecutorService;
37 import java.util.concurrent.LinkedBlockingQueue;
38 import java.util.concurrent.ThreadPoolExecutor;
39 import java.util.concurrent.TimeUnit;
40 import java.util.concurrent.atomic.AtomicBoolean;
41
42 import org.eclipse.aether.ConfigurationProperties;
43 import org.eclipse.aether.RepositorySystemSession;
44 import org.eclipse.aether.RequestTrace;
45 import org.eclipse.aether.repository.RemoteRepository;
46 import org.eclipse.aether.spi.connector.ArtifactDownload;
47 import org.eclipse.aether.spi.connector.ArtifactUpload;
48 import org.eclipse.aether.spi.connector.MetadataDownload;
49 import org.eclipse.aether.spi.connector.MetadataUpload;
50 import org.eclipse.aether.spi.connector.RepositoryConnector;
51 import org.eclipse.aether.spi.connector.checksum.ChecksumAlgorithmFactory;
52 import org.eclipse.aether.spi.connector.checksum.ChecksumAlgorithmHelper;
53 import org.eclipse.aether.spi.connector.checksum.ChecksumPolicy;
54 import org.eclipse.aether.spi.connector.checksum.ChecksumPolicyProvider;
55 import org.eclipse.aether.spi.connector.checksum.ProvidedChecksumsSource;
56 import org.eclipse.aether.spi.connector.layout.RepositoryLayout;
57 import org.eclipse.aether.spi.connector.layout.RepositoryLayoutProvider;
58 import org.eclipse.aether.spi.connector.transport.GetTask;
59 import org.eclipse.aether.spi.connector.transport.PeekTask;
60 import org.eclipse.aether.spi.connector.transport.PutTask;
61 import org.eclipse.aether.spi.connector.transport.Transporter;
62 import org.eclipse.aether.spi.connector.transport.TransporterProvider;
63 import org.eclipse.aether.spi.io.FileProcessor;
64 import org.eclipse.aether.transfer.ChecksumFailureException;
65 import org.eclipse.aether.transfer.NoRepositoryConnectorException;
66 import org.eclipse.aether.transfer.NoRepositoryLayoutException;
67 import org.eclipse.aether.transfer.NoTransporterException;
68 import org.eclipse.aether.transfer.TransferEvent;
69 import org.eclipse.aether.transfer.TransferResource;
70 import org.eclipse.aether.transform.FileTransformer;
71 import org.eclipse.aether.util.ConfigUtils;
72 import org.eclipse.aether.util.FileUtils;
73 import org.eclipse.aether.util.concurrency.RunnableErrorForwarder;
74 import org.eclipse.aether.util.concurrency.WorkerThreadFactory;
75 import org.slf4j.Logger;
76 import org.slf4j.LoggerFactory;
77
78
79
80
81 final class BasicRepositoryConnector
82 implements RepositoryConnector
83 {
84
85 private static final String CONFIG_PROP_THREADS = "aether.connector.basic.threads";
86
87 private static final String CONFIG_PROP_SMART_CHECKSUMS = "aether.connector.smartChecksums";
88
89 private static final Logger LOGGER = LoggerFactory.getLogger( BasicRepositoryConnector.class );
90
91 private final Map<String, ProvidedChecksumsSource> providedChecksumsSources;
92
93 private final FileProcessor fileProcessor;
94
95 private final RemoteRepository repository;
96
97 private final RepositorySystemSession session;
98
99 private final Transporter transporter;
100
101 private final RepositoryLayout layout;
102
103 private final ChecksumPolicyProvider checksumPolicyProvider;
104
105 private final int maxThreads;
106
107 private final boolean smartChecksums;
108
109 private final boolean persistedChecksums;
110
111 private Executor executor;
112
113 private final AtomicBoolean closed;
114
115 BasicRepositoryConnector( RepositorySystemSession session,
116 RemoteRepository repository,
117 TransporterProvider transporterProvider,
118 RepositoryLayoutProvider layoutProvider,
119 ChecksumPolicyProvider checksumPolicyProvider,
120 FileProcessor fileProcessor,
121 Map<String, ProvidedChecksumsSource> providedChecksumsSources )
122 throws NoRepositoryConnectorException
123 {
124 try
125 {
126 layout = layoutProvider.newRepositoryLayout( session, repository );
127 }
128 catch ( NoRepositoryLayoutException e )
129 {
130 throw new NoRepositoryConnectorException( repository, e.getMessage(), e );
131 }
132 try
133 {
134 transporter = transporterProvider.newTransporter( session, repository );
135 }
136 catch ( NoTransporterException e )
137 {
138 throw new NoRepositoryConnectorException( repository, e.getMessage(), e );
139 }
140 this.checksumPolicyProvider = checksumPolicyProvider;
141
142 this.session = session;
143 this.repository = repository;
144 this.fileProcessor = fileProcessor;
145 this.providedChecksumsSources = providedChecksumsSources;
146 this.closed = new AtomicBoolean( false );
147
148 maxThreads = ConfigUtils.getInteger( session, 5, CONFIG_PROP_THREADS, "maven.artifact.threads" );
149 smartChecksums = ConfigUtils.getBoolean( session, true, CONFIG_PROP_SMART_CHECKSUMS );
150 persistedChecksums =
151 ConfigUtils.getBoolean( session, ConfigurationProperties.DEFAULT_PERSISTED_CHECKSUMS,
152 ConfigurationProperties.PERSISTED_CHECKSUMS );
153 }
154
155 private Executor getExecutor( Collection<?> artifacts, Collection<?> metadatas )
156 {
157 if ( maxThreads <= 1 )
158 {
159 return DirectExecutor.INSTANCE;
160 }
161 int tasks = safe( artifacts ).size() + safe( metadatas ).size();
162 if ( tasks <= 1 )
163 {
164 return DirectExecutor.INSTANCE;
165 }
166 if ( executor == null )
167 {
168 executor =
169 new ThreadPoolExecutor( maxThreads, maxThreads, 3L, TimeUnit.SECONDS,
170 new LinkedBlockingQueue<>(),
171 new WorkerThreadFactory( getClass().getSimpleName() + '-'
172 + repository.getHost() + '-' ) );
173 }
174 return executor;
175 }
176
177 @Override
178 protected void finalize()
179 throws Throwable
180 {
181 try
182 {
183 close();
184 }
185 finally
186 {
187 super.finalize();
188 }
189 }
190
191 @Override
192 public void close()
193 {
194 if ( closed.compareAndSet( false, true ) )
195 {
196 if ( executor instanceof ExecutorService )
197 {
198 ( (ExecutorService) executor ).shutdown();
199 }
200 transporter.close();
201 }
202 }
203
204 private void failIfClosed()
205 {
206 if ( closed.get() )
207 {
208 throw new IllegalStateException( "connector already closed" );
209 }
210 }
211
212 @Override
213 public void get( Collection<? extends ArtifactDownload> artifactDownloads,
214 Collection<? extends MetadataDownload> metadataDownloads )
215 {
216 failIfClosed();
217
218 Executor executor = getExecutor( artifactDownloads, metadataDownloads );
219 RunnableErrorForwarder errorForwarder = new RunnableErrorForwarder();
220 List<ChecksumAlgorithmFactory> checksumAlgorithmFactories = layout.getChecksumAlgorithmFactories();
221
222 for ( MetadataDownload transfer : safe( metadataDownloads ) )
223 {
224 URI location = layout.getLocation( transfer.getMetadata(), false );
225
226 TransferResource resource = newTransferResource( location, transfer.getFile(), transfer.getTrace() );
227 TransferEvent.Builder builder = newEventBuilder( resource, false, false );
228 MetadataTransportListener listener = new MetadataTransportListener( transfer, repository, builder );
229
230 ChecksumPolicy checksumPolicy = newChecksumPolicy( transfer.getChecksumPolicy(), resource );
231 List<RepositoryLayout.ChecksumLocation> checksumLocations = null;
232 if ( checksumPolicy != null )
233 {
234 checksumLocations = layout.getChecksumLocations( transfer.getMetadata(), false, location );
235 }
236
237 Runnable task = new GetTaskRunner( location, transfer.getFile(), checksumPolicy,
238 checksumAlgorithmFactories, checksumLocations, null, listener );
239 executor.execute( errorForwarder.wrap( task ) );
240 }
241
242 for ( ArtifactDownload transfer : safe( artifactDownloads ) )
243 {
244 Map<String, String> providedChecksums = Collections.emptyMap();
245 for ( ProvidedChecksumsSource providedChecksumsSource : providedChecksumsSources.values() )
246 {
247 Map<String, String> provided = providedChecksumsSource.getProvidedArtifactChecksums(
248 session, transfer, checksumAlgorithmFactories );
249
250 if ( provided != null )
251 {
252 providedChecksums = provided;
253 break;
254 }
255 }
256
257 URI location = layout.getLocation( transfer.getArtifact(), false );
258
259 TransferResource resource = newTransferResource( location, transfer.getFile(), transfer.getTrace() );
260 TransferEvent.Builder builder = newEventBuilder( resource, false, transfer.isExistenceCheck() );
261 ArtifactTransportListener listener = new ArtifactTransportListener( transfer, repository, builder );
262
263 Runnable task;
264 if ( transfer.isExistenceCheck() )
265 {
266 task = new PeekTaskRunner( location, listener );
267 }
268 else
269 {
270 ChecksumPolicy checksumPolicy = newChecksumPolicy( transfer.getChecksumPolicy(), resource );
271 List<RepositoryLayout.ChecksumLocation> checksumLocations = null;
272 if ( checksumPolicy != null )
273 {
274 checksumLocations = layout.getChecksumLocations( transfer.getArtifact(), false, location );
275 }
276
277 task = new GetTaskRunner( location, transfer.getFile(), checksumPolicy,
278 checksumAlgorithmFactories, checksumLocations, providedChecksums, listener );
279 }
280 executor.execute( errorForwarder.wrap( task ) );
281 }
282
283 errorForwarder.await();
284 }
285
286 @Override
287 public void put( Collection<? extends ArtifactUpload> artifactUploads,
288 Collection<? extends MetadataUpload> metadataUploads )
289 {
290 failIfClosed();
291
292 for ( ArtifactUpload transfer : safe( artifactUploads ) )
293 {
294 URI location = layout.getLocation( transfer.getArtifact(), true );
295
296 TransferResource resource = newTransferResource( location, transfer.getFile(), transfer.getTrace() );
297 TransferEvent.Builder builder = newEventBuilder( resource, true, false );
298 ArtifactTransportListener listener = new ArtifactTransportListener( transfer, repository, builder );
299
300 List<RepositoryLayout.ChecksumLocation> checksumLocations =
301 layout.getChecksumLocations( transfer.getArtifact(), true, location );
302
303 Runnable task = new PutTaskRunner( location, transfer.getFile(), transfer.getFileTransformer(),
304 checksumLocations, listener );
305 task.run();
306 }
307
308 for ( MetadataUpload transfer : safe( metadataUploads ) )
309 {
310 URI location = layout.getLocation( transfer.getMetadata(), true );
311
312 TransferResource resource = newTransferResource( location, transfer.getFile(), transfer.getTrace() );
313 TransferEvent.Builder builder = newEventBuilder( resource, true, false );
314 MetadataTransportListener listener = new MetadataTransportListener( transfer, repository, builder );
315
316 List<RepositoryLayout.ChecksumLocation> checksumLocations =
317 layout.getChecksumLocations( transfer.getMetadata(), true, location );
318
319 Runnable task = new PutTaskRunner( location, transfer.getFile(), checksumLocations, listener );
320 task.run();
321 }
322 }
323
324 private static <T> Collection<T> safe( Collection<T> items )
325 {
326 return ( items != null ) ? items : Collections.emptyList();
327 }
328
329 private TransferResource newTransferResource( URI path, File file, RequestTrace trace )
330 {
331 return new TransferResource( repository.getId(), repository.getUrl(), path.toString(), file, trace );
332 }
333
334 private TransferEvent.Builder newEventBuilder( TransferResource resource, boolean upload, boolean peek )
335 {
336 TransferEvent.Builder builder = new TransferEvent.Builder( session, resource );
337 if ( upload )
338 {
339 builder.setRequestType( TransferEvent.RequestType.PUT );
340 }
341 else if ( !peek )
342 {
343 builder.setRequestType( TransferEvent.RequestType.GET );
344 }
345 else
346 {
347 builder.setRequestType( TransferEvent.RequestType.GET_EXISTENCE );
348 }
349 return builder;
350 }
351
352 private ChecksumPolicy newChecksumPolicy( String policy, TransferResource resource )
353 {
354 return checksumPolicyProvider.newChecksumPolicy( session, repository, resource, policy );
355 }
356
357 @Override
358 public String toString()
359 {
360 return String.valueOf( repository );
361 }
362
363 abstract class TaskRunner
364 implements Runnable
365 {
366
367 protected final URI path;
368
369 protected final TransferTransportListener<?> listener;
370
371 TaskRunner( URI path, TransferTransportListener<?> listener )
372 {
373 this.path = path;
374 this.listener = listener;
375 }
376
377 @Override
378 public void run()
379 {
380 try
381 {
382 listener.transferInitiated();
383 runTask();
384 listener.transferSucceeded();
385 }
386 catch ( Exception e )
387 {
388 listener.transferFailed( e, transporter.classify( e ) );
389 }
390 }
391
392 protected abstract void runTask()
393 throws Exception;
394
395 }
396
397 class PeekTaskRunner
398 extends TaskRunner
399 {
400
401 PeekTaskRunner( URI path, TransferTransportListener<?> listener )
402 {
403 super( path, listener );
404 }
405
406 @Override
407 protected void runTask()
408 throws Exception
409 {
410 transporter.peek( new PeekTask( path ) );
411 }
412
413 }
414
415 class GetTaskRunner
416 extends TaskRunner
417 implements ChecksumValidator.ChecksumFetcher
418 {
419
420 private final File file;
421
422 private final ChecksumValidator checksumValidator;
423
424 GetTaskRunner( URI path, File file, ChecksumPolicy checksumPolicy,
425 List<ChecksumAlgorithmFactory> checksumAlgorithmFactories,
426 List<RepositoryLayout.ChecksumLocation> checksumLocations,
427 Map<String, String> providedChecksums,
428 TransferTransportListener<?> listener )
429 {
430 super( path, listener );
431 this.file = requireNonNull( file, "destination file cannot be null" );
432 checksumValidator = new ChecksumValidator( file, checksumAlgorithmFactories, fileProcessor, this,
433 checksumPolicy, providedChecksums, safe( checksumLocations ) );
434 }
435
436 @Override
437 public boolean fetchChecksum( URI remote, File local )
438 throws Exception
439 {
440 try
441 {
442 transporter.get( new GetTask( remote ).setDataFile( local ) );
443 }
444 catch ( Exception e )
445 {
446 if ( transporter.classify( e ) == Transporter.ERROR_NOT_FOUND )
447 {
448 return false;
449 }
450 throw e;
451 }
452 return true;
453 }
454
455 @Override
456 protected void runTask()
457 throws Exception
458 {
459 try ( FileUtils.CollocatedTempFile tempFile = FileUtils.newTempFile( file.toPath() ) )
460 {
461 final File tmp = tempFile.getPath().toFile();
462 listener.setChecksumCalculator( checksumValidator.newChecksumCalculator( tmp ) );
463 for ( int firstTrial = 0, lastTrial = 1, trial = firstTrial; ; trial++ )
464 {
465 GetTask task = new GetTask( path ).setDataFile( tmp, false ).setListener( listener );
466 transporter.get( task );
467 try
468 {
469 checksumValidator.validate( listener.getChecksums(), smartChecksums ? task.getChecksums()
470 : null );
471 break;
472 }
473 catch ( ChecksumFailureException e )
474 {
475 boolean retry = trial < lastTrial && e.isRetryWorthy();
476 if ( !retry && !checksumValidator.handle( e ) )
477 {
478 throw e;
479 }
480 listener.transferCorrupted( e );
481 if ( retry )
482 {
483 checksumValidator.retry();
484 }
485 else
486 {
487 break;
488 }
489 }
490 }
491 tempFile.move();
492 if ( persistedChecksums )
493 {
494 checksumValidator.commit();
495 }
496 }
497 }
498
499 }
500
501 class PutTaskRunner
502 extends TaskRunner
503 {
504
505 private final File file;
506
507 private final FileTransformer fileTransformer;
508
509 private final Collection<RepositoryLayout.ChecksumLocation> checksumLocations;
510
511 PutTaskRunner( URI path, File file, List<RepositoryLayout.ChecksumLocation> checksumLocations,
512 TransferTransportListener<?> listener )
513 {
514 this( path, file, null, checksumLocations, listener );
515 }
516
517
518
519
520
521
522
523
524
525
526
527 PutTaskRunner( URI path, File file, FileTransformer fileTransformer,
528 List<RepositoryLayout.ChecksumLocation> checksumLocations,
529 TransferTransportListener<?> listener )
530 {
531 super( path, listener );
532 this.file = requireNonNull( file, "source file cannot be null" );
533 this.fileTransformer = fileTransformer;
534 this.checksumLocations = safe( checksumLocations );
535 }
536
537 @SuppressWarnings( "checkstyle:innerassignment" )
538 @Override
539 protected void runTask()
540 throws Exception
541 {
542 if ( fileTransformer != null )
543 {
544
545 ByteArrayOutputStream baos = new ByteArrayOutputStream();
546 byte[] buffer = new byte[1024];
547
548 try ( InputStream transformData = fileTransformer.transformData( file ) )
549 {
550 for ( int read; ( read = transformData.read( buffer, 0, buffer.length ) ) != -1; )
551 {
552 baos.write( buffer, 0, read );
553 }
554 }
555
556 byte[] bytes = baos.toByteArray();
557 transporter.put( new PutTask( path ).setDataBytes( bytes ).setListener( listener ) );
558 uploadChecksums( file, bytes );
559 }
560 else
561 {
562 transporter.put( new PutTask( path ).setDataFile( file ).setListener( listener ) );
563 uploadChecksums( file, null );
564 }
565 }
566
567
568
569
570
571 private void uploadChecksums( File file, byte[] bytes )
572 {
573 if ( checksumLocations.isEmpty() )
574 {
575 return;
576 }
577 try
578 {
579 ArrayList<ChecksumAlgorithmFactory> algorithms = new ArrayList<>();
580 for ( RepositoryLayout.ChecksumLocation checksumLocation : checksumLocations )
581 {
582 algorithms.add( checksumLocation.getChecksumAlgorithmFactory() );
583 }
584
585 Map<String, String> sumsByAlgo;
586 if ( bytes != null )
587 {
588 sumsByAlgo = ChecksumAlgorithmHelper.calculate( bytes, algorithms );
589 }
590 else
591 {
592 sumsByAlgo = ChecksumAlgorithmHelper.calculate( file, algorithms );
593 }
594
595 for ( RepositoryLayout.ChecksumLocation checksumLocation : checksumLocations )
596 {
597 uploadChecksum( checksumLocation.getLocation(),
598 sumsByAlgo.get( checksumLocation.getChecksumAlgorithmFactory().getName() ) );
599 }
600 }
601 catch ( IOException e )
602 {
603 LOGGER.warn( "Failed to upload checksums for {}", file, e );
604 throw new UncheckedIOException( e );
605 }
606 }
607
608 private void uploadChecksum( URI location, Object checksum )
609 {
610 try
611 {
612 if ( checksum instanceof Exception )
613 {
614 throw (Exception) checksum;
615 }
616 transporter.put( new PutTask( location ).setDataString( (String) checksum ) );
617 }
618 catch ( Exception e )
619 {
620 LOGGER.warn( "Failed to upload checksum to {}", location, e );
621 }
622 }
623
624 }
625
626 private static class DirectExecutor
627 implements Executor
628 {
629
630 static final Executor INSTANCE = new DirectExecutor();
631
632 @Override
633 public void execute( Runnable command )
634 {
635 command.run();
636 }
637
638 }
639 }