001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.oozie.executor.jpa; 019 020import java.sql.Timestamp; 021import java.util.ArrayList; 022import java.util.Date; 023import java.util.List; 024 025import javax.persistence.EntityManager; 026import javax.persistence.Query; 027 028import org.apache.oozie.BundleActionBean; 029import org.apache.oozie.ErrorCode; 030import org.apache.oozie.service.JPAService; 031import org.apache.oozie.service.Services; 032 033/** 034 * Query Executor that provides API to run query for Bundle Action 035 */ 036public class BundleActionQueryExecutor extends 037 QueryExecutor<BundleActionBean, BundleActionQueryExecutor.BundleActionQuery> { 038 039 public enum BundleActionQuery { 040 UPDATE_BUNDLE_ACTION_PENDING_MODTIME, 041 UPDATE_BUNDLE_ACTION_STATUS_PENDING_MODTIME, 042 UPDATE_BUNDLE_ACTION_STATUS_PENDING_MODTIME_COORDID, 043 GET_BUNDLE_ACTION, 044 GET_BUNDLE_ACTIONS_STATUS_UNIGNORED_FOR_BUNDLE, 045 GET_BUNDLE_ACTIONS_BY_LAST_MODIFIED_TIME, 046 GET_BUNDLE_WAITING_ACTIONS_OLDER_THAN, 047 GET_BUNDLE_UNIGNORED_ACTION_STATUS_PENDING_FOR_BUNDLE 048 }; 049 050 private static BundleActionQueryExecutor instance = new BundleActionQueryExecutor(); 051 052 private BundleActionQueryExecutor() { 053 } 054 055 public static QueryExecutor<BundleActionBean, BundleActionQueryExecutor.BundleActionQuery> getInstance() { 056 return BundleActionQueryExecutor.instance; 057 } 058 059 @Override 060 public Query getUpdateQuery(BundleActionQuery namedQuery, BundleActionBean baBean, EntityManager em) 061 throws JPAExecutorException { 062 063 Query query = em.createNamedQuery(namedQuery.name()); 064 switch (namedQuery) { 065 case UPDATE_BUNDLE_ACTION_PENDING_MODTIME: 066 query.setParameter("lastModifiedTime", baBean.getLastModifiedTimestamp()); 067 query.setParameter("pending", baBean.getPending()); 068 query.setParameter("bundleActionId", baBean.getBundleActionId()); 069 break; 070 case UPDATE_BUNDLE_ACTION_STATUS_PENDING_MODTIME: 071 query.setParameter("status", baBean.getStatusStr()); 072 query.setParameter("lastModifiedTime", baBean.getLastModifiedTimestamp()); 073 query.setParameter("pending", baBean.getPending()); 074 query.setParameter("bundleActionId", baBean.getBundleActionId()); 075 break; 076 case UPDATE_BUNDLE_ACTION_STATUS_PENDING_MODTIME_COORDID: 077 query.setParameter("status", baBean.getStatusStr()); 078 query.setParameter("lastModifiedTime", baBean.getLastModifiedTimestamp()); 079 query.setParameter("pending", baBean.getPending()); 080 query.setParameter("coordId", baBean.getCoordId()); 081 query.setParameter("bundleActionId", baBean.getBundleActionId()); 082 break; 083 default: 084 throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot set parameters for " 085 + namedQuery.name()); 086 } 087 return query; 088 } 089 090 @Override 091 public Query getSelectQuery(BundleActionQuery namedQuery, EntityManager em, Object... parameters) 092 throws JPAExecutorException { 093 Query query = em.createNamedQuery(namedQuery.name()); 094 switch (namedQuery) { 095 case GET_BUNDLE_ACTION: 096 query.setParameter("bundleActionId", parameters[0]); 097 break; 098 case GET_BUNDLE_ACTIONS_STATUS_UNIGNORED_FOR_BUNDLE: 099 query.setParameter("bundleId", parameters[0]); 100 break; 101 case GET_BUNDLE_ACTIONS_BY_LAST_MODIFIED_TIME: 102 query.setParameter("lastModifiedTime", new Timestamp(((Date)parameters[0]).getTime())); 103 break; 104 case GET_BUNDLE_WAITING_ACTIONS_OLDER_THAN: 105 Timestamp ts = new Timestamp(System.currentTimeMillis() - (Long)parameters[0] * 1000); 106 query.setParameter("lastModifiedTime", ts); 107 break; 108 case GET_BUNDLE_UNIGNORED_ACTION_STATUS_PENDING_FOR_BUNDLE: 109 query.setParameter("bundleId", parameters[0]); 110 break; 111 default: 112 throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot set parameters for " 113 + namedQuery.name()); 114 } 115 return query; 116 } 117 118 @Override 119 public int executeUpdate(BundleActionQuery namedQuery, BundleActionBean jobBean) throws JPAExecutorException { 120 JPAService jpaService = Services.get().get(JPAService.class); 121 EntityManager em = jpaService.getEntityManager(); 122 Query query = getUpdateQuery(namedQuery, jobBean, em); 123 int ret = jpaService.executeUpdate(namedQuery.name(), query, em); 124 return ret; 125 } 126 127 @Override 128 public BundleActionBean get(BundleActionQuery namedQuery, Object... parameters) throws JPAExecutorException { 129 JPAService jpaService = Services.get().get(JPAService.class); 130 EntityManager em = jpaService.getEntityManager(); 131 Query query = getSelectQuery(namedQuery, em, parameters); 132 Object ret = jpaService.executeGet(namedQuery.name(), query, em); 133 if (ret == null) { 134 throw new JPAExecutorException(ErrorCode.E0604, query.toString()); 135 } 136 BundleActionBean bean = constructBean(namedQuery, ret); 137 return bean; 138 } 139 140 private BundleActionBean constructBean(BundleActionQuery namedQuery, Object ret) throws JPAExecutorException { 141 BundleActionBean bean; 142 Object[] arr; 143 switch (namedQuery) { 144 case GET_BUNDLE_ACTION: 145 case GET_BUNDLE_ACTIONS_STATUS_UNIGNORED_FOR_BUNDLE: 146 bean = (BundleActionBean) ret; 147 break; 148 case GET_BUNDLE_ACTIONS_BY_LAST_MODIFIED_TIME: 149 bean = new BundleActionBean(); 150 bean.setBundleId((String) ret); 151 break; 152 case GET_BUNDLE_WAITING_ACTIONS_OLDER_THAN: 153 bean = new BundleActionBean(); 154 arr = (Object[]) ret; 155 bean.setBundleActionId((String) arr[0]); 156 bean.setBundleId((String) arr[1]); 157 bean.setStatusStr((String) arr[2]); 158 bean.setCoordId((String) arr[3]); 159 bean.setCoordName((String) arr[4]); 160 break; 161 case GET_BUNDLE_UNIGNORED_ACTION_STATUS_PENDING_FOR_BUNDLE: 162 bean = new BundleActionBean(); 163 arr = (Object[]) ret; 164 bean.setCoordId((String) arr[0]); 165 bean.setStatusStr((String) arr[1]); 166 bean.setPending((Integer) arr[2]); 167 break; 168 default: 169 throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot construct action bean for " 170 + namedQuery.name()); 171 } 172 return bean; 173 } 174 175 @Override 176 public List<BundleActionBean> getList(BundleActionQuery namedQuery, Object... parameters) 177 throws JPAExecutorException { 178 JPAService jpaService = Services.get().get(JPAService.class); 179 EntityManager em = jpaService.getEntityManager(); 180 Query query = getSelectQuery(namedQuery, em, parameters); 181 List<?> retList = (List<?>) jpaService.executeGetList(namedQuery.name(), query, em); 182 List<BundleActionBean> beanList = new ArrayList<BundleActionBean>(); 183 if (retList != null) { 184 for (Object ret : retList) { 185 beanList.add(constructBean(namedQuery, ret)); 186 } 187 } 188 return beanList; 189 } 190 191 @Override 192 public Object getSingleValue(BundleActionQuery namedQuery, Object... parameters) throws JPAExecutorException { 193 throw new UnsupportedOperationException(); 194 } 195 196}