db = $db; if ( is_int( $executionId ) ) { $this->loadExecution( $executionId ); } $this->properties['definitionStorage'] = new ezcWorkflowDatabaseDefinitionStorage( $db ); } /** * Start workflow execution. * * @param int $parentId * @throws ezcDbException */ protected function doStart( $parentId ) { $this->db->beginTransaction(); $query = $this->db->createInsertQuery(); $query->insertInto( 'execution' ) ->set( 'workflow_id', $query->bindValue( (int)$this->workflow->id ) ) ->set( 'execution_parent', $query->bindValue( (int)$parentId ) ) ->set( 'execution_started', $query->bindValue( time() ) ) ->set( 'execution_variables', $query->bindValue( ezcWorkflowDatabaseUtil::serialize( $this->variables ) ) ) ->set( 'execution_waiting_for', $query->bindValue( ezcWorkflowDatabaseUtil::serialize( $this->waitingFor ) ) ) ->set( 'execution_threads', $query->bindValue( ezcWorkflowDatabaseUtil::serialize( $this->threads ) ) ) ->set( 'execution_next_thread_id', $query->bindValue( (int)$this->nextThreadId ) ); $statement = $query->prepare(); $statement->execute(); $this->id = (int)$this->db->lastInsertId( 'execution_execution_id_seq' ); } /** * Suspend workflow execution. * * @throws ezcDbException */ protected function doSuspend() { $query = $this->db->createUpdateQuery(); $query->update( 'execution' ) ->where( $query->expr->eq( 'execution_id', $query->bindValue( (int)$this->id ) ) ) ->set( 'execution_variables', $query->bindValue( ezcWorkflowDatabaseUtil::serialize( $this->variables ) ) ) ->set( 'execution_waiting_for', $query->bindValue( ezcWorkflowDatabaseUtil::serialize( $this->waitingFor ) ) ) ->set( 'execution_threads', $query->bindValue( ezcWorkflowDatabaseUtil::serialize( $this->threads ) ) ) ->set( 'execution_next_thread_id', $query->bindValue( (int)$this->nextThreadId ) ); $statement = $query->prepare(); $statement->execute(); foreach ( $this->activatedNodes as $node ) { $query = $this->db->createInsertQuery(); $query->insertInto( 'execution_state' ) ->set( 'execution_id', $query->bindValue( (int)$this->id ) ) ->set( 'node_id', $query->bindValue( (int)$node->getId() ) ) ->set( 'node_state', $query->bindValue( ezcWorkflowDatabaseUtil::serialize( $node->getState() ) ) ) ->set( 'node_activated_from', $query->bindValue( ezcWorkflowDatabaseUtil::serialize( $node->getActivatedFrom() ) ) ) ->set( 'node_thread_id', $query->bindValue( (int)$node->getThreadId() ) ); $statement = $query->prepare(); $statement->execute(); } $this->db->commit(); } /** * Resume workflow execution. * * @throws ezcDbException */ protected function doResume() { $this->db->beginTransaction(); $this->cleanupTable( 'execution_state' ); } /** * End workflow execution. * * @throws ezcDbException */ protected function doEnd() { $this->cleanupTable( 'execution' ); $this->cleanupTable( 'execution_state' ); $this->db->commit(); } /** * Returns a new execution object for a sub workflow. * * @param int $id * @return ezcWorkflowExecution */ protected function doGetSubExecution( $id = null ) { return new ezcWorkflowDatabaseExecution( $this->db, $id ); } /** * Cleanup execution / execution_state tables. * * @param string $tableName * @throws ezcDbException */ protected function cleanupTable( $tableName ) { $query = $this->db->createDeleteQuery(); $query->deleteFrom( $tableName ) ->where( $query->expr->eq( 'execution_id', $query->bindValue( (int)$this->id ) ) ); $statement = $query->prepare(); $statement->execute(); } /** * Load execution state. * * @param int $executionId ID of the execution to load. * @throws ezcWorkflowExecutionException */ protected function loadExecution( $executionId ) { $query = $this->db->createSelectQuery(); $query->select( 'workflow_id, execution_variables, execution_threads, execution_next_thread_id, execution_waiting_for' ) ->from( 'execution' ) ->where( $query->expr->eq( 'execution_id', $query->bindValue( (int)$executionId ) ) ); $stmt = $query->prepare(); $stmt->execute(); $result = $stmt->fetchAll( PDO::FETCH_ASSOC ); if ( $result === false || empty( $result ) ) { throw new ezcWorkflowExecutionException( 'Could not load execution state.' ); } $this->id = $executionId; $this->nextThreadId = $result[0]['execution_next_thread_id']; $this->threads = ezcWorkflowDatabaseUtil::unserialize( $result[0]['execution_threads'] ); $this->variables = ezcWorkflowDatabaseUtil::unserialize( $result[0]['execution_variables'] ); $this->waitingFor = ezcWorkflowDatabaseUtil::unserialize( $result[0]['execution_waiting_for'] ); $definition = new ezcWorkflowDatabaseDefinitionStorage( $this->db ); $workflowId = $result[0]['workflow_id']; $this->workflow = $definition->loadById( $workflowId ); $query = $this->db->createSelectQuery(); $query->select( 'node_id, node_state, node_activated_from, node_thread_id' ) ->from( 'execution_state' ) ->where( $query->expr->eq( 'execution_id', $query->bindValue( (int)$executionId ) ) ); $stmt = $query->prepare(); $stmt->execute(); $result = $stmt->fetchAll( PDO::FETCH_ASSOC ); $activatedNodes = array(); foreach ( $result as $row ) { $activatedNodes[$row['node_id']] = array( 'state' => $row['node_state'], 'activated_from' => $row['node_activated_from'], 'thread_id' => $row['node_thread_id'] ); } foreach ( $this->workflow->nodes as $node ) { $nodeId = $node->getId(); if ( isset( $activatedNodes[$nodeId] ) ) { $node->setActivationState( ezcWorkflowNode::WAITING_FOR_EXECUTION ); $node->setThreadId( $activatedNodes[$nodeId]['thread_id'] ); $node->setState( ezcWorkflowDatabaseUtil::unserialize( $activatedNodes[$nodeId]['state'], null ) ); $node->setActivatedFrom( ezcWorkflowDatabaseUtil::unserialize( $activatedNodes[$nodeId]['activated_from'] ) ); $this->activate( $node, false ); } } $this->loaded = true; } } ?>