001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.bundle; 019 020 import java.util.Date; 021 import java.util.HashMap; 022 import java.util.List; 023 import java.util.Map; 024 025 import org.apache.oozie.BundleActionBean; 026 import org.apache.oozie.BundleJobBean; 027 import org.apache.oozie.CoordinatorJobBean; 028 import org.apache.oozie.ErrorCode; 029 import org.apache.oozie.XException; 030 import org.apache.oozie.client.Job; 031 import org.apache.oozie.client.rest.RestConstants; 032 import org.apache.oozie.command.CommandException; 033 import org.apache.oozie.command.RerunTransitionXCommand; 034 import org.apache.oozie.command.coord.CoordRerunXCommand; 035 import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor; 036 import org.apache.oozie.executor.jpa.BundleActionsGetJPAExecutor; 037 import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor; 038 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor; 039 import org.apache.oozie.executor.jpa.JPAExecutorException; 040 import org.apache.oozie.service.JPAService; 041 import org.apache.oozie.service.Services; 042 import org.apache.oozie.util.DateUtils; 043 import org.apache.oozie.util.LogUtils; 044 import org.apache.oozie.util.ParamChecker; 045 import org.apache.oozie.util.XLog; 046 047 /** 048 * Rerun bundle coordinator jobs by a list of coordinator names or dates. User can specify if refresh or noCleanup. 049 * <p/> 050 * The "refresh" is used to indicate if user wants to refresh an action's input/outpur dataset urls 051 * <p/> 052 * The "noCleanup" is used to indicate if user wants to cleanup output events for given rerun actions 053 */ 054 public class BundleRerunXCommand extends RerunTransitionXCommand<Void> { 055 056 private final String coordScope; 057 private final String dateScope; 058 private final boolean refresh; 059 private final boolean noCleanup; 060 private BundleJobBean bundleJob; 061 private List<BundleActionBean> bundleActions; 062 protected boolean prevPending; 063 064 private JPAService jpaService = null; 065 066 /** 067 * The constructor for class {@link BundleRerunXCommand} 068 * 069 * @param jobId the bundle job id 070 * @param coordScope the rerun scope for coordinator job names separated by "," 071 * @param dateScope the rerun scope for coordinator nominal times separated by "," 072 * @param refresh true if user wants to refresh input/outpur dataset urls 073 * @param noCleanup false if user wants to cleanup output events for given rerun actions 074 */ 075 public BundleRerunXCommand(String jobId, String coordScope, String dateScope, boolean refresh, boolean noCleanup) { 076 super("bundle_rerun", "bundle_rerun", 1); 077 this.jobId = ParamChecker.notEmpty(jobId, "jobId"); 078 this.coordScope = coordScope; 079 this.dateScope = dateScope; 080 this.refresh = refresh; 081 this.noCleanup = noCleanup; 082 } 083 084 /* (non-Javadoc) 085 * @see org.apache.oozie.command.XCommand#loadState() 086 */ 087 @Override 088 protected void loadState() throws CommandException { 089 try { 090 jpaService = Services.get().get(JPAService.class); 091 092 if (jpaService != null) { 093 this.bundleJob = jpaService.execute(new BundleJobGetJPAExecutor(jobId)); 094 this.bundleActions = jpaService.execute(new BundleActionsGetJPAExecutor(jobId)); 095 LogUtils.setLogInfo(bundleJob, logInfo); 096 super.setJob(bundleJob); 097 prevPending = bundleJob.isPending(); 098 } 099 else { 100 throw new CommandException(ErrorCode.E0610); 101 } 102 } 103 catch (XException ex) { 104 throw new CommandException(ex); 105 } 106 107 } 108 109 /* (non-Javadoc) 110 * @see org.apache.oozie.command.RerunTransitionXCommand#rerunChildren() 111 */ 112 @Override 113 public void rerunChildren() throws CommandException { 114 boolean isUpdateActionDone = false; 115 Map<String, BundleActionBean> coordNameToBAMapping = new HashMap<String, BundleActionBean>(); 116 if (bundleActions != null) { 117 for (BundleActionBean action : bundleActions) { 118 if (action.getCoordName() != null) { 119 coordNameToBAMapping.put(action.getCoordName(), action); 120 } 121 } 122 } 123 124 if (coordScope != null && !coordScope.isEmpty()) { 125 String[] list = coordScope.split(","); 126 for (String coordName : list) { 127 coordName = coordName.trim(); 128 if (coordNameToBAMapping.keySet().contains(coordName)) { 129 String coordId = coordNameToBAMapping.get(coordName).getCoordId(); 130 if (coordId == null) { 131 LOG.info("No coord id found. Therefore, nothing to queue for coord rerun for coordname: " + coordName); 132 continue; 133 } 134 CoordinatorJobBean coordJob = getCoordJob(coordId); 135 136 String rerunDateScope; 137 if (dateScope != null && !dateScope.isEmpty()) { 138 rerunDateScope = dateScope; 139 } 140 else { 141 String coordStart = DateUtils.formatDateOozieTZ(coordJob.getStartTime()); 142 String coordEnd = DateUtils.formatDateOozieTZ(coordJob.getEndTime()); 143 rerunDateScope = coordStart + "::" + coordEnd; 144 } 145 LOG.debug("Queuing rerun range [" + rerunDateScope + "] for coord id " + coordId + " of bundle " 146 + bundleJob.getId()); 147 queue(new CoordRerunXCommand(coordId, RestConstants.JOB_COORD_RERUN_DATE, rerunDateScope, refresh, 148 noCleanup)); 149 updateBundleAction(coordNameToBAMapping.get(coordName)); 150 isUpdateActionDone = true; 151 } 152 else { 153 LOG.info("Rerun for coord " + coordName + " NOT performed because it is not in bundle ", bundleJob.getId()); 154 } 155 } 156 } 157 else if (dateScope != null && !dateScope.isEmpty()) { 158 if (bundleActions != null) { 159 for (BundleActionBean action : bundleActions) { 160 if (action.getCoordId() == null) { 161 LOG.info("No coord id found. Therefore nothing to queue for coord rerun with coord name " 162 + action.getCoordName()); 163 continue; 164 } 165 LOG.debug("Queuing rerun range [" + dateScope + "] for coord id " + action.getCoordId() + " of bundle " 166 + bundleJob.getId()); 167 queue(new CoordRerunXCommand(action.getCoordId(), RestConstants.JOB_COORD_RERUN_DATE, dateScope, 168 refresh, noCleanup)); 169 updateBundleAction(action); 170 isUpdateActionDone = true; 171 } 172 } 173 } 174 if (!isUpdateActionDone) { 175 transitToPrevious(); 176 } 177 LOG.info("Rerun coord jobs for the bundle=[{0}]", jobId); 178 } 179 180 private final void transitToPrevious() throws CommandException { 181 bundleJob.setStatus(getPrevStatus()); 182 if (!prevPending) { 183 bundleJob.resetPending(); 184 } 185 else { 186 bundleJob.setPending(); 187 } 188 updateJob(); 189 } 190 191 /** 192 * Update bundle action 193 * 194 * @param action the bundle action 195 * @throws CommandException thrown if failed to update bundle action 196 */ 197 private void updateBundleAction(BundleActionBean action) { 198 action.incrementAndGetPending(); 199 action.setLastModifiedTime(new Date()); 200 updateList.add(action); 201 } 202 203 /* (non-Javadoc) 204 * @see org.apache.oozie.command.TransitionXCommand#updateJob() 205 */ 206 @Override 207 public void updateJob() { 208 // rerun a paused bundle job will keep job status at paused and pending at previous pending 209 if (getPrevStatus() != null) { 210 Job.Status bundleJobStatus = getPrevStatus(); 211 if (bundleJobStatus.equals(Job.Status.PAUSED) || bundleJobStatus.equals(Job.Status.PAUSEDWITHERROR)) { 212 bundleJob.setStatus(bundleJobStatus); 213 if (prevPending) { 214 bundleJob.setPending(); 215 } 216 else { 217 bundleJob.resetPending(); 218 } 219 } 220 } 221 updateList.add(bundleJob); 222 } 223 224 /* (non-Javadoc) 225 * @see org.apache.oozie.command.RerunTransitionXCommand#performWrites() 226 */ 227 @Override 228 public void performWrites() throws CommandException { 229 try { 230 jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, null)); 231 } 232 catch (JPAExecutorException e) { 233 throw new CommandException(e); 234 } 235 } 236 237 /* (non-Javadoc) 238 * @see org.apache.oozie.command.XCommand#getEntityKey() 239 */ 240 @Override 241 public String getEntityKey() { 242 return jobId; 243 } 244 245 /* (non-Javadoc) 246 * @see org.apache.oozie.command.XCommand#isLockRequired() 247 */ 248 @Override 249 protected boolean isLockRequired() { 250 return true; 251 } 252 253 /* 254 * (non-Javadoc) 255 * @see org.apache.oozie.command.TransitionXCommand#getJob() 256 */ 257 @Override 258 public Job getJob() { 259 return bundleJob; 260 } 261 262 /* (non-Javadoc) 263 * @see org.apache.oozie.command.TransitionXCommand#notifyParent() 264 */ 265 @Override 266 public void notifyParent() throws CommandException { 267 268 } 269 270 /* (non-Javadoc) 271 * @see org.apache.oozie.command.RerunTransitionXCommand#getLog() 272 */ 273 @Override 274 public XLog getLog() { 275 return LOG; 276 } 277 278 private final CoordinatorJobBean getCoordJob(String coordId) throws CommandException { 279 try { 280 CoordinatorJobBean job = jpaService.execute(new CoordJobGetJPAExecutor(coordId)); 281 return job; 282 } 283 catch (JPAExecutorException je) { 284 throw new CommandException(je); 285 } 286 287 } 288 289 }