#!/usr/bin/perl # # mk-corpus-link-farm - distribute a bunch of mail tidily into a set of corpora # (see EOF for an example/testcase) # # Note: creates symbolic links only; renaming/moving the originals will # cause breakage. # # <@LICENSE> # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to you under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at: # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # use strict; use warnings; sub usage { die " usage: mk-corpus-link-farm [options] [dest] src [...] dest: -dest outputdir [-num num] options: -most_recent: select the most recent messages (default) -after=N only test mails received after time_t N (negative values are an offset from current time, e.g. -86400 = last day) or after date as parsed by Time::ParseDate (e.g. '-6 months') "; } use Time::ParseDate; use Time::Local; use Cwd; use File::Path; use File::Find; use File::Basename; use Data::Dumper; use SDBM_File; use Fcntl; my $DEBUG;# $DEBUG=1; my @classes = qw(ham spam); my $srcs = [ ]; my $dests = [ ]; my $mbox_tmpdir = $ENV{TMPDIR} || "/tmp"; sub dbg; use Getopt::Long; use vars qw( $opt_most_recent $opt_after $opt_before ); $opt_most_recent = 0; tz_init(); my $curdest; GetOptions( 'dest=s' => sub { my ($switch, $dir) = @_; $curdest = { ham => { }, spam => { }, dir => $dir }; push (@$dests, $curdest); }, 'num=i' => sub { my ($switch, $num) = @_; $curdest->{num_msgs} = $num; }, 'most_recent' => \$opt_most_recent, 'before=s' => \$opt_before, 'after=s' => \$opt_after, ) or usage(); foreach my $arg (@ARGV) { push (@$srcs, { dir => $arg }); } # Deal with --before and --after foreach my $time ($opt_before, $opt_after) { if ($time && $time =~ /^-\d+$/) { $time = time + $time; } elsif ($time && $time !~ /^-?\d+$/) { $time = Time::ParseDate::parsedate($time, GMT => 1, PREFER_PAST => 1); } } # test data: $srcs = [ { dir => "/src1", ham => { dests => [ ], dir => # "/src1/ham", num => 100 }, spam => { dests => [ ], dir => "/src1/spam", num # => 100 }, }, { dir => "/src2", ham => { dests => [ ], dir => "/src2/ham", num # => 300 }, spam => { dests => [ ], dir => "/src2/spam", num => 300 }, }, { dir # => "/src3", ham => { dests => [ ], dir => "/src3/ham", num => 500 }, spam => # { dests => [ ], dir => "/src3/spam", num => 500 }, } ]; my $cwd = cwd(); # use an on-disk file -- this list can get pretty big! my $poss_del_path = "$mbox_tmpdir/dels.$$"; my $poss_delete; { my %h; tie(%h, 'SDBM_File', $poss_del_path, O_RDWR|O_CREAT, 0600) or die "Couldn't tie SDBM file $poss_del_path: $!; aborting"; $poss_delete = \%h; } my $mbox_work = "$mbox_tmpdir/mboxes.d"; if (-d $mbox_work) { mark_for_poss_deletion($mbox_work); } main(); # and clean up again. unlink $poss_del_path; unlink "$poss_del_path.dir"; unlink "$poss_del_path.pag"; exit; sub main { find_srcs(); dist_across_dests(); make_links_in_dests(); perform_poss_deletion(); } sub find_srcs { foreach my $src (@$srcs) { my $num_files; my @mboxes = (); my $cb = sub { if (-f $_ && -r _) { if ($_ =~ /\.mbox/i) { push @mboxes, $File::Find::name; } else { $num_files++; } } }; $src->{ham} = { num => 0, dests => [ ] }; $src->{spam} = { num => 0, dests => [ ] }; my $try_dir = "$src->{dir}/ham"; if (-d $try_dir) { $num_files = 0; @mboxes = (); File::Find::find({ wanted => $cb, follow => 1 }, $try_dir); foreach my $mbox (@mboxes) { $num_files += mbox_count($mbox); } $src->{ham}{subdir} = $try_dir; $src->{ham}{num} = $num_files; } $try_dir = "$src->{dir}/spam"; if (-d $try_dir) { $num_files = 0; @mboxes = (); File::Find::find({ wanted => $cb, follow => 1 }, $try_dir); foreach my $mbox (@mboxes) { $num_files += mbox_count($mbox); } $src->{spam}{subdir} = $try_dir; $src->{spam}{num} = $num_files; } print "$src->{dir}: found $src->{ham}{num} ham, $src->{spam}{num} spam\n"; } } sub dist_across_dests { my @srcorder = @$srcs; foreach my $dest (@$dests) { my %want = (); my $wantnum = $dest->{num_msgs} || 99999999; foreach my $class (@classes) { $want{$class} = $wantnum; } $dest->{srcs} = [ ]; print "\n$dest->{dir}: want $wantnum messages\n"; foreach my $class (@classes) { foreach my $src (@srcorder) { last unless ($want{$class} > 0); allocate ($src, $dest, \$want{$class}, $class); } } foreach my $class (@classes) { print "$class:"; ($class eq 'ham') and print " "; my $added = 0; foreach my $src (@{$dest->{$class}{srcs}}) { print " $src->{num} of $src->{from}{$class}{subdir}"; $added += $src->{num}; } print "\n"; if ($want{$class} > 0) { warn " WARNING: failed to fill $dest->{dir}/$class: ". "only $added, wanted $want{$class} more\n"; } } # for the next dest, try to take some more entries from # other sources as well. do this by moving the source that's # currently at the head of the list, to the end. my $first = shift @srcorder; @srcorder = (@srcorder, $first); } } sub make_links_in_dests { foreach my $class (@classes) { foreach my $dest (@$dests) { my $dir = $dest->{dir}.'/'.$class; if (-d $dir) { mark_for_poss_deletion($dir); } else { mkpath($dir) or warn "cannot mkdir $dir: $!"; } } foreach my $src (@$srcs) { _mklink($class, $src); } } } sub _mklink { my ($class, $src) = @_; my $srcdir = $src->{$class}{subdir}; if (!$srcdir) { dbg "no srcdir, skipping $src"; return; } if (!-d $srcdir) { warn "cannot read $srcdir, ignoring: $!"; return; } dbg "linking from $srcdir"; # create a hash of modtime -> filepath, so we can be sure we pick up # "new" files first if so desired. note that -M gives (now - modtime) in # days, so larger numbers means earlier. my %files = (); File::Find::find({ follow => 1, wanted => sub { return unless (-f $_ && -r _); # not a file my @stat = stat _; my $mtime = $stat[9]; return unless message_is_useful_by_date($mtime); if (!exists $files{$mtime}) { $files{$mtime} = [ ]; } if ($_ =~ /\.mbox/i) { push(@{$files{$mtime}}, mbox_extract_all($_)); } else { push(@{$files{$mtime}}, $File::Find::name); } } }, $srcdir); my @files = (); foreach my $key (sort { $b <=> $a } keys %files) { push (@files, @{$files{$key}}); } undef %files; # no longer need that # @files is now sorted with the "youngest" files first. check: if (scalar @files && $files[0] && $files[1] && -M $files[0] > -M $files[-1]) { warn "oops! files out of order, should be youngest first: ". join(' ',@files); } foreach my $destobj (@{$src->{$class}{dests}}) { my $dest = $destobj->{dest}; my $num = $destobj->{num}; my $destdir = $dest->{dir}; dbg " linking $num into $destdir"; my $i; for ($i = 0; $i < $num; $i++) { my $srcname = shift @files; if (!$srcname) { # die "oops! ran out of srcs. at $i / $num. dump: ".Dumper($destobj); last; } my $dstname = $srcname; $dstname =~ s/[^-_\.A-Za-z0-9]/_/gs; $dstname =~ s/_+/_/gs; $dstname =~ s/^_//gs; $dstname = $destdir."/".$class."/".$dstname; if ($srcname !~ m,^/,) { # unrooted. root it $srcname = $cwd.'/'.$srcname; } remove_from_poss_delete($dstname); if (-l $dstname) { my $link = readlink($dstname); if ($link eq $srcname) { dbg " $srcname already linked to $dstname"; next; } unlink $dstname; } if (symlink($srcname, $dstname)) { dbg " $srcname -> $dstname"; } else { warn "symlink $srcname -> $dstname failed: $!"; } } } } sub allocate { my ($src, $dest, $nhamref, $class) = @_; my $nsrc = $src->{$class}{num}; dbg "$class nsrc=$nsrc nwanted=$$nhamref"; if ($nsrc == 0) { dbg "already exhausted src"; } elsif ($nsrc <= $$nhamref) { dbg "exhausted src"; push (@{$dest->{$class}{srcs}}, { from => $src, num => $nsrc }); push (@{$src->{$class}{dests}}, { dest => $dest, num => $nsrc }); $$nhamref -= $nsrc; $src->{$class}{num} = 0; } else { dbg "filled dest, some left in src"; push (@{$dest->{$class}{srcs}}, { from => $src, num => $$nhamref }); push (@{$src->{$class}{dests}}, { dest => $dest, num => $$nhamref }); $src->{$class}{num} -= $$nhamref; $$nhamref = 0; } } sub mark_for_poss_deletion { my ($dir) = @_; File::Find::find({ follow => 1, wanted => sub { return if (/mboxcountcache$/); if (!-d $_) { my $fname = $File::Find::name; $poss_delete->{$fname} = 1; dbg("marked as deleteable: $fname"); } else { # TODO: delete dirs? for now, leave 'em behind } } }, $dir); } sub perform_poss_deletion { foreach my $fname (keys %{$poss_delete}) { unlink $fname or warn "cannot unlink $fname"; } } sub remove_from_poss_delete { my ($fname) = @_; if (exists $poss_delete->{$fname}) { delete $poss_delete->{$fname}; return 1; } else { return 0; } } sub mbox_count { my ($mboxpath) = @_; print "counting mbox: $mboxpath\n"; return _mbox_extract_all($mboxpath, 1); } sub mbox_extract_all { my ($mboxpath) = @_; print "extracting mbox: $mboxpath\n"; return _mbox_extract_all($mboxpath, 0); } sub _mbox_extract_all { my ($mboxpath, $justcount) = @_; # create an area to hold extracted mbox files # this cannot use $$, it must remain the same between runs if (!-d $mbox_work) { mkdir $mbox_work or die "cannot create tmpdir: $mbox_work"; # fatal error, could be an attack } my $countcache = get_mbox_name ($mboxpath, 0); $countcache =~ s/OFF\d+$/mboxcountcache/gs; if ($justcount && -f $countcache && -M $countcache < -M $mboxpath) { open (CACHE, "<$countcache"); my $count = + 0; close CACHE; return $count; } my $counter = 0; my @created_files = (); open (INPUT, "<$mboxpath") or die "cannot read $mboxpath"; binmode INPUT; # get stat details for the input mbox my ($dev,$ino,$mode,$nlink,$uid,$gid,$rdev,$size, $atime,$mtime,$ctime) = stat INPUT; # assumption: get_mbox_name() uses the same dir for all offsets my $newname = get_mbox_name ($mboxpath, 0); my $dir = dirname ($newname); if (!-d $dir) { mkdir $dir or die "cannot mkdir $dir"; } chmod 0755, $dir or warn "cannot chmod $dir"; my $start = 0; # start of a message my $where = 0; # current byte offset my $in_header = 0; # are in we a header? my $fromline; while (!eof INPUT) { my $offset = $start; # byte offset of this message while () { nextfrom: last unless defined($_); if (substr($_,0,5) eq "From ") { $in_header = 1; $start = $where; $where = tell INPUT; $fromline = $_; last; } } last unless defined($_); # dbg "mbox From: $counter $start $where $fromline"; if ($fromline && mbox_new_enough($fromline)) { $counter++; if (!$justcount) { $newname = get_mbox_name ($mboxpath, $offset); if (-f $newname && (-M _ >= -M INPUT)) { # no need to recreate it, it's fresh my $past = 0; while () { if ($past) { last if (!defined($_) || substr($_,0,5) eq "From "); } else { $past = 1; } } } else { seek (INPUT, $where, 0); open (OUTPUT, ">$newname") or die "cannot write to $newname"; binmode OUTPUT; my $past = 0; while () { if ($past) { last if (!defined($_) || substr($_,0,5) eq "From "); } else { $past = 1; } print OUTPUT; } close OUTPUT or die "failed to write to $newname"; chmod 0644, $newname or warn "cannot chmod $newname"; utime $atime, $mtime, $newname or warn "failed to touch $newname"; } push @created_files, $newname; remove_from_poss_delete($newname); $where = tell INPUT; $offset = $where; # we've already read the next "From " line, parse it now goto nextfrom; } } } close INPUT; if ($justcount) { open (CACHE, ">$countcache"); print CACHE $counter; close CACHE; return $counter; } else { print "extracted: $mboxpath: $counter files\n"; return @created_files; } } sub get_mbox_name { my ($mboxpath, $where) = @_; my $dstname = $mboxpath; $dstname =~ s/[^-_\.A-Za-z0-9]/_/gs; $dstname =~ s/_+/_/gs; $dstname =~ s/^_//gs; $dstname = $mbox_work."/".$dstname."/OFF".$where; return $dstname; } sub mbox_new_enough { my ($fromline) = @_; # From xscludshmkjgc@yahoo.com Thu Apr 29 20:02:18 2004 return unless ($fromline && $fromline =~ /^From \S+ +(.*)$/); $fromline = $1; $fromline .= " ".local_tz() unless $fromline =~ /(?:[-+]\d{4}|\b[A-Z]{2,4}\b)/; my $time = first_date($fromline); return message_is_useful_by_date($time); } sub message_is_useful_by_date { my ($date) = @_; return 0 unless $date; # undef or 0 date = unusable if (!$opt_after && !$opt_before) { # Not using the feature return 1; } elsif (!$opt_before) { # Just care about after return $date > $opt_after; } else { return (($date < $opt_before) && ($date > $opt_after)); } } sub dbg { return unless $DEBUG; warn "debug: ".join("", @_)."\n"; } sub first_date { my (@strings) = @_; foreach my $string (@strings) { my $time = parse_rfc822_date($string); return $time if defined($time) && $time; } return undef; } ########################################################################### my %TZ; my %MONTH; my $LOCALTZ; sub tz_init { # timezone mappings: in case of conflicts, use RFC 2822, then most # common and least conflicting mapping %TZ = ( # standard 'UT' => '+0000', 'UTC' => '+0000', # US and Canada 'NDT' => '-0230', 'AST' => '-0400', 'ADT' => '-0300', 'NST' => '-0330', 'EST' => '-0500', 'EDT' => '-0400', 'CST' => '-0600', 'CDT' => '-0500', 'MST' => '-0700', 'MDT' => '-0600', 'PST' => '-0800', 'PDT' => '-0700', 'HST' => '-1000', 'AKST' => '-0900', 'AKDT' => '-0800', 'HADT' => '-0900', 'HAST' => '-1000', # Europe 'GMT' => '+0000', 'BST' => '+0100', 'IST' => '+0100', 'WET' => '+0000', 'WEST' => '+0100', 'CET' => '+0100', 'CEST' => '+0200', 'EET' => '+0200', 'EEST' => '+0300', 'MSK' => '+0300', 'MSD' => '+0400', 'MET' => '+0100', 'MEZ' => '+0100', 'MEST' => '+0200', 'MESZ' => '+0200', # South America 'BRST' => '-0200', 'BRT' => '-0300', # Australia 'AEST' => '+1000', 'AEDT' => '+1100', 'ACST' => '+0930', 'ACDT' => '+1030', 'AWST' => '+0800', # New Zealand 'NZST' => '+1200', 'NZDT' => '+1300', # Asia 'JST' => '+0900', 'KST' => '+0900', 'HKT' => '+0800', 'SGT' => '+0800', 'PHT' => '+0800', # Middle East 'IDT' => '+0300', ); # month mappings %MONTH = (jan => 1, feb => 2, mar => 3, apr => 4, may => 5, jun => 6, jul => 7, aug => 8, sep => 9, oct => 10, nov => 11, dec => 12); } sub local_tz { return $LOCALTZ if defined($LOCALTZ); # standard method for determining local timezone my $time = time; my @g = gmtime($time); my @t = localtime($time); my $z = $t[1]-$g[1]+($t[2]-$g[2])*60+($t[7]-$g[7])*1440+($t[5]-$g[5])*525600; $LOCALTZ = sprintf("%+.2d%.2d", $z/60, $z%60); return $LOCALTZ; } sub parse_rfc822_date { my ($date) = @_; local ($_); my ($yyyy, $mmm, $dd, $hh, $mm, $ss, $mon, $tzoff); # make it a bit easier to match $_ = " $date "; s/, */ /gs; s/\s+/ /gs; # now match it in parts. Date part first: if (s/ (\d+) (Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec) (\d{4}) / /i) { $dd = $1; $mon = lc($2); $yyyy = $3; } elsif (s/ (Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec) +(\d+) \d+:\d+:\d+ (\d{4}) / /i) { $dd = $2; $mon = lc($1); $yyyy = $3; } elsif (s/ (\d+) (Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec) (\d{2,3}) / /i) { $dd = $1; $mon = lc($2); $yyyy = $3; } else { dbg("util: time cannot be parsed: $date"); return undef; } # handle two and three digit dates as specified by RFC 2822 if (defined $yyyy) { if (length($yyyy) == 2 && $yyyy < 50) { $yyyy += 2000; } elsif (length($yyyy) != 4) { # three digit years and two digit years with values between 50 and 99 $yyyy += 1900; } } # hh:mm:ss if (s/ (\d?\d):(\d\d)(:(\d\d))? / /) { $hh = $1; $mm = $2; $ss = $4 || 0; } # numeric timezones if (s/ ([-+]\d{4}) / /) { $tzoff = $1; } # common timezones elsif (s/\b([A-Z]{2,4}(?:-DST)?)\b/ / && exists $TZ{$1}) { $tzoff = $TZ{$1}; } # all other timezones are considered equivalent to "-0000" $tzoff ||= '-0000'; # months if (exists $MONTH{$mon}) { $mmm = $MONTH{$mon}; } $hh ||= 0; $mm ||= 0; $ss ||= 0; $dd ||= 0; $mmm ||= 0; $yyyy ||= 0; # Time::Local (v1.10 at least) throws warnings when the dates cause # a 32-bit overflow. So force a min/max for year. if ($yyyy > 2037) { dbg("util: date after supported range, forcing year to 2037: $date"); $yyyy = 2037; } elsif ($yyyy < 1970) { dbg("util: date before supported range, forcing year to 1970: $date"); $yyyy = 1971; } my $time; eval { # could croak $time = timegm($ss, $mm, $hh, $dd, $mmm-1, $yyyy); }; if ($@) { dbg("util: time cannot be parsed: $date, $yyyy-$mmm-$dd $hh:$mm:$ss: $@"); return undef; } if ($tzoff =~ /([-+])(\d\d)(\d\d)$/) # convert to seconds difference { $tzoff = (($2 * 60) + $3) * 60; if ($1 eq '-') { $time += $tzoff; } else { $time -= $tzoff; } } return $time; } sub time_to_rfc822_date { my($time) = @_; my @days = qw/Sun Mon Tue Wed Thu Fri Sat/; my @months = qw/Jan Feb Mar Apr May Jun Jul Aug Sep Oct Nov Dec/; my @localtime = localtime($time || time); $localtime[5]+=1900; sprintf("%s, %02d %s %4d %02d:%02d:%02d %s", $days[$localtime[6]], $localtime[3], $months[$localtime[4]], @localtime[5,2,1,0], local_tz()); } ########################################################################### __DATA__ Quick test/demo. Given the following input structure: src1/{ham,spam}/{1,2,3} src2/{ham,spam}/{1,2} src4/{ham,spam}/1 and this command: ../mk-corpus-link-farm \ -dest ./out1 -num 1 -dest ./out2 -num 2 -dest ./out3 -num 5 \ src* we want: out1/{ham,spam}/1 out2/{ham,spam}/{1,2} out3/{ham,spam}/{1,2,3} [and a warning that we exhausted the sources, because we actually asked for 5 mails in each class of out3.] test commands: mkdir t_splitcorpus; cd t_splitcorpus; mkdir -p src{1,2,3}/{ham,spam} for f in src1/{ham,spam}/{1,2,3} src2/{ham,spam}/{1,2} src3/{ham,spam}/1 do echo > $f ; done; ../mk-corpus-link-farm \ -dest ./out1 -num 1 -dest ./out2 -num 2 -dest ./out3 -num 5 \ src* ../mk-corpus-link-farm \ -dest ./out1 -num 1 -dest ./out2 -num 2 -dest ./out3 -num 5 \ src1/*.mbox src2 src3