001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.action.email;
020
021import java.io.File;
022import java.io.IOException;
023import java.io.InputStream;
024import java.io.OutputStream;
025import java.net.URI;
026import java.net.URISyntaxException;
027import java.util.ArrayList;
028import java.util.List;
029import java.util.Properties;
030
031import javax.activation.DataHandler;
032import javax.activation.DataSource;
033import javax.mail.Authenticator;
034import javax.mail.Message;
035import javax.mail.Message.RecipientType;
036import javax.mail.MessagingException;
037import javax.mail.Multipart;
038import javax.mail.NoSuchProviderException;
039import javax.mail.PasswordAuthentication;
040import javax.mail.Session;
041import javax.mail.Transport;
042import javax.mail.internet.AddressException;
043import javax.mail.internet.InternetAddress;
044import javax.mail.internet.MimeBodyPart;
045import javax.mail.internet.MimeMessage;
046import javax.mail.internet.MimeMultipart;
047
048import org.apache.hadoop.conf.Configuration;
049import org.apache.hadoop.fs.FileSystem;
050import org.apache.hadoop.fs.Path;
051import org.apache.oozie.action.ActionExecutor;
052import org.apache.oozie.action.ActionExecutorException;
053import org.apache.oozie.action.ActionExecutorException.ErrorType;
054import org.apache.oozie.client.WorkflowAction;
055import org.apache.oozie.service.ConfigurationService;
056import org.apache.oozie.service.HadoopAccessorException;
057import org.apache.oozie.service.Services;
058import org.apache.oozie.service.HadoopAccessorService;
059import org.apache.oozie.util.XLog;
060import org.apache.oozie.util.XmlUtils;
061import org.jdom.Element;
062import org.jdom.Namespace;
063
064/**
065 * Email action executor. It takes to, cc addresses along with a subject and body and sends
066 * out an email.
067 */
068public class EmailActionExecutor extends ActionExecutor {
069
070    public static final String CONF_PREFIX = "oozie.email.";
071    public static final String EMAIL_SMTP_HOST = CONF_PREFIX + "smtp.host";
072    public static final String EMAIL_SMTP_PORT = CONF_PREFIX + "smtp.port";
073    public static final String EMAIL_SMTP_AUTH = CONF_PREFIX + "smtp.auth";
074    public static final String EMAIL_SMTP_USER = CONF_PREFIX + "smtp.username";
075    public static final String EMAIL_SMTP_PASS = CONF_PREFIX + "smtp.password";
076    public static final String EMAIL_SMTP_FROM = CONF_PREFIX + "from.address";
077    public static final String EMAIL_ATTACHMENT_ENABLED = CONF_PREFIX + "attachment.enabled";
078
079    private final static String TO = "to";
080    private final static String CC = "cc";
081    private final static String SUB = "subject";
082    private final static String BOD = "body";
083    private final static String ATTACHMENT = "attachment";
084    private final static String COMMA = ",";
085    private final static String CONTENT_TYPE = "content_type";
086
087    private final static String DEFAULT_CONTENT_TYPE = "text/plain";
088    private XLog LOG = XLog.getLog(getClass());
089    public static final String EMAIL_ATTACHMENT_ERROR_MSG =
090            "\n Note: This email is missing configured email attachments "
091            + "as sending attachments in email action is disabled in the Oozie server. "
092            + "It could be for security compliance with data protection or other reasons";
093
094    public EmailActionExecutor() {
095        super("email");
096    }
097
098    @Override
099    public void initActionType() {
100        super.initActionType();
101    }
102
103    @Override
104    public void start(Context context, WorkflowAction action) throws ActionExecutorException {
105        try {
106            context.setStartData("-", "-", "-");
107            Element actionXml = XmlUtils.parseXml(action.getConf());
108            validateAndMail(context, actionXml);
109            context.setExecutionData("OK", null);
110        }
111        catch (Exception ex) {
112            throw convertException(ex);
113        }
114    }
115
116    @SuppressWarnings("unchecked")
117    protected void validateAndMail(Context context, Element element) throws ActionExecutorException {
118        // The XSD does the min/max occurrence validation for us.
119        Namespace ns = element.getNamespace();
120        String tos[] = new String[0];
121        String ccs[] = new String[0];
122        String subject = "";
123        String body = "";
124        String attachments[] = new String[0];
125        String contentType;
126        Element child = null;
127
128        // <to> - One ought to exist.
129        String text = element.getChildTextTrim(TO, ns);
130        if (text.isEmpty()) {
131            throw new ActionExecutorException(ErrorType.ERROR, "EM001", "No receipents were specified in the to-address field.");
132        }
133        tos = text.split(COMMA);
134
135        // <cc> - Optional, but only one ought to exist.
136        try {
137            ccs = element.getChildTextTrim(CC, ns).split(COMMA);
138        } catch (Exception e) {
139            // It is alright for cc to be given empty or not be present.
140            ccs = new String[0];
141        }
142
143        // <subject> - One ought to exist.
144        subject = element.getChildTextTrim(SUB, ns);
145
146        // <body> - One ought to exist.
147        body = element.getChildTextTrim(BOD, ns);
148
149        // <attachment> - Optional
150        String attachment = element.getChildTextTrim(ATTACHMENT, ns);
151        if(attachment != null) {
152            attachments = attachment.split(COMMA);
153        }
154
155        contentType = element.getChildTextTrim(CONTENT_TYPE, ns);
156        if (contentType == null || contentType.isEmpty()) {
157            contentType = DEFAULT_CONTENT_TYPE;
158        }
159
160        // All good - lets try to mail!
161        email(tos, ccs, subject, body, attachments, contentType, context.getWorkflow().getUser());
162    }
163
164    public void email(String[] to, String[] cc, String subject, String body, String[] attachments, String contentType,
165            String user) throws ActionExecutorException {
166        // Get mailing server details.
167        String smtpHost = getOozieConf().get(EMAIL_SMTP_HOST, "localhost");
168        String smtpPort = getOozieConf().get(EMAIL_SMTP_PORT, "25");
169        Boolean smtpAuth = getOozieConf().getBoolean(EMAIL_SMTP_AUTH, false);
170        String smtpUser = getOozieConf().get(EMAIL_SMTP_USER, "");
171        String smtpPassword = getOozieConf().get(EMAIL_SMTP_PASS, "");
172        String fromAddr = getOozieConf().get(EMAIL_SMTP_FROM, "oozie@localhost");
173
174        Properties properties = new Properties();
175        properties.setProperty("mail.smtp.host", smtpHost);
176        properties.setProperty("mail.smtp.port", smtpPort);
177        properties.setProperty("mail.smtp.auth", smtpAuth.toString());
178
179        Session session;
180        // Do not use default instance (i.e. Session.getDefaultInstance)
181        // (cause it may lead to issues when used second time).
182        if (!smtpAuth) {
183            session = Session.getInstance(properties);
184        } else {
185            session = Session.getInstance(properties, new JavaMailAuthenticator(smtpUser, smtpPassword));
186        }
187
188        Message message = new MimeMessage(session);
189        InternetAddress from;
190        List<InternetAddress> toAddrs = new ArrayList<InternetAddress>(to.length);
191        List<InternetAddress> ccAddrs = new ArrayList<InternetAddress>(cc.length);
192
193        try {
194            from = new InternetAddress(fromAddr);
195            message.setFrom(from);
196        } catch (AddressException e) {
197            throw new ActionExecutorException(ErrorType.ERROR, "EM002", "Bad from address specified in ${oozie.email.from.address}.", e);
198        } catch (MessagingException e) {
199            throw new ActionExecutorException(ErrorType.ERROR, "EM003", "Error setting a from address in the message.", e);
200        }
201
202        try {
203            // Add all <to>
204            for (String toStr : to) {
205                toAddrs.add(new InternetAddress(toStr.trim()));
206            }
207            message.addRecipients(RecipientType.TO, toAddrs.toArray(new InternetAddress[0]));
208
209            // Add all <cc>
210            for (String ccStr : cc) {
211                ccAddrs.add(new InternetAddress(ccStr.trim()));
212            }
213            message.addRecipients(RecipientType.CC, ccAddrs.toArray(new InternetAddress[0]));
214
215            // Set subject
216            message.setSubject(subject);
217
218            // when there is attachment
219            if (attachments != null && attachments.length > 0 && ConfigurationService.getBoolean(EMAIL_ATTACHMENT_ENABLED)) {
220                Multipart multipart = new MimeMultipart();
221
222                // Set body text
223                MimeBodyPart bodyTextPart = new MimeBodyPart();
224                bodyTextPart.setText(body);
225                multipart.addBodyPart(bodyTextPart);
226
227                for (String attachment : attachments) {
228                    URI attachUri = new URI(attachment);
229                    if (attachUri.getScheme() != null && attachUri.getScheme().equals("file")) {
230                        throw new ActionExecutorException(ErrorType.ERROR, "EM008",
231                                "Encountered an error when attaching a file. A local file cannot be attached:"
232                                        + attachment);
233                    }
234                    MimeBodyPart messageBodyPart = new MimeBodyPart();
235                    DataSource source = new URIDataSource(attachUri, user);
236                    messageBodyPart.setDataHandler(new DataHandler(source));
237                    messageBodyPart.setFileName(new File(attachment).getName());
238                    multipart.addBodyPart(messageBodyPart);
239                }
240                message.setContent(multipart);
241            }
242            else {
243                if (attachments != null && attachments.length > 0 && !ConfigurationService.getBoolean(EMAIL_ATTACHMENT_ENABLED)) {
244                    body = body + EMAIL_ATTACHMENT_ERROR_MSG;
245                }
246                message.setContent(body, contentType);
247            }
248        }
249        catch (AddressException e) {
250            throw new ActionExecutorException(ErrorType.ERROR, "EM004", "Bad address format in <to> or <cc>.", e);
251        }
252        catch (MessagingException e) {
253            throw new ActionExecutorException(ErrorType.ERROR, "EM005", "An error occured while adding recipients.", e);
254        }
255        catch (URISyntaxException e) {
256            throw new ActionExecutorException(ErrorType.ERROR, "EM008", "Encountered an error when attaching a file", e);
257        }
258        catch (HadoopAccessorException e) {
259            throw new ActionExecutorException(ErrorType.ERROR, "EM008", "Encountered an error when attaching a file", e);
260        }
261
262        try {
263            // Send over SMTP Transport
264            // (Session+Message has adequate details.)
265            Transport.send(message);
266        } catch (NoSuchProviderException e) {
267            throw new ActionExecutorException(ErrorType.ERROR, "EM006", "Could not find an SMTP transport provider to email.", e);
268        } catch (MessagingException e) {
269            throw new ActionExecutorException(ErrorType.ERROR, "EM007", "Encountered an error while sending the email message over SMTP.", e);
270        }
271    }
272
273    @Override
274    public void end(Context context, WorkflowAction action) throws ActionExecutorException {
275        String externalStatus = action.getExternalStatus();
276        WorkflowAction.Status status = externalStatus.equals("OK") ? WorkflowAction.Status.OK :
277                                       WorkflowAction.Status.ERROR;
278        context.setEndData(status, getActionSignal(status));
279    }
280
281    @Override
282    public void check(Context context, WorkflowAction action)
283            throws ActionExecutorException {
284
285    }
286
287    @Override
288    public void kill(Context context, WorkflowAction action)
289            throws ActionExecutorException {
290
291    }
292
293    @Override
294    public boolean isCompleted(String externalStatus) {
295        return true;
296    }
297
298    public static class JavaMailAuthenticator extends Authenticator {
299
300        String user;
301        String password;
302
303        public JavaMailAuthenticator(String user, String password) {
304            this.user = user;
305            this.password = password;
306        }
307
308        @Override
309        protected PasswordAuthentication getPasswordAuthentication() {
310           return new PasswordAuthentication(user, password);
311        }
312    }
313
314    class URIDataSource implements DataSource{
315
316        HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
317        FileSystem fs;
318        URI uri;
319        public URIDataSource(URI uri, String user) throws HadoopAccessorException {
320            this.uri = uri;
321            Configuration fsConf = has.createJobConf(uri.getAuthority());
322            fs = has.createFileSystem(user, uri, fsConf);
323        }
324
325        public InputStream getInputStream() throws IOException {
326            return fs.open(new Path(uri));
327        }
328
329        public OutputStream getOutputStream() throws IOException {
330            return fs.create(new Path(uri));
331        }
332
333        public String getContentType() {
334            return "application/octet-stream";
335        }
336
337        public String getName() {
338            return uri.getPath();
339        }
340    }
341}