001    package org.apache.archiva.scheduler.repository;
002    
003    /*
004     * Licensed to the Apache Software Foundation (ASF) under one
005     * or more contributor license agreements.  See the NOTICE file
006     * distributed with this work for additional information
007     * regarding copyright ownership.  The ASF licenses this file
008     * to you under the Apache License, Version 2.0 (the
009     * "License"); you may not use this file except in compliance
010     * with the License.  You may obtain a copy of the License at
011     *
012     *   http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing,
015     * software distributed under the License is distributed on an
016     * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
017     * KIND, either express or implied.  See the License for the
018     * specific language governing permissions and limitations
019     * under the License.
020     */
021    
022    import org.apache.archiva.common.ArchivaException;
023    import org.apache.archiva.configuration.ArchivaConfiguration;
024    import org.apache.archiva.configuration.ConfigurationEvent;
025    import org.apache.archiva.configuration.ConfigurationListener;
026    import org.apache.archiva.configuration.ManagedRepositoryConfiguration;
027    import org.apache.archiva.metadata.repository.MetadataRepository;
028    import org.apache.archiva.metadata.repository.MetadataRepositoryException;
029    import org.apache.archiva.metadata.repository.RepositorySession;
030    import org.apache.archiva.metadata.repository.RepositorySessionFactory;
031    import org.apache.archiva.metadata.repository.stats.RepositoryStatisticsManager;
032    import org.apache.archiva.redback.components.scheduler.CronExpressionValidator;
033    import org.apache.archiva.redback.components.scheduler.Scheduler;
034    import org.apache.archiva.redback.components.taskqueue.TaskQueue;
035    import org.apache.archiva.redback.components.taskqueue.TaskQueueException;
036    import org.apache.archiva.scheduler.repository.model.RepositoryArchivaTaskScheduler;
037    import org.apache.archiva.scheduler.repository.model.RepositoryTask;
038    import org.apache.commons.lang.time.StopWatch;
039    import org.quartz.SchedulerException;
040    import org.quartz.impl.JobDetailImpl;
041    import org.quartz.impl.triggers.CronTriggerImpl;
042    import org.slf4j.Logger;
043    import org.slf4j.LoggerFactory;
044    import org.springframework.stereotype.Service;
045    
046    import javax.annotation.PostConstruct;
047    import javax.annotation.PreDestroy;
048    import javax.inject.Inject;
049    import javax.inject.Named;
050    import java.text.ParseException;
051    import java.util.ArrayList;
052    import java.util.HashSet;
053    import java.util.List;
054    import java.util.Set;
055    
056    /**
057     * Default implementation of a scheduling component for archiva.
058     */
059    @Service( "archivaTaskScheduler#repository" )
060    public class DefaultRepositoryArchivaTaskScheduler
061        implements RepositoryArchivaTaskScheduler, ConfigurationListener
062    {
063        private Logger log = LoggerFactory.getLogger( getClass() );
064    
065        /**
066         *
067         */
068        @Inject
069        private Scheduler scheduler;
070    
071        @Inject
072        private CronExpressionValidator cronValidator;
073    
074        /**
075         *
076         */
077        @Inject
078        @Named( value = "taskQueue#repository-scanning" )
079        private TaskQueue repositoryScanningQueue;
080    
081        /**
082         *
083         */
084        @Inject
085        private ArchivaConfiguration archivaConfiguration;
086    
087        /**
088         *
089         */
090        @Inject
091        @Named( value = "repositoryStatisticsManager#default" )
092        private RepositoryStatisticsManager repositoryStatisticsManager;
093    
094        /**
095         * TODO: could have multiple implementations
096         */
097        @Inject
098        private RepositorySessionFactory repositorySessionFactory;
099    
100        private static final String REPOSITORY_SCAN_GROUP = "rg";
101    
102        private static final String REPOSITORY_JOB = "rj";
103    
104        private static final String REPOSITORY_JOB_TRIGGER = "rjt";
105    
106        static final String TASK_QUEUE = "TASK_QUEUE";
107    
108        static final String TASK_REPOSITORY = "TASK_REPOSITORY";
109    
110        public static final String CRON_HOURLY = "0 0 * * * ?";
111    
112        private Set<String> jobs = new HashSet<String>();
113    
114        private List<String> queuedRepos = new ArrayList<String>();
115    
116        @PostConstruct
117        public void startup()
118            throws ArchivaException
119        {
120    
121            StopWatch stopWatch = new StopWatch();
122            stopWatch.start();
123    
124            archivaConfiguration.addListener( this );
125    
126            List<ManagedRepositoryConfiguration> repositories =
127                archivaConfiguration.getConfiguration().getManagedRepositories();
128    
129            RepositorySession repositorySession = repositorySessionFactory.createSession();
130            try
131            {
132                MetadataRepository metadataRepository = repositorySession.getRepository();
133                for ( ManagedRepositoryConfiguration repoConfig : repositories )
134                {
135                    if ( repoConfig.isScanned() )
136                    {
137                        try
138                        {
139                            scheduleRepositoryJobs( repoConfig );
140                        }
141                        catch ( SchedulerException e )
142                        {
143                            throw new ArchivaException( "Unable to start scheduler: " + e.getMessage(), e );
144                        }
145    
146                        try
147                        {
148                            if ( !isPreviouslyScanned( repoConfig, metadataRepository ) )
149                            {
150                                queueInitialRepoScan( repoConfig );
151                            }
152                        }
153                        catch ( MetadataRepositoryException e )
154                        {
155                            log.warn( "Unable to determine if a repository is already scanned, skipping initial scan: "
156                                          + e.getMessage(), e );
157                        }
158                    }
159                }
160            }
161            finally
162            {
163                repositorySession.close();
164            }
165    
166            stopWatch.stop();
167            log.info( "Time to initalize DefaultRepositoryArchivaTaskScheduler: {} ms", stopWatch.getTime() );
168        }
169    
170    
171        @PreDestroy
172        public void stop()
173            throws SchedulerException
174        {
175            for ( String job : jobs )
176            {
177                scheduler.unscheduleJob( job, REPOSITORY_SCAN_GROUP );
178            }
179            jobs.clear();
180            queuedRepos.clear();
181    
182        }
183    
184        @SuppressWarnings( "unchecked" )
185        public boolean isProcessingRepositoryTask( String repositoryId )
186        {
187            synchronized ( repositoryScanningQueue )
188            {
189                List<RepositoryTask> queue = null;
190    
191                try
192                {
193                    queue = repositoryScanningQueue.getQueueSnapshot();
194                }
195                catch ( TaskQueueException e )
196                {
197                    // not possible with plexus-taskqueue implementation, ignore
198                }
199    
200                for ( RepositoryTask queuedTask : queue )
201                {
202                    if ( queuedTask.getRepositoryId().equals( repositoryId ) )
203                    {
204                        return true;
205                    }
206                }
207                return false;
208            }
209        }
210    
211        public boolean isProcessingRepositoryTask( RepositoryTask task )
212        {
213            synchronized ( repositoryScanningQueue )
214            {
215                List<RepositoryTask> queue = null;
216    
217                try
218                {
219                    queue = repositoryScanningQueue.getQueueSnapshot();
220                }
221                catch ( TaskQueueException e )
222                {
223                    // not possible with plexus-taskqueue implementation, ignore
224                }
225    
226                for ( RepositoryTask queuedTask : queue )
227                {
228                    if ( task.equals( queuedTask ) )
229                    {
230                        return true;
231                    }
232                }
233                return false;
234            }
235        }
236    
237        public void queueTask( RepositoryTask task )
238            throws TaskQueueException
239        {
240            synchronized ( repositoryScanningQueue )
241            {
242                if ( isProcessingRepositoryTask( task ) )
243                {
244                    log.debug( "Repository task '{}' is already queued. Skipping task.", task );
245                }
246                else
247                {
248                    // add check if the task is already queued if it is a file scan
249                    repositoryScanningQueue.put( task );
250                }
251            }
252        }
253    
254        public boolean unQueueTask( RepositoryTask task )
255            throws TaskQueueException
256        {
257            synchronized ( repositoryScanningQueue )
258            {
259                if ( !isProcessingRepositoryTask( task ) )
260                {
261                    log.info( "cannot unqueue Repository task '{}' not already queued.", task );
262                    return false;
263                }
264                else
265                {
266                    return repositoryScanningQueue.remove( task );
267                }
268            }
269        }
270    
271        public void configurationEvent( ConfigurationEvent event )
272        {
273            if ( event.getType() == ConfigurationEvent.SAVED )
274            {
275                for ( String job : jobs )
276                {
277                    try
278                    {
279                        scheduler.unscheduleJob( job, REPOSITORY_SCAN_GROUP );
280                    }
281                    catch ( SchedulerException e )
282                    {
283                        log.error( "Error restarting the repository scanning job after property change." );
284                    }
285                }
286                jobs.clear();
287    
288                List<ManagedRepositoryConfiguration> repositories =
289                    archivaConfiguration.getConfiguration().getManagedRepositories();
290    
291                for ( ManagedRepositoryConfiguration repoConfig : repositories )
292                {
293                    if ( repoConfig.getRefreshCronExpression() != null )
294                    {
295                        try
296                        {
297                            scheduleRepositoryJobs( repoConfig );
298                        }
299                        catch ( SchedulerException e )
300                        {
301                            log.error( "error restarting job: " + REPOSITORY_JOB + ":" + repoConfig.getId() );
302                        }
303                    }
304                }
305            }
306        }
307    
308        private boolean isPreviouslyScanned( ManagedRepositoryConfiguration repoConfig,
309                                             MetadataRepository metadataRepository )
310            throws MetadataRepositoryException
311        {
312            long start = System.currentTimeMillis();
313    
314            boolean res = repositoryStatisticsManager.hasStatistics( metadataRepository, repoConfig.getId() );
315    
316            long end = System.currentTimeMillis();
317    
318            log.debug( "isPreviouslyScanned repo {} {} time: {} ms", repoConfig.getId(), res, ( end - start ) );
319    
320            return res;
321        }
322    
323        // MRM-848: Pre-configured repository initially appear to be empty
324        private synchronized void queueInitialRepoScan( ManagedRepositoryConfiguration repoConfig )
325        {
326            String repoId = repoConfig.getId();
327            RepositoryTask task = new RepositoryTask();
328            task.setRepositoryId( repoId );
329    
330            if ( !queuedRepos.contains( repoId ) )
331            {
332                log.info( "Repository [" + repoId + "] is queued to be scanned as it hasn't been previously." );
333    
334                try
335                {
336                    queuedRepos.add( repoConfig.getId() );
337                    this.queueTask( task );
338                }
339                catch ( TaskQueueException e )
340                {
341                    log.error( "Error occurred while queueing repository [" + repoId + "] task : " + e.getMessage() );
342                }
343            }
344        }
345    
346        private synchronized void scheduleRepositoryJobs( ManagedRepositoryConfiguration repoConfig )
347            throws SchedulerException
348        {
349            if ( repoConfig.getRefreshCronExpression() == null )
350            {
351                log.warn( "Skipping job, no cron expression for {}", repoConfig.getId() );
352                return;
353            }
354    
355            if ( !repoConfig.isScanned() )
356            {
357                log.warn( "Skipping job, repository scannable has been disabled for {}", repoConfig.getId() );
358                return;
359            }
360    
361            // get the cron string for these database scanning jobs
362            String cronString = repoConfig.getRefreshCronExpression();
363    
364            if ( !cronValidator.validate( cronString ) )
365            {
366                log.warn( "Cron expression [{}] for repository [{}] is invalid.  Defaulting to hourly.", cronString,
367                          repoConfig.getId() );
368                cronString = CRON_HOURLY;
369            }
370    
371            // setup the unprocessed artifact job
372            JobDetailImpl repositoryJob =
373                new JobDetailImpl( REPOSITORY_JOB + ":" + repoConfig.getId(), REPOSITORY_SCAN_GROUP,
374                                   RepositoryTaskJob.class );
375    
376            repositoryJob.getJobDataMap().put( TASK_QUEUE, repositoryScanningQueue );
377            repositoryJob.getJobDataMap().put( TASK_REPOSITORY, repoConfig.getId() );
378    
379            try
380            {
381                CronTriggerImpl trigger =
382                    new CronTriggerImpl( REPOSITORY_JOB_TRIGGER + ":" + repoConfig.getId(), REPOSITORY_SCAN_GROUP,
383                                         cronString );
384    
385                jobs.add( REPOSITORY_JOB + ":" + repoConfig.getId() );
386                scheduler.scheduleJob( repositoryJob, trigger );
387            }
388            catch ( ParseException e )
389            {
390                log.error( "ParseException in repository scanning cron expression, disabling repository scanning for '"
391                               + repoConfig.getId() + "': " + e.getMessage() );
392            }
393    
394        }
395    }