View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing,
13   * software distributed under the License is distributed on an
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   * KIND, either express or implied.  See the License for the
16   * specific language governing permissions and limitations
17   * under the License.
18   */
19  package org.eclipse.aether.connector.basic;
20  
21  import java.io.File;
22  import java.io.IOException;
23  import java.io.UncheckedIOException;
24  import java.net.URI;
25  import java.util.ArrayList;
26  import java.util.Collection;
27  import java.util.Collections;
28  import java.util.List;
29  import java.util.Map;
30  import java.util.concurrent.Executor;
31  import java.util.concurrent.atomic.AtomicBoolean;
32  
33  import org.eclipse.aether.ConfigurationProperties;
34  import org.eclipse.aether.RepositorySystemSession;
35  import org.eclipse.aether.RequestTrace;
36  import org.eclipse.aether.metadata.Metadata;
37  import org.eclipse.aether.repository.RemoteRepository;
38  import org.eclipse.aether.spi.checksums.ProvidedChecksumsSource;
39  import org.eclipse.aether.spi.connector.ArtifactDownload;
40  import org.eclipse.aether.spi.connector.ArtifactUpload;
41  import org.eclipse.aether.spi.connector.MetadataDownload;
42  import org.eclipse.aether.spi.connector.MetadataUpload;
43  import org.eclipse.aether.spi.connector.RepositoryConnector;
44  import org.eclipse.aether.spi.connector.checksum.ChecksumAlgorithmFactory;
45  import org.eclipse.aether.spi.connector.checksum.ChecksumAlgorithmHelper;
46  import org.eclipse.aether.spi.connector.checksum.ChecksumPolicy;
47  import org.eclipse.aether.spi.connector.checksum.ChecksumPolicyProvider;
48  import org.eclipse.aether.spi.connector.layout.RepositoryLayout;
49  import org.eclipse.aether.spi.connector.layout.RepositoryLayoutProvider;
50  import org.eclipse.aether.spi.connector.transport.GetTask;
51  import org.eclipse.aether.spi.connector.transport.PeekTask;
52  import org.eclipse.aether.spi.connector.transport.PutTask;
53  import org.eclipse.aether.spi.connector.transport.Transporter;
54  import org.eclipse.aether.spi.connector.transport.TransporterProvider;
55  import org.eclipse.aether.spi.io.FileProcessor;
56  import org.eclipse.aether.transfer.ChecksumFailureException;
57  import org.eclipse.aether.transfer.NoRepositoryConnectorException;
58  import org.eclipse.aether.transfer.NoRepositoryLayoutException;
59  import org.eclipse.aether.transfer.NoTransporterException;
60  import org.eclipse.aether.transfer.TransferEvent;
61  import org.eclipse.aether.transfer.TransferResource;
62  import org.eclipse.aether.util.ConfigUtils;
63  import org.eclipse.aether.util.FileUtils;
64  import org.eclipse.aether.util.concurrency.ExecutorUtils;
65  import org.eclipse.aether.util.concurrency.RunnableErrorForwarder;
66  import org.slf4j.Logger;
67  import org.slf4j.LoggerFactory;
68  
69  import static java.util.Objects.requireNonNull;
70  
71  /**
72   *
73   */
74  final class BasicRepositoryConnector implements RepositoryConnector {
75  
76      private static final String CONFIG_PROP_THREADS = "aether.connector.basic.threads";
77  
78      private static final String CONFIG_PROP_SMART_CHECKSUMS = "aether.connector.smartChecksums";
79  
80      private static final String CONFIG_PROP_PARALLEL_PUT = "aether.connector.basic.parallelPut";
81  
82      private static final Logger LOGGER = LoggerFactory.getLogger(BasicRepositoryConnector.class);
83  
84      private final Map<String, ProvidedChecksumsSource> providedChecksumsSources;
85  
86      private final FileProcessor fileProcessor;
87  
88      private final RemoteRepository repository;
89  
90      private final RepositorySystemSession session;
91  
92      private final Transporter transporter;
93  
94      private final RepositoryLayout layout;
95  
96      private final ChecksumPolicyProvider checksumPolicyProvider;
97  
98      private final int maxThreads;
99  
100     private final boolean smartChecksums;
101 
102     private final boolean parallelPut;
103 
104     private final boolean persistedChecksums;
105 
106     private Executor executor;
107 
108     private final AtomicBoolean closed;
109 
110     BasicRepositoryConnector(
111             RepositorySystemSession session,
112             RemoteRepository repository,
113             TransporterProvider transporterProvider,
114             RepositoryLayoutProvider layoutProvider,
115             ChecksumPolicyProvider checksumPolicyProvider,
116             FileProcessor fileProcessor,
117             Map<String, ProvidedChecksumsSource> providedChecksumsSources)
118             throws NoRepositoryConnectorException {
119         try {
120             layout = layoutProvider.newRepositoryLayout(session, repository);
121         } catch (NoRepositoryLayoutException e) {
122             throw new NoRepositoryConnectorException(repository, e.getMessage(), e);
123         }
124         try {
125             transporter = transporterProvider.newTransporter(session, repository);
126         } catch (NoTransporterException e) {
127             throw new NoRepositoryConnectorException(repository, e.getMessage(), e);
128         }
129         this.checksumPolicyProvider = checksumPolicyProvider;
130 
131         this.session = session;
132         this.repository = repository;
133         this.fileProcessor = fileProcessor;
134         this.providedChecksumsSources = providedChecksumsSources;
135         this.closed = new AtomicBoolean(false);
136 
137         maxThreads = ExecutorUtils.threadCount(session, 5, CONFIG_PROP_THREADS, "maven.artifact.threads");
138         smartChecksums = ConfigUtils.getBoolean(session, true, CONFIG_PROP_SMART_CHECKSUMS);
139         parallelPut = ConfigUtils.getBoolean(
140                 session, true, CONFIG_PROP_PARALLEL_PUT + "." + repository.getId(), CONFIG_PROP_PARALLEL_PUT);
141         persistedChecksums = ConfigUtils.getBoolean(
142                 session,
143                 ConfigurationProperties.DEFAULT_PERSISTED_CHECKSUMS,
144                 ConfigurationProperties.PERSISTED_CHECKSUMS);
145     }
146 
147     private Executor getExecutor(int tasks) {
148         if (maxThreads <= 1) {
149             return ExecutorUtils.DIRECT_EXECUTOR;
150         }
151         if (tasks <= 1) {
152             return ExecutorUtils.DIRECT_EXECUTOR;
153         }
154         if (executor == null) {
155             executor =
156                     ExecutorUtils.threadPool(maxThreads, getClass().getSimpleName() + '-' + repository.getHost() + '-');
157         }
158         return executor;
159     }
160 
161     @Override
162     public void close() {
163         if (closed.compareAndSet(false, true)) {
164             ExecutorUtils.shutdown(executor);
165             transporter.close();
166         }
167     }
168 
169     private void failIfClosed() {
170         if (closed.get()) {
171             throw new IllegalStateException("connector already closed");
172         }
173     }
174 
175     @Override
176     public void get(
177             Collection<? extends ArtifactDownload> artifactDownloads,
178             Collection<? extends MetadataDownload> metadataDownloads) {
179         failIfClosed();
180 
181         Collection<? extends ArtifactDownload> safeArtifactDownloads = safe(artifactDownloads);
182         Collection<? extends MetadataDownload> safeMetadataDownloads = safe(metadataDownloads);
183 
184         Executor executor = getExecutor(safeArtifactDownloads.size() + safeMetadataDownloads.size());
185         RunnableErrorForwarder errorForwarder = new RunnableErrorForwarder();
186         List<ChecksumAlgorithmFactory> checksumAlgorithmFactories = layout.getChecksumAlgorithmFactories();
187 
188         boolean first = true;
189 
190         for (MetadataDownload transfer : safeMetadataDownloads) {
191             URI location = layout.getLocation(transfer.getMetadata(), false);
192 
193             TransferResource resource = newTransferResource(location, transfer.getFile(), transfer.getTrace());
194             TransferEvent.Builder builder = newEventBuilder(resource, false, false);
195             MetadataTransportListener listener = new MetadataTransportListener(transfer, repository, builder);
196 
197             ChecksumPolicy checksumPolicy = newChecksumPolicy(transfer.getChecksumPolicy(), resource);
198             List<RepositoryLayout.ChecksumLocation> checksumLocations = null;
199             if (checksumPolicy != null) {
200                 checksumLocations = layout.getChecksumLocations(transfer.getMetadata(), false, location);
201             }
202 
203             Runnable task = new GetTaskRunner(
204                     location,
205                     transfer.getFile(),
206                     checksumPolicy,
207                     checksumAlgorithmFactories,
208                     checksumLocations,
209                     null,
210                     listener);
211             if (first) {
212                 task.run();
213                 first = false;
214             } else {
215                 executor.execute(errorForwarder.wrap(task));
216             }
217         }
218 
219         for (ArtifactDownload transfer : safeArtifactDownloads) {
220             Map<String, String> providedChecksums = Collections.emptyMap();
221             for (ProvidedChecksumsSource providedChecksumsSource : providedChecksumsSources.values()) {
222                 Map<String, String> provided = providedChecksumsSource.getProvidedArtifactChecksums(
223                         session, transfer, repository, checksumAlgorithmFactories);
224 
225                 if (provided != null) {
226                     providedChecksums = provided;
227                     break;
228                 }
229             }
230 
231             URI location = layout.getLocation(transfer.getArtifact(), false);
232 
233             TransferResource resource = newTransferResource(location, transfer.getFile(), transfer.getTrace());
234             TransferEvent.Builder builder = newEventBuilder(resource, false, transfer.isExistenceCheck());
235             ArtifactTransportListener listener = new ArtifactTransportListener(transfer, repository, builder);
236 
237             Runnable task;
238             if (transfer.isExistenceCheck()) {
239                 task = new PeekTaskRunner(location, listener);
240             } else {
241                 ChecksumPolicy checksumPolicy = newChecksumPolicy(transfer.getChecksumPolicy(), resource);
242                 List<RepositoryLayout.ChecksumLocation> checksumLocations = null;
243                 if (checksumPolicy != null) {
244                     checksumLocations = layout.getChecksumLocations(transfer.getArtifact(), false, location);
245                 }
246 
247                 task = new GetTaskRunner(
248                         location,
249                         transfer.getFile(),
250                         checksumPolicy,
251                         checksumAlgorithmFactories,
252                         checksumLocations,
253                         providedChecksums,
254                         listener);
255             }
256             if (first) {
257                 task.run();
258                 first = false;
259             } else {
260                 executor.execute(errorForwarder.wrap(task));
261             }
262         }
263 
264         errorForwarder.await();
265     }
266 
267     @Override
268     public void put(
269             Collection<? extends ArtifactUpload> artifactUploads,
270             Collection<? extends MetadataUpload> metadataUploads) {
271         failIfClosed();
272 
273         Collection<? extends ArtifactUpload> safeArtifactUploads = safe(artifactUploads);
274         Collection<? extends MetadataUpload> safeMetadataUploads = safe(metadataUploads);
275 
276         Executor executor = getExecutor(parallelPut ? safeArtifactUploads.size() + safeMetadataUploads.size() : 1);
277         RunnableErrorForwarder errorForwarder = new RunnableErrorForwarder();
278 
279         boolean first = true;
280 
281         for (ArtifactUpload transfer : safeArtifactUploads) {
282             URI location = layout.getLocation(transfer.getArtifact(), true);
283 
284             TransferResource resource = newTransferResource(location, transfer.getFile(), transfer.getTrace());
285             TransferEvent.Builder builder = newEventBuilder(resource, true, false);
286             ArtifactTransportListener listener = new ArtifactTransportListener(transfer, repository, builder);
287 
288             List<RepositoryLayout.ChecksumLocation> checksumLocations =
289                     layout.getChecksumLocations(transfer.getArtifact(), true, location);
290 
291             Runnable task = new PutTaskRunner(location, transfer.getFile(), checksumLocations, listener);
292             if (first) {
293                 task.run();
294                 first = false;
295             } else {
296                 executor.execute(errorForwarder.wrap(task));
297             }
298         }
299 
300         errorForwarder.await(); // make sure all artifacts are PUT before we go with Metadata
301 
302         for (List<? extends MetadataUpload> transferGroup : groupUploads(safeMetadataUploads)) {
303             for (MetadataUpload transfer : transferGroup) {
304                 URI location = layout.getLocation(transfer.getMetadata(), true);
305 
306                 TransferResource resource = newTransferResource(location, transfer.getFile(), transfer.getTrace());
307                 TransferEvent.Builder builder = newEventBuilder(resource, true, false);
308                 MetadataTransportListener listener = new MetadataTransportListener(transfer, repository, builder);
309 
310                 List<RepositoryLayout.ChecksumLocation> checksumLocations =
311                         layout.getChecksumLocations(transfer.getMetadata(), true, location);
312 
313                 Runnable task = new PutTaskRunner(location, transfer.getFile(), checksumLocations, listener);
314                 if (first) {
315                     task.run();
316                     first = false;
317                 } else {
318                     executor.execute(errorForwarder.wrap(task));
319                 }
320             }
321 
322             errorForwarder.await(); // make sure each group is done before starting next group
323         }
324     }
325 
326     /**
327      * This method "groups" the Metadata to be uploaded by their level (version, artifact, group and root). This is MUST
328      * as clients consume metadata in opposite order (root, group, artifact, version), and hence, we must deploy and
329      * ensure (in case of parallel deploy) that all V level metadata is deployed before we start deploying A level, etc.
330      */
331     private static List<List<MetadataUpload>> groupUploads(Collection<? extends MetadataUpload> metadataUploads) {
332         ArrayList<MetadataUpload> v = new ArrayList<>();
333         ArrayList<MetadataUpload> a = new ArrayList<>();
334         ArrayList<MetadataUpload> g = new ArrayList<>();
335         ArrayList<MetadataUpload> r = new ArrayList<>();
336 
337         for (MetadataUpload transfer : metadataUploads) {
338             Metadata metadata = transfer.getMetadata();
339             if (!"".equals(metadata.getVersion())) {
340                 v.add(transfer);
341             } else if (!"".equals(metadata.getArtifactId())) {
342                 a.add(transfer);
343             } else if (!"".equals(metadata.getGroupId())) {
344                 g.add(transfer);
345             } else {
346                 r.add(transfer);
347             }
348         }
349 
350         List<List<MetadataUpload>> result = new ArrayList<>(4);
351         if (!v.isEmpty()) {
352             result.add(v);
353         }
354         if (!a.isEmpty()) {
355             result.add(a);
356         }
357         if (!g.isEmpty()) {
358             result.add(g);
359         }
360         if (!r.isEmpty()) {
361             result.add(r);
362         }
363         return result;
364     }
365 
366     private static <T> Collection<T> safe(Collection<T> items) {
367         return (items != null) ? items : Collections.emptyList();
368     }
369 
370     private TransferResource newTransferResource(URI path, File file, RequestTrace trace) {
371         return new TransferResource(repository.getId(), repository.getUrl(), path.toString(), file, trace);
372     }
373 
374     private TransferEvent.Builder newEventBuilder(TransferResource resource, boolean upload, boolean peek) {
375         TransferEvent.Builder builder = new TransferEvent.Builder(session, resource);
376         if (upload) {
377             builder.setRequestType(TransferEvent.RequestType.PUT);
378         } else if (!peek) {
379             builder.setRequestType(TransferEvent.RequestType.GET);
380         } else {
381             builder.setRequestType(TransferEvent.RequestType.GET_EXISTENCE);
382         }
383         return builder;
384     }
385 
386     private ChecksumPolicy newChecksumPolicy(String policy, TransferResource resource) {
387         return checksumPolicyProvider.newChecksumPolicy(session, repository, resource, policy);
388     }
389 
390     @Override
391     public String toString() {
392         return String.valueOf(repository);
393     }
394 
395     abstract class TaskRunner implements Runnable {
396 
397         protected final URI path;
398 
399         protected final TransferTransportListener<?> listener;
400 
401         TaskRunner(URI path, TransferTransportListener<?> listener) {
402             this.path = path;
403             this.listener = listener;
404         }
405 
406         @Override
407         public void run() {
408             try {
409                 listener.transferInitiated();
410                 runTask();
411                 listener.transferSucceeded();
412             } catch (Exception e) {
413                 listener.transferFailed(e, transporter.classify(e));
414             }
415         }
416 
417         protected abstract void runTask() throws Exception;
418     }
419 
420     class PeekTaskRunner extends TaskRunner {
421 
422         PeekTaskRunner(URI path, TransferTransportListener<?> listener) {
423             super(path, listener);
424         }
425 
426         @Override
427         protected void runTask() throws Exception {
428             transporter.peek(new PeekTask(path));
429         }
430     }
431 
432     class GetTaskRunner extends TaskRunner implements ChecksumValidator.ChecksumFetcher {
433 
434         private final File file;
435 
436         private final ChecksumValidator checksumValidator;
437 
438         GetTaskRunner(
439                 URI path,
440                 File file,
441                 ChecksumPolicy checksumPolicy,
442                 List<ChecksumAlgorithmFactory> checksumAlgorithmFactories,
443                 List<RepositoryLayout.ChecksumLocation> checksumLocations,
444                 Map<String, String> providedChecksums,
445                 TransferTransportListener<?> listener) {
446             super(path, listener);
447             this.file = requireNonNull(file, "destination file cannot be null");
448             checksumValidator = new ChecksumValidator(
449                     file,
450                     checksumAlgorithmFactories,
451                     fileProcessor,
452                     this,
453                     checksumPolicy,
454                     providedChecksums,
455                     safe(checksumLocations));
456         }
457 
458         @Override
459         public boolean fetchChecksum(URI remote, File local) throws Exception {
460             try {
461                 transporter.get(new GetTask(remote).setDataFile(local));
462             } catch (Exception e) {
463                 if (transporter.classify(e) == Transporter.ERROR_NOT_FOUND) {
464                     return false;
465                 }
466                 throw e;
467             }
468             return true;
469         }
470 
471         @Override
472         protected void runTask() throws Exception {
473             try (FileUtils.CollocatedTempFile tempFile = FileUtils.newTempFile(file.toPath())) {
474                 final File tmp = tempFile.getPath().toFile();
475                 listener.setChecksumCalculator(checksumValidator.newChecksumCalculator(tmp));
476                 for (int firstTrial = 0, lastTrial = 1, trial = firstTrial; ; trial++) {
477                     GetTask task = new GetTask(path).setDataFile(tmp, false).setListener(listener);
478                     transporter.get(task);
479                     try {
480                         checksumValidator.validate(
481                                 listener.getChecksums(), smartChecksums ? task.getChecksums() : null);
482                         break;
483                     } catch (ChecksumFailureException e) {
484                         boolean retry = trial < lastTrial && e.isRetryWorthy();
485                         if (!retry && !checksumValidator.handle(e)) {
486                             throw e;
487                         }
488                         listener.transferCorrupted(e);
489                         if (retry) {
490                             checksumValidator.retry();
491                         } else {
492                             break;
493                         }
494                     }
495                 }
496                 tempFile.move();
497                 if (persistedChecksums) {
498                     checksumValidator.commit();
499                 }
500             }
501         }
502     }
503 
504     class PutTaskRunner extends TaskRunner {
505 
506         private final File file;
507 
508         private final Collection<RepositoryLayout.ChecksumLocation> checksumLocations;
509 
510         PutTaskRunner(
511                 URI path,
512                 File file,
513                 List<RepositoryLayout.ChecksumLocation> checksumLocations,
514                 TransferTransportListener<?> listener) {
515             super(path, listener);
516             this.file = requireNonNull(file, "source file cannot be null");
517             this.checksumLocations = safe(checksumLocations);
518         }
519 
520         @SuppressWarnings("checkstyle:innerassignment")
521         @Override
522         protected void runTask() throws Exception {
523             transporter.put(new PutTask(path).setDataFile(file).setListener(listener));
524             uploadChecksums(file, null);
525         }
526 
527         /**
528          * @param file  source
529          * @param bytes transformed data from file or {@code null}
530          */
531         private void uploadChecksums(File file, byte[] bytes) {
532             if (checksumLocations.isEmpty()) {
533                 return;
534             }
535             try {
536                 ArrayList<ChecksumAlgorithmFactory> algorithms = new ArrayList<>();
537                 for (RepositoryLayout.ChecksumLocation checksumLocation : checksumLocations) {
538                     algorithms.add(checksumLocation.getChecksumAlgorithmFactory());
539                 }
540 
541                 Map<String, String> sumsByAlgo;
542                 if (bytes != null) {
543                     sumsByAlgo = ChecksumAlgorithmHelper.calculate(bytes, algorithms);
544                 } else {
545                     sumsByAlgo = ChecksumAlgorithmHelper.calculate(file, algorithms);
546                 }
547 
548                 for (RepositoryLayout.ChecksumLocation checksumLocation : checksumLocations) {
549                     uploadChecksum(
550                             checksumLocation.getLocation(),
551                             sumsByAlgo.get(checksumLocation
552                                     .getChecksumAlgorithmFactory()
553                                     .getName()));
554                 }
555             } catch (IOException e) {
556                 LOGGER.warn("Failed to upload checksums for {}", file, e);
557                 throw new UncheckedIOException(e);
558             }
559         }
560 
561         private void uploadChecksum(URI location, Object checksum) {
562             try {
563                 if (checksum instanceof Exception) {
564                     throw (Exception) checksum;
565                 }
566                 transporter.put(new PutTask(location).setDataString((String) checksum));
567             } catch (Exception e) {
568                 LOGGER.warn("Failed to upload checksum to {}", location, e);
569             }
570         }
571     }
572 }