001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.command.coord; 020 021import java.sql.Timestamp; 022import java.util.ArrayList; 023import java.util.Date; 024import java.util.List; 025 026import org.apache.oozie.CoordinatorActionBean; 027import org.apache.oozie.CoordinatorJobBean; 028import org.apache.oozie.ErrorCode; 029import org.apache.oozie.SLAEventBean; 030import org.apache.oozie.WorkflowJobBean; 031import org.apache.oozie.XException; 032import org.apache.oozie.service.EventHandlerService; 033import org.apache.oozie.service.JPAService; 034import org.apache.oozie.service.Services; 035import org.apache.oozie.util.InstrumentUtils; 036import org.apache.oozie.util.LogUtils; 037import org.apache.oozie.util.ParamChecker; 038import org.apache.oozie.util.db.SLADbOperations; 039import org.apache.oozie.client.CoordinatorAction; 040import org.apache.oozie.client.WorkflowJob; 041import org.apache.oozie.client.SLAEvent.SlaAppType; 042import org.apache.oozie.client.SLAEvent.Status; 043import org.apache.oozie.client.rest.JsonBean; 044import org.apache.oozie.command.CommandException; 045import org.apache.oozie.command.PreconditionException; 046import org.apache.oozie.executor.jpa.BatchQueryExecutor; 047import org.apache.oozie.executor.jpa.CoordActionGetForCheckJPAExecutor; 048import org.apache.oozie.executor.jpa.CoordActionQueryExecutor; 049import org.apache.oozie.executor.jpa.CoordinatorJobGetForUserAppnameJPAExecutor; 050import org.apache.oozie.executor.jpa.WorkflowJobGetForSLAJPAExecutor; 051import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry; 052import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery; 053 054/** 055 * The command checks workflow status for coordinator action. 056 */ 057@SuppressWarnings("deprecation") 058public class CoordActionCheckXCommand extends CoordinatorXCommand<Void> { 059 private String actionId; 060 private int actionCheckDelay; 061 private CoordinatorActionBean coordAction = null; 062 private CoordinatorJobBean coordJob; 063 private WorkflowJobBean workflowJob; 064 private JPAService jpaService = null; 065 private List<UpdateEntry> updateList = new ArrayList<UpdateEntry>(); 066 private List<JsonBean> insertList = new ArrayList<JsonBean>(); 067 068 public CoordActionCheckXCommand(String actionId, int actionCheckDelay) { 069 super("coord_action_check", "coord_action_check", 0); 070 this.actionId = ParamChecker.notEmpty(actionId, "actionId"); 071 this.actionCheckDelay = actionCheckDelay; 072 } 073 074 @Override 075 protected void setLogInfo() { 076 LogUtils.setLogInfo(actionId); 077 } 078 079 /* (non-Javadoc) 080 * @see org.apache.oozie.command.XCommand#execute() 081 */ 082 @Override 083 protected Void execute() throws CommandException { 084 try { 085 InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation()); 086 Status slaStatus = null; 087 CoordinatorAction.Status initialStatus = coordAction.getStatus(); 088 089 if (workflowJob.getStatus() == WorkflowJob.Status.SUCCEEDED) { 090 coordAction.setStatus(CoordinatorAction.Status.SUCCEEDED); 091 // set pending to false as the status is SUCCEEDED 092 coordAction.setPending(0); 093 slaStatus = Status.SUCCEEDED; 094 } 095 else { 096 if (workflowJob.getStatus() == WorkflowJob.Status.FAILED) { 097 coordAction.setStatus(CoordinatorAction.Status.FAILED); 098 slaStatus = Status.FAILED; 099 // set pending to false as the status is FAILED 100 coordAction.setPending(0); 101 } 102 else { 103 if (workflowJob.getStatus() == WorkflowJob.Status.KILLED) { 104 coordAction.setStatus(CoordinatorAction.Status.KILLED); 105 slaStatus = Status.KILLED; 106 // set pending to false as the status is KILLED 107 coordAction.setPending(0); 108 } 109 else { 110 LOG.warn("Unexpected workflow " + workflowJob.getId() + " STATUS " + workflowJob.getStatus()); 111 coordAction.setLastModifiedTime(new Date()); 112 CoordActionQueryExecutor.getInstance().executeUpdate( 113 CoordActionQueryExecutor.CoordActionQuery.UPDATE_COORD_ACTION_FOR_MODIFIED_DATE, 114 coordAction); 115 return null; 116 } 117 } 118 } 119 120 LOG.debug("Updating Coordinator actionId :" + coordAction.getId() + "status to =" 121 + coordAction.getStatus()); 122 coordAction.setLastModifiedTime(new Date()); 123 updateList.add(new UpdateEntry<CoordActionQuery>(CoordActionQuery.UPDATE_COORD_ACTION_STATUS_PENDING_TIME, 124 coordAction)); 125 126 if (slaStatus != null) { 127 SLAEventBean slaEvent = SLADbOperations.createStatusEvent(coordAction.getSlaXml(), coordAction.getId(), slaStatus, 128 SlaAppType.COORDINATOR_ACTION, LOG); 129 if(slaEvent != null) { 130 insertList.add(slaEvent); 131 } 132 } 133 134 BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, updateList, null); 135 CoordinatorAction.Status endStatus = coordAction.getStatus(); 136 if (endStatus != initialStatus && EventHandlerService.isEnabled()) { 137 generateEvent(coordAction, coordJob.getUser(), coordJob.getAppName(), workflowJob.getStartTime()); 138 } 139 } 140 catch (XException ex) { 141 LOG.warn("CoordActionCheckCommand Failed ", ex); 142 throw new CommandException(ex); 143 } 144 return null; 145 } 146 147 /* (non-Javadoc) 148 * @see org.apache.oozie.command.XCommand#getEntityKey() 149 */ 150 @Override 151 public String getEntityKey() { 152 return actionId; 153 } 154 155 @Override 156 public String getKey() { 157 return getName() + "_" + actionId; 158 } 159 160 /* (non-Javadoc) 161 * @see org.apache.oozie.command.XCommand#isLockRequired() 162 */ 163 @Override 164 protected boolean isLockRequired() { 165 return true; 166 } 167 168 /* (non-Javadoc) 169 * @see org.apache.oozie.command.XCommand#loadState() 170 */ 171 @Override 172 protected void loadState() throws CommandException { 173 try { 174 jpaService = Services.get().get(JPAService.class); 175 176 if (jpaService != null) { 177 coordAction = jpaService.execute(new CoordActionGetForCheckJPAExecutor(actionId)); 178 coordJob = jpaService.execute(new CoordinatorJobGetForUserAppnameJPAExecutor( 179 coordAction.getJobId())); 180 workflowJob = jpaService.execute (new WorkflowJobGetForSLAJPAExecutor(coordAction.getExternalId())); 181 LogUtils.setLogInfo(coordAction); 182 } 183 else { 184 throw new CommandException(ErrorCode.E0610); 185 } 186 } 187 catch (XException ex) { 188 throw new CommandException(ex); 189 } 190 } 191 192 /* (non-Javadoc) 193 * @see org.apache.oozie.command.XCommand#verifyPrecondition() 194 */ 195 @Override 196 protected void verifyPrecondition() throws CommandException, PreconditionException { 197 // if the action has been updated, quit this command 198 Timestamp actionCheckTs = new Timestamp(System.currentTimeMillis() - actionCheckDelay * 1000); 199 Timestamp cactionLmt = coordAction.getLastModifiedTimestamp(); 200 if (cactionLmt.after(actionCheckTs)) { 201 throw new PreconditionException(ErrorCode.E1100, "The coord action :" + actionId 202 + " has been udated. Ignore CoordActionCheckCommand!"); 203 } 204 if (coordAction.getStatus().equals(CoordinatorAction.Status.SUCCEEDED) 205 || coordAction.getStatus().equals(CoordinatorAction.Status.FAILED) 206 || coordAction.getStatus().equals(CoordinatorAction.Status.KILLED)) { 207 throw new PreconditionException(ErrorCode.E1100, "The coord action [" + actionId + "] must not have status " 208 + CoordinatorAction.Status.SUCCEEDED.name() + ", " + CoordinatorAction.Status.FAILED.name() 209 + ", or " + CoordinatorAction.Status.KILLED.name() + " but has status [" + coordAction.getStatus().name() 210 + "]"); 211 } 212 } 213}