Coverage Report - org.apache.maven.archiva.scheduled.DefaultArchivaTaskScheduler
 
Classes in this File Line Coverage Branch Coverage Complexity
DefaultArchivaTaskScheduler
0%
0/142
0%
0/42
0
 
 1  
 package org.apache.maven.archiva.scheduled;
 2  
 
 3  
 /*
 4  
  * Licensed to the Apache Software Foundation (ASF) under one
 5  
  * or more contributor license agreements.  See the NOTICE file
 6  
  * distributed with this work for additional information
 7  
  * regarding copyright ownership.  The ASF licenses this file
 8  
  * to you under the Apache License, Version 2.0 (the
 9  
  * "License"); you may not use this file except in compliance
 10  
  * with the License.  You may obtain a copy of the License at
 11  
  *
 12  
  *   http://www.apache.org/licenses/LICENSE-2.0
 13  
  *
 14  
  * Unless required by applicable law or agreed to in writing,
 15  
  * software distributed under the License is distributed on an
 16  
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 17  
  * KIND, either express or implied.  See the License for the
 18  
  * specific language governing permissions and limitations
 19  
  * under the License.
 20  
  */
 21  
 
 22  
 import java.text.ParseException;
 23  
 import java.util.ArrayList;
 24  
 import java.util.HashSet;
 25  
 import java.util.List;
 26  
 import java.util.Set;
 27  
 
 28  
 import org.apache.maven.archiva.common.ArchivaException;
 29  
 import org.apache.maven.archiva.configuration.ArchivaConfiguration;
 30  
 import org.apache.maven.archiva.configuration.ConfigurationEvent;
 31  
 import org.apache.maven.archiva.configuration.ConfigurationListener;
 32  
 import org.apache.maven.archiva.configuration.ManagedRepositoryConfiguration;
 33  
 import org.apache.maven.archiva.database.ArchivaDAO;
 34  
 import org.apache.maven.archiva.database.constraints.MostRecentRepositoryScanStatistics;
 35  
 import org.apache.maven.archiva.repository.scanner.RepositoryScanStatistics;
 36  
 import org.apache.maven.archiva.scheduled.tasks.ArtifactIndexingTask;
 37  
 import org.apache.maven.archiva.scheduled.tasks.DatabaseTask;
 38  
 import org.apache.maven.archiva.scheduled.tasks.RepositoryTask;
 39  
 import org.apache.maven.archiva.scheduled.tasks.TaskCreator;
 40  
 import org.codehaus.plexus.personality.plexus.lifecycle.phase.Startable;
 41  
 import org.codehaus.plexus.personality.plexus.lifecycle.phase.StartingException;
 42  
 import org.codehaus.plexus.personality.plexus.lifecycle.phase.StoppingException;
 43  
 import org.codehaus.plexus.scheduler.CronExpressionValidator;
 44  
 import org.codehaus.plexus.scheduler.Scheduler;
 45  
 import org.codehaus.plexus.taskqueue.Task;
 46  
 import org.codehaus.plexus.taskqueue.TaskQueue;
 47  
 import org.codehaus.plexus.taskqueue.TaskQueueException;
 48  
 import org.codehaus.plexus.taskqueue.execution.TaskExecutionException;
 49  
 import org.quartz.CronTrigger;
 50  
 import org.quartz.JobDataMap;
 51  
 import org.quartz.JobDetail;
 52  
 import org.quartz.SchedulerException;
 53  
 import org.slf4j.Logger;
 54  
 import org.slf4j.LoggerFactory;
 55  
 
 56  
 /**
 57  
  * Default implementation of a scheduling component for archiva.
 58  
  *
 59  
  * @plexus.component role="org.apache.maven.archiva.scheduled.ArchivaTaskScheduler" role-hint="default"
 60  
  */
 61  0
 public class DefaultArchivaTaskScheduler
 62  
     implements ArchivaTaskScheduler, Startable, ConfigurationListener
 63  
 {
 64  0
     private Logger log = LoggerFactory.getLogger( DefaultArchivaTaskScheduler.class );
 65  
     
 66  
     /**
 67  
      * @plexus.requirement
 68  
      */
 69  
     private Scheduler scheduler;
 70  
 
 71  
     /**
 72  
      * @plexus.requirement role-hint="database-update"
 73  
      */
 74  
     private TaskQueue databaseUpdateQueue;
 75  
 
 76  
     /**
 77  
      * @plexus.requirement role-hint="repository-scanning"
 78  
      */
 79  
     private TaskQueue repositoryScanningQueue;
 80  
     
 81  
     /**
 82  
      * @plexus.requirement role-hint="indexing"
 83  
      */
 84  
     private TaskQueue indexingQueue;
 85  
 
 86  
     /**
 87  
      * @plexus.requirement
 88  
      */
 89  
     private ArchivaConfiguration archivaConfiguration;
 90  
     
 91  
     /**
 92  
      * @plexus.requirement role-hint="jdo"
 93  
      */
 94  
     private ArchivaDAO dao;
 95  
 
 96  
     private static final String DATABASE_SCAN_GROUP = "dbg";
 97  
 
 98  
     private static final String DATABASE_JOB = "dbj";
 99  
 
 100  
     private static final String DATABASE_JOB_TRIGGER = "dbt";
 101  
 
 102  
     private static final String REPOSITORY_SCAN_GROUP = "rg";
 103  
 
 104  
     private static final String REPOSITORY_JOB = "rj";
 105  
 
 106  
     private static final String REPOSITORY_JOB_TRIGGER = "rjt";
 107  
 
 108  
     static final String TASK_QUEUE = "TASK_QUEUE";
 109  
 
 110  
     static final String TASK_REPOSITORY = "TASK_REPOSITORY";
 111  
 
 112  
     public static final String CRON_HOURLY = "0 0 * * * ?";
 113  
 
 114  0
     private Set<String> jobs = new HashSet<String>();
 115  
     
 116  0
     private List<String> queuedRepos = new ArrayList<String>();
 117  
 
 118  
     public void startup()
 119  
         throws ArchivaException
 120  
     {
 121  0
         archivaConfiguration.addListener( this );
 122  
 
 123  
         try
 124  
         {
 125  0
             start();
 126  
         }
 127  0
         catch ( StartingException e )
 128  
         {
 129  0
             throw new ArchivaException( e.getMessage(), e );
 130  0
         }
 131  0
     }
 132  
     
 133  
     public void start()
 134  
         throws StartingException
 135  
     {
 136  
         try
 137  
         {
 138  0
             List<ManagedRepositoryConfiguration> repositories = archivaConfiguration.getConfiguration()
 139  
                 .getManagedRepositories();
 140  
 
 141  0
             for ( ManagedRepositoryConfiguration repoConfig : repositories )
 142  
             {
 143  0
                 if ( repoConfig.isScanned() )
 144  
                 {
 145  0
                     scheduleRepositoryJobs( repoConfig );
 146  
                     
 147  0
                     if( !isPreviouslyScanned( repoConfig ) )
 148  
                     {
 149  0
                         queueInitialRepoScan( repoConfig );
 150  
                     }
 151  
                 }
 152  
             }
 153  
 
 154  0
             scheduleDatabaseJobs();
 155  
         }
 156  0
         catch ( SchedulerException e )
 157  
         {
 158  0
             throw new StartingException( "Unable to start scheduler: " + e.getMessage(), e );
 159  0
         }
 160  0
     }
 161  
 
 162  
     public void stop()
 163  
         throws StoppingException
 164  
     {
 165  
         try
 166  
         {
 167  0
             scheduler.unscheduleJob( DATABASE_JOB, DATABASE_SCAN_GROUP );
 168  
 
 169  0
             for ( String job : jobs )
 170  
             {
 171  0
                 scheduler.unscheduleJob( job, REPOSITORY_SCAN_GROUP );
 172  
             }
 173  0
             jobs.clear();
 174  0
             queuedRepos.clear();
 175  
         }
 176  0
         catch ( SchedulerException e )
 177  
         {
 178  0
             throw new StoppingException( "Unable to unschedule tasks", e );
 179  0
         }
 180  0
     }
 181  
 
 182  
     /**
 183  
      * @see ArchivaTaskScheduler#scheduleDatabaseTasks()
 184  
      */
 185  
     public void scheduleDatabaseTasks()
 186  
         throws TaskExecutionException
 187  
     {
 188  
         try
 189  
         {
 190  0
             scheduleDatabaseJobs();
 191  
         }
 192  0
         catch ( SchedulerException e )
 193  
         {
 194  0
             throw new TaskExecutionException( "Unable to schedule repository jobs: " + e.getMessage(), e );
 195  
 
 196  0
         }
 197  0
     }
 198  
 
 199  
     /**
 200  
      * @see ArchivaTaskScheduler#isProcessingRepositoryTask(String)
 201  
      */
 202  
     @SuppressWarnings("unchecked")
 203  
     public boolean isProcessingRepositoryTask( String repositoryId )
 204  
     {
 205  0
         synchronized( repositoryScanningQueue )
 206  
         {
 207  0
             List<RepositoryTask> queue = null;
 208  
     
 209  
             try
 210  
             {
 211  0
                 queue = repositoryScanningQueue.getQueueSnapshot();
 212  
             }
 213  0
             catch ( TaskQueueException e )
 214  
             {
 215  
                 // not possible with plexus-taskqueue implementation, ignore
 216  0
             }
 217  
     
 218  0
             for ( RepositoryTask queuedTask : queue )
 219  
             {
 220  0
                 if ( queuedTask.getRepositoryId().equals( repositoryId ) )
 221  
                 {
 222  0
                     return true;
 223  
                 }
 224  
             }
 225  0
             return false;
 226  0
         }
 227  
     }
 228  
     
 229  
     /**
 230  
      * @see ArchivaTaskScheduler#isProcessingIndexingTaskWithName(String)
 231  
      */
 232  
     @SuppressWarnings("unchecked")
 233  
     private boolean isProcessingRepositoryTask( RepositoryTask task )
 234  
     {
 235  0
         synchronized( repositoryScanningQueue )
 236  
         {
 237  0
             List<RepositoryTask> queue = null;
 238  
     
 239  
             try
 240  
             {
 241  0
                 queue = repositoryScanningQueue.getQueueSnapshot();
 242  
             }
 243  0
             catch ( TaskQueueException e )
 244  
             {
 245  
                 // not possible with plexus-taskqueue implementation, ignore
 246  0
             }
 247  
     
 248  0
             for ( RepositoryTask queuedTask : queue )
 249  
             {
 250  0
                 if ( task.equals( queuedTask ) )
 251  
                 {
 252  0
                     return true;
 253  
                 }
 254  
             }
 255  0
             return false;
 256  0
         }
 257  
     }
 258  
 
 259  
     /**
 260  
      * @see ArchivaTaskScheduler#isProcessingDatabaseTask()
 261  
      */
 262  
     @SuppressWarnings("unchecked")
 263  
     public boolean isProcessingDatabaseTask()
 264  
     {
 265  0
         List<? extends Task> queue = null;
 266  
 
 267  
         try
 268  
         {
 269  0
             queue = databaseUpdateQueue.getQueueSnapshot();
 270  
         }
 271  0
         catch ( TaskQueueException e )
 272  
         {
 273  
             // not possible with plexus-taskqueue implementation, ignore
 274  0
         }
 275  
 
 276  0
         return !queue.isEmpty();
 277  
     }
 278  
 
 279  
     /**
 280  
      * @see ArchivaTaskScheduler#queueRepositoryTask(RepositoryTask)
 281  
      */
 282  
     public void queueRepositoryTask( RepositoryTask task )
 283  
         throws TaskQueueException
 284  
     {
 285  0
         synchronized ( repositoryScanningQueue )
 286  
         {
 287  0
             if ( isProcessingRepositoryTask( task ) )
 288  
             {
 289  0
                 log.debug( "Repository task '" + task + "' is already queued. Skipping task." );
 290  
             }
 291  
             else
 292  
             {
 293  
                 // add check if the task is already queued if it is a file scan
 294  0
                 repositoryScanningQueue.put( task );
 295  
             }
 296  0
         }
 297  0
     }
 298  
 
 299  
     /**
 300  
      * @see ArchivaTaskScheduler#queueDatabaseTask(DatabaseTask)
 301  
      */
 302  
     public void queueDatabaseTask( DatabaseTask task )
 303  
         throws TaskQueueException
 304  
     {
 305  0
         databaseUpdateQueue.put( task );
 306  0
     }
 307  
     
 308  
     /**
 309  
      * @see ArchivaTaskScheduler#queueIndexingTask(ArtifactIndexingTask)
 310  
      */
 311  
     public void queueIndexingTask( ArtifactIndexingTask task )
 312  
         throws TaskQueueException
 313  
     {
 314  0
         indexingQueue.put( task );
 315  0
     }
 316  
 
 317  
     public void configurationEvent( ConfigurationEvent event )
 318  
     {
 319  0
         if ( event.getType() == ConfigurationEvent.SAVED )
 320  
         {
 321  
             try
 322  
             {
 323  0
                 scheduler.unscheduleJob( DATABASE_JOB, DATABASE_SCAN_GROUP );
 324  
 
 325  0
                 scheduleDatabaseJobs();
 326  
             }
 327  0
             catch ( SchedulerException e )
 328  
             {
 329  0
                 log.error( "Error restarting the database scanning job after property change." );
 330  0
             }
 331  
 
 332  0
             for ( String job : jobs )
 333  
             {
 334  
                 try
 335  
                 {
 336  0
                     scheduler.unscheduleJob( job, REPOSITORY_SCAN_GROUP );
 337  
                 }
 338  0
                 catch ( SchedulerException e )
 339  
                 {
 340  0
                     log.error( "Error restarting the repository scanning job after property change." );
 341  0
                 }
 342  
             }
 343  0
             jobs.clear();
 344  
 
 345  0
             List<ManagedRepositoryConfiguration> repositories = archivaConfiguration.getConfiguration().getManagedRepositories();
 346  
 
 347  0
             for ( ManagedRepositoryConfiguration repoConfig : repositories )
 348  
             {
 349  0
                 if ( repoConfig.getRefreshCronExpression() != null )
 350  
                 {
 351  
                     try
 352  
                     {
 353  0
                         scheduleRepositoryJobs( repoConfig );
 354  
                     }
 355  0
                     catch ( SchedulerException e )
 356  
                     {
 357  0
                         log.error( "error restarting job: " + REPOSITORY_JOB + ":" + repoConfig.getId() );
 358  0
                     }
 359  
                 }
 360  
             }
 361  
         }
 362  0
     }
 363  
     
 364  
     @SuppressWarnings("unchecked")
 365  
     private boolean isPreviouslyScanned( ManagedRepositoryConfiguration repoConfig )
 366  
     {
 367  0
         List<RepositoryScanStatistics> results =
 368  
             (List<RepositoryScanStatistics>) dao.query( new MostRecentRepositoryScanStatistics( repoConfig.getId() ) );
 369  
 
 370  0
         if ( results != null && !results.isEmpty() )
 371  
         {
 372  0
             return true;
 373  
         }
 374  
 
 375  0
         return false;
 376  
     }
 377  
     
 378  
     // MRM-848: Pre-configured repository initially appear to be empty
 379  
     private synchronized void queueInitialRepoScan( ManagedRepositoryConfiguration repoConfig )
 380  
     {
 381  0
         String repoId = repoConfig.getId();        
 382  0
         RepositoryTask task = TaskCreator.createRepositoryTask( repoId );
 383  
 
 384  0
         if ( !queuedRepos.contains( repoId ) )
 385  
         {
 386  0
             log.info( "Repository [" + repoId + "] is queued to be scanned as it hasn't been previously." );
 387  
 
 388  
             try
 389  
             {
 390  0
                 queuedRepos.add( repoConfig.getId() );
 391  0
                 this.queueRepositoryTask( task );
 392  
             }
 393  0
             catch ( TaskQueueException e )
 394  
             {
 395  0
                 log.error( "Error occurred while queueing repository [" + repoId + "] task : " + e.getMessage() );
 396  0
             }
 397  
         }
 398  0
     }
 399  
     
 400  
     private synchronized void scheduleRepositoryJobs( ManagedRepositoryConfiguration repoConfig )
 401  
         throws SchedulerException
 402  
     {
 403  0
         if ( repoConfig.getRefreshCronExpression() == null )
 404  
         {
 405  0
             log.warn( "Skipping job, no cron expression for " + repoConfig.getId() );
 406  0
             return;
 407  
         }
 408  
         
 409  0
         if ( !repoConfig.isScanned() )
 410  
         {
 411  0
             log.warn( "Skipping job, repository scannable has been disabled for " + repoConfig.getId() );
 412  0
             return;
 413  
         }
 414  
 
 415  
         // get the cron string for these database scanning jobs
 416  0
         String cronString = repoConfig.getRefreshCronExpression();
 417  
 
 418  0
         CronExpressionValidator cronValidator = new CronExpressionValidator();
 419  0
         if ( !cronValidator.validate( cronString ) )
 420  
         {
 421  0
             log.warn( "Cron expression [" + cronString + "] for repository [" + repoConfig.getId() +
 422  
                 "] is invalid.  Defaulting to hourly." );
 423  0
             cronString = CRON_HOURLY;
 424  
         }
 425  
 
 426  
         // setup the unprocessed artifact job
 427  0
         JobDetail repositoryJob =
 428  
             new JobDetail( REPOSITORY_JOB + ":" + repoConfig.getId(), REPOSITORY_SCAN_GROUP, RepositoryTaskJob.class );
 429  
 
 430  0
         JobDataMap dataMap = new JobDataMap();
 431  0
         dataMap.put( DefaultArchivaTaskScheduler.TASK_QUEUE, repositoryScanningQueue );
 432  0
         dataMap.put( DefaultArchivaTaskScheduler.TASK_REPOSITORY, repoConfig.getId() );
 433  0
         repositoryJob.setJobDataMap( dataMap );
 434  
 
 435  
         try
 436  
         {
 437  0
             CronTrigger trigger =
 438  
                 new CronTrigger( REPOSITORY_JOB_TRIGGER + ":" + repoConfig.getId(), REPOSITORY_SCAN_GROUP, cronString );
 439  
 
 440  0
             jobs.add( REPOSITORY_JOB + ":" + repoConfig.getId() );
 441  0
             scheduler.scheduleJob( repositoryJob, trigger );
 442  
         }
 443  0
         catch ( ParseException e )
 444  
         {
 445  0
             log.error(
 446  
                 "ParseException in repository scanning cron expression, disabling repository scanning for '" +
 447  
                     repoConfig.getId() + "': " + e.getMessage() );
 448  0
         }
 449  
 
 450  0
     }
 451  
 
 452  
     private synchronized void scheduleDatabaseJobs()
 453  
         throws SchedulerException
 454  
     {
 455  0
         String cronString = archivaConfiguration.getConfiguration().getDatabaseScanning().getCronExpression();
 456  
 
 457  
         // setup the unprocessed artifact job
 458  0
         JobDetail databaseJob = new JobDetail( DATABASE_JOB, DATABASE_SCAN_GROUP, DatabaseTaskJob.class );
 459  
 
 460  0
         JobDataMap dataMap = new JobDataMap();
 461  0
         dataMap.put( TASK_QUEUE, databaseUpdateQueue );
 462  0
         databaseJob.setJobDataMap( dataMap );
 463  
 
 464  0
         CronExpressionValidator cronValidator = new CronExpressionValidator();
 465  0
         if ( !cronValidator.validate( cronString ) )
 466  
         {
 467  0
             log.warn(
 468  
                 "Cron expression [" + cronString + "] for database update is invalid.  Defaulting to hourly." );
 469  0
             cronString = CRON_HOURLY;
 470  
         }
 471  
 
 472  
         try
 473  
         {
 474  0
             CronTrigger trigger = new CronTrigger( DATABASE_JOB_TRIGGER, DATABASE_SCAN_GROUP, cronString );
 475  
 
 476  0
             scheduler.scheduleJob( databaseJob, trigger );
 477  
         }
 478  0
         catch ( ParseException e )
 479  
         {
 480  0
             log.error(
 481  
                 "ParseException in database scanning cron expression, disabling database scanning: " + e.getMessage() );
 482  0
         }
 483  
 
 484  0
     }
 485  
 }