1 package org.eclipse.aether.connector.basic;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.io.File;
23 import java.io.IOException;
24 import java.net.URI;
25 import java.util.Collection;
26 import java.util.Collections;
27 import java.util.HashSet;
28 import java.util.List;
29 import java.util.Map;
30 import static java.util.Objects.requireNonNull;
31 import java.util.Set;
32 import java.util.concurrent.Executor;
33 import java.util.concurrent.ExecutorService;
34 import java.util.concurrent.LinkedBlockingQueue;
35 import java.util.concurrent.ThreadPoolExecutor;
36 import java.util.concurrent.TimeUnit;
37
38 import org.eclipse.aether.ConfigurationProperties;
39 import org.eclipse.aether.RepositorySystemSession;
40 import org.eclipse.aether.RequestTrace;
41 import org.eclipse.aether.repository.RemoteRepository;
42 import org.eclipse.aether.spi.connector.ArtifactDownload;
43 import org.eclipse.aether.spi.connector.ArtifactUpload;
44 import org.eclipse.aether.spi.connector.MetadataDownload;
45 import org.eclipse.aether.spi.connector.MetadataUpload;
46 import org.eclipse.aether.spi.connector.RepositoryConnector;
47 import org.eclipse.aether.spi.connector.checksum.ChecksumPolicy;
48 import org.eclipse.aether.spi.connector.checksum.ChecksumPolicyProvider;
49 import org.eclipse.aether.spi.connector.layout.RepositoryLayout;
50 import org.eclipse.aether.spi.connector.layout.RepositoryLayoutProvider;
51 import org.eclipse.aether.spi.connector.transport.GetTask;
52 import org.eclipse.aether.spi.connector.transport.PeekTask;
53 import org.eclipse.aether.spi.connector.transport.PutTask;
54 import org.eclipse.aether.spi.connector.transport.Transporter;
55 import org.eclipse.aether.spi.connector.transport.TransporterProvider;
56 import org.eclipse.aether.spi.io.FileProcessor;
57 import org.eclipse.aether.spi.log.Logger;
58 import org.eclipse.aether.transfer.ChecksumFailureException;
59 import org.eclipse.aether.transfer.NoRepositoryConnectorException;
60 import org.eclipse.aether.transfer.NoRepositoryLayoutException;
61 import org.eclipse.aether.transfer.NoTransporterException;
62 import org.eclipse.aether.transfer.TransferEvent;
63 import org.eclipse.aether.transfer.TransferResource;
64 import org.eclipse.aether.util.ChecksumUtils;
65 import org.eclipse.aether.util.ConfigUtils;
66 import org.eclipse.aether.util.concurrency.RunnableErrorForwarder;
67 import org.eclipse.aether.util.concurrency.WorkerThreadFactory;
68
69
70
71 final class BasicRepositoryConnector
72 implements RepositoryConnector
73 {
74
75 private static final String CONFIG_PROP_THREADS = "aether.connector.basic.threads";
76
77 private static final String CONFIG_PROP_RESUME = "aether.connector.resumeDownloads";
78
79 private static final String CONFIG_PROP_RESUME_THRESHOLD = "aether.connector.resumeThreshold";
80
81 private static final String CONFIG_PROP_SMART_CHECKSUMS = "aether.connector.smartChecksums";
82
83 private final Logger logger;
84
85 private final FileProcessor fileProcessor;
86
87 private final RemoteRepository repository;
88
89 private final RepositorySystemSession session;
90
91 private final Transporter transporter;
92
93 private final RepositoryLayout layout;
94
95 private final ChecksumPolicyProvider checksumPolicyProvider;
96
97 private final PartialFile.Factory partialFileFactory;
98
99 private final int maxThreads;
100
101 private final boolean smartChecksums;
102
103 private final boolean persistedChecksums;
104
105 private Executor executor;
106
107 private boolean closed;
108
109 public BasicRepositoryConnector( RepositorySystemSession session, RemoteRepository repository,
110 TransporterProvider transporterProvider, RepositoryLayoutProvider layoutProvider,
111 ChecksumPolicyProvider checksumPolicyProvider, FileProcessor fileProcessor,
112 Logger logger )
113 throws NoRepositoryConnectorException
114 {
115 try
116 {
117 layout = layoutProvider.newRepositoryLayout( session, repository );
118 }
119 catch ( NoRepositoryLayoutException e )
120 {
121 throw new NoRepositoryConnectorException( repository, e.getMessage(), e );
122 }
123 try
124 {
125 transporter = transporterProvider.newTransporter( session, repository );
126 }
127 catch ( NoTransporterException e )
128 {
129 throw new NoRepositoryConnectorException( repository, e.getMessage(), e );
130 }
131 this.checksumPolicyProvider = checksumPolicyProvider;
132
133 this.session = session;
134 this.repository = repository;
135 this.fileProcessor = fileProcessor;
136 this.logger = logger;
137
138 maxThreads = ConfigUtils.getInteger( session, 5, CONFIG_PROP_THREADS, "maven.artifact.threads" );
139 smartChecksums = ConfigUtils.getBoolean( session, true, CONFIG_PROP_SMART_CHECKSUMS );
140 persistedChecksums =
141 ConfigUtils.getBoolean( session, ConfigurationProperties.DEFAULT_PERSISTED_CHECKSUMS,
142 ConfigurationProperties.PERSISTED_CHECKSUMS );
143
144 boolean resumeDownloads =
145 ConfigUtils.getBoolean( session, true, CONFIG_PROP_RESUME + '.' + repository.getId(), CONFIG_PROP_RESUME );
146 long resumeThreshold =
147 ConfigUtils.getLong( session, 64 * 1024, CONFIG_PROP_RESUME_THRESHOLD + '.' + repository.getId(),
148 CONFIG_PROP_RESUME_THRESHOLD );
149 int requestTimeout =
150 ConfigUtils.getInteger( session, ConfigurationProperties.DEFAULT_REQUEST_TIMEOUT,
151 ConfigurationProperties.REQUEST_TIMEOUT + '.' + repository.getId(),
152 ConfigurationProperties.REQUEST_TIMEOUT );
153 partialFileFactory = new PartialFile.Factory( resumeDownloads, resumeThreshold, requestTimeout, logger );
154 }
155
156 private Executor getExecutor( Collection<?> artifacts, Collection<?> metadatas )
157 {
158 if ( maxThreads <= 1 )
159 {
160 return DirectExecutor.INSTANCE;
161 }
162 int tasks = safe( artifacts ).size() + safe( metadatas ).size();
163 if ( tasks <= 1 )
164 {
165 return DirectExecutor.INSTANCE;
166 }
167 if ( executor == null )
168 {
169 executor =
170 new ThreadPoolExecutor( maxThreads, maxThreads, 3L, TimeUnit.SECONDS,
171 new LinkedBlockingQueue<Runnable>(),
172 new WorkerThreadFactory( getClass().getSimpleName() + '-'
173 + repository.getHost() + '-' ) );
174 }
175 return executor;
176 }
177
178 @Override
179 protected void finalize()
180 throws Throwable
181 {
182 try
183 {
184 close();
185 }
186 finally
187 {
188 super.finalize();
189 }
190 }
191
192 public void close()
193 {
194 if ( !closed )
195 {
196 closed = true;
197 if ( executor instanceof ExecutorService )
198 {
199 ( (ExecutorService) executor ).shutdown();
200 }
201 transporter.close();
202 }
203 }
204
205 public void get( Collection<? extends ArtifactDownload> artifactDownloads,
206 Collection<? extends MetadataDownload> metadataDownloads )
207 {
208 if ( closed )
209 {
210 throw new IllegalStateException( "connector closed" );
211 }
212
213 Executor executor = getExecutor( artifactDownloads, metadataDownloads );
214 RunnableErrorForwarder errorForwarder = new RunnableErrorForwarder();
215
216 for ( MetadataDownload transfer : safe( metadataDownloads ) )
217 {
218 URI location = layout.getLocation( transfer.getMetadata(), false );
219
220 TransferResource resource = newTransferResource( location, transfer.getFile(), transfer.getTrace() );
221 TransferEvent.Builder builder = newEventBuilder( resource, false, false );
222 MetadataTransportListener listener = new MetadataTransportListener( transfer, repository, builder );
223
224 ChecksumPolicy checksumPolicy = newChecksumPolicy( transfer.getChecksumPolicy(), resource );
225 List<RepositoryLayout.Checksum> checksums = null;
226 if ( checksumPolicy != null )
227 {
228 checksums = layout.getChecksums( transfer.getMetadata(), false, location );
229 }
230
231 Runnable task = new GetTaskRunner( location, transfer.getFile(), checksumPolicy, checksums, listener );
232 executor.execute( errorForwarder.wrap( task ) );
233 }
234
235 for ( ArtifactDownload transfer : safe( artifactDownloads ) )
236 {
237 URI location = layout.getLocation( transfer.getArtifact(), false );
238
239 TransferResource resource = newTransferResource( location, transfer.getFile(), transfer.getTrace() );
240 TransferEvent.Builder builder = newEventBuilder( resource, false, transfer.isExistenceCheck() );
241 ArtifactTransportListener listener = new ArtifactTransportListener( transfer, repository, builder );
242
243 Runnable task;
244 if ( transfer.isExistenceCheck() )
245 {
246 task = new PeekTaskRunner( location, listener );
247 }
248 else
249 {
250 ChecksumPolicy checksumPolicy = newChecksumPolicy( transfer.getChecksumPolicy(), resource );
251 List<RepositoryLayout.Checksum> checksums = null;
252 if ( checksumPolicy != null )
253 {
254 checksums = layout.getChecksums( transfer.getArtifact(), false, location );
255 }
256
257 task = new GetTaskRunner( location, transfer.getFile(), checksumPolicy, checksums, listener );
258 }
259 executor.execute( errorForwarder.wrap( task ) );
260 }
261
262 errorForwarder.await();
263 }
264
265 public void put( Collection<? extends ArtifactUpload> artifactUploads,
266 Collection<? extends MetadataUpload> metadataUploads )
267 {
268 if ( closed )
269 {
270 throw new IllegalStateException( "connector closed" );
271 }
272
273 for ( ArtifactUpload transfer : safe( artifactUploads ) )
274 {
275 URI location = layout.getLocation( transfer.getArtifact(), true );
276
277 TransferResource resource = newTransferResource( location, transfer.getFile(), transfer.getTrace() );
278 TransferEvent.Builder builder = newEventBuilder( resource, true, false );
279 ArtifactTransportListener listener = new ArtifactTransportListener( transfer, repository, builder );
280
281 List<RepositoryLayout.Checksum> checksums = layout.getChecksums( transfer.getArtifact(), true, location );
282
283 Runnable task = new PutTaskRunner( location, transfer.getFile(), checksums, listener );
284 task.run();
285 }
286
287 for ( MetadataUpload transfer : safe( metadataUploads ) )
288 {
289 URI location = layout.getLocation( transfer.getMetadata(), true );
290
291 TransferResource resource = newTransferResource( location, transfer.getFile(), transfer.getTrace() );
292 TransferEvent.Builder builder = newEventBuilder( resource, true, false );
293 MetadataTransportListener listener = new MetadataTransportListener( transfer, repository, builder );
294
295 List<RepositoryLayout.Checksum> checksums = layout.getChecksums( transfer.getMetadata(), true, location );
296
297 Runnable task = new PutTaskRunner( location, transfer.getFile(), checksums, listener );
298 task.run();
299 }
300 }
301
302 private static <T> Collection<T> safe( Collection<T> items )
303 {
304 return ( items != null ) ? items : Collections.<T>emptyList();
305 }
306
307 private TransferResource newTransferResource( URI path, File file, RequestTrace trace )
308 {
309 return new TransferResource( repository.getId(), repository.getUrl(), path.toString(), file, trace );
310 }
311
312 private TransferEvent.Builder newEventBuilder( TransferResource resource, boolean upload, boolean peek )
313 {
314 TransferEvent.Builder builder = new TransferEvent.Builder( session, resource );
315 if ( upload )
316 {
317 builder.setRequestType( TransferEvent.RequestType.PUT );
318 }
319 else if ( !peek )
320 {
321 builder.setRequestType( TransferEvent.RequestType.GET );
322 }
323 else
324 {
325 builder.setRequestType( TransferEvent.RequestType.GET_EXISTENCE );
326 }
327 return builder;
328 }
329
330 private ChecksumPolicy newChecksumPolicy( String policy, TransferResource resource )
331 {
332 return checksumPolicyProvider.newChecksumPolicy( session, repository, resource, policy );
333 }
334
335 @Override
336 public String toString()
337 {
338 return String.valueOf( repository );
339 }
340
341 abstract class TaskRunner
342 implements Runnable
343 {
344
345 protected final URI path;
346
347 protected final TransferTransportListener<?> listener;
348
349 public TaskRunner( URI path, TransferTransportListener<?> listener )
350 {
351 this.path = path;
352 this.listener = listener;
353 }
354
355 public void run()
356 {
357 try
358 {
359 listener.transferInitiated();
360 runTask();
361 listener.transferSucceeded();
362 }
363 catch ( Exception e )
364 {
365 listener.transferFailed( e, transporter.classify( e ) );
366 }
367 }
368
369 protected abstract void runTask()
370 throws Exception;
371
372 }
373
374 class PeekTaskRunner
375 extends TaskRunner
376 {
377
378 public PeekTaskRunner( URI path, TransferTransportListener<?> listener )
379 {
380 super( path, listener );
381 }
382
383 protected void runTask()
384 throws Exception
385 {
386 transporter.peek( new PeekTask( path ) );
387 }
388
389 }
390
391 class GetTaskRunner
392 extends TaskRunner
393 implements PartialFile.RemoteAccessChecker, ChecksumValidator.ChecksumFetcher
394 {
395
396 private final File file;
397
398 private final ChecksumValidator checksumValidator;
399
400 public GetTaskRunner( URI path, File file, ChecksumPolicy checksumPolicy,
401 List<RepositoryLayout.Checksum> checksums, TransferTransportListener<?> listener )
402 {
403 super( path, listener );
404 this.file = requireNonNull( file, "destination file cannot be null" );
405 checksumValidator =
406 new ChecksumValidator( logger, file, fileProcessor, this, checksumPolicy, safe( checksums ) );
407 }
408
409 public void checkRemoteAccess()
410 throws Exception
411 {
412 transporter.peek( new PeekTask( path ) );
413 }
414
415 public boolean fetchChecksum( URI remote, File local )
416 throws Exception
417 {
418 try
419 {
420 transporter.get( new GetTask( remote ).setDataFile( local ) );
421 }
422 catch ( Exception e )
423 {
424 if ( transporter.classify( e ) == Transporter.ERROR_NOT_FOUND )
425 {
426 return false;
427 }
428 throw e;
429 }
430 return true;
431 }
432
433 protected void runTask()
434 throws Exception
435 {
436 fileProcessor.mkdirs( file.getParentFile() );
437
438 PartialFile partFile = partialFileFactory.newInstance( file, this );
439 if ( partFile == null )
440 {
441 logger.debug( "Concurrent download of " + file + " just finished, skipping download" );
442 return;
443 }
444
445 try
446 {
447 File tmp = partFile.getFile();
448 listener.setChecksumCalculator( checksumValidator.newChecksumCalculator( tmp ) );
449 for ( int firstTrial = 0, lastTrial = 1, trial = firstTrial;; trial++ )
450 {
451 boolean resume = partFile.isResume() && trial <= firstTrial;
452 GetTask task = new GetTask( path ).setDataFile( tmp, resume ).setListener( listener );
453 transporter.get( task );
454 try
455 {
456 checksumValidator.validate( listener.getChecksums(), smartChecksums ? task.getChecksums()
457 : null );
458 break;
459 }
460 catch ( ChecksumFailureException e )
461 {
462 boolean retry = trial < lastTrial && e.isRetryWorthy();
463 if ( !retry && !checksumValidator.handle( e ) )
464 {
465 throw e;
466 }
467 listener.transferCorrupted( e );
468 if ( retry )
469 {
470 checksumValidator.retry();
471 }
472 else
473 {
474 break;
475 }
476 }
477 }
478 fileProcessor.move( tmp, file );
479 if ( persistedChecksums )
480 {
481 checksumValidator.commit();
482 }
483 }
484 finally
485 {
486 partFile.close();
487 checksumValidator.close();
488 }
489 }
490
491 }
492
493 class PutTaskRunner
494 extends TaskRunner
495 {
496
497 private final File file;
498
499 private final Collection<RepositoryLayout.Checksum> checksums;
500
501 public PutTaskRunner( URI path, File file, List<RepositoryLayout.Checksum> checksums,
502 TransferTransportListener<?> listener )
503 {
504 super( path, listener );
505 this.file = requireNonNull( file, "source file cannot be null" );
506 this.checksums = safe( checksums );
507 }
508
509 protected void runTask()
510 throws Exception
511 {
512 transporter.put( new PutTask( path ).setDataFile( file ).setListener( listener ) );
513 uploadChecksums( file, path );
514 }
515
516 private void uploadChecksums( File file, URI location )
517 {
518 if ( checksums.isEmpty() )
519 {
520 return;
521 }
522 try
523 {
524 Set<String> algos = new HashSet<String>();
525 for ( RepositoryLayout.Checksum checksum : checksums )
526 {
527 algos.add( checksum.getAlgorithm() );
528 }
529 Map<String, Object> sumsByAlgo = ChecksumUtils.calc( file, algos );
530 for ( RepositoryLayout.Checksum checksum : checksums )
531 {
532 uploadChecksum( checksum.getLocation(), sumsByAlgo.get( checksum.getAlgorithm() ) );
533 }
534 }
535 catch ( IOException e )
536 {
537 String msg = "Failed to upload checksums for " + file + ": " + e.getMessage();
538 if ( logger.isDebugEnabled() )
539 {
540 logger.warn( msg, e );
541 }
542 else
543 {
544 logger.warn( msg );
545 }
546 }
547 }
548
549 private void uploadChecksum( URI location, Object checksum )
550 {
551 try
552 {
553 if ( checksum instanceof Exception )
554 {
555 throw (Exception) checksum;
556 }
557 transporter.put( new PutTask( location ).setDataString( (String) checksum ) );
558 }
559 catch ( Exception e )
560 {
561 String msg = "Failed to upload checksum " + location + ": " + e.getMessage();
562 if ( logger.isDebugEnabled() )
563 {
564 logger.warn( msg, e );
565 }
566 else
567 {
568 logger.warn( msg );
569 }
570 }
571 }
572
573 }
574
575 private static class DirectExecutor
576 implements Executor
577 {
578
579 static final Executor INSTANCE = new DirectExecutor();
580
581 public void execute( Runnable command )
582 {
583 command.run();
584 }
585
586 }
587
588 }