001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.command.wf; 020 021import java.util.Properties; 022 023import org.apache.oozie.ErrorCode; 024import org.apache.oozie.WorkflowActionBean; 025import org.apache.oozie.action.ActionExecutor; 026import org.apache.oozie.command.CommandException; 027import org.apache.oozie.command.PreconditionException; 028import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor; 029import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQuery; 030import org.apache.oozie.service.ActionService; 031import org.apache.oozie.service.CallbackService; 032import org.apache.oozie.service.Services; 033import org.apache.oozie.util.LogUtils; 034import org.apache.oozie.util.ParamChecker; 035 036/** 037 * This command is executed once the Workflow command is finished. 038 */ 039public class CompletedActionXCommand extends WorkflowXCommand<Void> { 040 private final String actionId; 041 private final String externalStatus; 042 private WorkflowActionBean wfactionBean; 043 private int earlyRequeueCount; 044 045 public CompletedActionXCommand(String actionId, String externalStatus, Properties actionData, int priority, 046 int earlyRequeueCount) { 047 super("callback", "callback", priority); 048 this.actionId = ParamChecker.notEmpty(actionId, "actionId"); 049 this.externalStatus = ParamChecker.notEmpty(externalStatus, "externalStatus"); 050 this.earlyRequeueCount = earlyRequeueCount; 051 } 052 053 public CompletedActionXCommand(String actionId, String externalStatus, Properties actionData, int priority) { 054 this(actionId, externalStatus, actionData, priority, 0); 055 } 056 057 public CompletedActionXCommand(String actionId, String externalStatus, Properties actionData) { 058 this(actionId, externalStatus, actionData, 1); 059 } 060 061 @Override 062 protected void setLogInfo() { 063 LogUtils.setLogInfo(actionId); 064 } 065 066 /* 067 * (non-Javadoc) 068 * 069 * @see org.apache.oozie.command.XCommand#eagerLoadState() 070 */ 071 @Override 072 protected void eagerLoadState() throws CommandException { 073 try { 074 this.wfactionBean = WorkflowActionQueryExecutor.getInstance().get(WorkflowActionQuery.GET_ACTION_COMPLETED, 075 this.actionId); 076 } 077 catch (Exception ex) { 078 throw new CommandException(ErrorCode.E0603, ex.getMessage(), ex); 079 } 080 LogUtils.setLogInfo(this.wfactionBean); 081 } 082 083 /* 084 * (non-Javadoc) 085 * 086 * @see org.apache.oozie.command.XCommand#eagerVerifyPrecondition() 087 */ 088 @Override 089 protected void eagerVerifyPrecondition() throws CommandException, PreconditionException { 090 if (this.wfactionBean.getStatus() != WorkflowActionBean.Status.RUNNING 091 && this.wfactionBean.getStatus() != WorkflowActionBean.Status.PREP) { 092 throw new CommandException(ErrorCode.E0800, actionId, this.wfactionBean.getStatus()); 093 } 094 } 095 096 /* 097 * (non-Javadoc) 098 * 099 * @see org.apache.oozie.command.XCommand#execute() 100 */ 101 @Override 102 protected Void execute() throws CommandException { 103 // If the action is still in PREP, we probably received a callback before Oozie was able to update from PREP to RUNNING; 104 // we'll requeue this command a few times and hope that it switches to RUNNING before giving up 105 if (this.wfactionBean.getStatus() == WorkflowActionBean.Status.PREP) { 106 int maxEarlyRequeueCount = Services.get().get(CallbackService.class).getEarlyRequeueMaxRetries(); 107 if (this.earlyRequeueCount < maxEarlyRequeueCount) { 108 long delay = getRequeueDelay(); 109 LOG.warn("Received early callback for action still in PREP state; will wait [{0}]ms and requeue up to [{1}] more" 110 + " times", delay, (maxEarlyRequeueCount - earlyRequeueCount)); 111 queue(new CompletedActionXCommand(this.actionId, this.externalStatus, null, this.getPriority(), 112 this.earlyRequeueCount + 1), delay); 113 } else { 114 throw new CommandException(ErrorCode.E0822, actionId); 115 } 116 } else { // RUNNING 117 ActionExecutor executor = Services.get().get(ActionService.class).getExecutor(this.wfactionBean.getType()); 118 // this is done because oozie notifications (of sub-wfs) is send 119 // every status change, not only on completion. 120 if (executor.isCompleted(externalStatus)) { 121 queue(new ActionCheckXCommand(this.wfactionBean.getId(), getPriority(), -1)); 122 } 123 } 124 return null; 125 } 126 127 /* 128 * (non-Javadoc) 129 * 130 * @see org.apache.oozie.command.XCommand#getEntityKey() 131 */ 132 @Override 133 public String getEntityKey() { 134 return null; 135 } 136 137 /* 138 * (non-Javadoc) 139 * 140 * @see org.apache.oozie.command.XCommand#isLockRequired() 141 */ 142 @Override 143 protected boolean isLockRequired() { 144 return false; 145 } 146 147 /* 148 * (non-Javadoc) 149 * 150 * @see org.apache.oozie.command.XCommand#loadState() 151 */ 152 @Override 153 protected void loadState() throws CommandException { 154 eagerLoadState(); 155 } 156 157 /* 158 * (non-Javadoc) 159 * 160 * @see org.apache.oozie.command.XCommand#verifyPrecondition() 161 */ 162 @Override 163 protected void verifyPrecondition() throws CommandException, PreconditionException { 164 eagerVerifyPrecondition(); 165 } 166}