1 package org.eclipse.aether.connector.basic;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import static java.util.Objects.requireNonNull;
23
24 import java.io.ByteArrayOutputStream;
25 import java.io.File;
26 import java.io.IOException;
27 import java.io.InputStream;
28 import java.net.URI;
29 import java.util.Collection;
30 import java.util.Collections;
31 import java.util.HashSet;
32 import java.util.List;
33 import java.util.Map;
34 import java.util.Set;
35 import java.util.concurrent.Executor;
36 import java.util.concurrent.ExecutorService;
37 import java.util.concurrent.LinkedBlockingQueue;
38 import java.util.concurrent.ThreadPoolExecutor;
39 import java.util.concurrent.TimeUnit;
40
41 import org.eclipse.aether.ConfigurationProperties;
42 import org.eclipse.aether.RepositorySystemSession;
43 import org.eclipse.aether.RequestTrace;
44 import org.eclipse.aether.repository.RemoteRepository;
45 import org.eclipse.aether.spi.connector.ArtifactDownload;
46 import org.eclipse.aether.spi.connector.ArtifactUpload;
47 import org.eclipse.aether.spi.connector.MetadataDownload;
48 import org.eclipse.aether.spi.connector.MetadataUpload;
49 import org.eclipse.aether.spi.connector.RepositoryConnector;
50 import org.eclipse.aether.spi.connector.checksum.ChecksumPolicy;
51 import org.eclipse.aether.spi.connector.checksum.ChecksumPolicyProvider;
52 import org.eclipse.aether.spi.connector.layout.RepositoryLayout;
53 import org.eclipse.aether.spi.connector.layout.RepositoryLayoutProvider;
54 import org.eclipse.aether.spi.connector.transport.GetTask;
55 import org.eclipse.aether.spi.connector.transport.PeekTask;
56 import org.eclipse.aether.spi.connector.transport.PutTask;
57 import org.eclipse.aether.spi.connector.transport.Transporter;
58 import org.eclipse.aether.spi.connector.transport.TransporterProvider;
59 import org.eclipse.aether.spi.io.FileProcessor;
60 import org.eclipse.aether.transfer.ChecksumFailureException;
61 import org.eclipse.aether.transfer.NoRepositoryConnectorException;
62 import org.eclipse.aether.transfer.NoRepositoryLayoutException;
63 import org.eclipse.aether.transfer.NoTransporterException;
64 import org.eclipse.aether.transfer.TransferEvent;
65 import org.eclipse.aether.transfer.TransferResource;
66 import org.eclipse.aether.transform.FileTransformer;
67 import org.eclipse.aether.util.ChecksumUtils;
68 import org.eclipse.aether.util.ConfigUtils;
69 import org.eclipse.aether.util.concurrency.RunnableErrorForwarder;
70 import org.eclipse.aether.util.concurrency.WorkerThreadFactory;
71 import org.slf4j.Logger;
72 import org.slf4j.LoggerFactory;
73
74
75
76 final class BasicRepositoryConnector
77 implements RepositoryConnector
78 {
79
80 private static final String CONFIG_PROP_THREADS = "aether.connector.basic.threads";
81
82 private static final String CONFIG_PROP_RESUME = "aether.connector.resumeDownloads";
83
84 private static final String CONFIG_PROP_RESUME_THRESHOLD = "aether.connector.resumeThreshold";
85
86 private static final String CONFIG_PROP_SMART_CHECKSUMS = "aether.connector.smartChecksums";
87
88 private static final Logger LOGGER = LoggerFactory.getLogger( BasicRepositoryConnector.class );
89
90 private final FileProcessor fileProcessor;
91
92 private final RemoteRepository repository;
93
94 private final RepositorySystemSession session;
95
96 private final Transporter transporter;
97
98 private final RepositoryLayout layout;
99
100 private final ChecksumPolicyProvider checksumPolicyProvider;
101
102 private final PartialFile.Factory partialFileFactory;
103
104 private final int maxThreads;
105
106 private final boolean smartChecksums;
107
108 private final boolean persistedChecksums;
109
110 private Executor executor;
111
112 private boolean closed;
113
114 BasicRepositoryConnector( RepositorySystemSession session, RemoteRepository repository,
115 TransporterProvider transporterProvider, RepositoryLayoutProvider layoutProvider,
116 ChecksumPolicyProvider checksumPolicyProvider, FileProcessor fileProcessor )
117 throws NoRepositoryConnectorException
118 {
119 try
120 {
121 layout = layoutProvider.newRepositoryLayout( session, repository );
122 }
123 catch ( NoRepositoryLayoutException e )
124 {
125 throw new NoRepositoryConnectorException( repository, e.getMessage(), e );
126 }
127 try
128 {
129 transporter = transporterProvider.newTransporter( session, repository );
130 }
131 catch ( NoTransporterException e )
132 {
133 throw new NoRepositoryConnectorException( repository, e.getMessage(), e );
134 }
135 this.checksumPolicyProvider = checksumPolicyProvider;
136
137 this.session = session;
138 this.repository = repository;
139 this.fileProcessor = fileProcessor;
140
141 maxThreads = ConfigUtils.getInteger( session, 5, CONFIG_PROP_THREADS, "maven.artifact.threads" );
142 smartChecksums = ConfigUtils.getBoolean( session, true, CONFIG_PROP_SMART_CHECKSUMS );
143 persistedChecksums =
144 ConfigUtils.getBoolean( session, ConfigurationProperties.DEFAULT_PERSISTED_CHECKSUMS,
145 ConfigurationProperties.PERSISTED_CHECKSUMS );
146
147 boolean resumeDownloads =
148 ConfigUtils.getBoolean( session, true, CONFIG_PROP_RESUME + '.' + repository.getId(), CONFIG_PROP_RESUME );
149 long resumeThreshold =
150 ConfigUtils.getLong( session, 64 * 1024, CONFIG_PROP_RESUME_THRESHOLD + '.' + repository.getId(),
151 CONFIG_PROP_RESUME_THRESHOLD );
152 int requestTimeout =
153 ConfigUtils.getInteger( session, ConfigurationProperties.DEFAULT_REQUEST_TIMEOUT,
154 ConfigurationProperties.REQUEST_TIMEOUT + '.' + repository.getId(),
155 ConfigurationProperties.REQUEST_TIMEOUT );
156 partialFileFactory = new PartialFile.Factory( resumeDownloads, resumeThreshold, requestTimeout );
157 }
158
159 private Executor getExecutor( Collection<?> artifacts, Collection<?> metadatas )
160 {
161 if ( maxThreads <= 1 )
162 {
163 return DirectExecutor.INSTANCE;
164 }
165 int tasks = safe( artifacts ).size() + safe( metadatas ).size();
166 if ( tasks <= 1 )
167 {
168 return DirectExecutor.INSTANCE;
169 }
170 if ( executor == null )
171 {
172 executor =
173 new ThreadPoolExecutor( maxThreads, maxThreads, 3L, TimeUnit.SECONDS,
174 new LinkedBlockingQueue<Runnable>(),
175 new WorkerThreadFactory( getClass().getSimpleName() + '-'
176 + repository.getHost() + '-' ) );
177 }
178 return executor;
179 }
180
181 @Override
182 protected void finalize()
183 throws Throwable
184 {
185 try
186 {
187 close();
188 }
189 finally
190 {
191 super.finalize();
192 }
193 }
194
195 public void close()
196 {
197 if ( !closed )
198 {
199 closed = true;
200 if ( executor instanceof ExecutorService )
201 {
202 ( (ExecutorService) executor ).shutdown();
203 }
204 transporter.close();
205 }
206 }
207
208 public void get( Collection<? extends ArtifactDownload> artifactDownloads,
209 Collection<? extends MetadataDownload> metadataDownloads )
210 {
211 if ( closed )
212 {
213 throw new IllegalStateException( "connector closed" );
214 }
215
216 Executor executor = getExecutor( artifactDownloads, metadataDownloads );
217 RunnableErrorForwarder errorForwarder = new RunnableErrorForwarder();
218
219 for ( MetadataDownload transfer : safe( metadataDownloads ) )
220 {
221 URI location = layout.getLocation( transfer.getMetadata(), false );
222
223 TransferResource resource = newTransferResource( location, transfer.getFile(), transfer.getTrace() );
224 TransferEvent.Builder builder = newEventBuilder( resource, false, false );
225 MetadataTransportListener listener = new MetadataTransportListener( transfer, repository, builder );
226
227 ChecksumPolicy checksumPolicy = newChecksumPolicy( transfer.getChecksumPolicy(), resource );
228 List<RepositoryLayout.Checksum> checksums = null;
229 if ( checksumPolicy != null )
230 {
231 checksums = layout.getChecksums( transfer.getMetadata(), false, location );
232 }
233
234 Runnable task = new GetTaskRunner( location, transfer.getFile(), checksumPolicy, checksums, listener );
235 executor.execute( errorForwarder.wrap( task ) );
236 }
237
238 for ( ArtifactDownload transfer : safe( artifactDownloads ) )
239 {
240 URI location = layout.getLocation( transfer.getArtifact(), false );
241
242 TransferResource resource = newTransferResource( location, transfer.getFile(), transfer.getTrace() );
243 TransferEvent.Builder builder = newEventBuilder( resource, false, transfer.isExistenceCheck() );
244 ArtifactTransportListener listener = new ArtifactTransportListener( transfer, repository, builder );
245
246 Runnable task;
247 if ( transfer.isExistenceCheck() )
248 {
249 task = new PeekTaskRunner( location, listener );
250 }
251 else
252 {
253 ChecksumPolicy checksumPolicy = newChecksumPolicy( transfer.getChecksumPolicy(), resource );
254 List<RepositoryLayout.Checksum> checksums = null;
255 if ( checksumPolicy != null )
256 {
257 checksums = layout.getChecksums( transfer.getArtifact(), false, location );
258 }
259
260 task = new GetTaskRunner( location, transfer.getFile(), checksumPolicy, checksums, listener );
261 }
262 executor.execute( errorForwarder.wrap( task ) );
263 }
264
265 errorForwarder.await();
266 }
267
268 public void put( Collection<? extends ArtifactUpload> artifactUploads,
269 Collection<? extends MetadataUpload> metadataUploads )
270 {
271 if ( closed )
272 {
273 throw new IllegalStateException( "connector closed" );
274 }
275
276 for ( ArtifactUpload transfer : safe( artifactUploads ) )
277 {
278 URI location = layout.getLocation( transfer.getArtifact(), true );
279
280 TransferResource resource = newTransferResource( location, transfer.getFile(), transfer.getTrace() );
281 TransferEvent.Builder builder = newEventBuilder( resource, true, false );
282 ArtifactTransportListener listener = new ArtifactTransportListener( transfer, repository, builder );
283
284 List<RepositoryLayout.Checksum> checksums = layout.getChecksums( transfer.getArtifact(), true, location );
285
286 Runnable task = new PutTaskRunner( location, transfer.getFile(), transfer.getFileTransformer(), checksums,
287 listener );
288 task.run();
289 }
290
291 for ( MetadataUpload transfer : safe( metadataUploads ) )
292 {
293 URI location = layout.getLocation( transfer.getMetadata(), true );
294
295 TransferResource resource = newTransferResource( location, transfer.getFile(), transfer.getTrace() );
296 TransferEvent.Builder builder = newEventBuilder( resource, true, false );
297 MetadataTransportListener listener = new MetadataTransportListener( transfer, repository, builder );
298
299 List<RepositoryLayout.Checksum> checksums = layout.getChecksums( transfer.getMetadata(), true, location );
300
301 Runnable task = new PutTaskRunner( location, transfer.getFile(), checksums, listener );
302 task.run();
303 }
304 }
305
306 private static <T> Collection<T> safe( Collection<T> items )
307 {
308 return ( items != null ) ? items : Collections.<T>emptyList();
309 }
310
311 private TransferResource newTransferResource( URI path, File file, RequestTrace trace )
312 {
313 return new TransferResource( repository.getId(), repository.getUrl(), path.toString(), file, trace );
314 }
315
316 private TransferEvent.Builder newEventBuilder( TransferResource resource, boolean upload, boolean peek )
317 {
318 TransferEvent.Builder builder = new TransferEvent.Builder( session, resource );
319 if ( upload )
320 {
321 builder.setRequestType( TransferEvent.RequestType.PUT );
322 }
323 else if ( !peek )
324 {
325 builder.setRequestType( TransferEvent.RequestType.GET );
326 }
327 else
328 {
329 builder.setRequestType( TransferEvent.RequestType.GET_EXISTENCE );
330 }
331 return builder;
332 }
333
334 private ChecksumPolicy newChecksumPolicy( String policy, TransferResource resource )
335 {
336 return checksumPolicyProvider.newChecksumPolicy( session, repository, resource, policy );
337 }
338
339 @Override
340 public String toString()
341 {
342 return String.valueOf( repository );
343 }
344
345 abstract class TaskRunner
346 implements Runnable
347 {
348
349 protected final URI path;
350
351 protected final TransferTransportListener<?> listener;
352
353 TaskRunner( URI path, TransferTransportListener<?> listener )
354 {
355 this.path = path;
356 this.listener = listener;
357 }
358
359 public void run()
360 {
361 try
362 {
363 listener.transferInitiated();
364 runTask();
365 listener.transferSucceeded();
366 }
367 catch ( Exception e )
368 {
369 listener.transferFailed( e, transporter.classify( e ) );
370 }
371 }
372
373 protected abstract void runTask()
374 throws Exception;
375
376 }
377
378 class PeekTaskRunner
379 extends TaskRunner
380 {
381
382 PeekTaskRunner( URI path, TransferTransportListener<?> listener )
383 {
384 super( path, listener );
385 }
386
387 protected void runTask()
388 throws Exception
389 {
390 transporter.peek( new PeekTask( path ) );
391 }
392
393 }
394
395 class GetTaskRunner
396 extends TaskRunner
397 implements PartialFile.RemoteAccessChecker, ChecksumValidator.ChecksumFetcher
398 {
399
400 private final File file;
401
402 private final ChecksumValidator checksumValidator;
403
404 GetTaskRunner( URI path, File file, ChecksumPolicy checksumPolicy,
405 List<RepositoryLayout.Checksum> checksums, TransferTransportListener<?> listener )
406 {
407 super( path, listener );
408 this.file = requireNonNull( file, "destination file cannot be null" );
409 checksumValidator =
410 new ChecksumValidator( file, fileProcessor, this, checksumPolicy, safe( checksums ) );
411 }
412
413 public void checkRemoteAccess()
414 throws Exception
415 {
416 transporter.peek( new PeekTask( path ) );
417 }
418
419 public boolean fetchChecksum( URI remote, File local )
420 throws Exception
421 {
422 try
423 {
424 transporter.get( new GetTask( remote ).setDataFile( local ) );
425 }
426 catch ( Exception e )
427 {
428 if ( transporter.classify( e ) == Transporter.ERROR_NOT_FOUND )
429 {
430 return false;
431 }
432 throw e;
433 }
434 return true;
435 }
436
437 protected void runTask()
438 throws Exception
439 {
440 fileProcessor.mkdirs( file.getParentFile() );
441
442 PartialFile partFile = partialFileFactory.newInstance( file, this );
443 if ( partFile == null )
444 {
445 LOGGER.debug( "Concurrent download of {} just finished, skipping download", file );
446 return;
447 }
448
449 try
450 {
451 File tmp = partFile.getFile();
452 listener.setChecksumCalculator( checksumValidator.newChecksumCalculator( tmp ) );
453 for ( int firstTrial = 0, lastTrial = 1, trial = firstTrial;; trial++ )
454 {
455 boolean resume = partFile.isResume() && trial <= firstTrial;
456 GetTask task = new GetTask( path ).setDataFile( tmp, resume ).setListener( listener );
457 transporter.get( task );
458 try
459 {
460 checksumValidator.validate( listener.getChecksums(), smartChecksums ? task.getChecksums()
461 : null );
462 break;
463 }
464 catch ( ChecksumFailureException e )
465 {
466 boolean retry = trial < lastTrial && e.isRetryWorthy();
467 if ( !retry && !checksumValidator.handle( e ) )
468 {
469 throw e;
470 }
471 listener.transferCorrupted( e );
472 if ( retry )
473 {
474 checksumValidator.retry();
475 }
476 else
477 {
478 break;
479 }
480 }
481 }
482 fileProcessor.move( tmp, file );
483 if ( persistedChecksums )
484 {
485 checksumValidator.commit();
486 }
487 }
488 finally
489 {
490 partFile.close();
491 checksumValidator.close();
492 }
493 }
494
495 }
496
497 class PutTaskRunner
498 extends TaskRunner
499 {
500
501 private final File file;
502
503 private final FileTransformer fileTransformer;
504
505 private final Collection<RepositoryLayout.Checksum> checksums;
506
507 PutTaskRunner( URI path, File file, List<RepositoryLayout.Checksum> checksums,
508 TransferTransportListener<?> listener )
509 {
510 this( path, file, null, checksums, listener );
511 }
512
513
514
515
516
517
518
519
520
521
522
523 PutTaskRunner( URI path, File file, FileTransformer fileTransformer, List<RepositoryLayout.Checksum> checksums,
524 TransferTransportListener<?> listener )
525 {
526 super( path, listener );
527 this.file = requireNonNull( file, "source file cannot be null" );
528 this.fileTransformer = fileTransformer;
529 this.checksums = safe( checksums );
530 }
531
532 @SuppressWarnings( "checkstyle:innerassignment" )
533 protected void runTask()
534 throws Exception
535 {
536 if ( fileTransformer != null )
537 {
538
539 ByteArrayOutputStream baos = new ByteArrayOutputStream();
540 byte[] buffer = new byte[1024];
541
542 try ( InputStream transformData = fileTransformer.transformData( file ) )
543 {
544 for ( int read; ( read = transformData.read( buffer, 0, buffer.length ) ) != -1; )
545 {
546 baos.write( buffer, 0, read );
547 }
548 }
549
550 byte[] bytes = baos.toByteArray();
551 transporter.put( new PutTask( path ).setDataBytes( bytes ).setListener( listener ) );
552 uploadChecksums( file, bytes );
553 }
554 else
555 {
556 transporter.put( new PutTask( path ).setDataFile( file ).setListener( listener ) );
557 uploadChecksums( file, null );
558 }
559 }
560
561
562
563
564
565
566 private void uploadChecksums( File file, byte[] bytes )
567 {
568 if ( checksums.isEmpty() )
569 {
570 return;
571 }
572 try
573 {
574 Set<String> algos = new HashSet<>();
575 for ( RepositoryLayout.Checksum checksum : checksums )
576 {
577 algos.add( checksum.getAlgorithm() );
578 }
579
580 Map<String, Object> sumsByAlgo;
581 if ( bytes != null )
582 {
583 sumsByAlgo = ChecksumUtils.calc( bytes, algos );
584 }
585 else
586 {
587 sumsByAlgo = ChecksumUtils.calc( file, algos );
588 }
589
590 for ( RepositoryLayout.Checksum checksum : checksums )
591 {
592 uploadChecksum( checksum.getLocation(), sumsByAlgo.get( checksum.getAlgorithm() ) );
593 }
594 }
595 catch ( IOException e )
596 {
597 LOGGER.warn( "Failed to upload checksums for {}", file, e );
598 }
599 }
600
601 private void uploadChecksum( URI location, Object checksum )
602 {
603 try
604 {
605 if ( checksum instanceof Exception )
606 {
607 throw (Exception) checksum;
608 }
609 transporter.put( new PutTask( location ).setDataString( (String) checksum ) );
610 }
611 catch ( Exception e )
612 {
613 LOGGER.warn( "Failed to upload checksum to {}", location, e );
614 }
615 }
616
617 }
618
619 private static class DirectExecutor
620 implements Executor
621 {
622
623 static final Executor INSTANCE = new DirectExecutor();
624
625 public void execute( Runnable command )
626 {
627 command.run();
628 }
629
630 }
631
632 }