Classes in this File | Line Coverage | Branch Coverage | Complexity | ||||
FtpFileDownloadStage |
|
| 0.0;0 | ||||
FtpFileDownloadStage$Criterion |
|
| 0.0;0 | ||||
FtpFileDownloadStage$FileDateMatchCriterion |
|
| 0.0;0 | ||||
FtpFileDownloadStage$FileNameMatchCriterion |
|
| 0.0;0 | ||||
FtpFileDownloadStage$FileSpec |
|
| 0.0;0 | ||||
FtpFileDownloadStage$FileSpec$FileType |
|
| 0.0;0 |
1 | /* | |
2 | * Licensed to the Apache Software Foundation (ASF) under one or more | |
3 | * contributor license agreements. See the NOTICE file distributed with | |
4 | * this work for additional information regarding copyright ownership. | |
5 | * The ASF licenses this file to You under the Apache License, Version 2.0 | |
6 | * (the "License"); you may not use this file except in compliance with | |
7 | * the License. You may obtain a copy of the License at | |
8 | * | |
9 | * http://www.apache.org/licenses/LICENSE-2.0 | |
10 | * | |
11 | * Unless required by applicable law or agreed to in writing, software | |
12 | * distributed under the License is distributed on an "AS IS" BASIS, | |
13 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
14 | * See the License for the specific language governing permissions and | |
15 | * limitations under the License. | |
16 | */ | |
17 | ||
18 | package org.apache.commons.pipeline.stage; | |
19 | ||
20 | import java.io.File; | |
21 | import java.io.FileOutputStream; | |
22 | import java.io.IOException; | |
23 | import java.io.OutputStream; | |
24 | import java.util.Calendar; | |
25 | import java.util.Date; | |
26 | import java.util.HashSet; | |
27 | import java.util.regex.Pattern; | |
28 | import java.util.Set; | |
29 | import org.apache.commons.logging.Log; | |
30 | import org.apache.commons.logging.LogFactory; | |
31 | import org.apache.commons.net.ftp.FTPClient; | |
32 | import org.apache.commons.net.ftp.FTPFile; | |
33 | import org.apache.commons.net.ftp.FTPReply; | |
34 | import org.apache.commons.pipeline.StageException; | |
35 | ||
36 | /** | |
37 | * <p>This {@link org.apache.commons.pipeline.Pipeline$Stage Stage} provides the | |
38 | * functionality needed to retrieve data from an FTP URL. Multipart responses | |
39 | * are not yet supported.</p> | |
40 | */ | |
41 | public class FtpFileDownloadStage extends BaseStage { | |
42 | 0 | private final Log log = LogFactory.getLog(FtpFileDownloadStage.class); |
43 | ||
44 | 0 | private String workDir = "/tmp"; |
45 | private File fworkDir; | |
46 | 0 | private FTPClient client = new FTPClient(); |
47 | ||
48 | /** Holds value of property host. */ | |
49 | private String host; | |
50 | ||
51 | /** Holds value of property user. */ | |
52 | private String user; | |
53 | ||
54 | /** Holds value of property password. */ | |
55 | private String password; | |
56 | ||
57 | /** Holds value of property port. */ | |
58 | private int port; | |
59 | ||
60 | /** | |
61 | * Default constructor - creates work directory in /tmp | |
62 | */ | |
63 | 0 | public FtpFileDownloadStage() { |
64 | 0 | } |
65 | ||
66 | /** | |
67 | * Constructor specifying work directory. | |
68 | * @param workDir local directory in which to store downloaded files | |
69 | */ | |
70 | 0 | public FtpFileDownloadStage(String workDir) { |
71 | 0 | this.workDir = workDir; |
72 | 0 | } |
73 | ||
74 | /** | |
75 | * Creates the download directory {@link #setWorkDir(String) workDir} uf it does | |
76 | * not exist and makes a connection to the remote FTP server. | |
77 | * @throws org.apache.commons.pipeline.StageException if a connection to the remote FTP server cannot be established, or the login to | |
78 | * the remote system fails | |
79 | */ | |
80 | public void preprocess() throws StageException { | |
81 | 0 | super.preprocess(); |
82 | 0 | if (fworkDir == null) fworkDir = new File(workDir); |
83 | 0 | if (!this.fworkDir.exists()) fworkDir.mkdirs(); |
84 | ||
85 | try { | |
86 | //connect to the ftp site | |
87 | 0 | client.connect(host, port); |
88 | 0 | log.debug(client.getReplyString()); |
89 | 0 | if(!FTPReply.isPositiveCompletion(client.getReplyCode())) { |
90 | 0 | throw new IOException("FTP server at host " + host + " refused connection."); |
91 | } | |
92 | ||
93 | 0 | client.login(user, password); |
94 | 0 | log.debug(client.getReplyString()); |
95 | 0 | if(!FTPReply.isPositiveCompletion(client.getReplyCode())) { |
96 | 0 | throw new StageException(this, "FTP login failed for user " + user + ": " + client.getReplyString()); |
97 | } | |
98 | 0 | } catch (IOException e) { |
99 | 0 | throw new StageException(this, e); |
100 | 0 | } |
101 | 0 | } |
102 | ||
103 | /** | |
104 | * Retrieves files that match the specified FileSpec from the FTP server | |
105 | * and stores them in the work directory. | |
106 | * @param obj incoming {@link FileSpec} that indicates the file to download along with some flags to | |
107 | * control the download behavior | |
108 | * @throws org.apache.commons.pipeline.StageException if there are errors navigating the remote directory structure or file download | |
109 | * fails | |
110 | */ | |
111 | public void process(Object obj) throws StageException { | |
112 | 0 | if (!this.fworkDir.exists()) throw new StageException(this, "The work directory for file download " + workDir.toString() + " does not exist."); |
113 | ||
114 | 0 | FileSpec spec = (FileSpec) obj; |
115 | ||
116 | try { | |
117 | 0 | client.setFileType(spec.type.intValue()); |
118 | 0 | client.changeWorkingDirectory(spec.path); |
119 | 0 | if(!FTPReply.isPositiveCompletion(client.getReplyCode())) { |
120 | 0 | throw new IOException("FTP client could not change to remote directory " + spec.path + ": " + client.getReplyString()); |
121 | } | |
122 | ||
123 | 0 | log.debug("FTP connection successfully established to " + host + ":" + spec.path); |
124 | ||
125 | //get the list of files | |
126 | 0 | client.enterLocalPassiveMode(); |
127 | 0 | searchCurrentDirectory("", spec); |
128 | 0 | } catch (IOException e) { |
129 | 0 | throw new StageException(this, e); |
130 | 0 | } |
131 | 0 | } |
132 | ||
133 | ||
134 | /** | |
135 | * Search the current working directory of the FTP client, saving files | |
136 | * to the path specified by workDir + the path to the file on the FTP server. | |
137 | * This method will optionally recursively search directories on the remote server. | |
138 | */ | |
139 | private void searchCurrentDirectory(String path, FileSpec spec) throws IOException { | |
140 | 0 | FTPFile[] files = client.listFiles(); |
141 | 0 | if(!FTPReply.isPositiveCompletion(client.getReplyCode())) { |
142 | 0 | throw new IOException("FTP client could not obtain file list : " + client.getReplyString()); |
143 | } | |
144 | ||
145 | 0 | search: for (FTPFile file : files) { |
146 | 0 | String localPath = path + File.separatorChar + file.getName(); |
147 | ||
148 | 0 | if (file.isDirectory() && spec.recursive) { |
149 | 0 | log.debug("Recursing into directory " + file.getName()); |
150 | 0 | client.changeWorkingDirectory(file.getName()); |
151 | 0 | searchCurrentDirectory(localPath, spec); |
152 | 0 | client.changeToParentDirectory(); |
153 | } else { | |
154 | 0 | log.debug("Examining file " + localPath); |
155 | 0 | for (Criterion crit : spec.criteria) { |
156 | 0 | if (!crit.matches(file)) { |
157 | 0 | log.info("File " + localPath + " failed criterion check " + crit); |
158 | 0 | continue search; |
159 | } | |
160 | } | |
161 | ||
162 | 0 | boolean getFile = true; |
163 | 0 | File localFile = new File(workDir + File.separatorChar + localPath); |
164 | 0 | if (localFile.exists()) { |
165 | 0 | if (spec.overwrite) { |
166 | 0 | log.info("Replacing existing local file " + localFile.getPath()); |
167 | 0 | getFile = true; |
168 | } else { | |
169 | 0 | if (spec.ignoreExisting) { |
170 | 0 | log.info("Ignoring existing local file " + localFile.getPath()); |
171 | 0 | continue search; |
172 | } else { | |
173 | 0 | log.info("Using existing local file " + localFile.getPath()); |
174 | 0 | getFile = false; |
175 | } | |
176 | } | |
177 | } else { | |
178 | 0 | getFile = true; |
179 | } | |
180 | ||
181 | 0 | if (getFile) { |
182 | 0 | if (! localFile.getParentFile().exists()) localFile.getParentFile().mkdir(); |
183 | ||
184 | 0 | OutputStream out = new FileOutputStream(localFile); |
185 | try { | |
186 | 0 | client.retrieveFile(file.getName(), out); |
187 | } finally { | |
188 | 0 | out.flush(); |
189 | 0 | out.close(); |
190 | 0 | } |
191 | } | |
192 | ||
193 | 0 | this.emit(localFile); |
194 | } | |
195 | } | |
196 | 0 | } |
197 | ||
198 | /** | |
199 | * Disconnects from FTP server. Errors are logged. | |
200 | */ | |
201 | public void release() { | |
202 | try { | |
203 | 0 | client.disconnect(); //close ftp connection |
204 | 0 | } catch (IOException e) { |
205 | 0 | log.error(e.getMessage(), e); |
206 | 0 | } |
207 | 0 | } |
208 | ||
209 | /** | |
210 | * Sets the working directory for the file download. If the directory does | |
211 | * not already exist, it will be created during the preprocess() step. | |
212 | * @param workDir local directory to receive file downloads | |
213 | */ | |
214 | public void setWorkDir(String workDir) { | |
215 | 0 | this.workDir = workDir; |
216 | 0 | } |
217 | ||
218 | /** | |
219 | * Returns the name of the file download directory. | |
220 | * @return the string containing the local working directory | |
221 | */ | |
222 | public String getWorkDir() { | |
223 | 0 | return this.workDir; |
224 | } | |
225 | ||
226 | /** Getter for property host. | |
227 | * @return Value of property host. | |
228 | * | |
229 | */ | |
230 | public String getHost() { | |
231 | 0 | return this.host; |
232 | } | |
233 | ||
234 | /** Setter for property host. | |
235 | * @param host New value of property host. | |
236 | * | |
237 | */ | |
238 | public void setHost(String host) { | |
239 | 0 | this.host = host; |
240 | 0 | } |
241 | ||
242 | /** Getter for property user. | |
243 | * @return Value of property user. | |
244 | * | |
245 | */ | |
246 | public String getUser() { | |
247 | 0 | return this.user; |
248 | } | |
249 | ||
250 | /** Setter for property user. | |
251 | * @param user New value of property user. | |
252 | * | |
253 | */ | |
254 | public void setUser(String user) { | |
255 | 0 | this.user = user; |
256 | 0 | } |
257 | ||
258 | /** Setter for property password. | |
259 | * @param password New value of property password. | |
260 | * | |
261 | */ | |
262 | public void setPassword(String password) { | |
263 | 0 | this.password = password; |
264 | 0 | } |
265 | ||
266 | /** | |
267 | * Getter for property port. | |
268 | * @return Value of property port. | |
269 | */ | |
270 | public int getPort() { | |
271 | 0 | return this.port; |
272 | } | |
273 | ||
274 | /** | |
275 | * Setter for property port. | |
276 | * @param port New value of property port. | |
277 | */ | |
278 | public void setPort(int port) { | |
279 | 0 | this.port = port; |
280 | 0 | } |
281 | ||
282 | /** | |
283 | * This class is used to specify a path and pattern of file for the FtpFileDownload | |
284 | * to retrieve. There are some parameters that can be configured in the filespec | |
285 | * that will control download behavior for <CODE>recursive</CODE> searching, the | |
286 | * <CODE>overwrite</CODE> of locally existing files, and to | |
287 | * <CODE>ignoreExisting</CODE> files. | |
288 | * <p> | |
289 | * If a file already exists in the local directory, it is only replaced if | |
290 | * <CODE>overwrite</CODE> is set to <CODE>true</CODE>. If it is replaced, then the | |
291 | * filename is passed on to the next stage. Existing files are passed on to the | |
292 | * stage unless <CODE>ignoreExisting</CODE> is <CODE>true</CODE>. Note that the | |
293 | * <CODE>ignoreExisting</CODE> flag is only used if <CODE>overwrite</CODE> is | |
294 | * <CODE>false</CODE> (it's assumed that if a file will be downloaded, then it | |
295 | * shouldn't be ignored). | |
296 | * <p> | |
297 | * Pseudocode to summarize the interaction of <CODE>overwrite</CODE> and | |
298 | * <CODE>ignoreExisting</CODE>: <PRE> | |
299 | * if (file exists) { | |
300 | * if (overwrite) { | |
301 | * download file over existing local copy | |
302 | * and pass it on to the next stage | |
303 | * } else { | |
304 | * if (ignoreExisting) { | |
305 | * skip this file | |
306 | * } else { | |
307 | * pass existing file on to the next stage | |
308 | * } | |
309 | * } | |
310 | * } else { | |
311 | * download new file | |
312 | * and pass it on to the next stage | |
313 | * } | |
314 | * </PRE> | |
315 | */ | |
316 | 0 | public static class FileSpec { |
317 | /** | |
318 | * Enumeration of legal FTP file tranfer types | |
319 | */ | |
320 | 0 | public enum FileType { |
321 | /** | |
322 | * ASCII text transfer mode, with end of line conversion. | |
323 | */ | |
324 | 0 | ASCII(FTPClient.ASCII_FILE_TYPE), |
325 | /** | |
326 | * Binary transfer mode, no changes made to data stream. | |
327 | */ | |
328 | 0 | BINARY(FTPClient.BINARY_FILE_TYPE); |
329 | ||
330 | private int type; | |
331 | ||
332 | 0 | private FileType(int type) { |
333 | 0 | this.type = type; |
334 | 0 | } |
335 | ||
336 | /** | |
337 | * Get the integer value of the FTP transfer mode enumeration. | |
338 | * @return the integer equivalent to the FTP transfer mode setting | |
339 | */ | |
340 | public int intValue() { | |
341 | 0 | return this.type; |
342 | } | |
343 | } | |
344 | ||
345 | /** Holds value of property path. */ | |
346 | 0 | private String path = "/"; |
347 | ||
348 | /** Holds flag that determines whether or not to perform recursive search of the specified path */ | |
349 | private boolean recursive; | |
350 | ||
351 | // Holds flag that determines whether or not to overwrite local files | |
352 | 0 | private boolean overwrite = false; |
353 | ||
354 | /** | |
355 | * Holds flag that determines if existing files are passed to the next stage. | |
356 | */ | |
357 | 0 | private boolean ignoreExisting = false; |
358 | ||
359 | // Type of file (ascii or binary) | |
360 | 0 | private FileType type = FileType.BINARY; |
361 | ||
362 | // List of criteria that the retrieved file must satisfy. | |
363 | 0 | private Set<Criterion> criteria = new HashSet<Criterion>(); |
364 | ||
365 | /** Getter for property path. | |
366 | * @return Value of property path. | |
367 | * | |
368 | */ | |
369 | public String getPath() { | |
370 | 0 | return this.path; |
371 | } | |
372 | ||
373 | /** Setter for property path. | |
374 | * @param path New value of property path. | |
375 | * | |
376 | */ | |
377 | public void setPath(String path) { | |
378 | 0 | this.path = path; |
379 | 0 | } |
380 | ||
381 | /** Getter for property pattern. | |
382 | * @return Value of property pattern. | |
383 | * @deprecated - not retrievable from criterion | |
384 | */ | |
385 | public String getPattern() { | |
386 | 0 | return null; |
387 | } | |
388 | ||
389 | /** Setter for property pattern. | |
390 | * @param pattern New value of property pattern. | |
391 | * | |
392 | */ | |
393 | public void setPattern(String pattern) { | |
394 | 0 | this.criteria.add(new FileNameMatchCriterion(pattern)); |
395 | 0 | } |
396 | ||
397 | /** | |
398 | * Add a criterion to the set of criteria that must be matched for files | |
399 | * to be downloaded | |
400 | * @param crit {@link Criterion} used to match desired files for download, typically a filename pattern | |
401 | */ | |
402 | public void addCriterion(Criterion crit) { | |
403 | 0 | this.criteria.add(crit); |
404 | 0 | } |
405 | ||
406 | /** | |
407 | * Sets the flag determining whether or not the stage will recursively | |
408 | * traverse the directory tree to find files. | |
409 | * @param recursive this value is <CODE>true</CODE> to recursively search the remote directories for matches to | |
410 | * the criterion, <CODE>false</CODE> to turn off recursive searching | |
411 | */ | |
412 | public void setRecursive(boolean recursive) { | |
413 | 0 | this.recursive = recursive; |
414 | 0 | } |
415 | ||
416 | /** | |
417 | * Returns whether or not the stage will recursively | |
418 | * traverse the directory tree to find files. | |
419 | * @return the current recursive search setting | |
420 | */ | |
421 | public boolean isRecursive() { | |
422 | 0 | return this.recursive; |
423 | } | |
424 | ||
425 | /** | |
426 | * Sets the file type for the transfer. Legal values are "ascii" and "binary". | |
427 | * Binary transfers are the default. | |
428 | * @param fileType the FTP transfer type to use, "<CODE>ascii</CODE>" or "<CODE>binary</CODE>" | |
429 | */ | |
430 | public void setFileType(String fileType) { | |
431 | 0 | if ("ascii".equalsIgnoreCase(fileType)) { |
432 | 0 | this.type = FileType.ASCII; |
433 | } else { | |
434 | 0 | this.type = FileType.BINARY; |
435 | } | |
436 | 0 | } |
437 | ||
438 | /** | |
439 | * Returns the file type for the transfer. | |
440 | * @return the current FTP transfer type setting | |
441 | */ | |
442 | public String getFileType() { | |
443 | 0 | return this.type.toString(); |
444 | } | |
445 | ||
446 | /** | |
447 | * Getter for property overwrite. The default value for this flag is | |
448 | * <CODE>false</CODE>, so existing local files will not be replaced by downloading | |
449 | * remote files. This flag should be set to <CODE>true</CODE> if it is expected | |
450 | * that the remote file is periodically updated and the local file is and out of | |
451 | * date copy from a previous run of this pipeline. | |
452 | * @return Value of property overwrite. | |
453 | */ | |
454 | public boolean isOverwrite() { | |
455 | 0 | return this.overwrite; |
456 | } | |
457 | ||
458 | /** | |
459 | * Setter for property overwrite. | |
460 | * @param overwrite New value of property overwrite. | |
461 | */ | |
462 | public void setOverwrite(boolean overwrite) { | |
463 | 0 | this.overwrite = overwrite; |
464 | 0 | } |
465 | ||
466 | /** | |
467 | * Getter for property ignoreExisting. The default value for this flag is | |
468 | * <CODE>false</CODE>, so existing files that aren't downloaded are still passed | |
469 | * on to the next stage. | |
470 | * @return Value of property ignoreExisting. | |
471 | */ | |
472 | public boolean isIgnoreExisting() { | |
473 | 0 | return this.ignoreExisting; |
474 | } | |
475 | ||
476 | /** | |
477 | * Setter for property ignoreExisting. | |
478 | * @param ignoreExisting New value of property ignoreExisting. | |
479 | */ | |
480 | public void setIgnoreExisting(boolean ignoreExisting) { | |
481 | 0 | this.ignoreExisting = ignoreExisting; |
482 | 0 | } |
483 | } | |
484 | ||
485 | /** | |
486 | * This class is used to specify a criterion that the downloaded file | |
487 | * must satisfy. | |
488 | */ | |
489 | public interface Criterion { | |
490 | /** | |
491 | * Interface defining matches for FTP file downloading. Those remote files that | |
492 | * match the criterion will be downloaded. | |
493 | * @param file file to compare criterion to | |
494 | * @return <CODE>true</CODE> if the file meets the Criterion, <CODE>false</CODE> otherwise | |
495 | */ | |
496 | public boolean matches(FTPFile file); | |
497 | } | |
498 | ||
499 | /** | |
500 | * Matches file names based upon the Java regex supplied in the constructor. | |
501 | */ | |
502 | public static class FileNameMatchCriterion implements Criterion { | |
503 | // precompiled pattern used to match filenames | |
504 | private Pattern pattern; | |
505 | private String _pattern; | |
506 | ||
507 | /** | |
508 | * Construct a new criterion to match on file names. | |
509 | * @param pattern Java regex pattern specifying acceptable file names | |
510 | */ | |
511 | 0 | public FileNameMatchCriterion(String pattern) { |
512 | 0 | this._pattern = pattern; |
513 | 0 | this.pattern = Pattern.compile(pattern); |
514 | 0 | } |
515 | ||
516 | /** | |
517 | * Test the given file's name against this criterion. | |
518 | * @param file file to compare to | |
519 | * @return <CODE>true</CODE> if the filename matches the filename pattern of this criterion, | |
520 | * <CODE>false</CODE> otherwise | |
521 | */ | |
522 | public boolean matches(FTPFile file) { | |
523 | 0 | return pattern.matcher(file.getName()).matches(); |
524 | } | |
525 | ||
526 | /** | |
527 | * Printable version of this Criterion indicating the Java regex used for filename | |
528 | * matching. | |
529 | * @return a string containing the regex used to construct this filename criterion | |
530 | */ | |
531 | public String toString() { | |
532 | 0 | return "filename matches pattern " + _pattern; |
533 | } | |
534 | } | |
535 | ||
536 | /** | |
537 | * Matches files by matching their filesystem timestamp to a date range. | |
538 | */ | |
539 | public static class FileDateMatchCriterion implements Criterion { | |
540 | private Date startDate; | |
541 | private Date endDate; | |
542 | ||
543 | /** | |
544 | * Construct a new criterion to match file timestamp to a range of dates. | |
545 | * @param startDate starting date (inclusive) of the date range | |
546 | * @param endDate ending date (inclusive) of the date range | |
547 | */ | |
548 | 0 | public FileDateMatchCriterion(Date startDate, Date endDate) { |
549 | 0 | this.startDate = startDate; |
550 | 0 | this.endDate = endDate; |
551 | 0 | } |
552 | ||
553 | /** | |
554 | * Test the given file's date against this criterion. | |
555 | * @param file file to compare to | |
556 | * @return <CODE>true</CODE> if the file date falls into the time window of | |
557 | * [startDate, endDate], <CODE>false</CODE> otherwise | |
558 | */ | |
559 | public boolean matches(FTPFile file) { | |
560 | 0 | Calendar cal = file.getTimestamp(); |
561 | 0 | if ((startDate != null && cal.getTime().before(startDate)) || (endDate != null && cal.getTime().after(endDate))) { |
562 | 0 | return false; |
563 | } else { | |
564 | 0 | return true; |
565 | } | |
566 | } | |
567 | ||
568 | /** | |
569 | * Printable version of this Criterion indicating the inclusive date range used | |
570 | * for file date matching. | |
571 | * @return a string noting the startDate and endDate | |
572 | */ | |
573 | public String toString() { | |
574 | 0 | return "file date is between " + startDate + " and " + endDate; |
575 | } | |
576 | } | |
577 | } |