1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.aether.connector.basic;
20
21 import java.io.File;
22 import java.io.IOException;
23 import java.io.UncheckedIOException;
24 import java.net.URI;
25 import java.util.ArrayList;
26 import java.util.Collection;
27 import java.util.Collections;
28 import java.util.List;
29 import java.util.Map;
30 import java.util.concurrent.Executor;
31 import java.util.concurrent.atomic.AtomicBoolean;
32
33 import org.eclipse.aether.RepositorySystemSession;
34 import org.eclipse.aether.RequestTrace;
35 import org.eclipse.aether.metadata.Metadata;
36 import org.eclipse.aether.repository.RemoteRepository;
37 import org.eclipse.aether.spi.checksums.ProvidedChecksumsSource;
38 import org.eclipse.aether.spi.connector.ArtifactDownload;
39 import org.eclipse.aether.spi.connector.ArtifactUpload;
40 import org.eclipse.aether.spi.connector.MetadataDownload;
41 import org.eclipse.aether.spi.connector.MetadataUpload;
42 import org.eclipse.aether.spi.connector.RepositoryConnector;
43 import org.eclipse.aether.spi.connector.checksum.ChecksumAlgorithmFactory;
44 import org.eclipse.aether.spi.connector.checksum.ChecksumAlgorithmHelper;
45 import org.eclipse.aether.spi.connector.checksum.ChecksumPolicy;
46 import org.eclipse.aether.spi.connector.checksum.ChecksumPolicyProvider;
47 import org.eclipse.aether.spi.connector.layout.RepositoryLayout;
48 import org.eclipse.aether.spi.connector.layout.RepositoryLayoutProvider;
49 import org.eclipse.aether.spi.connector.transport.GetTask;
50 import org.eclipse.aether.spi.connector.transport.PeekTask;
51 import org.eclipse.aether.spi.connector.transport.PutTask;
52 import org.eclipse.aether.spi.connector.transport.Transporter;
53 import org.eclipse.aether.spi.connector.transport.TransporterProvider;
54 import org.eclipse.aether.spi.io.FileProcessor;
55 import org.eclipse.aether.transfer.ChecksumFailureException;
56 import org.eclipse.aether.transfer.NoRepositoryConnectorException;
57 import org.eclipse.aether.transfer.NoRepositoryLayoutException;
58 import org.eclipse.aether.transfer.NoTransporterException;
59 import org.eclipse.aether.transfer.TransferEvent;
60 import org.eclipse.aether.transfer.TransferResource;
61 import org.eclipse.aether.util.ConfigUtils;
62 import org.eclipse.aether.util.FileUtils;
63 import org.eclipse.aether.util.concurrency.ExecutorUtils;
64 import org.eclipse.aether.util.concurrency.RunnableErrorForwarder;
65 import org.slf4j.Logger;
66 import org.slf4j.LoggerFactory;
67
68 import static java.util.Objects.requireNonNull;
69 import static org.eclipse.aether.connector.basic.BasicRepositoryConnectorConfigurationKeys.CONFIG_PROP_PARALLEL_PUT;
70 import static org.eclipse.aether.connector.basic.BasicRepositoryConnectorConfigurationKeys.CONFIG_PROP_PERSISTED_CHECKSUMS;
71 import static org.eclipse.aether.connector.basic.BasicRepositoryConnectorConfigurationKeys.CONFIG_PROP_SMART_CHECKSUMS;
72 import static org.eclipse.aether.connector.basic.BasicRepositoryConnectorConfigurationKeys.CONFIG_PROP_THREADS;
73 import static org.eclipse.aether.connector.basic.BasicRepositoryConnectorConfigurationKeys.DEFAULT_PARALLEL_PUT;
74 import static org.eclipse.aether.connector.basic.BasicRepositoryConnectorConfigurationKeys.DEFAULT_PERSISTED_CHECKSUMS;
75 import static org.eclipse.aether.connector.basic.BasicRepositoryConnectorConfigurationKeys.DEFAULT_SMART_CHECKSUMS;
76 import static org.eclipse.aether.connector.basic.BasicRepositoryConnectorConfigurationKeys.DEFAULT_THREADS;
77
78
79
80
81 final class BasicRepositoryConnector implements RepositoryConnector {
82 private static final Logger LOGGER = LoggerFactory.getLogger(BasicRepositoryConnector.class);
83
84 private final Map<String, ProvidedChecksumsSource> providedChecksumsSources;
85
86 private final FileProcessor fileProcessor;
87
88 private final RemoteRepository repository;
89
90 private final RepositorySystemSession session;
91
92 private final Transporter transporter;
93
94 private final RepositoryLayout layout;
95
96 private final ChecksumPolicyProvider checksumPolicyProvider;
97
98 private final int maxThreads;
99
100 private final boolean smartChecksums;
101
102 private final boolean parallelPut;
103
104 private final boolean persistedChecksums;
105
106 private Executor executor;
107
108 private final AtomicBoolean closed;
109
110 BasicRepositoryConnector(
111 RepositorySystemSession session,
112 RemoteRepository repository,
113 TransporterProvider transporterProvider,
114 RepositoryLayoutProvider layoutProvider,
115 ChecksumPolicyProvider checksumPolicyProvider,
116 FileProcessor fileProcessor,
117 Map<String, ProvidedChecksumsSource> providedChecksumsSources)
118 throws NoRepositoryConnectorException {
119 try {
120 layout = layoutProvider.newRepositoryLayout(session, repository);
121 } catch (NoRepositoryLayoutException e) {
122 throw new NoRepositoryConnectorException(repository, e.getMessage(), e);
123 }
124 try {
125 transporter = transporterProvider.newTransporter(session, repository);
126 } catch (NoTransporterException e) {
127 throw new NoRepositoryConnectorException(repository, e.getMessage(), e);
128 }
129 this.checksumPolicyProvider = checksumPolicyProvider;
130
131 this.session = session;
132 this.repository = repository;
133 this.fileProcessor = fileProcessor;
134 this.providedChecksumsSources = providedChecksumsSources;
135 this.closed = new AtomicBoolean(false);
136
137 maxThreads = ExecutorUtils.threadCount(session, DEFAULT_THREADS, CONFIG_PROP_THREADS);
138 smartChecksums = ConfigUtils.getBoolean(session, DEFAULT_SMART_CHECKSUMS, CONFIG_PROP_SMART_CHECKSUMS);
139 parallelPut = ConfigUtils.getBoolean(
140 session,
141 DEFAULT_PARALLEL_PUT,
142 CONFIG_PROP_PARALLEL_PUT + "." + repository.getId(),
143 CONFIG_PROP_PARALLEL_PUT);
144 persistedChecksums =
145 ConfigUtils.getBoolean(session, DEFAULT_PERSISTED_CHECKSUMS, CONFIG_PROP_PERSISTED_CHECKSUMS);
146 }
147
148 private Executor getExecutor(int tasks) {
149 if (maxThreads <= 1) {
150 return ExecutorUtils.DIRECT_EXECUTOR;
151 }
152 if (tasks <= 1) {
153 return ExecutorUtils.DIRECT_EXECUTOR;
154 }
155 if (executor == null) {
156 executor =
157 ExecutorUtils.threadPool(maxThreads, getClass().getSimpleName() + '-' + repository.getHost() + '-');
158 }
159 return executor;
160 }
161
162 @Override
163 public void close() {
164 if (closed.compareAndSet(false, true)) {
165 ExecutorUtils.shutdown(executor);
166 transporter.close();
167 }
168 }
169
170 private void failIfClosed() {
171 if (closed.get()) {
172 throw new IllegalStateException("connector already closed");
173 }
174 }
175
176 @Override
177 public void get(
178 Collection<? extends ArtifactDownload> artifactDownloads,
179 Collection<? extends MetadataDownload> metadataDownloads) {
180 failIfClosed();
181
182 Collection<? extends ArtifactDownload> safeArtifactDownloads = safe(artifactDownloads);
183 Collection<? extends MetadataDownload> safeMetadataDownloads = safe(metadataDownloads);
184
185 Executor executor = getExecutor(safeArtifactDownloads.size() + safeMetadataDownloads.size());
186 RunnableErrorForwarder errorForwarder = new RunnableErrorForwarder();
187 List<ChecksumAlgorithmFactory> checksumAlgorithmFactories = layout.getChecksumAlgorithmFactories();
188
189 boolean first = true;
190
191 for (MetadataDownload transfer : safeMetadataDownloads) {
192 URI location = layout.getLocation(transfer.getMetadata(), false);
193
194 TransferResource resource = newTransferResource(location, transfer.getFile(), transfer.getTrace());
195 TransferEvent.Builder builder = newEventBuilder(resource, false, false);
196 MetadataTransportListener listener = new MetadataTransportListener(transfer, repository, builder);
197
198 ChecksumPolicy checksumPolicy = newChecksumPolicy(transfer.getChecksumPolicy(), resource);
199 List<RepositoryLayout.ChecksumLocation> checksumLocations = null;
200 if (checksumPolicy != null) {
201 checksumLocations = layout.getChecksumLocations(transfer.getMetadata(), false, location);
202 }
203
204 Runnable task = new GetTaskRunner(
205 location,
206 transfer.getFile(),
207 checksumPolicy,
208 checksumAlgorithmFactories,
209 checksumLocations,
210 null,
211 listener);
212 if (first) {
213 task.run();
214 first = false;
215 } else {
216 executor.execute(errorForwarder.wrap(task));
217 }
218 }
219
220 for (ArtifactDownload transfer : safeArtifactDownloads) {
221 Map<String, String> providedChecksums = Collections.emptyMap();
222 for (ProvidedChecksumsSource providedChecksumsSource : providedChecksumsSources.values()) {
223 Map<String, String> provided = providedChecksumsSource.getProvidedArtifactChecksums(
224 session, transfer, repository, checksumAlgorithmFactories);
225
226 if (provided != null) {
227 providedChecksums = provided;
228 break;
229 }
230 }
231
232 URI location = layout.getLocation(transfer.getArtifact(), false);
233
234 TransferResource resource = newTransferResource(location, transfer.getFile(), transfer.getTrace());
235 TransferEvent.Builder builder = newEventBuilder(resource, false, transfer.isExistenceCheck());
236 ArtifactTransportListener listener = new ArtifactTransportListener(transfer, repository, builder);
237
238 Runnable task;
239 if (transfer.isExistenceCheck()) {
240 task = new PeekTaskRunner(location, listener);
241 } else {
242 ChecksumPolicy checksumPolicy = newChecksumPolicy(transfer.getChecksumPolicy(), resource);
243 List<RepositoryLayout.ChecksumLocation> checksumLocations = null;
244 if (checksumPolicy != null) {
245 checksumLocations = layout.getChecksumLocations(transfer.getArtifact(), false, location);
246 }
247
248 task = new GetTaskRunner(
249 location,
250 transfer.getFile(),
251 checksumPolicy,
252 checksumAlgorithmFactories,
253 checksumLocations,
254 providedChecksums,
255 listener);
256 }
257 if (first) {
258 task.run();
259 first = false;
260 } else {
261 executor.execute(errorForwarder.wrap(task));
262 }
263 }
264
265 errorForwarder.await();
266 }
267
268 @Override
269 public void put(
270 Collection<? extends ArtifactUpload> artifactUploads,
271 Collection<? extends MetadataUpload> metadataUploads) {
272 failIfClosed();
273
274 Collection<? extends ArtifactUpload> safeArtifactUploads = safe(artifactUploads);
275 Collection<? extends MetadataUpload> safeMetadataUploads = safe(metadataUploads);
276
277 Executor executor = getExecutor(parallelPut ? safeArtifactUploads.size() + safeMetadataUploads.size() : 1);
278 RunnableErrorForwarder errorForwarder = new RunnableErrorForwarder();
279
280 boolean first = true;
281
282 for (ArtifactUpload transfer : safeArtifactUploads) {
283 URI location = layout.getLocation(transfer.getArtifact(), true);
284
285 TransferResource resource = newTransferResource(location, transfer.getFile(), transfer.getTrace());
286 TransferEvent.Builder builder = newEventBuilder(resource, true, false);
287 ArtifactTransportListener listener = new ArtifactTransportListener(transfer, repository, builder);
288
289 List<RepositoryLayout.ChecksumLocation> checksumLocations =
290 layout.getChecksumLocations(transfer.getArtifact(), true, location);
291
292 Runnable task = new PutTaskRunner(location, transfer.getFile(), checksumLocations, listener);
293 if (first) {
294 task.run();
295 first = false;
296 } else {
297 executor.execute(errorForwarder.wrap(task));
298 }
299 }
300
301 errorForwarder.await();
302
303 for (List<? extends MetadataUpload> transferGroup : groupUploads(safeMetadataUploads)) {
304 for (MetadataUpload transfer : transferGroup) {
305 URI location = layout.getLocation(transfer.getMetadata(), true);
306
307 TransferResource resource = newTransferResource(location, transfer.getFile(), transfer.getTrace());
308 TransferEvent.Builder builder = newEventBuilder(resource, true, false);
309 MetadataTransportListener listener = new MetadataTransportListener(transfer, repository, builder);
310
311 List<RepositoryLayout.ChecksumLocation> checksumLocations =
312 layout.getChecksumLocations(transfer.getMetadata(), true, location);
313
314 Runnable task = new PutTaskRunner(location, transfer.getFile(), checksumLocations, listener);
315 if (first) {
316 task.run();
317 first = false;
318 } else {
319 executor.execute(errorForwarder.wrap(task));
320 }
321 }
322
323 errorForwarder.await();
324 }
325 }
326
327
328
329
330
331
332 private static List<List<MetadataUpload>> groupUploads(Collection<? extends MetadataUpload> metadataUploads) {
333 ArrayList<MetadataUpload> v = new ArrayList<>();
334 ArrayList<MetadataUpload> a = new ArrayList<>();
335 ArrayList<MetadataUpload> g = new ArrayList<>();
336 ArrayList<MetadataUpload> r = new ArrayList<>();
337
338 for (MetadataUpload transfer : metadataUploads) {
339 Metadata metadata = transfer.getMetadata();
340 if (!"".equals(metadata.getVersion())) {
341 v.add(transfer);
342 } else if (!"".equals(metadata.getArtifactId())) {
343 a.add(transfer);
344 } else if (!"".equals(metadata.getGroupId())) {
345 g.add(transfer);
346 } else {
347 r.add(transfer);
348 }
349 }
350
351 List<List<MetadataUpload>> result = new ArrayList<>(4);
352 if (!v.isEmpty()) {
353 result.add(v);
354 }
355 if (!a.isEmpty()) {
356 result.add(a);
357 }
358 if (!g.isEmpty()) {
359 result.add(g);
360 }
361 if (!r.isEmpty()) {
362 result.add(r);
363 }
364 return result;
365 }
366
367 private static <T> Collection<T> safe(Collection<T> items) {
368 return (items != null) ? items : Collections.emptyList();
369 }
370
371 private TransferResource newTransferResource(URI path, File file, RequestTrace trace) {
372 return new TransferResource(repository.getId(), repository.getUrl(), path.toString(), file, trace);
373 }
374
375 private TransferEvent.Builder newEventBuilder(TransferResource resource, boolean upload, boolean peek) {
376 TransferEvent.Builder builder = new TransferEvent.Builder(session, resource);
377 if (upload) {
378 builder.setRequestType(TransferEvent.RequestType.PUT);
379 } else if (!peek) {
380 builder.setRequestType(TransferEvent.RequestType.GET);
381 } else {
382 builder.setRequestType(TransferEvent.RequestType.GET_EXISTENCE);
383 }
384 return builder;
385 }
386
387 private ChecksumPolicy newChecksumPolicy(String policy, TransferResource resource) {
388 return checksumPolicyProvider.newChecksumPolicy(session, repository, resource, policy);
389 }
390
391 @Override
392 public String toString() {
393 return String.valueOf(repository);
394 }
395
396 abstract class TaskRunner implements Runnable {
397
398 protected final URI path;
399
400 protected final TransferTransportListener<?> listener;
401
402 TaskRunner(URI path, TransferTransportListener<?> listener) {
403 this.path = path;
404 this.listener = listener;
405 }
406
407 @Override
408 public void run() {
409 try {
410 listener.transferInitiated();
411 runTask();
412 listener.transferSucceeded();
413 } catch (Exception e) {
414 listener.transferFailed(e, transporter.classify(e));
415 }
416 }
417
418 protected abstract void runTask() throws Exception;
419 }
420
421 class PeekTaskRunner extends TaskRunner {
422
423 PeekTaskRunner(URI path, TransferTransportListener<?> listener) {
424 super(path, listener);
425 }
426
427 @Override
428 protected void runTask() throws Exception {
429 transporter.peek(new PeekTask(path));
430 }
431 }
432
433 class GetTaskRunner extends TaskRunner implements ChecksumValidator.ChecksumFetcher {
434
435 private final File file;
436
437 private final ChecksumValidator checksumValidator;
438
439 GetTaskRunner(
440 URI path,
441 File file,
442 ChecksumPolicy checksumPolicy,
443 List<ChecksumAlgorithmFactory> checksumAlgorithmFactories,
444 List<RepositoryLayout.ChecksumLocation> checksumLocations,
445 Map<String, String> providedChecksums,
446 TransferTransportListener<?> listener) {
447 super(path, listener);
448 this.file = requireNonNull(file, "destination file cannot be null");
449 checksumValidator = new ChecksumValidator(
450 file,
451 checksumAlgorithmFactories,
452 fileProcessor,
453 this,
454 checksumPolicy,
455 providedChecksums,
456 safe(checksumLocations));
457 }
458
459 @Override
460 public boolean fetchChecksum(URI remote, File local) throws Exception {
461 try {
462 transporter.get(new GetTask(remote).setDataFile(local));
463 } catch (Exception e) {
464 if (transporter.classify(e) == Transporter.ERROR_NOT_FOUND) {
465 return false;
466 }
467 throw e;
468 }
469 return true;
470 }
471
472 @Override
473 protected void runTask() throws Exception {
474 try (FileUtils.CollocatedTempFile tempFile = FileUtils.newTempFile(file.toPath())) {
475 final File tmp = tempFile.getPath().toFile();
476 listener.setChecksumCalculator(checksumValidator.newChecksumCalculator(tmp));
477 for (int firstTrial = 0, lastTrial = 1, trial = firstTrial; ; trial++) {
478 GetTask task = new GetTask(path).setDataFile(tmp, false).setListener(listener);
479 transporter.get(task);
480 try {
481 checksumValidator.validate(
482 listener.getChecksums(), smartChecksums ? task.getChecksums() : null);
483 break;
484 } catch (ChecksumFailureException e) {
485 boolean retry = trial < lastTrial && e.isRetryWorthy();
486 if (!retry && !checksumValidator.handle(e)) {
487 throw e;
488 }
489 listener.transferCorrupted(e);
490 if (retry) {
491 checksumValidator.retry();
492 } else {
493 break;
494 }
495 }
496 }
497 tempFile.move();
498 if (persistedChecksums) {
499 checksumValidator.commit();
500 }
501 }
502 }
503 }
504
505 class PutTaskRunner extends TaskRunner {
506
507 private final File file;
508
509 private final Collection<RepositoryLayout.ChecksumLocation> checksumLocations;
510
511 PutTaskRunner(
512 URI path,
513 File file,
514 List<RepositoryLayout.ChecksumLocation> checksumLocations,
515 TransferTransportListener<?> listener) {
516 super(path, listener);
517 this.file = requireNonNull(file, "source file cannot be null");
518 this.checksumLocations = safe(checksumLocations);
519 }
520
521 @SuppressWarnings("checkstyle:innerassignment")
522 @Override
523 protected void runTask() throws Exception {
524 transporter.put(new PutTask(path).setDataFile(file).setListener(listener));
525 uploadChecksums(file, null);
526 }
527
528
529
530
531
532 private void uploadChecksums(File file, byte[] bytes) {
533 if (checksumLocations.isEmpty()) {
534 return;
535 }
536 try {
537 ArrayList<ChecksumAlgorithmFactory> algorithms = new ArrayList<>();
538 for (RepositoryLayout.ChecksumLocation checksumLocation : checksumLocations) {
539 algorithms.add(checksumLocation.getChecksumAlgorithmFactory());
540 }
541
542 Map<String, String> sumsByAlgo;
543 if (bytes != null) {
544 sumsByAlgo = ChecksumAlgorithmHelper.calculate(bytes, algorithms);
545 } else {
546 sumsByAlgo = ChecksumAlgorithmHelper.calculate(file, algorithms);
547 }
548
549 for (RepositoryLayout.ChecksumLocation checksumLocation : checksumLocations) {
550 uploadChecksum(
551 checksumLocation.getLocation(),
552 sumsByAlgo.get(checksumLocation
553 .getChecksumAlgorithmFactory()
554 .getName()));
555 }
556 } catch (IOException e) {
557 LOGGER.warn("Failed to upload checksums for {}", file, e);
558 throw new UncheckedIOException(e);
559 }
560 }
561
562 private void uploadChecksum(URI location, Object checksum) {
563 try {
564 if (checksum instanceof Exception) {
565 throw (Exception) checksum;
566 }
567 transporter.put(new PutTask(location).setDataString((String) checksum));
568 } catch (Exception e) {
569 LOGGER.warn("Failed to upload checksum to {}", location, e);
570 }
571 }
572 }
573 }