001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.command; 020 021import java.util.ArrayList; 022import java.util.Date; 023import java.util.List; 024 025import org.apache.oozie.CoordinatorActionBean; 026import org.apache.oozie.CoordinatorJobBean; 027import org.apache.oozie.client.Job; 028import org.apache.oozie.client.rest.JsonBean; 029import org.apache.oozie.command.coord.CoordinatorXCommand; 030import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry; 031import org.apache.oozie.util.ParamChecker; 032 033/** 034 * This is the base commands for all the jobs related commands . This will drive the statuses for all the jobs and all 035 * the jobs will follow the same state machine. 036 * 037 * @param <T> 038 */ 039public abstract class TransitionXCommand<T> extends XCommand<T> { 040 041 protected Job job; 042 protected List<UpdateEntry> updateList = new ArrayList<UpdateEntry>(); 043 protected List<JsonBean> insertList = new ArrayList<JsonBean>(); 044 045 public TransitionXCommand(String name, String type, int priority) { 046 super(name, type, priority); 047 } 048 049 public TransitionXCommand(String name, String type, int priority, boolean dryrun) { 050 super(name, type, priority, dryrun); 051 } 052 053 /** 054 * Transit to the next status based on the result of the Job. 055 * 056 * @throws CommandException 057 */ 058 public abstract void transitToNext() throws CommandException; 059 060 /** 061 * Update the parent job. 062 * 063 * @throws CommandException 064 */ 065 public abstract void updateJob() throws CommandException; 066 067 /** 068 * This will be used to notify the parent about the status of that perticular job. 069 * 070 * @throws CommandException 071 */ 072 public abstract void notifyParent() throws CommandException; 073 074 /** 075 * This will be used to generate Job Notification events on status changes 076 * 077 * @param user 078 * @param appName 079 * @param em 080 * @throws CommandException 081 */ 082 public void generateEvents(CoordinatorJobBean coordJob, Date startTime) throws CommandException { 083 for(UpdateEntry entry : updateList){ 084 JsonBean actionBean = entry.getBean(); 085 if (actionBean instanceof CoordinatorActionBean) { 086 CoordinatorActionBean caBean = (CoordinatorActionBean) actionBean; 087 caBean.setJobId(coordJob.getId()); 088 CoordinatorXCommand.generateEvent(caBean, coordJob.getUser(), coordJob.getAppName(), startTime); 089 } 090 // TODO generate Coord Job event 091 } 092 } 093 094 /** 095 * This will be used to perform atomically all the writes within this command. 096 * 097 * @throws CommandException 098 */ 099 public abstract void performWrites() throws CommandException; 100 101 /* (non-Javadoc) 102 * @see org.apache.oozie.command.XCommand#execute() 103 */ 104 @Override 105 protected T execute() throws CommandException { 106 transitToNext(); 107 updateJob(); 108 notifyParent(); 109 return null; 110 } 111 112 /** 113 * Get the Job for the command. 114 * 115 * @return the job 116 */ 117 public Job getJob() { 118 return job; 119 } 120 121 /** 122 * Set the Job for the command. 123 * 124 * @param job the job 125 */ 126 public void setJob(Job job) { 127 this.job = ParamChecker.notNull(job, "job"); 128 } 129 130}