001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie;
019    
020    import java.io.IOException;
021    import java.io.Writer;
022    import java.util.ArrayList;
023    import java.util.Date;
024    import java.util.HashMap;
025    import java.util.HashSet;
026    import java.util.Iterator;
027    import java.util.List;
028    import java.util.Map;
029    import java.util.Set;
030    import java.util.StringTokenizer;
031    import org.apache.hadoop.conf.Configuration;
032    import org.apache.oozie.client.CoordinatorJob;
033    import org.apache.oozie.client.OozieClient;
034    import org.apache.oozie.client.WorkflowJob;
035    import org.apache.oozie.client.rest.RestConstants;
036    import org.apache.oozie.command.CommandException;
037    import org.apache.oozie.command.coord.CoordActionInfoCommand;
038    import org.apache.oozie.command.coord.CoordActionInfoXCommand;
039    import org.apache.oozie.util.CoordActionsInDateRange;
040    import org.apache.oozie.command.coord.CoordChangeCommand;
041    import org.apache.oozie.command.coord.CoordChangeXCommand;
042    import org.apache.oozie.command.coord.CoordJobCommand;
043    import org.apache.oozie.command.coord.CoordJobXCommand;
044    import org.apache.oozie.command.coord.CoordJobsCommand;
045    import org.apache.oozie.command.coord.CoordJobsXCommand;
046    import org.apache.oozie.command.coord.CoordKillCommand;
047    import org.apache.oozie.command.coord.CoordKillXCommand;
048    import org.apache.oozie.command.coord.CoordRerunCommand;
049    import org.apache.oozie.command.coord.CoordRerunXCommand;
050    import org.apache.oozie.command.coord.CoordResumeCommand;
051    import org.apache.oozie.command.coord.CoordResumeXCommand;
052    import org.apache.oozie.command.coord.CoordSubmitCommand;
053    import org.apache.oozie.command.coord.CoordSubmitXCommand;
054    import org.apache.oozie.command.coord.CoordSuspendCommand;
055    import org.apache.oozie.command.coord.CoordSuspendXCommand;
056    import org.apache.oozie.service.DagXLogInfoService;
057    import org.apache.oozie.service.Services;
058    import org.apache.oozie.service.XLogService;
059    import org.apache.oozie.util.ParamChecker;
060    import org.apache.oozie.util.XLog;
061    import org.apache.oozie.util.XLogStreamer;
062    
063    public class CoordinatorEngine extends BaseEngine {
064        private static boolean useXCommand = true;
065        private static XLog LOG = XLog.getLog(CoordinatorEngine.class);
066    
067        /**
068         * Create a system Coordinator engine, with no user and no group.
069         */
070        public CoordinatorEngine() {
071            if (Services.get().getConf().getBoolean(USE_XCOMMAND, true) == false) {
072                useXCommand = false;
073                LOG.debug("Oozie CoordinatorEngine is not using XCommands.");
074            }
075            else {
076                LOG.debug("Oozie CoordinatorEngine is using XCommands.");
077            }
078        }
079    
080        /**
081         * Create a Coordinator engine to perform operations on behave of a user.
082         *
083         * @param user user name.
084         * @param authToken the authentication token.
085         */
086        public CoordinatorEngine(String user, String authToken) {
087            this();
088            this.user = ParamChecker.notEmpty(user, "user");
089            this.authToken = ParamChecker.notEmpty(authToken, "authToken");
090        }
091    
092        /*
093         * (non-Javadoc)
094         *
095         * @see org.apache.oozie.BaseEngine#getDefinition(java.lang.String)
096         */
097        @Override
098        public String getDefinition(String jobId) throws BaseEngineException {
099            CoordinatorJobBean job = getCoordJobWithNoActionInfo(jobId);
100            return job.getOrigJobXml();
101        }
102    
103        /**
104         * @param jobId
105         * @return CoordinatorJobBean
106         * @throws BaseEngineException
107         */
108        private CoordinatorJobBean getCoordJobWithNoActionInfo(String jobId) throws BaseEngineException {
109            try {
110                if (useXCommand) {
111                    return new CoordJobXCommand(jobId).call();
112                }
113                else {
114                    return new CoordJobCommand(jobId).call();
115                }
116            }
117            catch (CommandException ex) {
118                throw new BaseEngineException(ex);
119            }
120        }
121    
122        /**
123         * @param actionId
124         * @return CoordinatorActionBean
125         * @throws BaseEngineException
126         */
127        public CoordinatorActionBean getCoordAction(String actionId) throws BaseEngineException {
128            try {
129                if (useXCommand) {
130                    return new CoordActionInfoXCommand(actionId).call();
131                }
132                else {
133                    return new CoordActionInfoCommand(actionId).call();
134                }
135            }
136            catch (CommandException ex) {
137                throw new BaseEngineException(ex);
138            }
139        }
140    
141        /*
142         * (non-Javadoc)
143         *
144         * @see org.apache.oozie.BaseEngine#getCoordJob(java.lang.String)
145         */
146        @Override
147        public CoordinatorJobBean getCoordJob(String jobId) throws BaseEngineException {
148            try {
149                if (useXCommand) {
150                    return new CoordJobXCommand(jobId).call();
151                }
152                else {
153                    return new CoordJobCommand(jobId).call();
154                }
155            }
156            catch (CommandException ex) {
157                throw new BaseEngineException(ex);
158            }
159        }
160    
161        /*
162         * (non-Javadoc)
163         *
164         * @see org.apache.oozie.BaseEngine#getCoordJob(java.lang.String, int, int)
165         */
166        @Override
167        public CoordinatorJobBean getCoordJob(String jobId, int start, int length) throws BaseEngineException {
168            try {
169                if (useXCommand) {
170                    return new CoordJobXCommand(jobId, start, length).call();
171                }
172                else {
173                    return new CoordJobCommand(jobId, start, length).call();
174                }
175            }
176            catch (CommandException ex) {
177                throw new BaseEngineException(ex);
178            }
179        }
180    
181        /*
182         * (non-Javadoc)
183         *
184         * @see org.apache.oozie.BaseEngine#getJobIdForExternalId(java.lang.String)
185         */
186        @Override
187        public String getJobIdForExternalId(String externalId) throws CoordinatorEngineException {
188            return null;
189        }
190    
191        /*
192         * (non-Javadoc)
193         *
194         * @see org.apache.oozie.BaseEngine#kill(java.lang.String)
195         */
196        @Override
197        public void kill(String jobId) throws CoordinatorEngineException {
198            try {
199                if (useXCommand) {
200                    new CoordKillXCommand(jobId).call();
201                }
202                else {
203                    new CoordKillCommand(jobId).call();
204                }
205                LOG.info("User " + user + " killed the Coordinator job " + jobId);
206            }
207            catch (CommandException e) {
208                throw new CoordinatorEngineException(e);
209            }
210        }
211    
212        /* (non-Javadoc)
213         * @see org.apache.oozie.BaseEngine#change(java.lang.String, java.lang.String)
214         */
215        @Override
216        public void change(String jobId, String changeValue) throws CoordinatorEngineException {
217            try {
218                if (useXCommand) {
219                    new CoordChangeXCommand(jobId, changeValue).call();
220                }
221                else {
222                    new CoordChangeCommand(jobId, changeValue).call();
223                }
224                LOG.info("User " + user + " changed the Coordinator job " + jobId + " to " + changeValue);
225            }
226            catch (CommandException e) {
227                throw new CoordinatorEngineException(e);
228            }
229        }
230    
231        @Override
232        @Deprecated
233        public void reRun(String jobId, Configuration conf) throws BaseEngineException {
234            throw new BaseEngineException(new XException(ErrorCode.E0301));
235        }
236    
237        /**
238         * Rerun coordinator actions for given rerunType
239         *
240         * @param jobId
241         * @param rerunType
242         * @param scope
243         * @param refresh
244         * @param noCleanup
245         * @throws BaseEngineException
246         */
247        public CoordinatorActionInfo reRun(String jobId, String rerunType, String scope, boolean refresh, boolean noCleanup)
248                throws BaseEngineException {
249            try {
250                if (useXCommand) {
251                    return new CoordRerunXCommand(jobId, rerunType, scope, refresh, noCleanup).call();
252                }
253                else {
254                    return new CoordRerunCommand(jobId, rerunType, scope, refresh, noCleanup).call();
255                }
256            }
257            catch (CommandException ex) {
258                throw new BaseEngineException(ex);
259            }
260        }
261    
262        /*
263         * (non-Javadoc)
264         *
265         * @see org.apache.oozie.BaseEngine#resume(java.lang.String)
266         */
267        @Override
268        public void resume(String jobId) throws CoordinatorEngineException {
269            try {
270                if (useXCommand) {
271                    new CoordResumeXCommand(jobId).call();
272                }
273                else {
274                    new CoordResumeCommand(jobId).call();
275                }
276            }
277            catch (CommandException e) {
278                throw new CoordinatorEngineException(e);
279            }
280        }
281    
282        @Override
283        @Deprecated
284        public void start(String jobId) throws BaseEngineException {
285            throw new BaseEngineException(new XException(ErrorCode.E0301));
286        }
287    
288        /*
289         * (non-Javadoc)
290         *
291         * @see org.apache.oozie.BaseEngine#streamLog(java.lang.String,
292         * java.io.Writer)
293         */
294        @Override
295        public void streamLog(String jobId, Writer writer) throws IOException, BaseEngineException {
296            XLogStreamer.Filter filter = new XLogStreamer.Filter();
297            filter.setParameter(DagXLogInfoService.JOB, jobId);
298    
299            CoordinatorJobBean job = getCoordJobWithNoActionInfo(jobId);
300            Services.get().get(XLogService.class).streamLog(filter, job.getCreatedTime(), new Date(), writer);
301        }
302    
303        /**
304         * Add list of actions to the filter based on conditions
305         *
306         * @param jobId Job Id
307         * @param logRetrievalScope Value for the retrieval type
308         * @param logRetrievalType Based on which filter criteria the log is retrieved
309         * @param writer writer to stream the log to
310         * @throws IOException
311         * @throws BaseEngineException
312         * @throws CommandException
313         */
314        public void streamLog(String jobId, String logRetrievalScope, String logRetrievalType, Writer writer)
315                throws IOException, BaseEngineException, CommandException {
316            XLogStreamer.Filter filter = new XLogStreamer.Filter();
317            filter.setParameter(DagXLogInfoService.JOB, jobId);
318            if (logRetrievalScope != null && logRetrievalType != null) {
319                // if coordinator action logs are to be retrieved based on action id range
320                if (logRetrievalType.equals(RestConstants.JOB_LOG_ACTION)) {
321                    Set<String> actions = new HashSet<String>();
322                    String[] list = logRetrievalScope.split(",");
323                    for (String s : list) {
324                        s = s.trim();
325                        if (s.contains("-")) {
326                            String[] range = s.split("-");
327                            if (range.length != 2) {
328                                throw new CommandException(ErrorCode.E0302, "format is wrong for action's range '" + s
329                                        + "'");
330                            }
331                            int start;
332                            int end;
333                            try {
334                                start = Integer.parseInt(range[0].trim());
335                                end = Integer.parseInt(range[1].trim());
336                                if (start > end) {
337                                    throw new CommandException(ErrorCode.E0302, "format is wrong for action's range '" + s
338                                            + "'");
339                                }
340                            }
341                            catch (NumberFormatException ne) {
342                                throw new CommandException(ErrorCode.E0302, ne);
343                            }
344                            for (int i = start; i <= end; i++) {
345                                actions.add(jobId + "@" + i);
346                            }
347                        }
348                        else {
349                            try {
350                                Integer.parseInt(s);
351                            }
352                            catch (NumberFormatException ne) {
353                                throw new CommandException(ErrorCode.E0302, "format is wrong for action id'" + s
354                                        + "'. Integer only.");
355                            }
356                            actions.add(jobId + "@" + s);
357                        }
358                    }
359    
360                    Iterator<String> actionsIterator = actions.iterator();
361                    StringBuilder orSeparatedActions = new StringBuilder("");
362                    boolean orRequired = false;
363                    while (actionsIterator.hasNext()) {
364                        if (orRequired) {
365                            orSeparatedActions.append("|");
366                        }
367                        orSeparatedActions.append(actionsIterator.next().toString());
368                        orRequired = true;
369                    }
370                    if (actions.size() > 1 && orRequired) {
371                        orSeparatedActions.insert(0, "(");
372                        orSeparatedActions.append(")");
373                    }
374                    filter.setParameter(DagXLogInfoService.ACTION, orSeparatedActions.toString());
375                }
376                // if coordinator action logs are to be retrieved based on date range
377                // this block gets the corresponding list of coordinator actions to be used by the log filter
378                if (logRetrievalType.equalsIgnoreCase(RestConstants.JOB_LOG_DATE)) {
379                    List<CoordinatorActionBean> actionsList = null;
380                    try {
381                        actionsList = CoordActionsInDateRange.getCoordActionsFromDates(jobId, logRetrievalScope);
382                    }
383                    catch (XException xe) {
384                        throw new CommandException(ErrorCode.E0302, "Error in date range for coordinator actions", xe);
385                    }
386                    StringBuilder orSeparatedActions = new StringBuilder("");
387                    boolean orRequired = false;
388                    for (CoordinatorActionBean coordAction : actionsList) {
389                        if (orRequired) {
390                            orSeparatedActions.append("|");
391                        }
392                        orSeparatedActions.append(coordAction.getId());
393                        orRequired = true;
394                    }
395                    if (actionsList.size() > 1 && orRequired) {
396                        orSeparatedActions.insert(0, "(");
397                        orSeparatedActions.append(")");
398                    }
399                    filter.setParameter(DagXLogInfoService.ACTION, orSeparatedActions.toString());
400                }
401            }
402            CoordinatorJobBean job = getCoordJobWithNoActionInfo(jobId);
403            Services.get().get(XLogService.class).streamLog(filter, job.getCreatedTime(), new Date(), writer);
404        }
405    
406        /*
407         * (non-Javadoc)
408         *
409         * @see
410         * org.apache.oozie.BaseEngine#submitJob(org.apache.hadoop.conf.Configuration
411         * , boolean)
412         */
413        @Override
414        public String submitJob(Configuration conf, boolean startJob) throws CoordinatorEngineException {
415            try {
416                String jobId;
417                if (useXCommand) {
418                    CoordSubmitXCommand submit = new CoordSubmitXCommand(conf, getAuthToken());
419                    jobId = submit.call();
420                }
421                else {
422                    CoordSubmitCommand submit = new CoordSubmitCommand(conf, getAuthToken());
423                    jobId = submit.call();
424                }
425                return jobId;
426            }
427            catch (CommandException ex) {
428                throw new CoordinatorEngineException(ex);
429            }
430        }
431    
432        /*
433         * (non-Javadoc)
434         *
435         * @see
436         * org.apache.oozie.BaseEngine#dryrunSubmit(org.apache.hadoop.conf.Configuration
437         * , boolean)
438         */
439        @Override
440        public String dryrunSubmit(Configuration conf, boolean startJob) throws CoordinatorEngineException {
441            try {
442                String jobId;
443                if (useXCommand) {
444                    CoordSubmitXCommand submit = new CoordSubmitXCommand(true, conf, getAuthToken());
445                    jobId = submit.call();
446                }
447                else {
448                    CoordSubmitCommand submit = new CoordSubmitCommand(true, conf, getAuthToken());
449                    jobId = submit.call();
450                }
451                return jobId;
452            }
453            catch (CommandException ex) {
454                throw new CoordinatorEngineException(ex);
455            }
456        }
457    
458        /*
459         * (non-Javadoc)
460         *
461         * @see org.apache.oozie.BaseEngine#suspend(java.lang.String)
462         */
463        @Override
464        public void suspend(String jobId) throws CoordinatorEngineException {
465            try {
466                if (useXCommand) {
467                    new CoordSuspendXCommand(jobId).call();
468                }
469                else {
470                    new CoordSuspendCommand(jobId).call();
471                }
472            }
473            catch (CommandException e) {
474                throw new CoordinatorEngineException(e);
475            }
476    
477        }
478    
479        /*
480         * (non-Javadoc)
481         *
482         * @see org.apache.oozie.BaseEngine#getJob(java.lang.String)
483         */
484        @Override
485        public WorkflowJob getJob(String jobId) throws BaseEngineException {
486            throw new BaseEngineException(new XException(ErrorCode.E0301));
487        }
488    
489        /*
490         * (non-Javadoc)
491         *
492         * @see org.apache.oozie.BaseEngine#getJob(java.lang.String, int, int)
493         */
494        @Override
495        public WorkflowJob getJob(String jobId, int start, int length) throws BaseEngineException {
496            throw new BaseEngineException(new XException(ErrorCode.E0301));
497        }
498    
499        private static final Set<String> FILTER_NAMES = new HashSet<String>();
500    
501        static {
502            FILTER_NAMES.add(OozieClient.FILTER_USER);
503            FILTER_NAMES.add(OozieClient.FILTER_NAME);
504            FILTER_NAMES.add(OozieClient.FILTER_GROUP);
505            FILTER_NAMES.add(OozieClient.FILTER_STATUS);
506            FILTER_NAMES.add(OozieClient.FILTER_ID);
507            FILTER_NAMES.add(OozieClient.FILTER_FREQUENCY);
508        }
509    
510        /**
511         * @param filterStr
512         * @param start
513         * @param len
514         * @return CoordinatorJobInfo
515         * @throws CoordinatorEngineException
516         */
517        public CoordinatorJobInfo getCoordJobs(String filterStr, int start, int len) throws CoordinatorEngineException {
518            Map<String, List<String>> filter = parseFilter(filterStr);
519    
520            try {
521                if (useXCommand) {
522                    return new CoordJobsXCommand(filter, start, len).call();
523                }
524                else {
525                    return new CoordJobsCommand(filter, start, len).call();
526                }
527            }
528            catch (CommandException ex) {
529                throw new CoordinatorEngineException(ex);
530            }
531        }
532    
533        /**
534         * @param filter
535         * @return Map<String, List<String>>
536         * @throws CoordinatorEngineException
537         */
538        private Map<String, List<String>> parseFilter(String filter) throws CoordinatorEngineException {
539            Map<String, List<String>> map = new HashMap<String, List<String>>();
540            if (filter != null) {
541                StringTokenizer st = new StringTokenizer(filter, ";");
542                while (st.hasMoreTokens()) {
543                    String token = st.nextToken();
544                    if (token.contains("=")) {
545                        String[] pair = token.split("=");
546                        if (pair.length != 2) {
547                            throw new CoordinatorEngineException(ErrorCode.E0420, filter,
548                                    "elements must be name=value pairs");
549                        }
550                        if (!FILTER_NAMES.contains(pair[0])) {
551                            throw new CoordinatorEngineException(ErrorCode.E0420, filter, XLog.format("invalid name [{0}]",
552                                    pair[0]));
553                        }
554                        if (pair[0].equals("status")) {
555                            try {
556                                CoordinatorJob.Status.valueOf(pair[1]);
557                            }
558                            catch (IllegalArgumentException ex) {
559                                throw new CoordinatorEngineException(ErrorCode.E0420, filter, XLog.format(
560                                        "invalid status [{0}]", pair[1]));
561                            }
562                        }
563                        List<String> list = map.get(pair[0]);
564                        if (list == null) {
565                            list = new ArrayList<String>();
566                            map.put(pair[0], list);
567                        }
568                        list.add(pair[1]);
569                    }
570                    else {
571                        throw new CoordinatorEngineException(ErrorCode.E0420, filter, "elements must be name=value pairs");
572                    }
573                }
574            }
575            return map;
576        }
577    }