001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.command.coord;
020
021import java.sql.Timestamp;
022import java.util.ArrayList;
023import java.util.Date;
024import java.util.List;
025
026import org.apache.oozie.CoordinatorActionBean;
027import org.apache.oozie.CoordinatorJobBean;
028import org.apache.oozie.ErrorCode;
029import org.apache.oozie.SLAEventBean;
030import org.apache.oozie.WorkflowJobBean;
031import org.apache.oozie.XException;
032import org.apache.oozie.service.EventHandlerService;
033import org.apache.oozie.service.JPAService;
034import org.apache.oozie.service.Services;
035import org.apache.oozie.util.InstrumentUtils;
036import org.apache.oozie.util.LogUtils;
037import org.apache.oozie.util.ParamChecker;
038import org.apache.oozie.util.db.SLADbOperations;
039import org.apache.oozie.client.CoordinatorAction;
040import org.apache.oozie.client.WorkflowJob;
041import org.apache.oozie.client.SLAEvent.SlaAppType;
042import org.apache.oozie.client.SLAEvent.Status;
043import org.apache.oozie.client.rest.JsonBean;
044import org.apache.oozie.command.CommandException;
045import org.apache.oozie.command.PreconditionException;
046import org.apache.oozie.executor.jpa.BatchQueryExecutor;
047import org.apache.oozie.executor.jpa.CoordActionGetForCheckJPAExecutor;
048import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
049import org.apache.oozie.executor.jpa.CoordinatorJobGetForUserAppnameJPAExecutor;
050import org.apache.oozie.executor.jpa.WorkflowJobGetForSLAJPAExecutor;
051import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry;
052import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
053
054/**
055 * The command checks workflow status for coordinator action.
056 */
057@SuppressWarnings("deprecation")
058public class CoordActionCheckXCommand extends CoordinatorXCommand<Void> {
059    private String actionId;
060    private int actionCheckDelay;
061    private CoordinatorActionBean coordAction = null;
062    private CoordinatorJobBean coordJob;
063    private WorkflowJobBean workflowJob;
064    private JPAService jpaService = null;
065    private List<UpdateEntry> updateList = new ArrayList<UpdateEntry>();
066    private List<JsonBean> insertList = new ArrayList<JsonBean>();
067
068    public CoordActionCheckXCommand(String actionId, int actionCheckDelay) {
069        super("coord_action_check", "coord_action_check", 0);
070        this.actionId = ParamChecker.notEmpty(actionId, "actionId");
071        this.actionCheckDelay = actionCheckDelay;
072    }
073
074    @Override
075    protected void setLogInfo() {
076        LogUtils.setLogInfo(actionId);
077    }
078
079    /* (non-Javadoc)
080     * @see org.apache.oozie.command.XCommand#execute()
081     */
082    @Override
083    protected Void execute() throws CommandException {
084        try {
085            InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation());
086            Status slaStatus = null;
087            CoordinatorAction.Status initialStatus = coordAction.getStatus();
088
089            if (workflowJob.getStatus() == WorkflowJob.Status.SUCCEEDED) {
090                coordAction.setStatus(CoordinatorAction.Status.SUCCEEDED);
091                // set pending to false as the status is SUCCEEDED
092                coordAction.setPending(0);
093                slaStatus = Status.SUCCEEDED;
094            }
095            else {
096                if (workflowJob.getStatus() == WorkflowJob.Status.FAILED) {
097                    coordAction.setStatus(CoordinatorAction.Status.FAILED);
098                    slaStatus = Status.FAILED;
099                    // set pending to false as the status is FAILED
100                    coordAction.setPending(0);
101                }
102                else {
103                    if (workflowJob.getStatus() == WorkflowJob.Status.KILLED) {
104                        coordAction.setStatus(CoordinatorAction.Status.KILLED);
105                        slaStatus = Status.KILLED;
106                        // set pending to false as the status is KILLED
107                        coordAction.setPending(0);
108                    }
109                    else {
110                        LOG.warn("Unexpected workflow " + workflowJob.getId() + " STATUS " + workflowJob.getStatus());
111                        coordAction.setLastModifiedTime(new Date());
112                        CoordActionQueryExecutor.getInstance().executeUpdate(
113                                CoordActionQueryExecutor.CoordActionQuery.UPDATE_COORD_ACTION_FOR_MODIFIED_DATE,
114                                coordAction);
115                        return null;
116                    }
117                }
118            }
119
120            LOG.debug("Updating Coordinator actionId :" + coordAction.getId() + "status to ="
121                            + coordAction.getStatus());
122            coordAction.setLastModifiedTime(new Date());
123            updateList.add(new UpdateEntry<CoordActionQuery>(CoordActionQuery.UPDATE_COORD_ACTION_STATUS_PENDING_TIME,
124                    coordAction));
125
126            if (slaStatus != null) {
127                SLAEventBean slaEvent = SLADbOperations.createStatusEvent(coordAction.getSlaXml(), coordAction.getId(), slaStatus,
128                        SlaAppType.COORDINATOR_ACTION, LOG);
129                if(slaEvent != null) {
130                    insertList.add(slaEvent);
131                }
132            }
133
134            BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, updateList, null);
135            CoordinatorAction.Status endStatus = coordAction.getStatus();
136            if (endStatus != initialStatus && EventHandlerService.isEnabled()) {
137                generateEvent(coordAction, coordJob.getUser(), coordJob.getAppName(), workflowJob.getStartTime());
138            }
139        }
140        catch (XException ex) {
141            LOG.warn("CoordActionCheckCommand Failed ", ex);
142            throw new CommandException(ex);
143        }
144        return null;
145    }
146
147    /* (non-Javadoc)
148     * @see org.apache.oozie.command.XCommand#getEntityKey()
149     */
150    @Override
151    public String getEntityKey() {
152        return actionId;
153    }
154
155    @Override
156    public String getKey() {
157        return getName() + "_" + actionId;
158    }
159
160    /* (non-Javadoc)
161     * @see org.apache.oozie.command.XCommand#isLockRequired()
162     */
163    @Override
164    protected boolean isLockRequired() {
165        return true;
166    }
167
168    /* (non-Javadoc)
169     * @see org.apache.oozie.command.XCommand#loadState()
170     */
171    @Override
172    protected void loadState() throws CommandException {
173        try {
174            jpaService = Services.get().get(JPAService.class);
175
176            if (jpaService != null) {
177                coordAction = jpaService.execute(new CoordActionGetForCheckJPAExecutor(actionId));
178                coordJob = jpaService.execute(new CoordinatorJobGetForUserAppnameJPAExecutor(
179                        coordAction.getJobId()));
180                workflowJob = jpaService.execute (new WorkflowJobGetForSLAJPAExecutor(coordAction.getExternalId()));
181                LogUtils.setLogInfo(coordAction);
182            }
183            else {
184                throw new CommandException(ErrorCode.E0610);
185            }
186        }
187        catch (XException ex) {
188            throw new CommandException(ex);
189        }
190    }
191
192    /* (non-Javadoc)
193     * @see org.apache.oozie.command.XCommand#verifyPrecondition()
194     */
195    @Override
196    protected void verifyPrecondition() throws CommandException, PreconditionException {
197        // if the action has been updated, quit this command
198        Timestamp actionCheckTs = new Timestamp(System.currentTimeMillis() - actionCheckDelay * 1000);
199        Timestamp cactionLmt = coordAction.getLastModifiedTimestamp();
200        if (cactionLmt.after(actionCheckTs)) {
201            throw new PreconditionException(ErrorCode.E1100, "The coord action :" + actionId
202                    + " has been udated. Ignore CoordActionCheckCommand!");
203        }
204        if (coordAction.getStatus().equals(CoordinatorAction.Status.SUCCEEDED)
205                || coordAction.getStatus().equals(CoordinatorAction.Status.FAILED)
206                || coordAction.getStatus().equals(CoordinatorAction.Status.KILLED)) {
207            throw new PreconditionException(ErrorCode.E1100, "The coord action [" + actionId + "] must not have status "
208                    + CoordinatorAction.Status.SUCCEEDED.name() + ", " + CoordinatorAction.Status.FAILED.name()
209                    + ", or " + CoordinatorAction.Status.KILLED.name() + " but has status [" + coordAction.getStatus().name()
210                    + "]");
211        }
212    }
213}