1 package org.eclipse.aether.connector.basic;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import static java.util.Objects.requireNonNull;
23
24 import java.io.ByteArrayOutputStream;
25 import java.io.File;
26 import java.io.IOException;
27 import java.io.InputStream;
28 import java.net.URI;
29 import java.util.Collection;
30 import java.util.Collections;
31 import java.util.HashSet;
32 import java.util.List;
33 import java.util.Map;
34 import java.util.Set;
35 import java.util.concurrent.Executor;
36 import java.util.concurrent.ExecutorService;
37 import java.util.concurrent.LinkedBlockingQueue;
38 import java.util.concurrent.ThreadPoolExecutor;
39 import java.util.concurrent.TimeUnit;
40
41 import org.eclipse.aether.ConfigurationProperties;
42 import org.eclipse.aether.RepositorySystemSession;
43 import org.eclipse.aether.RequestTrace;
44 import org.eclipse.aether.repository.RemoteRepository;
45 import org.eclipse.aether.spi.connector.ArtifactDownload;
46 import org.eclipse.aether.spi.connector.ArtifactUpload;
47 import org.eclipse.aether.spi.connector.MetadataDownload;
48 import org.eclipse.aether.spi.connector.MetadataUpload;
49 import org.eclipse.aether.spi.connector.RepositoryConnector;
50 import org.eclipse.aether.spi.connector.checksum.ChecksumPolicy;
51 import org.eclipse.aether.spi.connector.checksum.ChecksumPolicyProvider;
52 import org.eclipse.aether.spi.connector.layout.RepositoryLayout;
53 import org.eclipse.aether.spi.connector.layout.RepositoryLayoutProvider;
54 import org.eclipse.aether.spi.connector.transport.GetTask;
55 import org.eclipse.aether.spi.connector.transport.PeekTask;
56 import org.eclipse.aether.spi.connector.transport.PutTask;
57 import org.eclipse.aether.spi.connector.transport.Transporter;
58 import org.eclipse.aether.spi.connector.transport.TransporterProvider;
59 import org.eclipse.aether.spi.io.FileProcessor;
60 import org.eclipse.aether.transfer.ChecksumFailureException;
61 import org.eclipse.aether.transfer.NoRepositoryConnectorException;
62 import org.eclipse.aether.transfer.NoRepositoryLayoutException;
63 import org.eclipse.aether.transfer.NoTransporterException;
64 import org.eclipse.aether.transfer.TransferEvent;
65 import org.eclipse.aether.transfer.TransferResource;
66 import org.eclipse.aether.transform.FileTransformer;
67 import org.eclipse.aether.util.ChecksumUtils;
68 import org.eclipse.aether.util.ConfigUtils;
69 import org.eclipse.aether.util.concurrency.RunnableErrorForwarder;
70 import org.eclipse.aether.util.concurrency.WorkerThreadFactory;
71 import org.slf4j.Logger;
72 import org.slf4j.LoggerFactory;
73
74
75
76 final class BasicRepositoryConnector
77 implements RepositoryConnector
78 {
79
80 private static final String CONFIG_PROP_THREADS = "aether.connector.basic.threads";
81
82 private static final String CONFIG_PROP_RESUME = "aether.connector.resumeDownloads";
83
84 private static final String CONFIG_PROP_RESUME_THRESHOLD = "aether.connector.resumeThreshold";
85
86 private static final String CONFIG_PROP_SMART_CHECKSUMS = "aether.connector.smartChecksums";
87
88 private static final Logger LOGGER = LoggerFactory.getLogger( BasicRepositoryConnector.class );
89
90 private final FileProcessor fileProcessor;
91
92 private final RemoteRepository repository;
93
94 private final RepositorySystemSession session;
95
96 private final Transporter transporter;
97
98 private final RepositoryLayout layout;
99
100 private final ChecksumPolicyProvider checksumPolicyProvider;
101
102 private final PartialFile.Factory partialFileFactory;
103
104 private final int maxThreads;
105
106 private final boolean smartChecksums;
107
108 private final boolean persistedChecksums;
109
110 private Executor executor;
111
112 private boolean closed;
113
114 BasicRepositoryConnector( RepositorySystemSession session, RemoteRepository repository,
115 TransporterProvider transporterProvider, RepositoryLayoutProvider layoutProvider,
116 ChecksumPolicyProvider checksumPolicyProvider, FileProcessor fileProcessor )
117 throws NoRepositoryConnectorException
118 {
119 try
120 {
121 layout = layoutProvider.newRepositoryLayout( session, repository );
122 }
123 catch ( NoRepositoryLayoutException e )
124 {
125 throw new NoRepositoryConnectorException( repository, e.getMessage(), e );
126 }
127 try
128 {
129 transporter = transporterProvider.newTransporter( session, repository );
130 }
131 catch ( NoTransporterException e )
132 {
133 throw new NoRepositoryConnectorException( repository, e.getMessage(), e );
134 }
135 this.checksumPolicyProvider = checksumPolicyProvider;
136
137 this.session = session;
138 this.repository = repository;
139 this.fileProcessor = fileProcessor;
140
141 maxThreads = ConfigUtils.getInteger( session, 5, CONFIG_PROP_THREADS, "maven.artifact.threads" );
142 smartChecksums = ConfigUtils.getBoolean( session, true, CONFIG_PROP_SMART_CHECKSUMS );
143 persistedChecksums =
144 ConfigUtils.getBoolean( session, ConfigurationProperties.DEFAULT_PERSISTED_CHECKSUMS,
145 ConfigurationProperties.PERSISTED_CHECKSUMS );
146
147 boolean resumeDownloads =
148 ConfigUtils.getBoolean( session, true, CONFIG_PROP_RESUME + '.' + repository.getId(), CONFIG_PROP_RESUME );
149 long resumeThreshold =
150 ConfigUtils.getLong( session, 64 * 1024, CONFIG_PROP_RESUME_THRESHOLD + '.' + repository.getId(),
151 CONFIG_PROP_RESUME_THRESHOLD );
152 int requestTimeout =
153 ConfigUtils.getInteger( session, ConfigurationProperties.DEFAULT_REQUEST_TIMEOUT,
154 ConfigurationProperties.REQUEST_TIMEOUT + '.' + repository.getId(),
155 ConfigurationProperties.REQUEST_TIMEOUT );
156 partialFileFactory = new PartialFile.Factory( resumeDownloads, resumeThreshold, requestTimeout );
157 }
158
159 private Executor getExecutor( Collection<?> artifacts, Collection<?> metadatas )
160 {
161 if ( maxThreads <= 1 )
162 {
163 return DirectExecutor.INSTANCE;
164 }
165 int tasks = safe( artifacts ).size() + safe( metadatas ).size();
166 if ( tasks <= 1 )
167 {
168 return DirectExecutor.INSTANCE;
169 }
170 if ( executor == null )
171 {
172 executor =
173 new ThreadPoolExecutor( maxThreads, maxThreads, 3L, TimeUnit.SECONDS,
174 new LinkedBlockingQueue<Runnable>(),
175 new WorkerThreadFactory( getClass().getSimpleName() + '-'
176 + repository.getHost() + '-' ) );
177 }
178 return executor;
179 }
180
181 @Override
182 protected void finalize()
183 throws Throwable
184 {
185 try
186 {
187 close();
188 }
189 finally
190 {
191 super.finalize();
192 }
193 }
194
195 public void close()
196 {
197 if ( !closed )
198 {
199 closed = true;
200 if ( executor instanceof ExecutorService )
201 {
202 ( (ExecutorService) executor ).shutdown();
203 }
204 transporter.close();
205 }
206 }
207
208 public void get( Collection<? extends ArtifactDownload> artifactDownloads,
209 Collection<? extends MetadataDownload> metadataDownloads )
210 {
211 if ( closed )
212 {
213 throw new IllegalStateException( "connector closed" );
214 }
215
216 Executor executor = getExecutor( artifactDownloads, metadataDownloads );
217 RunnableErrorForwarder errorForwarder = new RunnableErrorForwarder();
218
219 for ( MetadataDownload transfer : safe( metadataDownloads ) )
220 {
221 URI location = layout.getLocation( transfer.getMetadata(), false );
222
223 TransferResource resource = newTransferResource( location, transfer.getFile(), transfer.getTrace() );
224 TransferEvent.Builder builder = newEventBuilder( resource, false, false );
225 MetadataTransportListener listener = new MetadataTransportListener( transfer, repository, builder );
226
227 ChecksumPolicy checksumPolicy = newChecksumPolicy( transfer.getChecksumPolicy(), resource );
228 List<RepositoryLayout.Checksum> checksums = null;
229 if ( checksumPolicy != null )
230 {
231 checksums = layout.getChecksums( transfer.getMetadata(), false, location );
232 }
233
234 Runnable task = new GetTaskRunner( location, transfer.getFile(), checksumPolicy, checksums, listener );
235 executor.execute( errorForwarder.wrap( task ) );
236 }
237
238 for ( ArtifactDownload transfer : safe( artifactDownloads ) )
239 {
240 URI location = layout.getLocation( transfer.getArtifact(), false );
241
242 TransferResource resource = newTransferResource( location, transfer.getFile(), transfer.getTrace() );
243 TransferEvent.Builder builder = newEventBuilder( resource, false, transfer.isExistenceCheck() );
244 ArtifactTransportListener listener = new ArtifactTransportListener( transfer, repository, builder );
245
246 Runnable task;
247 if ( transfer.isExistenceCheck() )
248 {
249 task = new PeekTaskRunner( location, listener );
250 }
251 else
252 {
253 ChecksumPolicy checksumPolicy = newChecksumPolicy( transfer.getChecksumPolicy(), resource );
254 List<RepositoryLayout.Checksum> checksums = null;
255 if ( checksumPolicy != null )
256 {
257 checksums = layout.getChecksums( transfer.getArtifact(), false, location );
258 }
259
260 task = new GetTaskRunner( location, transfer.getFile(), checksumPolicy, checksums, listener );
261 }
262 executor.execute( errorForwarder.wrap( task ) );
263 }
264
265 errorForwarder.await();
266 }
267
268 public void put( Collection<? extends ArtifactUpload> artifactUploads,
269 Collection<? extends MetadataUpload> metadataUploads )
270 {
271 if ( closed )
272 {
273 throw new IllegalStateException( "connector closed" );
274 }
275
276 for ( ArtifactUpload transfer : safe( artifactUploads ) )
277 {
278 URI location = layout.getLocation( transfer.getArtifact(), true );
279
280 TransferResource resource = newTransferResource( location, transfer.getFile(), transfer.getTrace() );
281 TransferEvent.Builder builder = newEventBuilder( resource, true, false );
282 ArtifactTransportListener listener = new ArtifactTransportListener( transfer, repository, builder );
283
284 List<RepositoryLayout.Checksum> checksums = layout.getChecksums( transfer.getArtifact(), true, location );
285
286 Runnable task = new PutTaskRunner( location, transfer.getFile(), transfer.getFileTransformer(), checksums, listener );
287 task.run();
288 }
289
290 for ( MetadataUpload transfer : safe( metadataUploads ) )
291 {
292 URI location = layout.getLocation( transfer.getMetadata(), true );
293
294 TransferResource resource = newTransferResource( location, transfer.getFile(), transfer.getTrace() );
295 TransferEvent.Builder builder = newEventBuilder( resource, true, false );
296 MetadataTransportListener listener = new MetadataTransportListener( transfer, repository, builder );
297
298 List<RepositoryLayout.Checksum> checksums = layout.getChecksums( transfer.getMetadata(), true, location );
299
300 Runnable task = new PutTaskRunner( location, transfer.getFile(), checksums, listener );
301 task.run();
302 }
303 }
304
305 private static <T> Collection<T> safe( Collection<T> items )
306 {
307 return ( items != null ) ? items : Collections.<T>emptyList();
308 }
309
310 private TransferResource newTransferResource( URI path, File file, RequestTrace trace )
311 {
312 return new TransferResource( repository.getId(), repository.getUrl(), path.toString(), file, trace );
313 }
314
315 private TransferEvent.Builder newEventBuilder( TransferResource resource, boolean upload, boolean peek )
316 {
317 TransferEvent.Builder builder = new TransferEvent.Builder( session, resource );
318 if ( upload )
319 {
320 builder.setRequestType( TransferEvent.RequestType.PUT );
321 }
322 else if ( !peek )
323 {
324 builder.setRequestType( TransferEvent.RequestType.GET );
325 }
326 else
327 {
328 builder.setRequestType( TransferEvent.RequestType.GET_EXISTENCE );
329 }
330 return builder;
331 }
332
333 private ChecksumPolicy newChecksumPolicy( String policy, TransferResource resource )
334 {
335 return checksumPolicyProvider.newChecksumPolicy( session, repository, resource, policy );
336 }
337
338 @Override
339 public String toString()
340 {
341 return String.valueOf( repository );
342 }
343
344 abstract class TaskRunner
345 implements Runnable
346 {
347
348 protected final URI path;
349
350 protected final TransferTransportListener<?> listener;
351
352 TaskRunner( URI path, TransferTransportListener<?> listener )
353 {
354 this.path = path;
355 this.listener = listener;
356 }
357
358 public void run()
359 {
360 try
361 {
362 listener.transferInitiated();
363 runTask();
364 listener.transferSucceeded();
365 }
366 catch ( Exception e )
367 {
368 listener.transferFailed( e, transporter.classify( e ) );
369 }
370 }
371
372 protected abstract void runTask()
373 throws Exception;
374
375 }
376
377 class PeekTaskRunner
378 extends TaskRunner
379 {
380
381 PeekTaskRunner( URI path, TransferTransportListener<?> listener )
382 {
383 super( path, listener );
384 }
385
386 protected void runTask()
387 throws Exception
388 {
389 transporter.peek( new PeekTask( path ) );
390 }
391
392 }
393
394 class GetTaskRunner
395 extends TaskRunner
396 implements PartialFile.RemoteAccessChecker, ChecksumValidator.ChecksumFetcher
397 {
398
399 private final File file;
400
401 private final ChecksumValidator checksumValidator;
402
403 GetTaskRunner( URI path, File file, ChecksumPolicy checksumPolicy,
404 List<RepositoryLayout.Checksum> checksums, TransferTransportListener<?> listener )
405 {
406 super( path, listener );
407 this.file = requireNonNull( file, "destination file cannot be null" );
408 checksumValidator =
409 new ChecksumValidator( file, fileProcessor, this, checksumPolicy, safe( checksums ) );
410 }
411
412 public void checkRemoteAccess()
413 throws Exception
414 {
415 transporter.peek( new PeekTask( path ) );
416 }
417
418 public boolean fetchChecksum( URI remote, File local )
419 throws Exception
420 {
421 try
422 {
423 transporter.get( new GetTask( remote ).setDataFile( local ) );
424 }
425 catch ( Exception e )
426 {
427 if ( transporter.classify( e ) == Transporter.ERROR_NOT_FOUND )
428 {
429 return false;
430 }
431 throw e;
432 }
433 return true;
434 }
435
436 protected void runTask()
437 throws Exception
438 {
439 fileProcessor.mkdirs( file.getParentFile() );
440
441 PartialFile partFile = partialFileFactory.newInstance( file, this );
442 if ( partFile == null )
443 {
444 LOGGER.debug( "Concurrent download of {} just finished, skipping download", file );
445 return;
446 }
447
448 try
449 {
450 File tmp = partFile.getFile();
451 listener.setChecksumCalculator( checksumValidator.newChecksumCalculator( tmp ) );
452 for ( int firstTrial = 0, lastTrial = 1, trial = firstTrial;; trial++ )
453 {
454 boolean resume = partFile.isResume() && trial <= firstTrial;
455 GetTask task = new GetTask( path ).setDataFile( tmp, resume ).setListener( listener );
456 transporter.get( task );
457 try
458 {
459 checksumValidator.validate( listener.getChecksums(), smartChecksums ? task.getChecksums()
460 : null );
461 break;
462 }
463 catch ( ChecksumFailureException e )
464 {
465 boolean retry = trial < lastTrial && e.isRetryWorthy();
466 if ( !retry && !checksumValidator.handle( e ) )
467 {
468 throw e;
469 }
470 listener.transferCorrupted( e );
471 if ( retry )
472 {
473 checksumValidator.retry();
474 }
475 else
476 {
477 break;
478 }
479 }
480 }
481 fileProcessor.move( tmp, file );
482 if ( persistedChecksums )
483 {
484 checksumValidator.commit();
485 }
486 }
487 finally
488 {
489 partFile.close();
490 checksumValidator.close();
491 }
492 }
493
494 }
495
496 class PutTaskRunner
497 extends TaskRunner
498 {
499
500 private final File file;
501
502 private final FileTransformer fileTransformer;
503
504 private final Collection<RepositoryLayout.Checksum> checksums;
505
506 PutTaskRunner( URI path, File file, List<RepositoryLayout.Checksum> checksums,
507 TransferTransportListener<?> listener )
508 {
509 this( path, file, null, checksums, listener );
510 }
511
512
513
514
515
516
517
518
519
520
521
522 PutTaskRunner( URI path, File file, FileTransformer fileTransformer, List<RepositoryLayout.Checksum> checksums,
523 TransferTransportListener<?> listener )
524 {
525 super( path, listener );
526 this.file = requireNonNull( file, "source file cannot be null" );
527 this.fileTransformer = fileTransformer;
528 this.checksums = safe( checksums );
529 }
530
531 protected void runTask()
532 throws Exception
533 {
534 if ( fileTransformer != null )
535 {
536
537 ByteArrayOutputStream baos = new ByteArrayOutputStream();
538 byte[] buffer = new byte[1024];
539
540 try ( InputStream transformData = fileTransformer.transformData( file ) )
541 {
542 for ( int read; ( read = transformData.read( buffer, 0, buffer.length ) ) != -1; )
543 {
544 baos.write( buffer, 0, read );
545 }
546 }
547
548 byte[] bytes = baos.toByteArray();
549 transporter.put( new PutTask( path ).setDataBytes( bytes ).setListener( listener ) );
550 uploadChecksums( file, bytes, path );
551 }
552 else
553 {
554 transporter.put( new PutTask( path ).setDataFile( file ).setListener( listener ) );
555 uploadChecksums( file, null , path );
556 }
557 }
558
559
560
561
562
563
564
565 private void uploadChecksums( File file, byte[] bytes, URI location )
566 {
567 if ( checksums.isEmpty() )
568 {
569 return;
570 }
571 try
572 {
573 Set<String> algos = new HashSet<>();
574 for ( RepositoryLayout.Checksum checksum : checksums )
575 {
576 algos.add( checksum.getAlgorithm() );
577 }
578
579 Map<String, Object> sumsByAlgo;
580 if ( bytes != null )
581 {
582 sumsByAlgo = ChecksumUtils.calc( bytes, algos );
583 }
584 else
585 {
586 sumsByAlgo = ChecksumUtils.calc( file, algos );
587 }
588
589 for ( RepositoryLayout.Checksum checksum : checksums )
590 {
591 uploadChecksum( checksum.getLocation(), sumsByAlgo.get( checksum.getAlgorithm() ) );
592 }
593 }
594 catch ( IOException e )
595 {
596 String msg = "Failed to upload checksums for " + file + ": " + e.getMessage();
597 if ( LOGGER.isDebugEnabled() )
598 {
599 LOGGER.warn( msg, e );
600 }
601 else
602 {
603 LOGGER.warn( msg );
604 }
605 }
606 }
607
608 private void uploadChecksum( URI location, Object checksum )
609 {
610 try
611 {
612 if ( checksum instanceof Exception )
613 {
614 throw (Exception) checksum;
615 }
616 transporter.put( new PutTask( location ).setDataString( (String) checksum ) );
617 }
618 catch ( Exception e )
619 {
620 String msg = "Failed to upload checksum " + location + ": " + e.getMessage();
621 if ( LOGGER.isDebugEnabled() )
622 {
623 LOGGER.warn( msg, e );
624 }
625 else
626 {
627 LOGGER.warn( msg );
628 }
629 }
630 }
631
632 }
633
634 private static class DirectExecutor
635 implements Executor
636 {
637
638 static final Executor INSTANCE = new DirectExecutor();
639
640 public void execute( Runnable command )
641 {
642 command.run();
643 }
644
645 }
646
647 }