View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing,
13   * software distributed under the License is distributed on an
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   * KIND, either express or implied.  See the License for the
16   * specific language governing permissions and limitations
17   * under the License.
18   */
19  package org.eclipse.aether.connector.basic;
20  
21  import java.io.File;
22  import java.io.IOException;
23  import java.io.UncheckedIOException;
24  import java.net.URI;
25  import java.util.ArrayList;
26  import java.util.Collection;
27  import java.util.Collections;
28  import java.util.List;
29  import java.util.Map;
30  import java.util.concurrent.Executor;
31  import java.util.concurrent.atomic.AtomicBoolean;
32  
33  import org.eclipse.aether.RepositorySystemSession;
34  import org.eclipse.aether.RequestTrace;
35  import org.eclipse.aether.metadata.Metadata;
36  import org.eclipse.aether.repository.RemoteRepository;
37  import org.eclipse.aether.spi.checksums.ProvidedChecksumsSource;
38  import org.eclipse.aether.spi.connector.ArtifactDownload;
39  import org.eclipse.aether.spi.connector.ArtifactUpload;
40  import org.eclipse.aether.spi.connector.MetadataDownload;
41  import org.eclipse.aether.spi.connector.MetadataUpload;
42  import org.eclipse.aether.spi.connector.RepositoryConnector;
43  import org.eclipse.aether.spi.connector.checksum.ChecksumAlgorithmFactory;
44  import org.eclipse.aether.spi.connector.checksum.ChecksumAlgorithmHelper;
45  import org.eclipse.aether.spi.connector.checksum.ChecksumPolicy;
46  import org.eclipse.aether.spi.connector.checksum.ChecksumPolicyProvider;
47  import org.eclipse.aether.spi.connector.layout.RepositoryLayout;
48  import org.eclipse.aether.spi.connector.layout.RepositoryLayoutProvider;
49  import org.eclipse.aether.spi.connector.transport.GetTask;
50  import org.eclipse.aether.spi.connector.transport.PeekTask;
51  import org.eclipse.aether.spi.connector.transport.PutTask;
52  import org.eclipse.aether.spi.connector.transport.Transporter;
53  import org.eclipse.aether.spi.connector.transport.TransporterProvider;
54  import org.eclipse.aether.spi.io.FileProcessor;
55  import org.eclipse.aether.transfer.ChecksumFailureException;
56  import org.eclipse.aether.transfer.NoRepositoryConnectorException;
57  import org.eclipse.aether.transfer.NoRepositoryLayoutException;
58  import org.eclipse.aether.transfer.NoTransporterException;
59  import org.eclipse.aether.transfer.TransferEvent;
60  import org.eclipse.aether.transfer.TransferResource;
61  import org.eclipse.aether.util.ConfigUtils;
62  import org.eclipse.aether.util.FileUtils;
63  import org.eclipse.aether.util.concurrency.ExecutorUtils;
64  import org.eclipse.aether.util.concurrency.RunnableErrorForwarder;
65  import org.slf4j.Logger;
66  import org.slf4j.LoggerFactory;
67  
68  import static java.util.Objects.requireNonNull;
69  import static org.eclipse.aether.connector.basic.BasicRepositoryConnectorConfigurationKeys.CONFIG_PROP_PARALLEL_PUT;
70  import static org.eclipse.aether.connector.basic.BasicRepositoryConnectorConfigurationKeys.CONFIG_PROP_PERSISTED_CHECKSUMS;
71  import static org.eclipse.aether.connector.basic.BasicRepositoryConnectorConfigurationKeys.CONFIG_PROP_SMART_CHECKSUMS;
72  import static org.eclipse.aether.connector.basic.BasicRepositoryConnectorConfigurationKeys.CONFIG_PROP_THREADS;
73  import static org.eclipse.aether.connector.basic.BasicRepositoryConnectorConfigurationKeys.DEFAULT_PARALLEL_PUT;
74  import static org.eclipse.aether.connector.basic.BasicRepositoryConnectorConfigurationKeys.DEFAULT_PERSISTED_CHECKSUMS;
75  import static org.eclipse.aether.connector.basic.BasicRepositoryConnectorConfigurationKeys.DEFAULT_SMART_CHECKSUMS;
76  import static org.eclipse.aether.connector.basic.BasicRepositoryConnectorConfigurationKeys.DEFAULT_THREADS;
77  
78  /**
79   *
80   */
81  final class BasicRepositoryConnector implements RepositoryConnector {
82      private static final Logger LOGGER = LoggerFactory.getLogger(BasicRepositoryConnector.class);
83  
84      private final Map<String, ProvidedChecksumsSource> providedChecksumsSources;
85  
86      private final FileProcessor fileProcessor;
87  
88      private final RemoteRepository repository;
89  
90      private final RepositorySystemSession session;
91  
92      private final Transporter transporter;
93  
94      private final RepositoryLayout layout;
95  
96      private final ChecksumPolicyProvider checksumPolicyProvider;
97  
98      private final int maxThreads;
99  
100     private final boolean smartChecksums;
101 
102     private final boolean parallelPut;
103 
104     private final boolean persistedChecksums;
105 
106     private Executor executor;
107 
108     private final AtomicBoolean closed;
109 
110     BasicRepositoryConnector(
111             RepositorySystemSession session,
112             RemoteRepository repository,
113             TransporterProvider transporterProvider,
114             RepositoryLayoutProvider layoutProvider,
115             ChecksumPolicyProvider checksumPolicyProvider,
116             FileProcessor fileProcessor,
117             Map<String, ProvidedChecksumsSource> providedChecksumsSources)
118             throws NoRepositoryConnectorException {
119         try {
120             layout = layoutProvider.newRepositoryLayout(session, repository);
121         } catch (NoRepositoryLayoutException e) {
122             throw new NoRepositoryConnectorException(repository, e.getMessage(), e);
123         }
124         try {
125             transporter = transporterProvider.newTransporter(session, repository);
126         } catch (NoTransporterException e) {
127             throw new NoRepositoryConnectorException(repository, e.getMessage(), e);
128         }
129         this.checksumPolicyProvider = checksumPolicyProvider;
130 
131         this.session = session;
132         this.repository = repository;
133         this.fileProcessor = fileProcessor;
134         this.providedChecksumsSources = providedChecksumsSources;
135         this.closed = new AtomicBoolean(false);
136 
137         maxThreads = ExecutorUtils.threadCount(session, DEFAULT_THREADS, CONFIG_PROP_THREADS);
138         smartChecksums = ConfigUtils.getBoolean(session, DEFAULT_SMART_CHECKSUMS, CONFIG_PROP_SMART_CHECKSUMS);
139         parallelPut = ConfigUtils.getBoolean(
140                 session,
141                 DEFAULT_PARALLEL_PUT,
142                 CONFIG_PROP_PARALLEL_PUT + "." + repository.getId(),
143                 CONFIG_PROP_PARALLEL_PUT);
144         persistedChecksums =
145                 ConfigUtils.getBoolean(session, DEFAULT_PERSISTED_CHECKSUMS, CONFIG_PROP_PERSISTED_CHECKSUMS);
146     }
147 
148     private Executor getExecutor(int tasks) {
149         if (maxThreads <= 1) {
150             return ExecutorUtils.DIRECT_EXECUTOR;
151         }
152         if (tasks <= 1) {
153             return ExecutorUtils.DIRECT_EXECUTOR;
154         }
155         if (executor == null) {
156             executor =
157                     ExecutorUtils.threadPool(maxThreads, getClass().getSimpleName() + '-' + repository.getHost() + '-');
158         }
159         return executor;
160     }
161 
162     @Override
163     public void close() {
164         if (closed.compareAndSet(false, true)) {
165             ExecutorUtils.shutdown(executor);
166             transporter.close();
167         }
168     }
169 
170     private void failIfClosed() {
171         if (closed.get()) {
172             throw new IllegalStateException("connector already closed");
173         }
174     }
175 
176     @Override
177     public void get(
178             Collection<? extends ArtifactDownload> artifactDownloads,
179             Collection<? extends MetadataDownload> metadataDownloads) {
180         failIfClosed();
181 
182         Collection<? extends ArtifactDownload> safeArtifactDownloads = safe(artifactDownloads);
183         Collection<? extends MetadataDownload> safeMetadataDownloads = safe(metadataDownloads);
184 
185         Executor executor = getExecutor(safeArtifactDownloads.size() + safeMetadataDownloads.size());
186         RunnableErrorForwarder errorForwarder = new RunnableErrorForwarder();
187         List<ChecksumAlgorithmFactory> checksumAlgorithmFactories = layout.getChecksumAlgorithmFactories();
188 
189         boolean first = true;
190 
191         for (MetadataDownload transfer : safeMetadataDownloads) {
192             URI location = layout.getLocation(transfer.getMetadata(), false);
193 
194             TransferResource resource = newTransferResource(location, transfer.getFile(), transfer.getTrace());
195             TransferEvent.Builder builder = newEventBuilder(resource, false, false);
196             MetadataTransportListener listener = new MetadataTransportListener(transfer, repository, builder);
197 
198             ChecksumPolicy checksumPolicy = newChecksumPolicy(transfer.getChecksumPolicy(), resource);
199             List<RepositoryLayout.ChecksumLocation> checksumLocations = null;
200             if (checksumPolicy != null) {
201                 checksumLocations = layout.getChecksumLocations(transfer.getMetadata(), false, location);
202             }
203 
204             Runnable task = new GetTaskRunner(
205                     location,
206                     transfer.getFile(),
207                     checksumPolicy,
208                     checksumAlgorithmFactories,
209                     checksumLocations,
210                     null,
211                     listener);
212             if (first) {
213                 task.run();
214                 first = false;
215             } else {
216                 executor.execute(errorForwarder.wrap(task));
217             }
218         }
219 
220         for (ArtifactDownload transfer : safeArtifactDownloads) {
221             Map<String, String> providedChecksums = Collections.emptyMap();
222             for (ProvidedChecksumsSource providedChecksumsSource : providedChecksumsSources.values()) {
223                 Map<String, String> provided = providedChecksumsSource.getProvidedArtifactChecksums(
224                         session, transfer, repository, checksumAlgorithmFactories);
225 
226                 if (provided != null) {
227                     providedChecksums = provided;
228                     break;
229                 }
230             }
231 
232             URI location = layout.getLocation(transfer.getArtifact(), false);
233 
234             TransferResource resource = newTransferResource(location, transfer.getFile(), transfer.getTrace());
235             TransferEvent.Builder builder = newEventBuilder(resource, false, transfer.isExistenceCheck());
236             ArtifactTransportListener listener = new ArtifactTransportListener(transfer, repository, builder);
237 
238             Runnable task;
239             if (transfer.isExistenceCheck()) {
240                 task = new PeekTaskRunner(location, listener);
241             } else {
242                 ChecksumPolicy checksumPolicy = newChecksumPolicy(transfer.getChecksumPolicy(), resource);
243                 List<RepositoryLayout.ChecksumLocation> checksumLocations = null;
244                 if (checksumPolicy != null) {
245                     checksumLocations = layout.getChecksumLocations(transfer.getArtifact(), false, location);
246                 }
247 
248                 task = new GetTaskRunner(
249                         location,
250                         transfer.getFile(),
251                         checksumPolicy,
252                         checksumAlgorithmFactories,
253                         checksumLocations,
254                         providedChecksums,
255                         listener);
256             }
257             if (first) {
258                 task.run();
259                 first = false;
260             } else {
261                 executor.execute(errorForwarder.wrap(task));
262             }
263         }
264 
265         errorForwarder.await();
266     }
267 
268     @Override
269     public void put(
270             Collection<? extends ArtifactUpload> artifactUploads,
271             Collection<? extends MetadataUpload> metadataUploads) {
272         failIfClosed();
273 
274         Collection<? extends ArtifactUpload> safeArtifactUploads = safe(artifactUploads);
275         Collection<? extends MetadataUpload> safeMetadataUploads = safe(metadataUploads);
276 
277         Executor executor = getExecutor(parallelPut ? safeArtifactUploads.size() + safeMetadataUploads.size() : 1);
278         RunnableErrorForwarder errorForwarder = new RunnableErrorForwarder();
279 
280         boolean first = true;
281 
282         for (ArtifactUpload transfer : safeArtifactUploads) {
283             URI location = layout.getLocation(transfer.getArtifact(), true);
284 
285             TransferResource resource = newTransferResource(location, transfer.getFile(), transfer.getTrace());
286             TransferEvent.Builder builder = newEventBuilder(resource, true, false);
287             ArtifactTransportListener listener = new ArtifactTransportListener(transfer, repository, builder);
288 
289             List<RepositoryLayout.ChecksumLocation> checksumLocations =
290                     layout.getChecksumLocations(transfer.getArtifact(), true, location);
291 
292             Runnable task = new PutTaskRunner(location, transfer.getFile(), checksumLocations, listener);
293             if (first) {
294                 task.run();
295                 first = false;
296             } else {
297                 executor.execute(errorForwarder.wrap(task));
298             }
299         }
300 
301         errorForwarder.await(); // make sure all artifacts are PUT before we go with Metadata
302 
303         for (List<? extends MetadataUpload> transferGroup : groupUploads(safeMetadataUploads)) {
304             for (MetadataUpload transfer : transferGroup) {
305                 URI location = layout.getLocation(transfer.getMetadata(), true);
306 
307                 TransferResource resource = newTransferResource(location, transfer.getFile(), transfer.getTrace());
308                 TransferEvent.Builder builder = newEventBuilder(resource, true, false);
309                 MetadataTransportListener listener = new MetadataTransportListener(transfer, repository, builder);
310 
311                 List<RepositoryLayout.ChecksumLocation> checksumLocations =
312                         layout.getChecksumLocations(transfer.getMetadata(), true, location);
313 
314                 Runnable task = new PutTaskRunner(location, transfer.getFile(), checksumLocations, listener);
315                 if (first) {
316                     task.run();
317                     first = false;
318                 } else {
319                     executor.execute(errorForwarder.wrap(task));
320                 }
321             }
322 
323             errorForwarder.await(); // make sure each group is done before starting next group
324         }
325     }
326 
327     /**
328      * This method "groups" the Metadata to be uploaded by their level (version, artifact, group and root). This is MUST
329      * as clients consume metadata in opposite order (root, group, artifact, version), and hence, we must deploy and
330      * ensure (in case of parallel deploy) that all V level metadata is deployed before we start deploying A level, etc.
331      */
332     private static List<List<MetadataUpload>> groupUploads(Collection<? extends MetadataUpload> metadataUploads) {
333         ArrayList<MetadataUpload> v = new ArrayList<>();
334         ArrayList<MetadataUpload> a = new ArrayList<>();
335         ArrayList<MetadataUpload> g = new ArrayList<>();
336         ArrayList<MetadataUpload> r = new ArrayList<>();
337 
338         for (MetadataUpload transfer : metadataUploads) {
339             Metadata metadata = transfer.getMetadata();
340             if (!"".equals(metadata.getVersion())) {
341                 v.add(transfer);
342             } else if (!"".equals(metadata.getArtifactId())) {
343                 a.add(transfer);
344             } else if (!"".equals(metadata.getGroupId())) {
345                 g.add(transfer);
346             } else {
347                 r.add(transfer);
348             }
349         }
350 
351         List<List<MetadataUpload>> result = new ArrayList<>(4);
352         if (!v.isEmpty()) {
353             result.add(v);
354         }
355         if (!a.isEmpty()) {
356             result.add(a);
357         }
358         if (!g.isEmpty()) {
359             result.add(g);
360         }
361         if (!r.isEmpty()) {
362             result.add(r);
363         }
364         return result;
365     }
366 
367     private static <T> Collection<T> safe(Collection<T> items) {
368         return (items != null) ? items : Collections.emptyList();
369     }
370 
371     private TransferResource newTransferResource(URI path, File file, RequestTrace trace) {
372         return new TransferResource(repository.getId(), repository.getUrl(), path.toString(), file, trace);
373     }
374 
375     private TransferEvent.Builder newEventBuilder(TransferResource resource, boolean upload, boolean peek) {
376         TransferEvent.Builder builder = new TransferEvent.Builder(session, resource);
377         if (upload) {
378             builder.setRequestType(TransferEvent.RequestType.PUT);
379         } else if (!peek) {
380             builder.setRequestType(TransferEvent.RequestType.GET);
381         } else {
382             builder.setRequestType(TransferEvent.RequestType.GET_EXISTENCE);
383         }
384         return builder;
385     }
386 
387     private ChecksumPolicy newChecksumPolicy(String policy, TransferResource resource) {
388         return checksumPolicyProvider.newChecksumPolicy(session, repository, resource, policy);
389     }
390 
391     @Override
392     public String toString() {
393         return String.valueOf(repository);
394     }
395 
396     abstract class TaskRunner implements Runnable {
397 
398         protected final URI path;
399 
400         protected final TransferTransportListener<?> listener;
401 
402         TaskRunner(URI path, TransferTransportListener<?> listener) {
403             this.path = path;
404             this.listener = listener;
405         }
406 
407         @Override
408         public void run() {
409             try {
410                 listener.transferInitiated();
411                 runTask();
412                 listener.transferSucceeded();
413             } catch (Exception e) {
414                 listener.transferFailed(e, transporter.classify(e));
415             }
416         }
417 
418         protected abstract void runTask() throws Exception;
419     }
420 
421     class PeekTaskRunner extends TaskRunner {
422 
423         PeekTaskRunner(URI path, TransferTransportListener<?> listener) {
424             super(path, listener);
425         }
426 
427         @Override
428         protected void runTask() throws Exception {
429             transporter.peek(new PeekTask(path));
430         }
431     }
432 
433     class GetTaskRunner extends TaskRunner implements ChecksumValidator.ChecksumFetcher {
434 
435         private final File file;
436 
437         private final ChecksumValidator checksumValidator;
438 
439         GetTaskRunner(
440                 URI path,
441                 File file,
442                 ChecksumPolicy checksumPolicy,
443                 List<ChecksumAlgorithmFactory> checksumAlgorithmFactories,
444                 List<RepositoryLayout.ChecksumLocation> checksumLocations,
445                 Map<String, String> providedChecksums,
446                 TransferTransportListener<?> listener) {
447             super(path, listener);
448             this.file = requireNonNull(file, "destination file cannot be null");
449             checksumValidator = new ChecksumValidator(
450                     file,
451                     checksumAlgorithmFactories,
452                     fileProcessor,
453                     this,
454                     checksumPolicy,
455                     providedChecksums,
456                     safe(checksumLocations));
457         }
458 
459         @Override
460         public boolean fetchChecksum(URI remote, File local) throws Exception {
461             try {
462                 transporter.get(new GetTask(remote).setDataFile(local));
463             } catch (Exception e) {
464                 if (transporter.classify(e) == Transporter.ERROR_NOT_FOUND) {
465                     return false;
466                 }
467                 throw e;
468             }
469             return true;
470         }
471 
472         @Override
473         protected void runTask() throws Exception {
474             try (FileUtils.CollocatedTempFile tempFile = FileUtils.newTempFile(file.toPath())) {
475                 final File tmp = tempFile.getPath().toFile();
476                 listener.setChecksumCalculator(checksumValidator.newChecksumCalculator(tmp));
477                 for (int firstTrial = 0, lastTrial = 1, trial = firstTrial; ; trial++) {
478                     GetTask task = new GetTask(path).setDataFile(tmp, false).setListener(listener);
479                     transporter.get(task);
480                     try {
481                         checksumValidator.validate(
482                                 listener.getChecksums(), smartChecksums ? task.getChecksums() : null);
483                         break;
484                     } catch (ChecksumFailureException e) {
485                         boolean retry = trial < lastTrial && e.isRetryWorthy();
486                         if (!retry && !checksumValidator.handle(e)) {
487                             throw e;
488                         }
489                         listener.transferCorrupted(e);
490                         if (retry) {
491                             checksumValidator.retry();
492                         } else {
493                             break;
494                         }
495                     }
496                 }
497                 tempFile.move();
498                 if (persistedChecksums) {
499                     checksumValidator.commit();
500                 }
501             }
502         }
503     }
504 
505     class PutTaskRunner extends TaskRunner {
506 
507         private final File file;
508 
509         private final Collection<RepositoryLayout.ChecksumLocation> checksumLocations;
510 
511         PutTaskRunner(
512                 URI path,
513                 File file,
514                 List<RepositoryLayout.ChecksumLocation> checksumLocations,
515                 TransferTransportListener<?> listener) {
516             super(path, listener);
517             this.file = requireNonNull(file, "source file cannot be null");
518             this.checksumLocations = safe(checksumLocations);
519         }
520 
521         @SuppressWarnings("checkstyle:innerassignment")
522         @Override
523         protected void runTask() throws Exception {
524             transporter.put(new PutTask(path).setDataFile(file).setListener(listener));
525             uploadChecksums(file, null);
526         }
527 
528         /**
529          * @param file  source
530          * @param bytes transformed data from file or {@code null}
531          */
532         private void uploadChecksums(File file, byte[] bytes) {
533             if (checksumLocations.isEmpty()) {
534                 return;
535             }
536             try {
537                 ArrayList<ChecksumAlgorithmFactory> algorithms = new ArrayList<>();
538                 for (RepositoryLayout.ChecksumLocation checksumLocation : checksumLocations) {
539                     algorithms.add(checksumLocation.getChecksumAlgorithmFactory());
540                 }
541 
542                 Map<String, String> sumsByAlgo;
543                 if (bytes != null) {
544                     sumsByAlgo = ChecksumAlgorithmHelper.calculate(bytes, algorithms);
545                 } else {
546                     sumsByAlgo = ChecksumAlgorithmHelper.calculate(file, algorithms);
547                 }
548 
549                 for (RepositoryLayout.ChecksumLocation checksumLocation : checksumLocations) {
550                     uploadChecksum(
551                             checksumLocation.getLocation(),
552                             sumsByAlgo.get(checksumLocation
553                                     .getChecksumAlgorithmFactory()
554                                     .getName()));
555                 }
556             } catch (IOException e) {
557                 LOGGER.warn("Failed to upload checksums for {}", file, e);
558                 throw new UncheckedIOException(e);
559             }
560         }
561 
562         private void uploadChecksum(URI location, Object checksum) {
563             try {
564                 if (checksum instanceof Exception) {
565                     throw (Exception) checksum;
566                 }
567                 transporter.put(new PutTask(location).setDataString((String) checksum));
568             } catch (Exception e) {
569                 LOGGER.warn("Failed to upload checksum to {}", location, e);
570             }
571         }
572     }
573 }