001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie;
020
021import java.io.DataInput;
022import java.io.DataOutput;
023import java.io.IOException;
024import java.sql.Timestamp;
025import java.text.MessageFormat;
026import java.util.Date;
027import java.util.List;
028
029import javax.persistence.Basic;
030import javax.persistence.Column;
031import javax.persistence.Entity;
032import javax.persistence.Id;
033import javax.persistence.Lob;
034import javax.persistence.NamedQueries;
035import javax.persistence.NamedQuery;
036import javax.persistence.Table;
037
038import org.apache.hadoop.io.Writable;
039import org.apache.oozie.client.CoordinatorAction;
040import org.apache.oozie.client.rest.JsonBean;
041import org.apache.oozie.client.rest.JsonTags;
042import org.apache.oozie.client.rest.JsonUtils;
043import org.apache.oozie.util.DateUtils;
044import org.apache.oozie.util.WritableUtils;
045import org.apache.openjpa.persistence.jdbc.Index;
046import org.apache.openjpa.persistence.jdbc.Strategy;
047import org.json.simple.JSONArray;
048import org.json.simple.JSONObject;
049
050
051@Entity
052@NamedQueries({
053
054        @NamedQuery(name = "UPDATE_COORD_ACTION", query = "update CoordinatorActionBean w set w.actionNumber = :actionNumber, w.actionXml = :actionXml, w.consoleUrl = :consoleUrl, w.createdConf = :createdConf, w.errorCode = :errorCode, w.errorMessage = :errorMessage, w.externalStatus = :externalStatus, w.missingDependencies = :missingDependencies, w.runConf = :runConf, w.timeOut = :timeOut, w.trackerUri = :trackerUri, w.type = :type, w.createdTimestamp = :createdTime, w.externalId = :externalId, w.jobId = :jobId, w.lastModifiedTimestamp = :lastModifiedTime, w.nominalTimestamp = :nominalTime, w.slaXml = :slaXml, w.statusStr = :status where w.id = :id"),
055
056        @NamedQuery(name = "UPDATE_COORD_ACTION_MIN", query = "update CoordinatorActionBean w set w.actionXml = :actionXml, w.missingDependencies = :missingDependencies, w.lastModifiedTimestamp = :lastModifiedTime, w.statusStr = :status where w.id = :id"),
057        // Query to update the action status, pending status and last modified time stamp of a Coordinator action
058        @NamedQuery(name = "UPDATE_COORD_ACTION_STATUS_PENDING_TIME", query = "update CoordinatorActionBean w set w.statusStr =:status, w.pending =:pending, w.lastModifiedTimestamp = :lastModifiedTime where w.id = :id"),
059        // Update query for InputCheck
060        @NamedQuery(name = "UPDATE_COORD_ACTION_FOR_INPUTCHECK", query = "update CoordinatorActionBean w set w.statusStr = :status, w.lastModifiedTimestamp = :lastModifiedTime, w.actionXml = :actionXml, w.missingDependencies = :missingDependencies where w.id = :id"),
061        // Update query for Push-based missing dependency check
062        @NamedQuery(name = "UPDATE_COORD_ACTION_FOR_PUSH_INPUTCHECK", query = "update CoordinatorActionBean w set w.statusStr = :status, w.lastModifiedTimestamp = :lastModifiedTime,  w.actionXml = :actionXml, w.pushMissingDependencies = :pushMissingDependencies where w.id = :id"),
063        // Update query for Push-based missing dependency check
064        @NamedQuery(name = "UPDATE_COORD_ACTION_DEPENDENCIES", query = "update CoordinatorActionBean w set w.missingDependencies = :missingDependencies, w.pushMissingDependencies = :pushMissingDependencies where w.id = :id"),
065        // Update query for Start
066        @NamedQuery(name = "UPDATE_COORD_ACTION_FOR_START", query = "update CoordinatorActionBean w set w.statusStr =:status, w.lastModifiedTimestamp = :lastModifiedTime, w.runConf = :runConf, w.externalId = :externalId, w.pending = :pending, w.errorCode = :errorCode, w.errorMessage = :errorMessage  where w.id = :id"),
067
068        @NamedQuery(name = "UPDATE_COORD_ACTION_FOR_MODIFIED_DATE", query = "update CoordinatorActionBean w set w.lastModifiedTimestamp = :lastModifiedTime where w.id = :id"),
069
070        @NamedQuery(name = "UPDATE_COORD_ACTION_RERUN", query = "update CoordinatorActionBean w set w.actionXml =:actionXml, w.statusStr = :status, w.externalId = :externalId, w.externalStatus = :externalStatus, w.rerunTimestamp = :rerunTime, w.lastModifiedTimestamp = :lastModifiedTime, w.createdTimestamp = :createdTime, w.createdConf = :createdConf, w.runConf = :runConf, w.missingDependencies = :missingDependencies, w.pushMissingDependencies = :pushMissingDependencies, w.errorCode = :errorCode, w.errorMessage = :errorMessage where w.id = :id"),
071
072        @NamedQuery(name = "DELETE_COMPLETED_ACTIONS_FOR_COORDINATOR", query = "delete from CoordinatorActionBean a where a.jobId = :jobId and (a.statusStr = 'SUCCEEDED' OR a.statusStr = 'FAILED' OR a.statusStr= 'KILLED')"),
073
074        @NamedQuery(name = "DELETE_ACTIONS_FOR_LONG_RUNNING_COORDINATOR", query = "delete from CoordinatorActionBean a where a.id IN (:actionId)"),
075
076        @NamedQuery(name = "DELETE_UNSCHEDULED_ACTION", query = "delete from CoordinatorActionBean a where a.id = :id and (a.statusStr = 'WAITING' OR a.statusStr = 'READY')"),
077
078        @NamedQuery(name = "GET_COORD_ACTIONS_FOR_COORDINATOR", query = "select a.id from CoordinatorActionBean a where a.jobId = :jobId"),
079
080        // Query used by XTestcase to setup tables
081        @NamedQuery(name = "GET_COORD_ACTIONS", query = "select OBJECT(w) from CoordinatorActionBean w"),
082        // Select query used only by test cases
083        @NamedQuery(name = "GET_COORD_ACTION", query = "select OBJECT(a) from CoordinatorActionBean a where a.id = :id"),
084
085        // Select query used by SLAService on restart
086        @NamedQuery(name = "GET_COORD_ACTION_FOR_SLA", query = "select a.id, a.jobId, a.statusStr, a.externalId, a.lastModifiedTimestamp from CoordinatorActionBean a where a.id = :id"),
087        // Select query used by ActionInfo command
088        @NamedQuery(name = "GET_COORD_ACTION_FOR_INFO", query = "select a.id, a.jobId, a.actionNumber, a.consoleUrl, a.errorCode, a.errorMessage, a.externalId, a.externalStatus, a.trackerUri, a.createdTimestamp, a.nominalTimestamp, a.statusStr, a.lastModifiedTimestamp, a.missingDependencies, a.pushMissingDependencies from CoordinatorActionBean a where a.id = :id"),
089        // Select Query used by Timeout and skip commands
090        @NamedQuery(name = "GET_COORD_ACTION_FOR_TIMEOUT", query = "select a.id, a.jobId, a.statusStr, a.runConf, a.pending, a.nominalTimestamp, a.createdTimestamp from CoordinatorActionBean a where a.id = :id"),
091        // Select query used by InputCheck command
092        @NamedQuery(name = "GET_COORD_ACTION_FOR_INPUTCHECK", query = "select a.id, a.jobId, a.statusStr, a.runConf, a.nominalTimestamp, a.createdTimestamp, a.actionXml, a.missingDependencies, a.pushMissingDependencies, a.timeOut, a.externalId from CoordinatorActionBean a where a.id = :id"),
093        // Select query used by CoordActionUpdate command
094        @NamedQuery(name = "GET_COORD_ACTION_FOR_EXTERNALID", query = "select a.id, a.jobId, a.statusStr, a.pending, a.externalId, a.lastModifiedTimestamp, a.slaXml, a.nominalTimestamp, a.createdTimestamp from CoordinatorActionBean a where a.externalId = :externalId"),
095        // Select query used by Check command
096        @NamedQuery(name = "GET_COORD_ACTION_FOR_CHECK", query = "select a.id, a.jobId, a.statusStr, a.pending, a.externalId, a.lastModifiedTimestamp, a.slaXml, a.nominalTimestamp, a.createdTimestamp from CoordinatorActionBean a where a.id = :id"),
097        // Select query used by Start command
098        @NamedQuery(name = "GET_COORD_ACTION_FOR_START", query = "select a.id, a.jobId, a.statusStr, a.pending, a.createdConf, a.slaXml, a.actionXml, a.externalId, a.errorMessage, a.errorCode, a.nominalTimestamp, a.createdTimestamp from CoordinatorActionBean a where a.id = :id"),
099
100        @NamedQuery(name = "GET_COORD_ACTIONS_FOR_JOB_FIFO", query = "select a.id, a.jobId, a.statusStr, a.pending, a.nominalTimestamp, a.createdTimestamp from CoordinatorActionBean a where a.jobId = :jobId AND a.statusStr = 'READY' order by a.nominalTimestamp"),
101
102        @NamedQuery(name = "GET_COORD_ACTIONS_FOR_JOB_LIFO", query = "select a.id, a.jobId, a.statusStr, a.pending, a.nominalTimestamp, a.createdTimestamp from CoordinatorActionBean a where a.jobId = :jobId AND a.statusStr = 'READY' order by a.nominalTimestamp desc"),
103
104        @NamedQuery(name = "GET_COORD_RUNNING_ACTIONS_COUNT", query = "select count(a) from CoordinatorActionBean a where a.jobId = :jobId AND (a.statusStr = 'RUNNING' OR a.statusStr='SUBMITTED')"),
105
106        @NamedQuery(name = "GET_COORD_ACTIONS_COUNT_BY_JOBID", query = "select count(a) from CoordinatorActionBean a where a.jobId = :jobId"),
107
108        @NamedQuery(name = "GET_COORD_ACTIVE_ACTIONS_COUNT_BY_JOBID", query = "select count(a) from CoordinatorActionBean a where a.jobId = :jobId AND a.statusStr = 'WAITING'"),
109
110        @NamedQuery(name = "GET_COORD_ACTIONS_PENDING_FALSE_COUNT", query = "select count(a) from CoordinatorActionBean a where a.jobId = :jobId AND a.pending = 0 AND (a.statusStr = 'SUSPENDED' OR a.statusStr = 'TIMEDOUT' OR a.statusStr = 'SUCCEEDED' OR a.statusStr = 'KILLED' OR a.statusStr = 'FAILED')"),
111
112        @NamedQuery(name = "GET_COORD_ACTIONS_PENDING_FALSE_STATUS_COUNT", query = "select count(a) from CoordinatorActionBean a where a.jobId = :jobId AND a.pending = 0 AND a.statusStr = :status"),
113
114        @NamedQuery(name = "GET_ACTIONS_FOR_COORD_JOB", query = "select count(a) from CoordinatorActionBean a where a.jobId = :jobId"),
115        // Query to retrieve Coordinator actions sorted by nominal time
116        @NamedQuery(name = "GET_ACTIONS_FOR_COORD_JOB_ORDER_BY_NOMINAL_TIME", query = "select a.id, a.actionNumber, a.consoleUrl, a.errorCode, a.errorMessage, a.externalId, a.externalStatus, a.jobId, a.trackerUri, a.createdTimestamp, a.nominalTimestamp, a.statusStr, a.lastModifiedTimestamp, a.missingDependencies, a.pushMissingDependencies, a.timeOut from CoordinatorActionBean a where a.jobId = :jobId order by a.nominalTimestamp"),
117        // Query to maintain backward compatibility for coord job info command
118        @NamedQuery(name = "GET_ALL_COLS_FOR_ACTIONS_FOR_COORD_JOB_ORDER_BY_NOMINAL_TIME", query = "select OBJECT(a) from CoordinatorActionBean a where a.jobId = :jobId order by a.nominalTimestamp"),
119        // Query to retrieve action id, action status, pending status and external Id of not completed Coordinator actions
120        @NamedQuery(name = "GET_COORD_ACTIONS_NOT_COMPLETED", query = "select a.id, a.statusStr, a.pending, a.externalId, a.pushMissingDependencies, a.nominalTimestamp, a.createdTimestamp, a.jobId from CoordinatorActionBean a where a.jobId = :jobId AND a.statusStr <> 'FAILED' AND a.statusStr <> 'TIMEDOUT' AND a.statusStr <> 'SUCCEEDED' AND a.statusStr <> 'KILLED' AND a.statusStr <> 'IGNORED'"),
121
122        // Query to retrieve action id, action status, pending status and external Id of running Coordinator actions
123        @NamedQuery(name = "GET_COORD_ACTIONS_RUNNING", query = "select a.id, a.statusStr, a.pending, a.externalId, a.nominalTimestamp, a.createdTimestamp from CoordinatorActionBean a where a.jobId = :jobId and a.statusStr = 'RUNNING'"),
124
125        // Query to retrieve action id, action status, pending status and external Id of suspended Coordinator actions
126        @NamedQuery(name = "GET_COORD_ACTIONS_SUSPENDED", query = "select a.id, a.statusStr, a.pending, a.externalId, a.nominalTimestamp, a.createdTimestamp from CoordinatorActionBean a where a.jobId = :jobId and a.statusStr = 'SUSPENDED'"),
127
128        // Query to retrieve count of Coordinator actions which are pending
129        @NamedQuery(name = "GET_COORD_ACTIONS_PENDING_COUNT", query = "select count(a) from CoordinatorActionBean a where a.jobId = :jobId AND a.pending > 0"),
130
131        // Query to retrieve status of Coordinator actions
132        @NamedQuery(name = "GET_COORD_ACTIONS_STATUS_UNIGNORED", query = "select a.statusStr, a.pending from CoordinatorActionBean a where a.jobId = :jobId AND a.statusStr <> 'IGNORED'"),
133
134        // Query to retrieve status of Coordinator actions
135        @NamedQuery(name = "GET_COORD_ACTION_STATUS", query = "select a.statusStr from CoordinatorActionBean a where a.id = :id"),
136
137        @NamedQuery(name = "GET_COORD_ACTION_FOR_COORD_JOB_BY_ACTION_NUMBER", query = "select a.id from CoordinatorActionBean a where a.jobId = :jobId AND a.actionNumber = :actionNumber"),
138
139        @NamedQuery(name = "GET_COORD_ACTIONS_BY_LAST_MODIFIED_TIME", query = "select a.jobId from CoordinatorActionBean a where a.lastModifiedTimestamp >= :lastModifiedTime"),
140
141        //Used by coordinator store only
142        @NamedQuery(name = "GET_RUNNING_ACTIONS_FOR_COORD_JOB", query = "select OBJECT(a) from CoordinatorActionBean a where a.jobId = :jobId AND a.statusStr = 'RUNNING'"),
143
144        @NamedQuery(name = "GET_RUNNING_ACTIONS_OLDER_THAN", query = "select a.id from CoordinatorActionBean a where a.statusStr = 'RUNNING' AND a.lastModifiedTimestamp <= :lastModifiedTime"),
145
146        @NamedQuery(name = "GET_COORD_ACTIONS_WAITING_SUBMITTED_OLDER_THAN", query = "select a.id, a.jobId, a.statusStr, a.externalId, a.pushMissingDependencies from CoordinatorActionBean a where (a.statusStr = 'WAITING' OR a.statusStr = 'SUBMITTED') AND a.lastModifiedTimestamp <= :lastModifiedTime"),
147
148        @NamedQuery(name = "GET_COORD_ACTIONS_FOR_RECOVERY_OLDER_THAN", query = "select a.id, a.jobId, a.statusStr, a.externalId, a.pending from CoordinatorActionBean a where a.pending > 0 AND (a.statusStr = 'SUSPENDED' OR a.statusStr = 'KILLED' OR a.statusStr = 'RUNNING') AND a.lastModifiedTimestamp <= :lastModifiedTime"),
149        // Select query used by rerun, requires almost all columns so select * is used
150        @NamedQuery(name = "GET_TERMINATED_ACTIONS_FOR_DATES", query = "select OBJECT(a) from CoordinatorActionBean a where a.jobId = :jobId AND (a.statusStr = 'TIMEDOUT' OR a.statusStr = 'SUCCEEDED' OR a.statusStr = 'KILLED' OR a.statusStr = 'FAILED' OR a.statusStr = 'IGNORED') AND a.nominalTimestamp >= :startTime AND a.nominalTimestamp <= :endTime"),
151        // Select query used by log
152        @NamedQuery(name = "GET_TERMINATED_ACTION_IDS_FOR_DATES", query = "select a.id from CoordinatorActionBean a where a.jobId = :jobId AND (a.statusStr = 'TIMEDOUT' OR a.statusStr = 'SUCCEEDED' OR a.statusStr = 'KILLED' OR a.statusStr = 'FAILED') AND a.nominalTimestamp >= :startTime AND a.nominalTimestamp <= :endTime"),
153        // Select query used by rerun, requires almost all columns so select * is used
154        @NamedQuery(name = "GET_ACTION_FOR_NOMINALTIME", query = "select OBJECT(a) from CoordinatorActionBean a where a.jobId = :jobId AND a.nominalTimestamp = :nominalTime"),
155
156        @NamedQuery(name = "GET_ACTIVE_ACTIONS_FOR_DATES", query = "select a.id, a.jobId, a.statusStr, a.externalId, a.pending, a.nominalTimestamp, a.createdTimestamp from CoordinatorActionBean a where a.jobId = :jobId AND (a.statusStr = 'WAITING' OR a.statusStr = 'READY' OR a.statusStr = 'SUBMITTED' OR a.statusStr = 'RUNNING'  OR a.statusStr = 'SUSPENDED') AND a.nominalTimestamp >= :startTime AND a.nominalTimestamp <= :endTime"),
157
158        @NamedQuery(name = "GET_COORD_ACTIONS_COUNT", query = "select count(w) from CoordinatorActionBean w"),
159
160        @NamedQuery(name = "GET_COORD_ACTIONS_COUNT_RUNNING_FOR_RANGE", query = "select count(w) from CoordinatorActionBean w where w.statusStr = 'RUNNING' and w.jobId= :jobId and w.id >= :startAction AND w.id <= :endAction"),
161
162        @NamedQuery(name = "GET_COORD_ACTIONS_MAX_MODIFIED_DATE_FOR_RANGE", query = "select max(w.lastModifiedTimestamp) from CoordinatorActionBean w where w.jobId= :jobId and w.id >= :startAction AND w.id <= :endAction"),
163
164         @NamedQuery(name = "GET_READY_ACTIONS_GROUP_BY_JOBID", query = "select a.jobId, min(a.lastModifiedTimestamp) from CoordinatorActionBean a where a.statusStr = 'READY' group by a.jobId having min(a.lastModifiedTimestamp) < :lastModifiedTime"),
165
166         @NamedQuery(name = "GET_ACTIVE_ACTIONS_IDS_FOR_SLA_CHANGE", query = "select a.id, a.nominalTimestamp, a.createdTimestamp, a.actionXml  from CoordinatorActionBean a where a.id in (:ids) and (a.statusStr <> 'FAILED' AND a.statusStr <> 'KILLED' AND a.statusStr <> 'SUCCEEDED' AND a.statusStr <> 'TIMEDOUT'  AND a.statusStr <> 'IGNORED')"),
167
168         @NamedQuery(name = "GET_ACTIVE_ACTIONS_JOBID_FOR_SLA_CHANGE", query = "select a.id, a.nominalTimestamp, a.createdTimestamp, a.actionXml  from CoordinatorActionBean a where a.jobId = :jobId and (a.statusStr <> 'FAILED' AND a.statusStr <> 'KILLED' AND a.statusStr <> 'SUCCEEDED' AND a.statusStr <> 'TIMEDOUT'  AND a.statusStr <> 'IGNORED')")
169 })
170
171@Table(name = "COORD_ACTIONS")
172public class CoordinatorActionBean implements
173        Writable,CoordinatorAction,JsonBean {
174
175    @Id
176    private String id;
177
178    @Basic
179    @Index
180    @Column(name = "job_id")
181    private String jobId;
182
183    @Basic
184    @Index
185    @Column(name = "status")
186    private String statusStr = CoordinatorAction.Status.WAITING.toString();
187
188    @Basic
189    @Index
190    @Column(name = "nominal_time")
191    private java.sql.Timestamp nominalTimestamp = null;
192
193    @Basic
194    @Index
195    @Column(name = "last_modified_time")
196    private java.sql.Timestamp lastModifiedTimestamp = null;
197
198    @Basic
199    @Index
200    @Column(name = "created_time")
201    private java.sql.Timestamp createdTimestamp = null;
202
203    @Basic
204    @Index
205    @Column(name = "rerun_time")
206    private java.sql.Timestamp rerunTimestamp = null;
207
208    @Basic
209    @Index
210    @Column(name = "external_id")
211    private String externalId;
212
213    @Basic
214    @Column(name = "sla_xml")
215    @Lob
216    @Strategy("org.apache.oozie.executor.jpa.StringBlobValueHandler")
217    private StringBlob slaXml = null;
218
219    @Basic
220    @Column(name = "pending")
221    private int pending = 0;
222
223    @Basic
224    @Column(name = "job_type")
225    private String type;
226
227    @Basic
228    @Column(name = "action_number")
229    private int actionNumber;
230
231    @Basic
232    @Column(name = "created_conf")
233    @Lob
234    @Strategy("org.apache.oozie.executor.jpa.StringBlobValueHandler")
235    private StringBlob createdConf;
236
237    @Basic
238    @Column(name = "time_out")
239    private int timeOut = 0;
240
241    @Basic
242    @Column(name = "run_conf")
243    @Lob
244    @Strategy("org.apache.oozie.executor.jpa.StringBlobValueHandler")
245    private StringBlob runConf;
246
247    @Basic
248    @Column(name = "action_xml")
249    @Lob
250    @Strategy("org.apache.oozie.executor.jpa.StringBlobValueHandler")
251    private StringBlob actionXml;
252
253    @Basic
254    @Column(name = "missing_dependencies")
255    @Lob
256    @Strategy("org.apache.oozie.executor.jpa.StringBlobValueHandler")
257    private StringBlob missingDependencies;
258
259    @Basic
260    @Column(name = "push_missing_dependencies")
261    @Lob
262    @Strategy("org.apache.oozie.executor.jpa.StringBlobValueHandler")
263    private StringBlob pushMissingDependencies;
264
265    @Basic
266    @Column(name = "external_status")
267    private String externalStatus;
268
269    @Basic
270    @Column(name = "tracker_uri")
271    private String trackerUri;
272
273    @Basic
274    @Column(name = "console_url")
275    private String consoleUrl;
276
277    @Basic
278    @Column(name = "error_code")
279    private String errorCode;
280
281    @Basic
282    @Column(name = "error_message")
283    private String errorMessage;
284
285    @SuppressWarnings("unchecked")
286    public JSONObject toJSONObject() {
287        return toJSONObject("GMT");
288    }
289
290    public CoordinatorActionBean() {
291    }
292
293    /**
294     * Serialize the coordinator bean to a data output.
295     *
296     * @param dataOutput data output.
297     * @throws IOException thrown if the coordinator bean could not be
298     *         serialized.
299     */
300    @Override
301    public void write(DataOutput dataOutput) throws IOException {
302        WritableUtils.writeStr(dataOutput, getJobId());
303        WritableUtils.writeStr(dataOutput, getType());
304        WritableUtils.writeStr(dataOutput, getId());
305        WritableUtils.writeStr(dataOutput, getCreatedConf());
306        WritableUtils.writeStr(dataOutput, getStatus().toString());
307        dataOutput.writeInt(getActionNumber());
308        WritableUtils.writeStr(dataOutput, getRunConf());
309        WritableUtils.writeStr(dataOutput, getExternalStatus());
310        WritableUtils.writeStr(dataOutput, getTrackerUri());
311        WritableUtils.writeStr(dataOutput, getConsoleUrl());
312        WritableUtils.writeStr(dataOutput, getErrorCode());
313        WritableUtils.writeStr(dataOutput, getErrorMessage());
314        dataOutput.writeLong((getCreatedTime() != null) ? getCreatedTime().getTime() : -1);
315        dataOutput.writeLong((getLastModifiedTime() != null) ? getLastModifiedTime().getTime() : -1);
316    }
317
318    /**
319     * Deserialize a coordinator bean from a data input.
320     *
321     * @param dataInput data input.
322     * @throws IOException thrown if the workflow bean could not be
323     *         deserialized.
324     */
325    @Override
326    public void readFields(DataInput dataInput) throws IOException {
327        setJobId(WritableUtils.readStr(dataInput));
328        setType(WritableUtils.readStr(dataInput));
329        setId(WritableUtils.readStr(dataInput));
330        setCreatedConf(WritableUtils.readStr(dataInput));
331        setStatus(CoordinatorAction.Status.valueOf(WritableUtils.readStr(dataInput)));
332        setActionNumber(dataInput.readInt());
333        setRunConf(WritableUtils.readStr(dataInput));
334        setExternalStatus(WritableUtils.readStr(dataInput));
335        setTrackerUri(WritableUtils.readStr(dataInput));
336        setConsoleUrl(WritableUtils.readStr(dataInput));
337        setErrorCode(WritableUtils.readStr(dataInput));
338        setErrorMessage(WritableUtils.readStr(dataInput));
339        long d = dataInput.readLong();
340        if (d != -1) {
341            setCreatedTime(new Date(d));
342        }
343        d = dataInput.readLong();
344        if (d != -1) {
345            setLastModifiedTime(new Date(d));
346        }
347    }
348
349    @Override
350    public String getJobId() {
351        return this.jobId;
352    }
353
354    public void setJobId(String id) {
355        this.jobId = id;
356    }
357
358    @Override
359    public Status getStatus() {
360        return Status.valueOf(statusStr);
361    }
362
363    /**
364     * Return the status in string
365     * @return
366     */
367    public String getStatusStr() {
368        return statusStr;
369    }
370
371    public void setStatus(Status status) {
372        this.statusStr = status.toString();
373    }
374
375    public void setStatusStr(String statusStr) {
376        this.statusStr = statusStr;
377    }
378
379    public void setCreatedTime(Date createdTime) {
380        this.createdTimestamp = DateUtils.convertDateToTimestamp(createdTime);
381    }
382
383    public void setRerunTime(Date rerunTime) {
384        this.rerunTimestamp = DateUtils.convertDateToTimestamp(rerunTime);
385    }
386
387    public void setNominalTime(Date nominalTime) {
388        this.nominalTimestamp = DateUtils.convertDateToTimestamp(nominalTime);
389    }
390
391    public void setLastModifiedTime(Date lastModifiedTime) {
392        this.lastModifiedTimestamp = DateUtils.convertDateToTimestamp(lastModifiedTime);
393    }
394
395    public Date getCreatedTime() {
396        return DateUtils.toDate(createdTimestamp);
397    }
398
399    public Timestamp getCreatedTimestamp() {
400        return createdTimestamp;
401    }
402
403    public Date getRerunTime() {
404        return DateUtils.toDate(rerunTimestamp);
405    }
406
407    public Timestamp getRerunTimestamp() {
408        return rerunTimestamp;
409    }
410
411    @Override
412    public Date getLastModifiedTime() {
413        return DateUtils.toDate(lastModifiedTimestamp);
414    }
415
416    public Timestamp getLastModifiedTimestamp() {
417        return lastModifiedTimestamp;
418    }
419
420    @Override
421    public Date getNominalTime() {
422        return DateUtils.toDate(nominalTimestamp);
423    }
424
425    public Timestamp getNominalTimestamp() {
426        return nominalTimestamp;
427    }
428
429    @Override
430    public String getExternalId() {
431        return externalId;
432    }
433
434    public void setExternalId(String externalId) {
435        this.externalId = externalId;
436    }
437
438    public StringBlob getSlaXmlBlob() {
439        return slaXml;
440    }
441
442    public void setSlaXmlBlob(StringBlob slaXml) {
443        this.slaXml = slaXml;
444    }
445
446    public String getSlaXml() {
447        return slaXml == null ? null : slaXml.getString();
448    }
449
450    public void setSlaXml(String slaXml) {
451        if (this.slaXml == null) {
452            this.slaXml = new StringBlob(slaXml);
453        }
454        else {
455            this.slaXml.setString(slaXml);
456        }
457    }
458
459    /**
460     * @return true if in terminal status
461     */
462    public boolean isTerminalStatus() {
463        boolean isTerminal = true;
464        switch (getStatus()) {
465            case WAITING:
466            case READY:
467            case SUBMITTED:
468            case RUNNING:
469            case SUSPENDED:
470                isTerminal = false;
471                break;
472            default:
473                isTerminal = true;
474                break;
475        }
476        return isTerminal;
477    }
478
479    /**
480     * Return if the action is complete with failure.
481     *
482     * @return if the action is complete with failure.
483     */
484    public boolean isTerminalWithFailure() {
485        boolean result = false;
486        switch (getStatus()) {
487            case FAILED:
488            case KILLED:
489            case TIMEDOUT:
490                result = true;
491        }
492        return result;
493    }
494
495    /**
496     * Set some actions are in progress for particular coordinator action.
497     *
498     * @param pending set pending to true
499     */
500    public void setPending(int pending) {
501        this.pending = pending;
502    }
503
504    /**
505     * increment pending and return it
506     *
507     * @return pending
508     */
509    public int incrementAndGetPending() {
510        this.pending++;
511        return pending;
512    }
513
514    /**
515     * decrement pending and return it
516     *
517     * @return pending
518     */
519    public int decrementAndGetPending() {
520        this.pending = Math.max(this.pending - 1, 0);
521        return pending;
522    }
523
524    /**
525     * Get some actions are in progress for particular bundle action.
526     *
527     * @return pending
528     */
529    public int getPending() {
530        return this.pending;
531    }
532
533    /**
534     * Return if the action is pending.
535     *
536     * @return if the action is pending.
537     */
538    public boolean isPending() {
539        return pending > 0 ? true : false;
540    }
541
542    @Override
543    public String getId() {
544        return id;
545    }
546
547    public void setId(String id) {
548        this.id = id;
549    }
550
551    public String getType() {
552        return type;
553    }
554
555    public void setType(String type) {
556        this.type = type;
557    }
558
559    public void setActionNumber(int actionNumber) {
560        this.actionNumber = actionNumber;
561    }
562
563    @Override
564    public int getActionNumber() {
565        return actionNumber;
566    }
567
568    @Override
569    public String getCreatedConf() {
570        return createdConf == null ? null : createdConf.getString();
571    }
572
573    public void setCreatedConf(String createdConf) {
574        if (this.createdConf == null) {
575            this.createdConf = new StringBlob(createdConf);
576        }
577        else {
578            this.createdConf.setString(createdConf);
579        }
580    }
581
582    public void setCreatedConfBlob(StringBlob createdConf) {
583        this.createdConf = createdConf;
584    }
585
586    public StringBlob getCreatedConfBlob() {
587        return createdConf;
588    }
589
590    public void setRunConf(String runConf) {
591        if (this.runConf == null) {
592            this.runConf = new StringBlob(runConf);
593        }
594        else {
595            this.runConf.setString(runConf);
596        }
597    }
598
599    @Override
600    public String getRunConf() {
601        return runConf == null ? null : runConf.getString();
602    }
603
604    public void setRunConfBlob(StringBlob runConf) {
605        this.runConf = runConf;
606    }
607
608    public StringBlob getRunConfBlob() {
609        return runConf;
610    }
611
612
613    public void setMissingDependencies(String missingDependencies) {
614        if (this.missingDependencies == null) {
615            this.missingDependencies = new StringBlob(missingDependencies);
616        }
617        else {
618            this.missingDependencies.setString(missingDependencies);
619        }
620    }
621
622    @Override
623    public String getMissingDependencies() {
624        return missingDependencies == null ? null : missingDependencies.getString();
625    }
626
627    public void setMissingDependenciesBlob(StringBlob missingDependencies) {
628        this.missingDependencies = missingDependencies;
629    }
630
631    public StringBlob getMissingDependenciesBlob() {
632        return missingDependencies;
633    }
634
635    @Override
636    public String getPushMissingDependencies() {
637        return pushMissingDependencies == null ? null : pushMissingDependencies.getString();
638    }
639
640    public void setPushMissingDependencies(String pushMissingDependencies) {
641        if (this.pushMissingDependencies == null) {
642            this.pushMissingDependencies = new StringBlob(pushMissingDependencies);
643        }
644        else {
645            this.pushMissingDependencies.setString(pushMissingDependencies);
646        }
647    }
648
649    public void setPushMissingDependenciesBlob(StringBlob pushMissingDependencies) {
650        this.pushMissingDependencies = pushMissingDependencies;
651    }
652
653    public StringBlob getPushMissingDependenciesBlob() {
654        return pushMissingDependencies;
655    }
656
657    public String getExternalStatus() {
658        return externalStatus;
659    }
660
661    public void setExternalStatus(String externalStatus) {
662        this.externalStatus = externalStatus;
663    }
664
665    @Override
666    public String getTrackerUri() {
667        return trackerUri;
668    }
669
670    public void setTrackerUri(String trackerUri) {
671        this.trackerUri = trackerUri;
672    }
673
674    @Override
675    public String getConsoleUrl() {
676        return consoleUrl;
677    }
678
679    public void setConsoleUrl(String consoleUrl) {
680        this.consoleUrl = consoleUrl;
681    }
682
683    @Override
684    public String getErrorCode() {
685        return errorCode;
686    }
687
688    @Override
689    public String getErrorMessage() {
690        return errorMessage;
691    }
692
693    public void setErrorInfo(String errorCode, String errorMessage) {
694        this.errorCode = errorCode;
695        this.errorMessage = errorMessage;
696    }
697
698    public String getActionXml() {
699        return actionXml == null ? null : actionXml.getString();
700    }
701
702    public void setActionXml(String actionXml) {
703        if (this.actionXml == null) {
704            this.actionXml = new StringBlob(actionXml);
705        }
706        else {
707            this.actionXml.setString(actionXml);
708        }
709    }
710
711    public void setActionXmlBlob(StringBlob actionXml) {
712        this.actionXml = actionXml;
713    }
714
715    public StringBlob getActionXmlBlob() {
716        return actionXml;
717    }
718
719    @Override
720    public String toString() {
721        return MessageFormat.format("CoordinatorAction name[{0}] status[{1}]",
722                                    getId(), getStatus());
723    }
724
725    public int getTimeOut() {
726        return timeOut;
727    }
728
729    public void setTimeOut(int timeOut) {
730        this.timeOut = timeOut;
731    }
732
733
734    public void setErrorCode(String errorCode) {
735        this.errorCode = errorCode;
736    }
737
738    public void setErrorMessage(String errorMessage) {
739        this.errorMessage = errorMessage;
740    }
741
742    @SuppressWarnings("unchecked")
743    public JSONObject toJSONObject(String timeZoneId) {
744        JSONObject json = new JSONObject();
745        json.put(JsonTags.COORDINATOR_ACTION_ID, id);
746        json.put(JsonTags.COORDINATOR_JOB_ID, jobId);
747        json.put(JsonTags.COORDINATOR_ACTION_TYPE, type);
748        json.put(JsonTags.COORDINATOR_ACTION_NUMBER, actionNumber);
749        json.put(JsonTags.COORDINATOR_ACTION_CREATED_CONF, getCreatedConf());
750        json.put(JsonTags.COORDINATOR_ACTION_CREATED_TIME, JsonUtils
751                .formatDateRfc822(getCreatedTime(), timeZoneId));
752        json.put(JsonTags.COORDINATOR_ACTION_NOMINAL_TIME, JsonUtils
753                .formatDateRfc822(getNominalTime(), timeZoneId));
754        json.put(JsonTags.COORDINATOR_ACTION_EXTERNALID, externalId);
755        // json.put(JsonTags.COORDINATOR_ACTION_START_TIME, JsonUtils
756        // .formatDateRfc822(startTime), timeZoneId);
757        json.put(JsonTags.COORDINATOR_ACTION_STATUS, statusStr);
758        json.put(JsonTags.COORDINATOR_ACTION_RUNTIME_CONF, getRunConf());
759        json.put(JsonTags.COORDINATOR_ACTION_LAST_MODIFIED_TIME, JsonUtils
760                .formatDateRfc822(getLastModifiedTime(), timeZoneId));
761        // json.put(JsonTags.COORDINATOR_ACTION_START_TIME, JsonUtils
762        // .formatDateRfc822(startTime), timeZoneId);
763        // json.put(JsonTags.COORDINATOR_ACTION_END_TIME, JsonUtils
764        // .formatDateRfc822(endTime), timeZoneId);
765        json.put(JsonTags.COORDINATOR_ACTION_MISSING_DEPS, getMissingDependencies());
766        json.put(JsonTags.COORDINATOR_ACTION_PUSH_MISSING_DEPS, getPushMissingDependencies());
767        json.put(JsonTags.COORDINATOR_ACTION_EXTERNAL_STATUS, externalStatus);
768        json.put(JsonTags.COORDINATOR_ACTION_TRACKER_URI, trackerUri);
769        json.put(JsonTags.COORDINATOR_ACTION_CONSOLE_URL, consoleUrl);
770        json.put(JsonTags.COORDINATOR_ACTION_ERROR_CODE, errorCode);
771        json.put(JsonTags.COORDINATOR_ACTION_ERROR_MESSAGE, errorMessage);
772        json.put(JsonTags.TO_STRING, toString());
773        return json;
774    }
775
776    /**
777     * Convert a nodes list into a JSONArray.
778     *
779     * @param actions nodes list.
780     * @param timeZoneId time zone to use for dates in the JSON array.
781     * @return the corresponding JSON array.
782     */
783    @SuppressWarnings("unchecked")
784    public static JSONArray toJSONArray(List<CoordinatorActionBean> actions, String timeZoneId) {
785        JSONArray array = new JSONArray();
786        for (CoordinatorActionBean action : actions) {
787            array.add(action.toJSONObject(timeZoneId));
788        }
789        return array;
790    }
791
792    @Override
793    public int hashCode() {
794        final int prime = 31;
795        int result = 1;
796        result = prime * result + ((id == null) ? 0 : id.hashCode());
797        return result;
798    }
799
800    @Override
801    public boolean equals(Object obj) {
802        if (this == obj) {
803            return true;
804        }
805        if (obj == null) {
806            return false;
807        }
808        if (getClass() != obj.getClass()) {
809            return false;
810        }
811        CoordinatorActionBean other = (CoordinatorActionBean) obj;
812        if (id == null) {
813            if (other.id != null) {
814                return false;
815            }
816        }
817        else if (!id.equals(other.id)) {
818            return false;
819        }
820        return true;
821    }
822
823
824}