1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.aether.connector.basic;
20
21 import java.io.File;
22 import java.io.IOException;
23 import java.io.UncheckedIOException;
24 import java.net.URI;
25 import java.util.ArrayList;
26 import java.util.Collection;
27 import java.util.Collections;
28 import java.util.List;
29 import java.util.Map;
30 import java.util.concurrent.Executor;
31 import java.util.concurrent.atomic.AtomicBoolean;
32
33 import org.eclipse.aether.ConfigurationProperties;
34 import org.eclipse.aether.RepositorySystemSession;
35 import org.eclipse.aether.RequestTrace;
36 import org.eclipse.aether.metadata.Metadata;
37 import org.eclipse.aether.repository.RemoteRepository;
38 import org.eclipse.aether.spi.checksums.ProvidedChecksumsSource;
39 import org.eclipse.aether.spi.connector.ArtifactDownload;
40 import org.eclipse.aether.spi.connector.ArtifactUpload;
41 import org.eclipse.aether.spi.connector.MetadataDownload;
42 import org.eclipse.aether.spi.connector.MetadataUpload;
43 import org.eclipse.aether.spi.connector.RepositoryConnector;
44 import org.eclipse.aether.spi.connector.checksum.ChecksumAlgorithmFactory;
45 import org.eclipse.aether.spi.connector.checksum.ChecksumAlgorithmHelper;
46 import org.eclipse.aether.spi.connector.checksum.ChecksumPolicy;
47 import org.eclipse.aether.spi.connector.checksum.ChecksumPolicyProvider;
48 import org.eclipse.aether.spi.connector.layout.RepositoryLayout;
49 import org.eclipse.aether.spi.connector.layout.RepositoryLayoutProvider;
50 import org.eclipse.aether.spi.connector.transport.GetTask;
51 import org.eclipse.aether.spi.connector.transport.PeekTask;
52 import org.eclipse.aether.spi.connector.transport.PutTask;
53 import org.eclipse.aether.spi.connector.transport.Transporter;
54 import org.eclipse.aether.spi.connector.transport.TransporterProvider;
55 import org.eclipse.aether.spi.io.FileProcessor;
56 import org.eclipse.aether.transfer.ChecksumFailureException;
57 import org.eclipse.aether.transfer.NoRepositoryConnectorException;
58 import org.eclipse.aether.transfer.NoRepositoryLayoutException;
59 import org.eclipse.aether.transfer.NoTransporterException;
60 import org.eclipse.aether.transfer.TransferEvent;
61 import org.eclipse.aether.transfer.TransferResource;
62 import org.eclipse.aether.util.ConfigUtils;
63 import org.eclipse.aether.util.FileUtils;
64 import org.eclipse.aether.util.concurrency.ExecutorUtils;
65 import org.eclipse.aether.util.concurrency.RunnableErrorForwarder;
66 import org.slf4j.Logger;
67 import org.slf4j.LoggerFactory;
68
69 import static java.util.Objects.requireNonNull;
70
71
72
73
74 final class BasicRepositoryConnector implements RepositoryConnector {
75
76 private static final String CONFIG_PROP_THREADS = "aether.connector.basic.threads";
77
78 private static final String CONFIG_PROP_SMART_CHECKSUMS = "aether.connector.smartChecksums";
79
80 private static final String CONFIG_PROP_PARALLEL_PUT = "aether.connector.basic.parallelPut";
81
82 private static final Logger LOGGER = LoggerFactory.getLogger(BasicRepositoryConnector.class);
83
84 private final Map<String, ProvidedChecksumsSource> providedChecksumsSources;
85
86 private final FileProcessor fileProcessor;
87
88 private final RemoteRepository repository;
89
90 private final RepositorySystemSession session;
91
92 private final Transporter transporter;
93
94 private final RepositoryLayout layout;
95
96 private final ChecksumPolicyProvider checksumPolicyProvider;
97
98 private final int maxThreads;
99
100 private final boolean smartChecksums;
101
102 private final boolean parallelPut;
103
104 private final boolean persistedChecksums;
105
106 private Executor executor;
107
108 private final AtomicBoolean closed;
109
110 BasicRepositoryConnector(
111 RepositorySystemSession session,
112 RemoteRepository repository,
113 TransporterProvider transporterProvider,
114 RepositoryLayoutProvider layoutProvider,
115 ChecksumPolicyProvider checksumPolicyProvider,
116 FileProcessor fileProcessor,
117 Map<String, ProvidedChecksumsSource> providedChecksumsSources)
118 throws NoRepositoryConnectorException {
119 try {
120 layout = layoutProvider.newRepositoryLayout(session, repository);
121 } catch (NoRepositoryLayoutException e) {
122 throw new NoRepositoryConnectorException(repository, e.getMessage(), e);
123 }
124 try {
125 transporter = transporterProvider.newTransporter(session, repository);
126 } catch (NoTransporterException e) {
127 throw new NoRepositoryConnectorException(repository, e.getMessage(), e);
128 }
129 this.checksumPolicyProvider = checksumPolicyProvider;
130
131 this.session = session;
132 this.repository = repository;
133 this.fileProcessor = fileProcessor;
134 this.providedChecksumsSources = providedChecksumsSources;
135 this.closed = new AtomicBoolean(false);
136
137 maxThreads = ExecutorUtils.threadCount(session, 5, CONFIG_PROP_THREADS, "maven.artifact.threads");
138 smartChecksums = ConfigUtils.getBoolean(session, true, CONFIG_PROP_SMART_CHECKSUMS);
139 parallelPut = ConfigUtils.getBoolean(
140 session, true, CONFIG_PROP_PARALLEL_PUT + "." + repository.getId(), CONFIG_PROP_PARALLEL_PUT);
141 persistedChecksums = ConfigUtils.getBoolean(
142 session,
143 ConfigurationProperties.DEFAULT_PERSISTED_CHECKSUMS,
144 ConfigurationProperties.PERSISTED_CHECKSUMS);
145 }
146
147 private Executor getExecutor(int tasks) {
148 if (maxThreads <= 1) {
149 return ExecutorUtils.DIRECT_EXECUTOR;
150 }
151 if (tasks <= 1) {
152 return ExecutorUtils.DIRECT_EXECUTOR;
153 }
154 if (executor == null) {
155 executor =
156 ExecutorUtils.threadPool(maxThreads, getClass().getSimpleName() + '-' + repository.getHost() + '-');
157 }
158 return executor;
159 }
160
161 @Override
162 public void close() {
163 if (closed.compareAndSet(false, true)) {
164 ExecutorUtils.shutdown(executor);
165 transporter.close();
166 }
167 }
168
169 private void failIfClosed() {
170 if (closed.get()) {
171 throw new IllegalStateException("connector already closed");
172 }
173 }
174
175 @Override
176 public void get(
177 Collection<? extends ArtifactDownload> artifactDownloads,
178 Collection<? extends MetadataDownload> metadataDownloads) {
179 failIfClosed();
180
181 Collection<? extends ArtifactDownload> safeArtifactDownloads = safe(artifactDownloads);
182 Collection<? extends MetadataDownload> safeMetadataDownloads = safe(metadataDownloads);
183
184 Executor executor = getExecutor(safeArtifactDownloads.size() + safeMetadataDownloads.size());
185 RunnableErrorForwarder errorForwarder = new RunnableErrorForwarder();
186 List<ChecksumAlgorithmFactory> checksumAlgorithmFactories = layout.getChecksumAlgorithmFactories();
187
188 boolean first = true;
189
190 for (MetadataDownload transfer : safeMetadataDownloads) {
191 URI location = layout.getLocation(transfer.getMetadata(), false);
192
193 TransferResource resource = newTransferResource(location, transfer.getFile(), transfer.getTrace());
194 TransferEvent.Builder builder = newEventBuilder(resource, false, false);
195 MetadataTransportListener listener = new MetadataTransportListener(transfer, repository, builder);
196
197 ChecksumPolicy checksumPolicy = newChecksumPolicy(transfer.getChecksumPolicy(), resource);
198 List<RepositoryLayout.ChecksumLocation> checksumLocations = null;
199 if (checksumPolicy != null) {
200 checksumLocations = layout.getChecksumLocations(transfer.getMetadata(), false, location);
201 }
202
203 Runnable task = new GetTaskRunner(
204 location,
205 transfer.getFile(),
206 checksumPolicy,
207 checksumAlgorithmFactories,
208 checksumLocations,
209 null,
210 listener);
211 if (first) {
212 task.run();
213 first = false;
214 } else {
215 executor.execute(errorForwarder.wrap(task));
216 }
217 }
218
219 for (ArtifactDownload transfer : safeArtifactDownloads) {
220 Map<String, String> providedChecksums = Collections.emptyMap();
221 for (ProvidedChecksumsSource providedChecksumsSource : providedChecksumsSources.values()) {
222 Map<String, String> provided = providedChecksumsSource.getProvidedArtifactChecksums(
223 session, transfer, repository, checksumAlgorithmFactories);
224
225 if (provided != null) {
226 providedChecksums = provided;
227 break;
228 }
229 }
230
231 URI location = layout.getLocation(transfer.getArtifact(), false);
232
233 TransferResource resource = newTransferResource(location, transfer.getFile(), transfer.getTrace());
234 TransferEvent.Builder builder = newEventBuilder(resource, false, transfer.isExistenceCheck());
235 ArtifactTransportListener listener = new ArtifactTransportListener(transfer, repository, builder);
236
237 Runnable task;
238 if (transfer.isExistenceCheck()) {
239 task = new PeekTaskRunner(location, listener);
240 } else {
241 ChecksumPolicy checksumPolicy = newChecksumPolicy(transfer.getChecksumPolicy(), resource);
242 List<RepositoryLayout.ChecksumLocation> checksumLocations = null;
243 if (checksumPolicy != null) {
244 checksumLocations = layout.getChecksumLocations(transfer.getArtifact(), false, location);
245 }
246
247 task = new GetTaskRunner(
248 location,
249 transfer.getFile(),
250 checksumPolicy,
251 checksumAlgorithmFactories,
252 checksumLocations,
253 providedChecksums,
254 listener);
255 }
256 if (first) {
257 task.run();
258 first = false;
259 } else {
260 executor.execute(errorForwarder.wrap(task));
261 }
262 }
263
264 errorForwarder.await();
265 }
266
267 @Override
268 public void put(
269 Collection<? extends ArtifactUpload> artifactUploads,
270 Collection<? extends MetadataUpload> metadataUploads) {
271 failIfClosed();
272
273 Collection<? extends ArtifactUpload> safeArtifactUploads = safe(artifactUploads);
274 Collection<? extends MetadataUpload> safeMetadataUploads = safe(metadataUploads);
275
276 Executor executor = getExecutor(parallelPut ? safeArtifactUploads.size() + safeMetadataUploads.size() : 1);
277 RunnableErrorForwarder errorForwarder = new RunnableErrorForwarder();
278
279 boolean first = true;
280
281 for (ArtifactUpload transfer : safeArtifactUploads) {
282 URI location = layout.getLocation(transfer.getArtifact(), true);
283
284 TransferResource resource = newTransferResource(location, transfer.getFile(), transfer.getTrace());
285 TransferEvent.Builder builder = newEventBuilder(resource, true, false);
286 ArtifactTransportListener listener = new ArtifactTransportListener(transfer, repository, builder);
287
288 List<RepositoryLayout.ChecksumLocation> checksumLocations =
289 layout.getChecksumLocations(transfer.getArtifact(), true, location);
290
291 Runnable task = new PutTaskRunner(location, transfer.getFile(), checksumLocations, listener);
292 if (first) {
293 task.run();
294 first = false;
295 } else {
296 executor.execute(errorForwarder.wrap(task));
297 }
298 }
299
300 errorForwarder.await();
301
302 for (List<? extends MetadataUpload> transferGroup : groupUploads(safeMetadataUploads)) {
303 for (MetadataUpload transfer : transferGroup) {
304 URI location = layout.getLocation(transfer.getMetadata(), true);
305
306 TransferResource resource = newTransferResource(location, transfer.getFile(), transfer.getTrace());
307 TransferEvent.Builder builder = newEventBuilder(resource, true, false);
308 MetadataTransportListener listener = new MetadataTransportListener(transfer, repository, builder);
309
310 List<RepositoryLayout.ChecksumLocation> checksumLocations =
311 layout.getChecksumLocations(transfer.getMetadata(), true, location);
312
313 Runnable task = new PutTaskRunner(location, transfer.getFile(), checksumLocations, listener);
314 if (first) {
315 task.run();
316 first = false;
317 } else {
318 executor.execute(errorForwarder.wrap(task));
319 }
320 }
321
322 errorForwarder.await();
323 }
324 }
325
326
327
328
329
330
331 private static List<List<MetadataUpload>> groupUploads(Collection<? extends MetadataUpload> metadataUploads) {
332 ArrayList<MetadataUpload> v = new ArrayList<>();
333 ArrayList<MetadataUpload> a = new ArrayList<>();
334 ArrayList<MetadataUpload> g = new ArrayList<>();
335 ArrayList<MetadataUpload> r = new ArrayList<>();
336
337 for (MetadataUpload transfer : metadataUploads) {
338 Metadata metadata = transfer.getMetadata();
339 if (!"".equals(metadata.getVersion())) {
340 v.add(transfer);
341 } else if (!"".equals(metadata.getArtifactId())) {
342 a.add(transfer);
343 } else if (!"".equals(metadata.getGroupId())) {
344 g.add(transfer);
345 } else {
346 r.add(transfer);
347 }
348 }
349
350 List<List<MetadataUpload>> result = new ArrayList<>(4);
351 if (!v.isEmpty()) {
352 result.add(v);
353 }
354 if (!a.isEmpty()) {
355 result.add(a);
356 }
357 if (!g.isEmpty()) {
358 result.add(g);
359 }
360 if (!r.isEmpty()) {
361 result.add(r);
362 }
363 return result;
364 }
365
366 private static <T> Collection<T> safe(Collection<T> items) {
367 return (items != null) ? items : Collections.emptyList();
368 }
369
370 private TransferResource newTransferResource(URI path, File file, RequestTrace trace) {
371 return new TransferResource(repository.getId(), repository.getUrl(), path.toString(), file, trace);
372 }
373
374 private TransferEvent.Builder newEventBuilder(TransferResource resource, boolean upload, boolean peek) {
375 TransferEvent.Builder builder = new TransferEvent.Builder(session, resource);
376 if (upload) {
377 builder.setRequestType(TransferEvent.RequestType.PUT);
378 } else if (!peek) {
379 builder.setRequestType(TransferEvent.RequestType.GET);
380 } else {
381 builder.setRequestType(TransferEvent.RequestType.GET_EXISTENCE);
382 }
383 return builder;
384 }
385
386 private ChecksumPolicy newChecksumPolicy(String policy, TransferResource resource) {
387 return checksumPolicyProvider.newChecksumPolicy(session, repository, resource, policy);
388 }
389
390 @Override
391 public String toString() {
392 return String.valueOf(repository);
393 }
394
395 abstract class TaskRunner implements Runnable {
396
397 protected final URI path;
398
399 protected final TransferTransportListener<?> listener;
400
401 TaskRunner(URI path, TransferTransportListener<?> listener) {
402 this.path = path;
403 this.listener = listener;
404 }
405
406 @Override
407 public void run() {
408 try {
409 listener.transferInitiated();
410 runTask();
411 listener.transferSucceeded();
412 } catch (Exception e) {
413 listener.transferFailed(e, transporter.classify(e));
414 }
415 }
416
417 protected abstract void runTask() throws Exception;
418 }
419
420 class PeekTaskRunner extends TaskRunner {
421
422 PeekTaskRunner(URI path, TransferTransportListener<?> listener) {
423 super(path, listener);
424 }
425
426 @Override
427 protected void runTask() throws Exception {
428 transporter.peek(new PeekTask(path));
429 }
430 }
431
432 class GetTaskRunner extends TaskRunner implements ChecksumValidator.ChecksumFetcher {
433
434 private final File file;
435
436 private final ChecksumValidator checksumValidator;
437
438 GetTaskRunner(
439 URI path,
440 File file,
441 ChecksumPolicy checksumPolicy,
442 List<ChecksumAlgorithmFactory> checksumAlgorithmFactories,
443 List<RepositoryLayout.ChecksumLocation> checksumLocations,
444 Map<String, String> providedChecksums,
445 TransferTransportListener<?> listener) {
446 super(path, listener);
447 this.file = requireNonNull(file, "destination file cannot be null");
448 checksumValidator = new ChecksumValidator(
449 file,
450 checksumAlgorithmFactories,
451 fileProcessor,
452 this,
453 checksumPolicy,
454 providedChecksums,
455 safe(checksumLocations));
456 }
457
458 @Override
459 public boolean fetchChecksum(URI remote, File local) throws Exception {
460 try {
461 transporter.get(new GetTask(remote).setDataFile(local));
462 } catch (Exception e) {
463 if (transporter.classify(e) == Transporter.ERROR_NOT_FOUND) {
464 return false;
465 }
466 throw e;
467 }
468 return true;
469 }
470
471 @Override
472 protected void runTask() throws Exception {
473 try (FileUtils.CollocatedTempFile tempFile = FileUtils.newTempFile(file.toPath())) {
474 final File tmp = tempFile.getPath().toFile();
475 listener.setChecksumCalculator(checksumValidator.newChecksumCalculator(tmp));
476 for (int firstTrial = 0, lastTrial = 1, trial = firstTrial; ; trial++) {
477 GetTask task = new GetTask(path).setDataFile(tmp, false).setListener(listener);
478 transporter.get(task);
479 try {
480 checksumValidator.validate(
481 listener.getChecksums(), smartChecksums ? task.getChecksums() : null);
482 break;
483 } catch (ChecksumFailureException e) {
484 boolean retry = trial < lastTrial && e.isRetryWorthy();
485 if (!retry && !checksumValidator.handle(e)) {
486 throw e;
487 }
488 listener.transferCorrupted(e);
489 if (retry) {
490 checksumValidator.retry();
491 } else {
492 break;
493 }
494 }
495 }
496 tempFile.move();
497 if (persistedChecksums) {
498 checksumValidator.commit();
499 }
500 }
501 }
502 }
503
504 class PutTaskRunner extends TaskRunner {
505
506 private final File file;
507
508 private final Collection<RepositoryLayout.ChecksumLocation> checksumLocations;
509
510 PutTaskRunner(
511 URI path,
512 File file,
513 List<RepositoryLayout.ChecksumLocation> checksumLocations,
514 TransferTransportListener<?> listener) {
515 super(path, listener);
516 this.file = requireNonNull(file, "source file cannot be null");
517 this.checksumLocations = safe(checksumLocations);
518 }
519
520 @SuppressWarnings("checkstyle:innerassignment")
521 @Override
522 protected void runTask() throws Exception {
523 transporter.put(new PutTask(path).setDataFile(file).setListener(listener));
524 uploadChecksums(file, null);
525 }
526
527
528
529
530
531 private void uploadChecksums(File file, byte[] bytes) {
532 if (checksumLocations.isEmpty()) {
533 return;
534 }
535 try {
536 ArrayList<ChecksumAlgorithmFactory> algorithms = new ArrayList<>();
537 for (RepositoryLayout.ChecksumLocation checksumLocation : checksumLocations) {
538 algorithms.add(checksumLocation.getChecksumAlgorithmFactory());
539 }
540
541 Map<String, String> sumsByAlgo;
542 if (bytes != null) {
543 sumsByAlgo = ChecksumAlgorithmHelper.calculate(bytes, algorithms);
544 } else {
545 sumsByAlgo = ChecksumAlgorithmHelper.calculate(file, algorithms);
546 }
547
548 for (RepositoryLayout.ChecksumLocation checksumLocation : checksumLocations) {
549 uploadChecksum(
550 checksumLocation.getLocation(),
551 sumsByAlgo.get(checksumLocation
552 .getChecksumAlgorithmFactory()
553 .getName()));
554 }
555 } catch (IOException e) {
556 LOGGER.warn("Failed to upload checksums for {}", file, e);
557 throw new UncheckedIOException(e);
558 }
559 }
560
561 private void uploadChecksum(URI location, Object checksum) {
562 try {
563 if (checksum instanceof Exception) {
564 throw (Exception) checksum;
565 }
566 transporter.put(new PutTask(location).setDataString((String) checksum));
567 } catch (Exception e) {
568 LOGGER.warn("Failed to upload checksum to {}", location, e);
569 }
570 }
571 }
572 }