1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.aether.connector.basic;
20
21 import java.io.ByteArrayOutputStream;
22 import java.io.File;
23 import java.io.IOException;
24 import java.io.InputStream;
25 import java.io.UncheckedIOException;
26 import java.net.URI;
27 import java.util.ArrayList;
28 import java.util.Collection;
29 import java.util.Collections;
30 import java.util.List;
31 import java.util.Map;
32 import java.util.concurrent.Executor;
33 import java.util.concurrent.atomic.AtomicBoolean;
34
35 import org.eclipse.aether.ConfigurationProperties;
36 import org.eclipse.aether.RepositorySystemSession;
37 import org.eclipse.aether.RequestTrace;
38 import org.eclipse.aether.metadata.Metadata;
39 import org.eclipse.aether.repository.RemoteRepository;
40 import org.eclipse.aether.spi.connector.ArtifactDownload;
41 import org.eclipse.aether.spi.connector.ArtifactUpload;
42 import org.eclipse.aether.spi.connector.MetadataDownload;
43 import org.eclipse.aether.spi.connector.MetadataUpload;
44 import org.eclipse.aether.spi.connector.RepositoryConnector;
45 import org.eclipse.aether.spi.connector.checksum.ChecksumAlgorithmFactory;
46 import org.eclipse.aether.spi.connector.checksum.ChecksumAlgorithmHelper;
47 import org.eclipse.aether.spi.connector.checksum.ChecksumPolicy;
48 import org.eclipse.aether.spi.connector.checksum.ChecksumPolicyProvider;
49 import org.eclipse.aether.spi.connector.checksum.ProvidedChecksumsSource;
50 import org.eclipse.aether.spi.connector.layout.RepositoryLayout;
51 import org.eclipse.aether.spi.connector.layout.RepositoryLayoutProvider;
52 import org.eclipse.aether.spi.connector.transport.GetTask;
53 import org.eclipse.aether.spi.connector.transport.PeekTask;
54 import org.eclipse.aether.spi.connector.transport.PutTask;
55 import org.eclipse.aether.spi.connector.transport.Transporter;
56 import org.eclipse.aether.spi.connector.transport.TransporterProvider;
57 import org.eclipse.aether.spi.io.FileProcessor;
58 import org.eclipse.aether.transfer.ChecksumFailureException;
59 import org.eclipse.aether.transfer.NoRepositoryConnectorException;
60 import org.eclipse.aether.transfer.NoRepositoryLayoutException;
61 import org.eclipse.aether.transfer.NoTransporterException;
62 import org.eclipse.aether.transfer.TransferEvent;
63 import org.eclipse.aether.transfer.TransferResource;
64 import org.eclipse.aether.transform.FileTransformer;
65 import org.eclipse.aether.util.ConfigUtils;
66 import org.eclipse.aether.util.FileUtils;
67 import org.eclipse.aether.util.concurrency.ExecutorUtils;
68 import org.eclipse.aether.util.concurrency.RunnableErrorForwarder;
69 import org.slf4j.Logger;
70 import org.slf4j.LoggerFactory;
71
72 import static java.util.Objects.requireNonNull;
73
74
75
76
77 final class BasicRepositoryConnector implements RepositoryConnector {
78
79 private static final String CONFIG_PROP_THREADS = "aether.connector.basic.threads";
80
81 private static final String CONFIG_PROP_SMART_CHECKSUMS = "aether.connector.smartChecksums";
82
83 private static final String CONFIG_PROP_PARALLEL_PUT = "aether.connector.basic.parallelPut";
84
85 private static final Logger LOGGER = LoggerFactory.getLogger(BasicRepositoryConnector.class);
86
87 private final Map<String, ProvidedChecksumsSource> providedChecksumsSources;
88
89 private final FileProcessor fileProcessor;
90
91 private final RemoteRepository repository;
92
93 private final RepositorySystemSession session;
94
95 private final Transporter transporter;
96
97 private final RepositoryLayout layout;
98
99 private final ChecksumPolicyProvider checksumPolicyProvider;
100
101 private final int maxThreads;
102
103 private final boolean smartChecksums;
104
105 private final boolean parallelPut;
106
107 private final boolean persistedChecksums;
108
109 private Executor executor;
110
111 private final AtomicBoolean closed;
112
113 BasicRepositoryConnector(
114 RepositorySystemSession session,
115 RemoteRepository repository,
116 TransporterProvider transporterProvider,
117 RepositoryLayoutProvider layoutProvider,
118 ChecksumPolicyProvider checksumPolicyProvider,
119 FileProcessor fileProcessor,
120 Map<String, ProvidedChecksumsSource> providedChecksumsSources)
121 throws NoRepositoryConnectorException {
122 try {
123 layout = layoutProvider.newRepositoryLayout(session, repository);
124 } catch (NoRepositoryLayoutException e) {
125 throw new NoRepositoryConnectorException(repository, e.getMessage(), e);
126 }
127 try {
128 transporter = transporterProvider.newTransporter(session, repository);
129 } catch (NoTransporterException e) {
130 throw new NoRepositoryConnectorException(repository, e.getMessage(), e);
131 }
132 this.checksumPolicyProvider = checksumPolicyProvider;
133
134 this.session = session;
135 this.repository = repository;
136 this.fileProcessor = fileProcessor;
137 this.providedChecksumsSources = providedChecksumsSources;
138 this.closed = new AtomicBoolean(false);
139
140 maxThreads = ExecutorUtils.threadCount(session, 5, CONFIG_PROP_THREADS, "maven.artifact.threads");
141 smartChecksums = ConfigUtils.getBoolean(session, true, CONFIG_PROP_SMART_CHECKSUMS);
142 parallelPut = ConfigUtils.getBoolean(
143 session, true, CONFIG_PROP_PARALLEL_PUT + "." + repository.getId(), CONFIG_PROP_PARALLEL_PUT);
144 persistedChecksums = ConfigUtils.getBoolean(
145 session,
146 ConfigurationProperties.DEFAULT_PERSISTED_CHECKSUMS,
147 ConfigurationProperties.PERSISTED_CHECKSUMS);
148 }
149
150 private Executor getExecutor(int tasks) {
151 if (maxThreads <= 1) {
152 return ExecutorUtils.DIRECT_EXECUTOR;
153 }
154 if (tasks <= 1) {
155 return ExecutorUtils.DIRECT_EXECUTOR;
156 }
157 if (executor == null) {
158 executor =
159 ExecutorUtils.threadPool(maxThreads, getClass().getSimpleName() + '-' + repository.getHost() + '-');
160 }
161 return executor;
162 }
163
164 @Override
165 public void close() {
166 if (closed.compareAndSet(false, true)) {
167 ExecutorUtils.shutdown(executor);
168 transporter.close();
169 }
170 }
171
172 private void failIfClosed() {
173 if (closed.get()) {
174 throw new IllegalStateException("connector already closed");
175 }
176 }
177
178 @Override
179 public void get(
180 Collection<? extends ArtifactDownload> artifactDownloads,
181 Collection<? extends MetadataDownload> metadataDownloads) {
182 failIfClosed();
183
184 Collection<? extends ArtifactDownload> safeArtifactDownloads = safe(artifactDownloads);
185 Collection<? extends MetadataDownload> safeMetadataDownloads = safe(metadataDownloads);
186
187 Executor executor = getExecutor(safeArtifactDownloads.size() + safeMetadataDownloads.size());
188 RunnableErrorForwarder errorForwarder = new RunnableErrorForwarder();
189 List<ChecksumAlgorithmFactory> checksumAlgorithmFactories = layout.getChecksumAlgorithmFactories();
190
191 boolean first = true;
192
193 for (MetadataDownload transfer : safeMetadataDownloads) {
194 URI location = layout.getLocation(transfer.getMetadata(), false);
195
196 TransferResource resource = newTransferResource(location, transfer.getFile(), transfer.getTrace());
197 TransferEvent.Builder builder = newEventBuilder(resource, false, false);
198 MetadataTransportListener listener = new MetadataTransportListener(transfer, repository, builder);
199
200 ChecksumPolicy checksumPolicy = newChecksumPolicy(transfer.getChecksumPolicy(), resource);
201 List<RepositoryLayout.ChecksumLocation> checksumLocations = null;
202 if (checksumPolicy != null) {
203 checksumLocations = layout.getChecksumLocations(transfer.getMetadata(), false, location);
204 }
205
206 Runnable task = new GetTaskRunner(
207 location,
208 transfer.getFile(),
209 checksumPolicy,
210 checksumAlgorithmFactories,
211 checksumLocations,
212 null,
213 listener);
214 if (first) {
215 task.run();
216 first = false;
217 } else {
218 executor.execute(errorForwarder.wrap(task));
219 }
220 }
221
222 for (ArtifactDownload transfer : safeArtifactDownloads) {
223 Map<String, String> providedChecksums = Collections.emptyMap();
224 for (ProvidedChecksumsSource providedChecksumsSource : providedChecksumsSources.values()) {
225 Map<String, String> provided = providedChecksumsSource.getProvidedArtifactChecksums(
226 session, transfer, checksumAlgorithmFactories);
227
228 if (provided != null) {
229 providedChecksums = provided;
230 break;
231 }
232 }
233
234 URI location = layout.getLocation(transfer.getArtifact(), false);
235
236 TransferResource resource = newTransferResource(location, transfer.getFile(), transfer.getTrace());
237 TransferEvent.Builder builder = newEventBuilder(resource, false, transfer.isExistenceCheck());
238 ArtifactTransportListener listener = new ArtifactTransportListener(transfer, repository, builder);
239
240 Runnable task;
241 if (transfer.isExistenceCheck()) {
242 task = new PeekTaskRunner(location, listener);
243 } else {
244 ChecksumPolicy checksumPolicy = newChecksumPolicy(transfer.getChecksumPolicy(), resource);
245 List<RepositoryLayout.ChecksumLocation> checksumLocations = null;
246 if (checksumPolicy != null) {
247 checksumLocations = layout.getChecksumLocations(transfer.getArtifact(), false, location);
248 }
249
250 task = new GetTaskRunner(
251 location,
252 transfer.getFile(),
253 checksumPolicy,
254 checksumAlgorithmFactories,
255 checksumLocations,
256 providedChecksums,
257 listener);
258 }
259 if (first) {
260 task.run();
261 first = false;
262 } else {
263 executor.execute(errorForwarder.wrap(task));
264 }
265 }
266
267 errorForwarder.await();
268 }
269
270 @Override
271 public void put(
272 Collection<? extends ArtifactUpload> artifactUploads,
273 Collection<? extends MetadataUpload> metadataUploads) {
274 failIfClosed();
275
276 Collection<? extends ArtifactUpload> safeArtifactUploads = safe(artifactUploads);
277 Collection<? extends MetadataUpload> safeMetadataUploads = safe(metadataUploads);
278
279 Executor executor = getExecutor(parallelPut ? safeArtifactUploads.size() + safeMetadataUploads.size() : 1);
280 RunnableErrorForwarder errorForwarder = new RunnableErrorForwarder();
281
282 boolean first = true;
283
284 for (ArtifactUpload transfer : safeArtifactUploads) {
285 URI location = layout.getLocation(transfer.getArtifact(), true);
286
287 TransferResource resource = newTransferResource(location, transfer.getFile(), transfer.getTrace());
288 TransferEvent.Builder builder = newEventBuilder(resource, true, false);
289 ArtifactTransportListener listener = new ArtifactTransportListener(transfer, repository, builder);
290
291 List<RepositoryLayout.ChecksumLocation> checksumLocations =
292 layout.getChecksumLocations(transfer.getArtifact(), true, location);
293
294 Runnable task = new PutTaskRunner(
295 location, transfer.getFile(), transfer.getFileTransformer(), checksumLocations, listener);
296 if (first) {
297 task.run();
298 first = false;
299 } else {
300 executor.execute(errorForwarder.wrap(task));
301 }
302 }
303
304 errorForwarder.await();
305
306 for (List<? extends MetadataUpload> transferGroup : groupUploads(safeMetadataUploads)) {
307 for (MetadataUpload transfer : transferGroup) {
308 URI location = layout.getLocation(transfer.getMetadata(), true);
309
310 TransferResource resource = newTransferResource(location, transfer.getFile(), transfer.getTrace());
311 TransferEvent.Builder builder = newEventBuilder(resource, true, false);
312 MetadataTransportListener listener = new MetadataTransportListener(transfer, repository, builder);
313
314 List<RepositoryLayout.ChecksumLocation> checksumLocations =
315 layout.getChecksumLocations(transfer.getMetadata(), true, location);
316
317 Runnable task = new PutTaskRunner(location, transfer.getFile(), checksumLocations, listener);
318 if (first) {
319 task.run();
320 first = false;
321 } else {
322 executor.execute(errorForwarder.wrap(task));
323 }
324 }
325
326 errorForwarder.await();
327 }
328 }
329
330
331
332
333
334
335 private static List<List<MetadataUpload>> groupUploads(Collection<? extends MetadataUpload> metadataUploads) {
336 ArrayList<MetadataUpload> v = new ArrayList<>();
337 ArrayList<MetadataUpload> a = new ArrayList<>();
338 ArrayList<MetadataUpload> g = new ArrayList<>();
339 ArrayList<MetadataUpload> r = new ArrayList<>();
340
341 for (MetadataUpload transfer : metadataUploads) {
342 Metadata metadata = transfer.getMetadata();
343 if (!"".equals(metadata.getVersion())) {
344 v.add(transfer);
345 } else if (!"".equals(metadata.getArtifactId())) {
346 a.add(transfer);
347 } else if (!"".equals(metadata.getGroupId())) {
348 g.add(transfer);
349 } else {
350 r.add(transfer);
351 }
352 }
353
354 List<List<MetadataUpload>> result = new ArrayList<>(4);
355 if (!v.isEmpty()) {
356 result.add(v);
357 }
358 if (!a.isEmpty()) {
359 result.add(a);
360 }
361 if (!g.isEmpty()) {
362 result.add(g);
363 }
364 if (!r.isEmpty()) {
365 result.add(r);
366 }
367 return result;
368 }
369
370 private static <T> Collection<T> safe(Collection<T> items) {
371 return (items != null) ? items : Collections.emptyList();
372 }
373
374 private TransferResource newTransferResource(URI path, File file, RequestTrace trace) {
375 return new TransferResource(repository.getId(), repository.getUrl(), path.toString(), file, trace);
376 }
377
378 private TransferEvent.Builder newEventBuilder(TransferResource resource, boolean upload, boolean peek) {
379 TransferEvent.Builder builder = new TransferEvent.Builder(session, resource);
380 if (upload) {
381 builder.setRequestType(TransferEvent.RequestType.PUT);
382 } else if (!peek) {
383 builder.setRequestType(TransferEvent.RequestType.GET);
384 } else {
385 builder.setRequestType(TransferEvent.RequestType.GET_EXISTENCE);
386 }
387 return builder;
388 }
389
390 private ChecksumPolicy newChecksumPolicy(String policy, TransferResource resource) {
391 return checksumPolicyProvider.newChecksumPolicy(session, repository, resource, policy);
392 }
393
394 @Override
395 public String toString() {
396 return String.valueOf(repository);
397 }
398
399 abstract class TaskRunner implements Runnable {
400
401 protected final URI path;
402
403 protected final TransferTransportListener<?> listener;
404
405 TaskRunner(URI path, TransferTransportListener<?> listener) {
406 this.path = path;
407 this.listener = listener;
408 }
409
410 @Override
411 public void run() {
412 try {
413 listener.transferInitiated();
414 runTask();
415 listener.transferSucceeded();
416 } catch (Exception e) {
417 listener.transferFailed(e, transporter.classify(e));
418 }
419 }
420
421 protected abstract void runTask() throws Exception;
422 }
423
424 class PeekTaskRunner extends TaskRunner {
425
426 PeekTaskRunner(URI path, TransferTransportListener<?> listener) {
427 super(path, listener);
428 }
429
430 @Override
431 protected void runTask() throws Exception {
432 transporter.peek(new PeekTask(path));
433 }
434 }
435
436 class GetTaskRunner extends TaskRunner implements ChecksumValidator.ChecksumFetcher {
437
438 private final File file;
439
440 private final ChecksumValidator checksumValidator;
441
442 GetTaskRunner(
443 URI path,
444 File file,
445 ChecksumPolicy checksumPolicy,
446 List<ChecksumAlgorithmFactory> checksumAlgorithmFactories,
447 List<RepositoryLayout.ChecksumLocation> checksumLocations,
448 Map<String, String> providedChecksums,
449 TransferTransportListener<?> listener) {
450 super(path, listener);
451 this.file = requireNonNull(file, "destination file cannot be null");
452 checksumValidator = new ChecksumValidator(
453 file,
454 checksumAlgorithmFactories,
455 fileProcessor,
456 this,
457 checksumPolicy,
458 providedChecksums,
459 safe(checksumLocations));
460 }
461
462 @Override
463 public boolean fetchChecksum(URI remote, File local) throws Exception {
464 try {
465 transporter.get(new GetTask(remote).setDataFile(local));
466 } catch (Exception e) {
467 if (transporter.classify(e) == Transporter.ERROR_NOT_FOUND) {
468 return false;
469 }
470 throw e;
471 }
472 return true;
473 }
474
475 @Override
476 protected void runTask() throws Exception {
477 try (FileUtils.CollocatedTempFile tempFile = FileUtils.newTempFile(file.toPath())) {
478 final File tmp = tempFile.getPath().toFile();
479 listener.setChecksumCalculator(checksumValidator.newChecksumCalculator(tmp));
480 for (int firstTrial = 0, lastTrial = 1, trial = firstTrial; ; trial++) {
481 GetTask task = new GetTask(path).setDataFile(tmp, false).setListener(listener);
482 transporter.get(task);
483 try {
484 checksumValidator.validate(
485 listener.getChecksums(), smartChecksums ? task.getChecksums() : null);
486 break;
487 } catch (ChecksumFailureException e) {
488 boolean retry = trial < lastTrial && e.isRetryWorthy();
489 if (!retry && !checksumValidator.handle(e)) {
490 throw e;
491 }
492 listener.transferCorrupted(e);
493 if (retry) {
494 checksumValidator.retry();
495 } else {
496 break;
497 }
498 }
499 }
500 tempFile.move();
501 if (persistedChecksums) {
502 checksumValidator.commit();
503 }
504 }
505 }
506 }
507
508 class PutTaskRunner extends TaskRunner {
509
510 private final File file;
511
512 private final FileTransformer fileTransformer;
513
514 private final Collection<RepositoryLayout.ChecksumLocation> checksumLocations;
515
516 PutTaskRunner(
517 URI path,
518 File file,
519 List<RepositoryLayout.ChecksumLocation> checksumLocations,
520 TransferTransportListener<?> listener) {
521 this(path, file, null, checksumLocations, listener);
522 }
523
524
525
526
527
528
529
530
531
532
533
534 PutTaskRunner(
535 URI path,
536 File file,
537 FileTransformer fileTransformer,
538 List<RepositoryLayout.ChecksumLocation> checksumLocations,
539 TransferTransportListener<?> listener) {
540 super(path, listener);
541 this.file = requireNonNull(file, "source file cannot be null");
542 this.fileTransformer = fileTransformer;
543 this.checksumLocations = safe(checksumLocations);
544 }
545
546 @SuppressWarnings("checkstyle:innerassignment")
547 @Override
548 protected void runTask() throws Exception {
549 if (fileTransformer != null) {
550
551 ByteArrayOutputStream baos = new ByteArrayOutputStream();
552 byte[] buffer = new byte[1024];
553
554 try (InputStream transformData = fileTransformer.transformData(file)) {
555 for (int read; (read = transformData.read(buffer, 0, buffer.length)) != -1; ) {
556 baos.write(buffer, 0, read);
557 }
558 }
559
560 byte[] bytes = baos.toByteArray();
561 transporter.put(new PutTask(path).setDataBytes(bytes).setListener(listener));
562 uploadChecksums(file, bytes);
563 } else {
564 transporter.put(new PutTask(path).setDataFile(file).setListener(listener));
565 uploadChecksums(file, null);
566 }
567 }
568
569
570
571
572
573 private void uploadChecksums(File file, byte[] bytes) {
574 if (checksumLocations.isEmpty()) {
575 return;
576 }
577 try {
578 ArrayList<ChecksumAlgorithmFactory> algorithms = new ArrayList<>();
579 for (RepositoryLayout.ChecksumLocation checksumLocation : checksumLocations) {
580 algorithms.add(checksumLocation.getChecksumAlgorithmFactory());
581 }
582
583 Map<String, String> sumsByAlgo;
584 if (bytes != null) {
585 sumsByAlgo = ChecksumAlgorithmHelper.calculate(bytes, algorithms);
586 } else {
587 sumsByAlgo = ChecksumAlgorithmHelper.calculate(file, algorithms);
588 }
589
590 for (RepositoryLayout.ChecksumLocation checksumLocation : checksumLocations) {
591 uploadChecksum(
592 checksumLocation.getLocation(),
593 sumsByAlgo.get(checksumLocation
594 .getChecksumAlgorithmFactory()
595 .getName()));
596 }
597 } catch (IOException e) {
598 LOGGER.warn("Failed to upload checksums for {}", file, e);
599 throw new UncheckedIOException(e);
600 }
601 }
602
603 private void uploadChecksum(URI location, Object checksum) {
604 try {
605 if (checksum instanceof Exception) {
606 throw (Exception) checksum;
607 }
608 transporter.put(new PutTask(location).setDataString((String) checksum));
609 } catch (Exception e) {
610 LOGGER.warn("Failed to upload checksum to {}", location, e);
611 }
612 }
613 }
614 }