001 package org.apache.archiva.scheduler.repository; 002 003 /* 004 * Licensed to the Apache Software Foundation (ASF) under one 005 * or more contributor license agreements. See the NOTICE file 006 * distributed with this work for additional information 007 * regarding copyright ownership. The ASF licenses this file 008 * to you under the Apache License, Version 2.0 (the 009 * "License"); you may not use this file except in compliance 010 * with the License. You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, 015 * software distributed under the License is distributed on an 016 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 017 * KIND, either express or implied. See the License for the 018 * specific language governing permissions and limitations 019 * under the License. 020 */ 021 022 import org.apache.archiva.common.ArchivaException; 023 import org.apache.archiva.configuration.ArchivaConfiguration; 024 import org.apache.archiva.configuration.ConfigurationEvent; 025 import org.apache.archiva.configuration.ConfigurationListener; 026 import org.apache.archiva.configuration.ManagedRepositoryConfiguration; 027 import org.apache.archiva.metadata.repository.MetadataRepository; 028 import org.apache.archiva.metadata.repository.MetadataRepositoryException; 029 import org.apache.archiva.metadata.repository.RepositorySession; 030 import org.apache.archiva.metadata.repository.RepositorySessionFactory; 031 import org.apache.archiva.metadata.repository.stats.RepositoryStatisticsManager; 032 import org.apache.archiva.redback.components.scheduler.CronExpressionValidator; 033 import org.apache.archiva.redback.components.scheduler.Scheduler; 034 import org.apache.archiva.redback.components.taskqueue.TaskQueue; 035 import org.apache.archiva.redback.components.taskqueue.TaskQueueException; 036 import org.apache.archiva.scheduler.repository.model.RepositoryArchivaTaskScheduler; 037 import org.apache.archiva.scheduler.repository.model.RepositoryTask; 038 import org.apache.commons.lang.time.StopWatch; 039 import org.quartz.SchedulerException; 040 import org.quartz.impl.JobDetailImpl; 041 import org.quartz.impl.triggers.CronTriggerImpl; 042 import org.slf4j.Logger; 043 import org.slf4j.LoggerFactory; 044 import org.springframework.stereotype.Service; 045 046 import javax.annotation.PostConstruct; 047 import javax.annotation.PreDestroy; 048 import javax.inject.Inject; 049 import javax.inject.Named; 050 import java.text.ParseException; 051 import java.util.ArrayList; 052 import java.util.HashSet; 053 import java.util.List; 054 import java.util.Set; 055 056 /** 057 * Default implementation of a scheduling component for archiva. 058 */ 059 @Service( "archivaTaskScheduler#repository" ) 060 public class DefaultRepositoryArchivaTaskScheduler 061 implements RepositoryArchivaTaskScheduler, ConfigurationListener 062 { 063 private Logger log = LoggerFactory.getLogger( getClass() ); 064 065 /** 066 * 067 */ 068 @Inject 069 private Scheduler scheduler; 070 071 @Inject 072 private CronExpressionValidator cronValidator; 073 074 /** 075 * 076 */ 077 @Inject 078 @Named( value = "taskQueue#repository-scanning" ) 079 private TaskQueue repositoryScanningQueue; 080 081 /** 082 * 083 */ 084 @Inject 085 private ArchivaConfiguration archivaConfiguration; 086 087 /** 088 * 089 */ 090 @Inject 091 @Named( value = "repositoryStatisticsManager#default" ) 092 private RepositoryStatisticsManager repositoryStatisticsManager; 093 094 /** 095 * TODO: could have multiple implementations 096 */ 097 @Inject 098 private RepositorySessionFactory repositorySessionFactory; 099 100 private static final String REPOSITORY_SCAN_GROUP = "rg"; 101 102 private static final String REPOSITORY_JOB = "rj"; 103 104 private static final String REPOSITORY_JOB_TRIGGER = "rjt"; 105 106 static final String TASK_QUEUE = "TASK_QUEUE"; 107 108 static final String TASK_REPOSITORY = "TASK_REPOSITORY"; 109 110 public static final String CRON_HOURLY = "0 0 * * * ?"; 111 112 private Set<String> jobs = new HashSet<String>(); 113 114 private List<String> queuedRepos = new ArrayList<String>(); 115 116 @PostConstruct 117 public void startup() 118 throws ArchivaException 119 { 120 121 StopWatch stopWatch = new StopWatch(); 122 stopWatch.start(); 123 124 archivaConfiguration.addListener( this ); 125 126 List<ManagedRepositoryConfiguration> repositories = 127 archivaConfiguration.getConfiguration().getManagedRepositories(); 128 129 RepositorySession repositorySession = repositorySessionFactory.createSession(); 130 try 131 { 132 MetadataRepository metadataRepository = repositorySession.getRepository(); 133 for ( ManagedRepositoryConfiguration repoConfig : repositories ) 134 { 135 if ( repoConfig.isScanned() ) 136 { 137 try 138 { 139 scheduleRepositoryJobs( repoConfig ); 140 } 141 catch ( SchedulerException e ) 142 { 143 throw new ArchivaException( "Unable to start scheduler: " + e.getMessage(), e ); 144 } 145 146 try 147 { 148 if ( !isPreviouslyScanned( repoConfig, metadataRepository ) ) 149 { 150 queueInitialRepoScan( repoConfig ); 151 } 152 } 153 catch ( MetadataRepositoryException e ) 154 { 155 log.warn( "Unable to determine if a repository is already scanned, skipping initial scan: " 156 + e.getMessage(), e ); 157 } 158 } 159 } 160 } 161 finally 162 { 163 repositorySession.close(); 164 } 165 166 stopWatch.stop(); 167 log.info( "Time to initalize DefaultRepositoryArchivaTaskScheduler: {} ms", stopWatch.getTime() ); 168 } 169 170 171 @PreDestroy 172 public void stop() 173 throws SchedulerException 174 { 175 for ( String job : jobs ) 176 { 177 scheduler.unscheduleJob( job, REPOSITORY_SCAN_GROUP ); 178 } 179 jobs.clear(); 180 queuedRepos.clear(); 181 182 } 183 184 @SuppressWarnings( "unchecked" ) 185 public boolean isProcessingRepositoryTask( String repositoryId ) 186 { 187 synchronized ( repositoryScanningQueue ) 188 { 189 List<RepositoryTask> queue = null; 190 191 try 192 { 193 queue = repositoryScanningQueue.getQueueSnapshot(); 194 } 195 catch ( TaskQueueException e ) 196 { 197 // not possible with plexus-taskqueue implementation, ignore 198 } 199 200 for ( RepositoryTask queuedTask : queue ) 201 { 202 if ( queuedTask.getRepositoryId().equals( repositoryId ) ) 203 { 204 return true; 205 } 206 } 207 return false; 208 } 209 } 210 211 public boolean isProcessingRepositoryTask( RepositoryTask task ) 212 { 213 synchronized ( repositoryScanningQueue ) 214 { 215 List<RepositoryTask> queue = null; 216 217 try 218 { 219 queue = repositoryScanningQueue.getQueueSnapshot(); 220 } 221 catch ( TaskQueueException e ) 222 { 223 // not possible with plexus-taskqueue implementation, ignore 224 } 225 226 for ( RepositoryTask queuedTask : queue ) 227 { 228 if ( task.equals( queuedTask ) ) 229 { 230 return true; 231 } 232 } 233 return false; 234 } 235 } 236 237 public void queueTask( RepositoryTask task ) 238 throws TaskQueueException 239 { 240 synchronized ( repositoryScanningQueue ) 241 { 242 if ( isProcessingRepositoryTask( task ) ) 243 { 244 log.debug( "Repository task '{}' is already queued. Skipping task.", task ); 245 } 246 else 247 { 248 // add check if the task is already queued if it is a file scan 249 repositoryScanningQueue.put( task ); 250 } 251 } 252 } 253 254 public boolean unQueueTask( RepositoryTask task ) 255 throws TaskQueueException 256 { 257 synchronized ( repositoryScanningQueue ) 258 { 259 if ( !isProcessingRepositoryTask( task ) ) 260 { 261 log.info( "cannot unqueue Repository task '{}' not already queued.", task ); 262 return false; 263 } 264 else 265 { 266 return repositoryScanningQueue.remove( task ); 267 } 268 } 269 } 270 271 public void configurationEvent( ConfigurationEvent event ) 272 { 273 if ( event.getType() == ConfigurationEvent.SAVED ) 274 { 275 for ( String job : jobs ) 276 { 277 try 278 { 279 scheduler.unscheduleJob( job, REPOSITORY_SCAN_GROUP ); 280 } 281 catch ( SchedulerException e ) 282 { 283 log.error( "Error restarting the repository scanning job after property change." ); 284 } 285 } 286 jobs.clear(); 287 288 List<ManagedRepositoryConfiguration> repositories = 289 archivaConfiguration.getConfiguration().getManagedRepositories(); 290 291 for ( ManagedRepositoryConfiguration repoConfig : repositories ) 292 { 293 if ( repoConfig.getRefreshCronExpression() != null ) 294 { 295 try 296 { 297 scheduleRepositoryJobs( repoConfig ); 298 } 299 catch ( SchedulerException e ) 300 { 301 log.error( "error restarting job: " + REPOSITORY_JOB + ":" + repoConfig.getId() ); 302 } 303 } 304 } 305 } 306 } 307 308 private boolean isPreviouslyScanned( ManagedRepositoryConfiguration repoConfig, 309 MetadataRepository metadataRepository ) 310 throws MetadataRepositoryException 311 { 312 long start = System.currentTimeMillis(); 313 314 boolean res = repositoryStatisticsManager.hasStatistics( metadataRepository, repoConfig.getId() ); 315 316 long end = System.currentTimeMillis(); 317 318 log.debug( "isPreviouslyScanned repo {} {} time: {} ms", repoConfig.getId(), res, ( end - start ) ); 319 320 return res; 321 } 322 323 // MRM-848: Pre-configured repository initially appear to be empty 324 private synchronized void queueInitialRepoScan( ManagedRepositoryConfiguration repoConfig ) 325 { 326 String repoId = repoConfig.getId(); 327 RepositoryTask task = new RepositoryTask(); 328 task.setRepositoryId( repoId ); 329 330 if ( !queuedRepos.contains( repoId ) ) 331 { 332 log.info( "Repository [" + repoId + "] is queued to be scanned as it hasn't been previously." ); 333 334 try 335 { 336 queuedRepos.add( repoConfig.getId() ); 337 this.queueTask( task ); 338 } 339 catch ( TaskQueueException e ) 340 { 341 log.error( "Error occurred while queueing repository [" + repoId + "] task : " + e.getMessage() ); 342 } 343 } 344 } 345 346 private synchronized void scheduleRepositoryJobs( ManagedRepositoryConfiguration repoConfig ) 347 throws SchedulerException 348 { 349 if ( repoConfig.getRefreshCronExpression() == null ) 350 { 351 log.warn( "Skipping job, no cron expression for {}", repoConfig.getId() ); 352 return; 353 } 354 355 if ( !repoConfig.isScanned() ) 356 { 357 log.warn( "Skipping job, repository scannable has been disabled for {}", repoConfig.getId() ); 358 return; 359 } 360 361 // get the cron string for these database scanning jobs 362 String cronString = repoConfig.getRefreshCronExpression(); 363 364 if ( !cronValidator.validate( cronString ) ) 365 { 366 log.warn( "Cron expression [{}] for repository [{}] is invalid. Defaulting to hourly.", cronString, 367 repoConfig.getId() ); 368 cronString = CRON_HOURLY; 369 } 370 371 // setup the unprocessed artifact job 372 JobDetailImpl repositoryJob = 373 new JobDetailImpl( REPOSITORY_JOB + ":" + repoConfig.getId(), REPOSITORY_SCAN_GROUP, 374 RepositoryTaskJob.class ); 375 376 repositoryJob.getJobDataMap().put( TASK_QUEUE, repositoryScanningQueue ); 377 repositoryJob.getJobDataMap().put( TASK_REPOSITORY, repoConfig.getId() ); 378 379 try 380 { 381 CronTriggerImpl trigger = 382 new CronTriggerImpl( REPOSITORY_JOB_TRIGGER + ":" + repoConfig.getId(), REPOSITORY_SCAN_GROUP, 383 cronString ); 384 385 jobs.add( REPOSITORY_JOB + ":" + repoConfig.getId() ); 386 scheduler.scheduleJob( repositoryJob, trigger ); 387 } 388 catch ( ParseException e ) 389 { 390 log.error( "ParseException in repository scanning cron expression, disabling repository scanning for '" 391 + repoConfig.getId() + "': " + e.getMessage() ); 392 } 393 394 } 395 }