001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.action.email; 020 021import java.io.File; 022import java.io.IOException; 023import java.io.InputStream; 024import java.io.OutputStream; 025import java.net.URI; 026import java.net.URISyntaxException; 027import java.util.ArrayList; 028import java.util.List; 029import java.util.Properties; 030 031import javax.activation.DataHandler; 032import javax.activation.DataSource; 033import javax.mail.Authenticator; 034import javax.mail.Message; 035import javax.mail.Message.RecipientType; 036import javax.mail.MessagingException; 037import javax.mail.Multipart; 038import javax.mail.NoSuchProviderException; 039import javax.mail.PasswordAuthentication; 040import javax.mail.Session; 041import javax.mail.Transport; 042import javax.mail.internet.AddressException; 043import javax.mail.internet.InternetAddress; 044import javax.mail.internet.MimeBodyPart; 045import javax.mail.internet.MimeMessage; 046import javax.mail.internet.MimeMultipart; 047 048import org.apache.hadoop.conf.Configuration; 049import org.apache.hadoop.fs.FileSystem; 050import org.apache.hadoop.fs.Path; 051import org.apache.oozie.action.ActionExecutor; 052import org.apache.oozie.action.ActionExecutorException; 053import org.apache.oozie.action.ActionExecutorException.ErrorType; 054import org.apache.oozie.client.WorkflowAction; 055import org.apache.oozie.service.ConfigurationService; 056import org.apache.oozie.service.HadoopAccessorException; 057import org.apache.oozie.service.Services; 058import org.apache.oozie.service.HadoopAccessorService; 059import org.apache.oozie.util.XLog; 060import org.apache.oozie.util.XmlUtils; 061import org.jdom.Element; 062import org.jdom.Namespace; 063 064/** 065 * Email action executor. It takes to, cc addresses along with a subject and body and sends 066 * out an email. 067 */ 068public class EmailActionExecutor extends ActionExecutor { 069 070 public static final String CONF_PREFIX = "oozie.email."; 071 public static final String EMAIL_SMTP_HOST = CONF_PREFIX + "smtp.host"; 072 public static final String EMAIL_SMTP_PORT = CONF_PREFIX + "smtp.port"; 073 public static final String EMAIL_SMTP_AUTH = CONF_PREFIX + "smtp.auth"; 074 public static final String EMAIL_SMTP_USER = CONF_PREFIX + "smtp.username"; 075 public static final String EMAIL_SMTP_PASS = CONF_PREFIX + "smtp.password"; 076 public static final String EMAIL_SMTP_FROM = CONF_PREFIX + "from.address"; 077 public static final String EMAIL_ATTACHMENT_ENABLED = CONF_PREFIX + "attachment.enabled"; 078 079 private final static String TO = "to"; 080 private final static String CC = "cc"; 081 private final static String SUB = "subject"; 082 private final static String BOD = "body"; 083 private final static String ATTACHMENT = "attachment"; 084 private final static String COMMA = ","; 085 private final static String CONTENT_TYPE = "content_type"; 086 087 private final static String DEFAULT_CONTENT_TYPE = "text/plain"; 088 private XLog LOG = XLog.getLog(getClass()); 089 public static final String EMAIL_ATTACHMENT_ERROR_MSG = 090 "\n Note: This email is missing configured email attachments " 091 + "as sending attachments in email action is disabled in the Oozie server. " 092 + "It could be for security compliance with data protection or other reasons"; 093 094 public EmailActionExecutor() { 095 super("email"); 096 } 097 098 @Override 099 public void initActionType() { 100 super.initActionType(); 101 } 102 103 @Override 104 public void start(Context context, WorkflowAction action) throws ActionExecutorException { 105 try { 106 context.setStartData("-", "-", "-"); 107 Element actionXml = XmlUtils.parseXml(action.getConf()); 108 validateAndMail(context, actionXml); 109 context.setExecutionData("OK", null); 110 } 111 catch (Exception ex) { 112 throw convertException(ex); 113 } 114 } 115 116 @SuppressWarnings("unchecked") 117 protected void validateAndMail(Context context, Element element) throws ActionExecutorException { 118 // The XSD does the min/max occurrence validation for us. 119 Namespace ns = element.getNamespace(); 120 String tos[] = new String[0]; 121 String ccs[] = new String[0]; 122 String subject = ""; 123 String body = ""; 124 String attachments[] = new String[0]; 125 String contentType; 126 Element child = null; 127 128 // <to> - One ought to exist. 129 String text = element.getChildTextTrim(TO, ns); 130 if (text.isEmpty()) { 131 throw new ActionExecutorException(ErrorType.ERROR, "EM001", "No receipents were specified in the to-address field."); 132 } 133 tos = text.split(COMMA); 134 135 // <cc> - Optional, but only one ought to exist. 136 try { 137 ccs = element.getChildTextTrim(CC, ns).split(COMMA); 138 } catch (Exception e) { 139 // It is alright for cc to be given empty or not be present. 140 ccs = new String[0]; 141 } 142 143 // <subject> - One ought to exist. 144 subject = element.getChildTextTrim(SUB, ns); 145 146 // <body> - One ought to exist. 147 body = element.getChildTextTrim(BOD, ns); 148 149 // <attachment> - Optional 150 String attachment = element.getChildTextTrim(ATTACHMENT, ns); 151 if(attachment != null) { 152 attachments = attachment.split(COMMA); 153 } 154 155 contentType = element.getChildTextTrim(CONTENT_TYPE, ns); 156 if (contentType == null || contentType.isEmpty()) { 157 contentType = DEFAULT_CONTENT_TYPE; 158 } 159 160 // All good - lets try to mail! 161 email(tos, ccs, subject, body, attachments, contentType, context.getWorkflow().getUser()); 162 } 163 164 public void email(String[] to, String[] cc, String subject, String body, String[] attachments, String contentType, 165 String user) throws ActionExecutorException { 166 // Get mailing server details. 167 String smtpHost = getOozieConf().get(EMAIL_SMTP_HOST, "localhost"); 168 String smtpPort = getOozieConf().get(EMAIL_SMTP_PORT, "25"); 169 Boolean smtpAuth = getOozieConf().getBoolean(EMAIL_SMTP_AUTH, false); 170 String smtpUser = getOozieConf().get(EMAIL_SMTP_USER, ""); 171 String smtpPassword = getOozieConf().get(EMAIL_SMTP_PASS, ""); 172 String fromAddr = getOozieConf().get(EMAIL_SMTP_FROM, "oozie@localhost"); 173 174 Properties properties = new Properties(); 175 properties.setProperty("mail.smtp.host", smtpHost); 176 properties.setProperty("mail.smtp.port", smtpPort); 177 properties.setProperty("mail.smtp.auth", smtpAuth.toString()); 178 179 Session session; 180 // Do not use default instance (i.e. Session.getDefaultInstance) 181 // (cause it may lead to issues when used second time). 182 if (!smtpAuth) { 183 session = Session.getInstance(properties); 184 } else { 185 session = Session.getInstance(properties, new JavaMailAuthenticator(smtpUser, smtpPassword)); 186 } 187 188 Message message = new MimeMessage(session); 189 InternetAddress from; 190 List<InternetAddress> toAddrs = new ArrayList<InternetAddress>(to.length); 191 List<InternetAddress> ccAddrs = new ArrayList<InternetAddress>(cc.length); 192 193 try { 194 from = new InternetAddress(fromAddr); 195 message.setFrom(from); 196 } catch (AddressException e) { 197 throw new ActionExecutorException(ErrorType.ERROR, "EM002", "Bad from address specified in ${oozie.email.from.address}.", e); 198 } catch (MessagingException e) { 199 throw new ActionExecutorException(ErrorType.ERROR, "EM003", "Error setting a from address in the message.", e); 200 } 201 202 try { 203 // Add all <to> 204 for (String toStr : to) { 205 toAddrs.add(new InternetAddress(toStr.trim())); 206 } 207 message.addRecipients(RecipientType.TO, toAddrs.toArray(new InternetAddress[0])); 208 209 // Add all <cc> 210 for (String ccStr : cc) { 211 ccAddrs.add(new InternetAddress(ccStr.trim())); 212 } 213 message.addRecipients(RecipientType.CC, ccAddrs.toArray(new InternetAddress[0])); 214 215 // Set subject 216 message.setSubject(subject); 217 218 // when there is attachment 219 if (attachments != null && attachments.length > 0 && ConfigurationService.getBoolean(EMAIL_ATTACHMENT_ENABLED)) { 220 Multipart multipart = new MimeMultipart(); 221 222 // Set body text 223 MimeBodyPart bodyTextPart = new MimeBodyPart(); 224 bodyTextPart.setText(body); 225 multipart.addBodyPart(bodyTextPart); 226 227 for (String attachment : attachments) { 228 URI attachUri = new URI(attachment); 229 if (attachUri.getScheme() != null && attachUri.getScheme().equals("file")) { 230 throw new ActionExecutorException(ErrorType.ERROR, "EM008", 231 "Encountered an error when attaching a file. A local file cannot be attached:" 232 + attachment); 233 } 234 MimeBodyPart messageBodyPart = new MimeBodyPart(); 235 DataSource source = new URIDataSource(attachUri, user); 236 messageBodyPart.setDataHandler(new DataHandler(source)); 237 messageBodyPart.setFileName(new File(attachment).getName()); 238 multipart.addBodyPart(messageBodyPart); 239 } 240 message.setContent(multipart); 241 } 242 else { 243 if (attachments != null && attachments.length > 0 && !ConfigurationService.getBoolean(EMAIL_ATTACHMENT_ENABLED)) { 244 body = body + EMAIL_ATTACHMENT_ERROR_MSG; 245 } 246 message.setContent(body, contentType); 247 } 248 } 249 catch (AddressException e) { 250 throw new ActionExecutorException(ErrorType.ERROR, "EM004", "Bad address format in <to> or <cc>.", e); 251 } 252 catch (MessagingException e) { 253 throw new ActionExecutorException(ErrorType.ERROR, "EM005", "An error occured while adding recipients.", e); 254 } 255 catch (URISyntaxException e) { 256 throw new ActionExecutorException(ErrorType.ERROR, "EM008", "Encountered an error when attaching a file", e); 257 } 258 catch (HadoopAccessorException e) { 259 throw new ActionExecutorException(ErrorType.ERROR, "EM008", "Encountered an error when attaching a file", e); 260 } 261 262 try { 263 // Send over SMTP Transport 264 // (Session+Message has adequate details.) 265 Transport.send(message); 266 } catch (NoSuchProviderException e) { 267 throw new ActionExecutorException(ErrorType.ERROR, "EM006", "Could not find an SMTP transport provider to email.", e); 268 } catch (MessagingException e) { 269 throw new ActionExecutorException(ErrorType.ERROR, "EM007", "Encountered an error while sending the email message over SMTP.", e); 270 } 271 } 272 273 @Override 274 public void end(Context context, WorkflowAction action) throws ActionExecutorException { 275 String externalStatus = action.getExternalStatus(); 276 WorkflowAction.Status status = externalStatus.equals("OK") ? WorkflowAction.Status.OK : 277 WorkflowAction.Status.ERROR; 278 context.setEndData(status, getActionSignal(status)); 279 } 280 281 @Override 282 public void check(Context context, WorkflowAction action) 283 throws ActionExecutorException { 284 285 } 286 287 @Override 288 public void kill(Context context, WorkflowAction action) 289 throws ActionExecutorException { 290 291 } 292 293 @Override 294 public boolean isCompleted(String externalStatus) { 295 return true; 296 } 297 298 public static class JavaMailAuthenticator extends Authenticator { 299 300 String user; 301 String password; 302 303 public JavaMailAuthenticator(String user, String password) { 304 this.user = user; 305 this.password = password; 306 } 307 308 @Override 309 protected PasswordAuthentication getPasswordAuthentication() { 310 return new PasswordAuthentication(user, password); 311 } 312 } 313 314 class URIDataSource implements DataSource{ 315 316 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 317 FileSystem fs; 318 URI uri; 319 public URIDataSource(URI uri, String user) throws HadoopAccessorException { 320 this.uri = uri; 321 Configuration fsConf = has.createJobConf(uri.getAuthority()); 322 fs = has.createFileSystem(user, uri, fsConf); 323 } 324 325 public InputStream getInputStream() throws IOException { 326 return fs.open(new Path(uri)); 327 } 328 329 public OutputStream getOutputStream() throws IOException { 330 return fs.create(new Path(uri)); 331 } 332 333 public String getContentType() { 334 return "application/octet-stream"; 335 } 336 337 public String getName() { 338 return uri.getPath(); 339 } 340 } 341}