001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.workflow.lite; 020 021import org.apache.hadoop.conf.Configuration; 022import org.apache.hadoop.io.Writable; 023import org.apache.hadoop.util.ReflectionUtils; 024import org.apache.oozie.ErrorCode; 025import org.apache.oozie.client.OozieClient; 026import org.apache.oozie.service.DagXLogInfoService; 027import org.apache.oozie.service.XLogService; 028import org.apache.oozie.util.ParamChecker; 029import org.apache.oozie.util.XConfiguration; 030import org.apache.oozie.util.XLog; 031import org.apache.oozie.workflow.WorkflowApp; 032import org.apache.oozie.workflow.WorkflowException; 033import org.apache.oozie.workflow.WorkflowInstance; 034 035import java.io.ByteArrayInputStream; 036import java.io.ByteArrayOutputStream; 037import java.io.DataInput; 038import java.io.DataOutput; 039import java.io.IOException; 040import java.util.ArrayList; 041import java.util.HashMap; 042import java.util.List; 043import java.util.Map; 044 045//TODO javadoc 046public class LiteWorkflowInstance implements Writable, WorkflowInstance { 047 private static final String TRANSITION_TO = "transition.to"; 048 049 private XLog log = XLog.getLog(getClass()); 050 051 private static String PATH_SEPARATOR = "/"; 052 private static String ROOT = PATH_SEPARATOR; 053 private static String TRANSITION_SEPARATOR = "#"; 054 055 // Using unique string to indicate version. This is to make sure that it 056 // doesn't match with user data. 057 private static final String DATA_VERSION = "V==1"; 058 059 private static class NodeInstance { 060 String nodeName; 061 boolean started = false; 062 063 private NodeInstance(String nodeName) { 064 this.nodeName = nodeName; 065 } 066 } 067 068 private class Context implements NodeHandler.Context { 069 private NodeDef nodeDef; 070 private String executionPath; 071 private String exitState; 072 private Status status = Status.RUNNING; 073 074 private Context(NodeDef nodeDef, String executionPath, String exitState) { 075 this.nodeDef = nodeDef; 076 this.executionPath = executionPath; 077 this.exitState = exitState; 078 } 079 080 public NodeDef getNodeDef() { 081 return nodeDef; 082 } 083 084 public String getExecutionPath() { 085 return executionPath; 086 } 087 088 public String getParentExecutionPath(String executionPath) { 089 return LiteWorkflowInstance.getParentPath(executionPath); 090 } 091 092 public String getSignalValue() { 093 return exitState; 094 } 095 096 public String createExecutionPath(String name) { 097 return LiteWorkflowInstance.createChildPath(executionPath, name); 098 } 099 100 public String createFullTransition(String executionPath, String transition) { 101 return LiteWorkflowInstance.createFullTransition(executionPath, transition); 102 } 103 104 public void deleteExecutionPath() { 105 if (!executionPaths.containsKey(executionPath)) { 106 throw new IllegalStateException(); 107 } 108 executionPaths.remove(executionPath); 109 executionPath = LiteWorkflowInstance.getParentPath(executionPath); 110 } 111 112 public void failJob() { 113 status = Status.FAILED; 114 } 115 116 public void killJob() { 117 status = Status.KILLED; 118 } 119 120 public void completeJob() { 121 status = Status.SUCCEEDED; 122 } 123 124 @Override 125 public Object getTransientVar(String name) { 126 return LiteWorkflowInstance.this.getTransientVar(name); 127 } 128 129 @Override 130 public String getVar(String name) { 131 return LiteWorkflowInstance.this.getVar(name); 132 } 133 134 @Override 135 public void setTransientVar(String name, Object value) { 136 LiteWorkflowInstance.this.setTransientVar(name, value); 137 } 138 139 @Override 140 public void setVar(String name, String value) { 141 LiteWorkflowInstance.this.setVar(name, value); 142 } 143 144 @Override 145 public LiteWorkflowInstance getProcessInstance() { 146 return LiteWorkflowInstance.this; 147 } 148 149 } 150 151 private LiteWorkflowApp def; 152 private Configuration conf; 153 private String instanceId; 154 private Status status; 155 private Map<String, NodeInstance> executionPaths = new HashMap<String, NodeInstance>(); 156 private Map<String, String> persistentVars = new HashMap<String, String>(); 157 private Map<String, Object> transientVars = new HashMap<String, Object>(); 158 159 protected LiteWorkflowInstance() { 160 log = XLog.getLog(getClass()); 161 } 162 163 public LiteWorkflowInstance(LiteWorkflowApp def, Configuration conf, String instanceId) { 164 this(); 165 this.def = ParamChecker.notNull(def, "def"); 166 this.instanceId = ParamChecker.notNull(instanceId, "instanceId"); 167 this.conf = ParamChecker.notNull(conf, "conf"); 168 refreshLog(); 169 status = Status.PREP; 170 } 171 172 public synchronized boolean start() throws WorkflowException { 173 if (status != Status.PREP) { 174 throw new WorkflowException(ErrorCode.E0719); 175 } 176 log.debug(XLog.STD, "Starting job"); 177 status = Status.RUNNING; 178 executionPaths.put(ROOT, new NodeInstance(StartNodeDef.START)); 179 return signal(ROOT, StartNodeDef.START); 180 } 181 182 //todo if suspended store signal and use when resuming 183 184 public synchronized boolean signal(String executionPath, String signalValue) throws WorkflowException { 185 ParamChecker.notEmpty(executionPath, "executionPath"); 186 ParamChecker.notNull(signalValue, "signalValue"); 187 188 if (status != Status.RUNNING) { 189 throw new WorkflowException(ErrorCode.E0716); 190 } 191 192 NodeInstance nodeJob = executionPaths.get(executionPath); 193 log.debug(XLog.STD, "Signaling job execution path [{0}] signal value [{1}] for node [{2}]", executionPath, 194 signalValue, (nodeJob == null ? null : nodeJob.nodeName)); 195 if (nodeJob == null) { 196 status = Status.FAILED; 197 log.error("invalid execution path [{0}]", executionPath); 198 } 199 200 NodeDef nodeDef = null; 201 if (!status.isEndState()) { 202 nodeDef = def.getNode(nodeJob.nodeName); 203 if (nodeDef == null) { 204 status = Status.FAILED; 205 log.error("invalid transition [{0}]", nodeJob.nodeName); 206 } 207 } 208 209 if (!status.isEndState()) { 210 NodeHandler nodeHandler = newInstance(nodeDef.getHandlerClass()); 211 boolean exiting = true; 212 213 Context context = new Context(nodeDef, executionPath, signalValue); 214 if (!nodeJob.started) { 215 try { 216 nodeHandler.loopDetection(context); 217 exiting = nodeHandler.enter(context); 218 nodeJob.started = true; 219 } 220 catch (WorkflowException ex) { 221 status = Status.FAILED; 222 List<String> killedNodes = terminateNodes(Status.KILLED); 223 if (killedNodes.size() > 1) { 224 log.warn(XLog.STD, "Workflow completed [{0}], killing [{1}] running nodes", status, killedNodes 225 .size()); 226 } 227 throw ex; 228 } 229 } 230 231 if (exiting) { 232 List<String> pathsToStart = new ArrayList<String>(); 233 List<String> fullTransitions; 234 try { 235 fullTransitions = nodeHandler.multiExit(context); 236 int last = fullTransitions.size() - 1; 237 // TEST THIS 238 if (last >= 0) { 239 String transitionTo = getTransitionNode(fullTransitions.get(last)); 240 if (nodeDef instanceof ForkNodeDef) { 241 transitionTo = "*"; // WF action cannot hold all transitions for a fork. 242 // transitions are hardcoded in the WF app. 243 } 244 persistentVars.put(nodeDef.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + TRANSITION_TO, 245 transitionTo); 246 } 247 } 248 catch (WorkflowException ex) { 249 status = Status.FAILED; 250 throw ex; 251 } 252 253 if (context.status == Status.KILLED) { 254 status = Status.KILLED; 255 log.debug(XLog.STD, "Completing job, kill node [{0}]", nodeJob.nodeName); 256 } else if (context.status == Status.FAILED) { 257 status = Status.FAILED; 258 log.debug(XLog.STD, "Completing job, fail node [{0}]", nodeJob.nodeName); 259 } else if (context.status == Status.SUCCEEDED) { 260 status = Status.SUCCEEDED; 261 log.debug(XLog.STD, "Completing job, end node [{0}]", nodeJob.nodeName); 262 } else { 263 for (String fullTransition : fullTransitions) { 264 //this is the whole trick for forking, we need the executionpath and the transition. 265 //in case of no forking, last element of executionpath is different from transition. 266 //in case of forking, they are the same 267 268 log.debug(XLog.STD, "Exiting node [{0}] with transition[{1}]", nodeJob.nodeName, 269 fullTransition); 270 271 String execPathFromTransition = getExecutionPath(fullTransition); 272 String transition = getTransitionNode(fullTransition); 273 def.validateTransition(nodeJob.nodeName, transition); 274 275 NodeInstance nodeJobInPath = executionPaths.get(execPathFromTransition); 276 if ((nodeJobInPath == null) || (!transition.equals(nodeJobInPath.nodeName))) { 277 // TODO explain this IF better 278 // If the WfJob is signaled with the parent 279 // execution executionPath again 280 // The Fork node will execute again.. and replace 281 // the Node WorkflowJobBean 282 // so this is required to prevent that.. 283 // Question : Should we throw an error in this case 284 // ?? 285 executionPaths.put(execPathFromTransition, new NodeInstance(transition)); 286 pathsToStart.add(execPathFromTransition); 287 } 288 289 } 290 291 // signal all new synch transitions 292 for (String pathToStart : pathsToStart) { 293 signal(pathToStart, "::synch::"); 294 } 295 } 296 } 297 } 298 299 if (status.isEndState()) { 300 if (status == Status.FAILED) { 301 List<String> failedNodes = terminateNodes(status); 302 log.warn(XLog.STD, "Workflow completed [{0}], failing [{1}] running nodes", status, failedNodes 303 .size()); 304 } 305 else { 306 List<String> killedNodes = terminateNodes(Status.KILLED); 307 308 if (killedNodes.size() > 1) { 309 log.warn(XLog.STD, "Workflow completed [{0}], killing [{1}] running nodes", status, killedNodes 310 .size()); 311 } 312 } 313 } 314 315 return status.isEndState(); 316 } 317 318 /** 319 * Get NodeDef from workflow instance 320 * @param executionPath execution path 321 * @return node def 322 */ 323 public NodeDef getNodeDef(String executionPath) { 324 NodeInstance nodeJob = executionPaths.get(executionPath); 325 NodeDef nodeDef = null; 326 if (nodeJob == null) { 327 log.error("invalid execution path [{0}]", executionPath); 328 } 329 else { 330 nodeDef = def.getNode(nodeJob.nodeName); 331 if (nodeDef == null) { 332 log.error("invalid transition [{0}]", nodeJob.nodeName); 333 } 334 } 335 return nodeDef; 336 } 337 338 public synchronized void fail(String nodeName) throws WorkflowException { 339 if (status.isEndState()) { 340 throw new WorkflowException(ErrorCode.E0718); 341 } 342 String failedNode = failNode(nodeName); 343 if (failedNode != null) { 344 log.warn(XLog.STD, "Workflow Failed. Failing node [{0}]", failedNode); 345 } 346 else { 347 //TODO failed attempting to fail the action. EXCEPTION 348 } 349 List<String> killedNodes = killNodes(); 350 if (killedNodes.size() > 1) { 351 log.warn(XLog.STD, "Workflow Failed, killing [{0}] nodes", killedNodes.size()); 352 } 353 status = Status.FAILED; 354 } 355 356 public synchronized void kill() throws WorkflowException { 357 if (status.isEndState()) { 358 throw new WorkflowException(ErrorCode.E0718); 359 } 360 log.debug(XLog.STD, "Killing job"); 361 List<String> killedNodes = killNodes(); 362 if (killedNodes.size() > 1) { 363 log.warn(XLog.STD, "workflow killed, killing [{0}] nodes", killedNodes.size()); 364 } 365 status = Status.KILLED; 366 } 367 368 public synchronized void suspend() throws WorkflowException { 369 if (status != Status.RUNNING) { 370 throw new WorkflowException(ErrorCode.E0716); 371 } 372 log.debug(XLog.STD, "Suspending job"); 373 this.status = Status.SUSPENDED; 374 } 375 376 public boolean isSuspended() { 377 return (status == Status.SUSPENDED); 378 } 379 380 public synchronized void resume() throws WorkflowException { 381 if (status != Status.SUSPENDED) { 382 throw new WorkflowException(ErrorCode.E0717); 383 } 384 log.debug(XLog.STD, "Resuming job"); 385 status = Status.RUNNING; 386 } 387 388 public void setVar(String name, String value) { 389 if (value != null) { 390 persistentVars.put(name, value); 391 } 392 else { 393 persistentVars.remove(name); 394 } 395 } 396 397 @Override 398 public Map<String, String> getAllVars() { 399 return persistentVars; 400 } 401 402 @Override 403 public void setAllVars(Map<String, String> varMap) { 404 persistentVars.putAll(varMap); 405 } 406 407 public String getVar(String name) { 408 return persistentVars.get(name); 409 } 410 411 412 public void setTransientVar(String name, Object value) { 413 if (value != null) { 414 transientVars.put(name, value); 415 } 416 else { 417 transientVars.remove(name); 418 } 419 } 420 421 public boolean hasTransientVar(String name) { 422 return transientVars.containsKey(name); 423 } 424 425 public Object getTransientVar(String name) { 426 return transientVars.get(name); 427 } 428 429 public boolean hasEnded() { 430 return status.isEndState(); 431 } 432 433 private List<String> terminateNodes(Status endStatus) { 434 List<String> endNodes = new ArrayList<String>(); 435 for (Map.Entry<String, NodeInstance> entry : executionPaths.entrySet()) { 436 if (entry.getValue().started) { 437 NodeDef nodeDef = def.getNode(entry.getValue().nodeName); 438 if (!(nodeDef instanceof ControlNodeDef)) { 439 NodeHandler nodeHandler = newInstance(nodeDef.getHandlerClass()); 440 try { 441 if (endStatus == Status.KILLED) { 442 nodeHandler.kill(new Context(nodeDef, entry.getKey(), null)); 443 } 444 else { 445 if (endStatus == Status.FAILED) { 446 nodeHandler.fail(new Context(nodeDef, entry.getKey(), null)); 447 } 448 } 449 endNodes.add(nodeDef.getName()); 450 } 451 catch (Exception ex) { 452 log.warn(XLog.STD, "Error Changing node state to [{0}] for Node [{1}]", endStatus.toString(), 453 nodeDef.getName(), ex); 454 } 455 } 456 } 457 } 458 return endNodes; 459 } 460 461 private String failNode(String nodeName) { 462 String failedNode = null; 463 for (Map.Entry<String, NodeInstance> entry : executionPaths.entrySet()) { 464 String node = entry.getKey(); 465 NodeInstance nodeInstance = entry.getValue(); 466 if (nodeInstance.started && nodeInstance.nodeName.equals(nodeName)) { 467 NodeDef nodeDef = def.getNode(nodeInstance.nodeName); 468 NodeHandler nodeHandler = newInstance(nodeDef.getHandlerClass()); 469 try { 470 nodeHandler.fail(new Context(nodeDef, node, null)); 471 failedNode = nodeDef.getName(); 472 nodeInstance.started = false; 473 } 474 catch (Exception ex) { 475 log.warn(XLog.STD, "Error failing node [{0}]", nodeDef.getName(), ex); 476 } 477 return failedNode; 478 } 479 } 480 return failedNode; 481 } 482 483 private List<String> killNodes() { 484 List<String> killedNodes = new ArrayList<String>(); 485 for (Map.Entry<String, NodeInstance> entry : executionPaths.entrySet()) { 486 String node = entry.getKey(); 487 NodeInstance nodeInstance = entry.getValue(); 488 if (nodeInstance.started) { 489 NodeDef nodeDef = def.getNode(nodeInstance.nodeName); 490 NodeHandler nodeHandler = newInstance(nodeDef.getHandlerClass()); 491 try { 492 nodeHandler.kill(new Context(nodeDef, node, null)); 493 killedNodes.add(nodeDef.getName()); 494 } 495 catch (Exception ex) { 496 log.warn(XLog.STD, "Error killing node [{0}]", nodeDef.getName(), ex); 497 } 498 } 499 } 500 return killedNodes; 501 } 502 503 public LiteWorkflowApp getProcessDefinition() { 504 return def; 505 } 506 507 private static String createChildPath(String path, String child) { 508 return path + child + PATH_SEPARATOR; 509 } 510 511 private static String getParentPath(String path) { 512 path = path.substring(0, path.length() - 1); 513 return (path.length() == 0) ? null : path.substring(0, path.lastIndexOf(PATH_SEPARATOR) + 1); 514 } 515 516 private static String createFullTransition(String executionPath, String transition) { 517 return executionPath + TRANSITION_SEPARATOR + transition; 518 } 519 520 private static String getExecutionPath(String fullTransition) { 521 int index = fullTransition.indexOf(TRANSITION_SEPARATOR); 522 if (index == -1) { 523 throw new IllegalArgumentException("Invalid fullTransition"); 524 } 525 return fullTransition.substring(0, index); 526 } 527 528 private static String getTransitionNode(String fullTransition) { 529 int index = fullTransition.indexOf(TRANSITION_SEPARATOR); 530 if (index == -1) { 531 throw new IllegalArgumentException("Invalid fullTransition"); 532 } 533 return fullTransition.substring(index + 1); 534 } 535 536 private NodeHandler newInstance(Class<? extends NodeHandler> handler) { 537 return (NodeHandler) ReflectionUtils.newInstance(handler, null); 538 } 539 540 private void refreshLog() { 541 XLog.Info.get().setParameter(XLogService.USER, conf.get(OozieClient.USER_NAME)); 542 XLog.Info.get().setParameter(XLogService.GROUP, conf.get(OozieClient.GROUP_NAME)); 543 XLog.Info.get().setParameter(DagXLogInfoService.APP, def.getName()); 544 XLog.Info.get().setParameter(DagXLogInfoService.TOKEN, conf.get(OozieClient.LOG_TOKEN, "")); 545 XLog.Info.get().setParameter(DagXLogInfoService.JOB, instanceId); 546 log = XLog.getLog(getClass()); 547 } 548 549 public Status getStatus() { 550 return status; 551 } 552 553 public void setStatus(Status status) { 554 this.status = status; 555 } 556 557 @Override 558 public void write(DataOutput dOut) throws IOException { 559 dOut.writeUTF(instanceId); 560 561 //Hadoop Configuration has to get its act right 562 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 563 conf.writeXml(baos); 564 baos.close(); 565 byte[] array = baos.toByteArray(); 566 dOut.writeInt(array.length); 567 dOut.write(array); 568 569 def.write(dOut); 570 dOut.writeUTF(status.toString()); 571 dOut.writeInt(executionPaths.size()); 572 for (Map.Entry<String, NodeInstance> entry : executionPaths.entrySet()) { 573 dOut.writeUTF(entry.getKey()); 574 dOut.writeUTF(entry.getValue().nodeName); 575 dOut.writeBoolean(entry.getValue().started); 576 } 577 dOut.writeInt(persistentVars.size()); 578 for (Map.Entry<String, String> entry : persistentVars.entrySet()) { 579 dOut.writeUTF(entry.getKey()); 580 writeStringAsBytes(entry.getValue(), dOut); 581 } 582 } 583 584 @Override 585 public void readFields(DataInput dIn) throws IOException { 586 instanceId = dIn.readUTF(); 587 588 //Hadoop Configuration has to get its act right 589 int len = dIn.readInt(); 590 byte[] array = new byte[len]; 591 dIn.readFully(array); 592 ByteArrayInputStream bais = new ByteArrayInputStream(array); 593 conf = new XConfiguration(bais); 594 595 def = new LiteWorkflowApp(); 596 def.readFields(dIn); 597 status = Status.valueOf(dIn.readUTF()); 598 int numExPaths = dIn.readInt(); 599 for (int x = 0; x < numExPaths; x++) { 600 String path = dIn.readUTF(); 601 String nodeName = dIn.readUTF(); 602 boolean isStarted = dIn.readBoolean(); 603 NodeInstance nodeInstance = new NodeInstance(nodeName); 604 nodeInstance.started = isStarted; 605 executionPaths.put(path, nodeInstance); 606 } 607 int numVars = dIn.readInt(); 608 for (int x = 0; x < numVars; x++) { 609 String vName = dIn.readUTF(); 610 String vVal = readBytesAsString(dIn); 611 persistentVars.put(vName, vVal); 612 } 613 refreshLog(); 614 } 615 616 private void writeStringAsBytes(String value, DataOutput dOut) throws IOException { 617 if (value == null) { 618 dOut.writeUTF(null); 619 return; 620 } 621 dOut.writeUTF(DATA_VERSION); 622 byte[] data = value.getBytes("UTF-8"); 623 dOut.writeInt(data.length); 624 dOut.write(data); 625 } 626 627 private String readBytesAsString(DataInput dIn) throws IOException { 628 String value = dIn.readUTF(); 629 if (value != null && value.equals(DATA_VERSION)) { 630 int length = dIn.readInt(); 631 byte[] data = new byte[length]; 632 dIn.readFully(data); 633 value = new String(data, "UTF-8"); 634 } 635 return value; 636 } 637 638 @Override 639 public Configuration getConf() { 640 return conf; 641 } 642 643 @Override 644 public WorkflowApp getApp() { 645 return def; 646 } 647 648 @Override 649 public String getId() { 650 return instanceId; 651 } 652 653 @Override 654 public String getTransition(String node) { 655 return persistentVars.get(node + WorkflowInstance.NODE_VAR_SEPARATOR + TRANSITION_TO); 656 } 657 658 @Override 659 public boolean equals(Object o) { 660 return (o != null) && (getClass().isInstance(o)) && ((WorkflowInstance) o).getId().equals(instanceId); 661 } 662 663 @Override 664 public int hashCode() { 665 return instanceId.hashCode(); 666 } 667}