#!/usr/bin/env perl ############################################################################ # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. ############################################################################### # Tests for pig streaming. # # This configuration file follows streaming functional spec: http://wiki.apache.org/pig/PigStreamingFunctionalSpec $cfg = { 'driver' => 'Pig', 'nummachines' => 5, 'groups' => [ { # This group corresponds to Section 1: Computation Specification 'name' => 'ComputeSpec', 'sortBenchmark' => 1, 'sortResults' => 1, 'floatpostprocess' => 1, 'delimiter' => ' ', 'tests' => [ { # Section 1.1: inline script 'num' => 1, 'pig' => q# A = load ':INPATH:/singlefile/studenttab10k'; B = foreach A generate $2, $1, $0; C = stream B through `awk 'BEGIN {FS = "\t"; OFS = "\t"} {print $3, $2, $1}'`; store C into ':OUTPATH:';#, 'sql' => "select name, age, gpa from studenttab10k;", }, { # Section 1.1: iline script with schema and projection 'num' => 2, 'pig' => q# A = load ':INPATH:/singlefile/studenttab10k'; B = foreach A generate $2, $1, $0; C = stream B through `awk 'BEGIN {FS = "\t"; OFS = "\t"} {print $3, $2, $1}'` as (name, age, gpa); D = foreach C generate name, age; store D into ':OUTPATH:';#, 'sql' => "select name, age from studenttab10k;", }, { # Section 1.1: inline script with pipe 'num' => 3, 'pig' => q# A = load ':INPATH:/singlefile/studenttab10k'; B = stream A through `cut -f 1 | sort`; C = order B by $0; store C into ':OUTPATH:';#, 'sql' => "select name from studenttab10k order by name;", }, { #Section 1.1: perl script, no parameters, autoship(Section 2.1) 'num' => 4, 'execonly' => 'mapred', 'pig' => q# A = load ':INPATH:/singlefile/studenttab10k'; B = foreach A generate $0, $1, $2; C = stream B through `perl PigStreaming.pl`; store C into ':OUTPATH:';#, 'sql' => "select name, age, gpa from studenttab10k;", }, { # Section 1.2: perl script that takes parameters; explicit ship of script (Section 2.1) 'num' => 5, 'execonly' => 'mapred', 'pig' => q# define CMD `perl PigStreaming.pl - -` ship(':SCRIPTHOMEPATH:/PigStreaming.pl') stderr('CMD' limit 1); A = load ':INPATH:/singlefile/studenttab10k'; B = foreach A generate $0, $1, $2; C = stream B through CMD; store C into ':OUTPATH:';#, 'sql' => "select name, age, gpa from studenttab10k;", }, { # Section 1.3: define clause; explicit ship of script (Section 2.1) 'num' => 6, 'execonly' => 'mapred', 'pig' => q# define CMD `perl PigStreaming.pl` ship(':SCRIPTHOMEPATH:/PigStreaming.pl') stderr('CMD'); A = load ':INPATH:/singlefile/studenttab10k'; B = foreach A generate $0, $1, $2; C = stream B through CMD as (name, age, gpa); D = foreach C generate name, gpa; store D into ':OUTPATH:';#, 'sql' => "select name, gpa from studenttab10k;", }, { # Section 1.4: grouped data 'num' => 7, 'execonly' => 'mapred', 'pig' => q# define CMD `perl GroupBy.pl '\t' 0` ship(':SCRIPTHOMEPATH:/GroupBy.pl'); A = load ':INPATH:/singlefile/studenttab10k'; B = group A by $0; C = foreach B generate flatten(A); D = stream C through CMD; store D into ':OUTPATH:';#, 'sql' => "select name, count(*) from studenttab10k group by name;", }, { # Section 1.4: grouped and ordered data 'num' => 8, 'execonly' => 'mapred', 'pig' => q# define CMD `perl GroupBy.pl '\t' 0 1` ship(':SCRIPTHOMEPATH:/GroupBy.pl'); A = load ':INPATH:/singlefile/studenttab10k'; B = group A by $0; C = foreach B { D = order A by $1; generate flatten(D); }; E = stream C through CMD; store E into ':OUTPATH:';#, 'sql' => "select name, age, count(*) from studenttab10k group by name, age;", }, { # Section 1.5: multiple streaming operators - adjacent - map side 'num' => 9, 'execonly' => 'mapred', 'pig' => q# define CMD `perl PigStreamingDepend.pl` input(stdin) ship(':SCRIPTHOMEPATH:/PigStreamingDepend.pl', ':SCRIPTHOMEPATH:/PigStreamingModule.pm'); A = load ':INPATH:/singlefile/studenttab10k'; B = stream A through `perl PigStreaming.pl`; C = stream B through CMD as (name, age, gpa); D = foreach C generate name, age; store D into ':OUTPATH:';#, 'sql' => "select name, age from studenttab10k;", }, { # Section 1.5: multiple streaming operators - not adjacent - map side 'num' => 10, 'execonly' => 'mapred', 'pig' => q# A = load ':INPATH:/singlefile/studenttab10k'; define CMD `perl PigStreamingDepend.pl` input(stdin) ship(':SCRIPTHOMEPATH:/PigStreamingDepend.pl', ':SCRIPTHOMEPATH:/PigStreamingModule.pm'); B = stream A through CMD as (name, age, gpa); C = filter B by age < '20'; D = foreach C generate name; define CMD `perl PigStreaming.pl - - nameMap` ship(':SCRIPTHOMEPATH:/PigStreaming.pl', ':SCRIPTHOMEPATH:/nameMap'); E = stream D through CMD; store E into ':OUTPATH:';#, 'sql' => "select UPPER(name) from studenttab10k where age < '20';", }, { # Section 1.5: multiple streaming operators - adjacent - reduce side 'num' => 11, 'execonly' => 'mapred', 'pig' => q# define CMD1 `perl GroupBy.pl '\t' 0 1` ship(':SCRIPTHOMEPATH:/GroupBy.pl') stderr('CMD1'); define CMD2 `perl PigStreamingDepend.pl` input(stdin) ship(':SCRIPTHOMEPATH:/PigStreamingDepend.pl', ':SCRIPTHOMEPATH:/PigStreamingModule.pm') stderr('CMD2'); A = load ':INPATH:/singlefile/studenttab10k'; B = group A by $0; C = foreach B { D = order A by $1; generate flatten(D); }; E = stream C through CMD1; F = stream E through CMD2; store F into ':OUTPATH:';#, 'sql' => "select name, age, count(*) from studenttab10k group by name, age;", }, { # Section 1.5: multiple streaming operators - one on map and one on reduce side # same alias name 'num' => 12, 'execonly' => 'mapred', 'pig' => q# define CMD1 `perl GroupBy.pl '\t' 0` ship(':SCRIPTHOMEPATH:/GroupBy.pl'); define CMD2 `perl PigStreamingDepend.pl` input(stdin) ship(':SCRIPTHOMEPATH:/PigStreamingDepend.pl', ':SCRIPTHOMEPATH:/PigStreamingModule.pm'); A = load ':INPATH:/singlefile/studenttab10k'; B = stream A through CMD2; C = group B by $0; D = foreach C generate flatten(B); B = stream D through CMD1; store B into ':OUTPATH:';#, 'sql' => "select name, count(*) from studenttab10k group by name;", }, { # Section 1.5: multiple streaming operators - adjacent - map side 'num' => 13, 'execonly' => 'mapred', 'pig' => q# define CMD `perl PigStreamingDepend.pl` input(stdin) ship(':SCRIPTHOMEPATH:/PigStreamingDepend.pl', ':SCRIPTHOMEPATH:/PigStreamingModule.pm'); A = load ':INPATH:/singlefile/studenttab10k'; B = stream A through CMD; C = stream B through CMD as (name, age, gpa); D = foreach C generate name, age; store D into ':OUTPATH:';#, 'sql' => "select name, age from studenttab10k;", }, # TODO python script and binary tests for Section 1.1 ] }, { # This group corresponds to Section 2: JobManagement 'name' => 'JobManagement', 'sortBenchmark' => 1, 'sortResults' => 1, 'floatpostprocess' => 1, 'delimiter' => ' ', 'tests' => [ { # Section 2.1: perl script and its dependency shipped # Also covers part of section 3.1: custom serializer 'num' => 1, 'execonly' => 'mapred', 'pig' => q# define CMD `perl PigStreamingDepend.pl` input(stdin) ship(':SCRIPTHOMEPATH:/PigStreamingDepend.pl', ':SCRIPTHOMEPATH:/PigStreamingModule.pm'); A = load ':INPATH:/singlefile/studenttab10k'; B = stream A through CMD; store B into ':OUTPATH:';#, 'sql' => "select name, age, gpa from studenttab10k;", }, { # Section 2.1: perl script and supported data file is shipped 'num' => 2, 'execonly' => 'mapred', 'pig' => q# define CMD `perl PigStreaming.pl - - nameMap` ship(':SCRIPTHOMEPATH:/PigStreaming.pl', ':SCRIPTHOMEPATH:/nameMap'); A = load ':INPATH:/singlefile/studenttab10k'; B = foreach A generate $0; C = stream B through CMD as (name); D = group C by name; E = foreach D generate group, COUNT(C); store E into ':OUTPATH:';#, 'sql' => "select upper(name) as nm, count(*) from studenttab10k group by nm;", }, { # Section 2.2: script is shipped while the supporting file is cached 'num' => 3, 'execonly' => 'mapred', 'pig' => q@ define CMD `perl PigStreaming.pl - - nameMap` ship(':SCRIPTHOMEPATH:/PigStreaming.pl') cache(':INPATH:/nameMap/part-00000#nameMap'); A = load ':INPATH:/singlefile/studenttab10k'; B = foreach A generate $0; C = stream B through CMD as (name); D = group C by name; E = foreach D generate group, COUNT(C); store E into ':OUTPATH:';@, 'sql' => "select upper(name) as nm, count(*) from studenttab10k group by nm;", }, # TODO skip path # TODO ship python script ] }, { # This group corresponds to Section 3: Input/Output Handling 'name' => 'StreamingIO', 'sortBenchmark' => 1, 'sortResults' => 1, 'floatpostprocess' => 1, 'delimiter' => ' ', 'tests' => [ { # Section 3.1: use of custom deserializer 'num' => 1, 'execonly' => 'mapred', 'pig' => q# define CMD `perl PigStreaming.pl` output(stdout) ship(':SCRIPTHOMEPATH:/PigStreaming.pl'); A = load ':INPATH:/singlefile/studenttab10k'; B = stream A through CMD; store B into ':OUTPATH:';#, 'sql' => "select name, age, gpa from studenttab10k;", }, { # Section 3.1: use of custom serializer and deserializer 'num' => 2, 'execonly' => 'mapred', 'pig' => q# register :FUNCPATH:/testudf.jar; define CMD `perl PigStreaming.pl` input(stdin using org.apache.pig.test.udf.streaming.StreamingDump) output(stdout using org.apache.pig.test.udf.streaming.DumpStreamer) ship(':SCRIPTHOMEPATH:/PigStreaming.pl'); A = load ':INPATH:/singlefile/studenttab10k'; B = stream A through CMD as (name, age, gpa); C = foreach B generate name, age; store C into ':OUTPATH:';#, 'sql' => "select name, age from studenttab10k;", }, { # Section 3.3: streaming application reads from file rather than stdin 'num' => 3, 'execonly' => 'mapred', 'pig' => q# define CMD `perl PigStreaming.pl foo -` input('foo') ship(':SCRIPTHOMEPATH:/PigStreaming.pl'); A = load ':INPATH:/singlefile/studenttab10k'; B = stream A through CMD; store B into ':OUTPATH:';#, 'sql' => "select name, age, gpa from studenttab10k;", }, { # Section 3.4: streaming application writes single output to a file 'num' => 4, 'execonly' => 'mapred', 'pig' => q# define CMD `perl PigStreaming.pl - foo nameMap` output('foo') ship(':SCRIPTHOMEPATH:/PigStreaming.pl', ':SCRIPTHOMEPATH:/nameMap'); A = load ':INPATH:/singlefile/studenttab10k'; B = foreach A generate $0; C = stream B through CMD; store C into ':OUTPATH:';#, 'sql' => "select upper(name) from studenttab10k;", }, { # Section 3.4: streaming application writes multiple outputs to file 'num' => 5, 'execonly' => 'mapred', 'pig' => q# define CMD `perl PigStreamingDepend.pl - sio_5_1 sio_5_2` input(stdin) output('sio_5_1', 'sio_5_2') ship(':SCRIPTHOMEPATH:/PigStreamingDepend.pl', ':SCRIPTHOMEPATH:/PigStreamingModule.pm'); A = load ':INPATH:/singlefile/studenttab10k'; B = stream A through CMD; store B into ':OUTPATH:';#, 'sql' => "select name, age, gpa from studenttab10k;", }, { # Section 3.4: streaming application writes multiple outputs: 1 to file and 1 to stdout 'num' => 6, 'execonly' => 'mapred', 'pig' => q# define CMD `perl PigStreamingDepend.pl - - sio_5_2` input(stdin) output(stdout, 'sio_5_2') ship(':SCRIPTHOMEPATH:/PigStreamingDepend.pl', ':SCRIPTHOMEPATH:/PigStreamingModule.pm'); A = load ':INPATH:/singlefile/studenttab10k'; B = stream A through CMD; store B into ':OUTPATH:';#, 'sql' => "select name, age, gpa from studenttab10k;", }, ] }, { # This group corresponds to Section 4: Non-streaming Features 'name' => 'NonStreaming', 'sortBenchmark' => 1, 'sortResults' => 1, 'floatpostprocess' => 1, 'delimiter' => ' ', 'tests' => [ { # Section 4.3: integration with parameter substitition 'num' => 1, 'execonly' => 'mapred', 'pig_params' => ['-p', qq(script_name='PigStreaming.pl')], 'pig' => q# define CMD `perl $script_name - - nameMap` ship(':SCRIPTHOMEPATH:/$script_name', ':SCRIPTHOMEPATH:/nameMap'); A = load ':INPATH:/singlefile/studenttab10k'; B = foreach A generate $0; C = stream B through CMD as (name); D = group C by name; E = foreach D generate group, COUNT(C); store E into ':OUTPATH:';#, 'sql' => "select upper(name) as nm, count(*) from studenttab10k group by nm;", }, ] }, { # This group corresponds to Section 5: Performance 'name' => 'StreamingPerformance', 'sortBenchmark' => 1, 'sortResults' => 1, 'floatpostprocess' => 1, 'delimiter' => ' ', 'tests' => [ { # Section 5.1: load/store optimization 'num' => 1, 'execonly' => 'mapred', 'pig' => q# define CMD `perl PigStreaming.pl` ship(':SCRIPTHOMEPATH:/PigStreaming.pl') stderr('CMD'); A = load ':INPATH:/singlefile/studenttab10k'; C = stream A through CMD; store C into ':OUTPATH:';#, 'sql' => "select name, age, gpa from studenttab10k;", }, { # PIG-272: problem with optimization and intermediate store 'num' => 2, 'pig' => q# define CMD1 `perl -ne 'print $_;'`; define CMD2 `perl -ne 'print $_;'`; A = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa); B = stream A through CMD1 as (name, age, gpa); store B into ':OUTPATH:.intermediate'; C = stream B through CMD2 as (name, age, gpa); D = JOIN B by name, C by name; store D into ':OUTPATH:';#, 'notmq' => 1, 'sql' => "select A.name, A.age, A.gpa, B.name, B.age, B.gpa from studenttab10k as A join studenttab10k as B using(name);", }, { # PIG-272: problem with optimization and intermediate store 'num' => 3, 'execonly' => 'mapred', 'pig' => q# define CMD1 `perl -ne 'print $_;print STDERR "stderr $_";'`; define CMD2 `Split.pl 3` input(stdin using PigStreaming(',')) ship(':SCRIPTHOMEPATH:/Split.pl'); A = load ':INPATH:/singlefile//studenttab10k'; B = stream A through CMD1; C = stream B through CMD1; D = stream C through CMD2; store D into ':OUTPATH:';#, 'sql' => "select name, age, gpa from studenttab10k;", }, { # PIG-272: problem with optimization and intermediate store 'num' => 4, 'execonly' => 'mapred', 'pig' => q# define CMD1 `perl -ne 'print $_;'`; define CMD2 `Split.pl 3` input(stdin using PigStreaming(',')) ship(':SCRIPTHOMEPATH:/Split.pl'); A = load ':INPATH:/singlefile/studenttab10k'; B = stream A through CMD1; store B into ':OUTPATH:.intermediate'; C = stream B through CMD1; D = stream C through CMD2; E = JOIN B by $0, D by $0; store E into ':OUTPATH:';#, 'notmq' => 1, 'sql' => "select A.name, A.age, A.gpa, B.name, B.age, B.gpa from studenttab10k as A join studenttab10k as B using(name);", }, { # Make sure join with stream optimization works # optimization only on load side 'num' => 5, 'execonly' => 'mapred', 'pig' => q# A = load ':INPATH:/singlefile/studenttab10k'; B = stream A through `cat` as (name:chararray, age:int, gpa:double); C = foreach A generate $0, $1 + 100, $2 + 100.0; D = join C by $0, B by $0; store D into ':OUTPATH:';#, 'sql' => "select A.name, A.age + 100, A.gpa + 100.0, B.name, B.age, B.gpa from studenttab10k as A join studenttab10k as B using(name);", }, { # Make sure join with stream optimization works # optimization only on store side 'num' => 6, 'execonly' => 'mapred', 'pig' => q# A = load ':INPATH:/singlefile/studenttab10k'; B = filter A by $1 > 25; C = stream B through `cat` as (name:chararray, age:int, gpa:double); store C into ':OUTPATH:.intermediate'; D = join A by $0, C by $0; store D into ':OUTPATH:';#, 'notmq' => 1, 'sql' => "select A.name, A.age, A.gpa, B.name, B.age, B.gpa from studenttab10k as A join studenttab10k as B using(name) where b.age > 25;", }, { # Make sure join with stream optimization works # optimization on load and store 'num' => 7, 'execonly' => 'mapred', 'pig' => q# A = load ':INPATH:/singlefile/studenttab10k'; B = stream A through `cat` as (name:chararray, age:int, gpa:double); store B into ':OUTPATH:.intermediate'; C = foreach A generate $0, $1 + 100, $2 + 100.0; D = join C by $0, B by $0; store D into ':OUTPATH:';#, 'notmq' => 1, 'sql' => "select A.name, A.age + 100, A.gpa + 100.0, B.name, B.age, B.gpa from studenttab10k as A join studenttab10k as B using(name);", }, ] }, { # This group is for testing cases where there could # be race conditions 'name' => 'RaceConditions', 'sortBenchmark' => 1, 'sortResults' => 1, 'floatpostprocess' => 1, 'delimiter' => ' ', 'tests' => [ { # case where binary finishes normally # BEFORE all input has been passed to it 'num' => 1, 'pig' => q# A = load ':INPATH:/singlefile/studenttab10k'; B = stream A through `head -2`; store B into ':OUTPATH:';#, 'sql' => "select name, age, gpa from studenttab10k limit 2;", }, { # case where binary finishes normally # BEFORE all input has been passed to it 'num' => 2, 'execonly' => 'mapred', 'pig' => q# define CMD `perl DieRandomly.pl 10000 0` ship(':SCRIPTHOMEPATH:/DieRandomly.pl'); A = load ':INPATH:/singlefile/studenttab10k'; B = stream A through CMD; store B into ':OUTPATH:';#, # the above pig script should return a 0 byte output # to simulate that have a query which return 0 results 'sql' => "select name, age, gpa from studenttab10k where name = 'pigtester';", }, { # Join with one side having a stream # where binary finishes normally # BEFORE all input has been passed to it 'num' => 3, 'pig' => q# A = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa); B = stream A through `head -1` as (name, age, gpa); C = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa); D = join B by name, C by name; store D into ':OUTPATH:';#, 'sql' => "select a.name, a.age, a.gpa, b.name, b.age, b.gpa from studenttab10k as a join studenttab10k as b using(name) where a.name = 'ulysses thompson' and a.age = 64;", }, { # Join with both sides having a stream # where binary finishes normally # BEFORE all input has been passed to it # FIXME: in local mode 'num' => 4, 'execonly' => 'mapred', 'pig' => q# A = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa); B = stream A through `head -1` as (name, age, gpa); C = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa); D = stream C through `head -1` as (name, age, gpa); E = join B by name, D by name; store E into ':OUTPATH:';#, 'sql' => "select a.name, a.age, a.gpa, b.name, b.age, b.gpa from studenttab10k as a join studenttab10k as b using(name) where a.name = 'ulysses thompson' and a.age = 64 and b.name = 'ulysses thompson' and b.age = 64;", }, { # Union with one side having a stream # where binary finishes normally # BEFORE all input has been passed to it # and emits no output 'num' => 5, 'execonly' => 'mapred', 'pig' => q# define CMD `perl DieRandomly.pl 10000 0` ship(':SCRIPTHOMEPATH:/DieRandomly.pl'); A = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa); B = stream A through CMD as (name, age, gpa); C = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa); D = union B,C; store D into ':OUTPATH:';#, 'sql' => "select * from studenttab10k ;", }, { # Union with one side having two stream binaries # one of them is head -10 followed by another # binary which finishes normally # BEFORE all input has been passed to it # and emits no output 'num' => 6, 'execonly' => 'mapred', 'pig' => q# define CMD `perl DieRandomly.pl 10000 0` ship(':SCRIPTHOMEPATH:/DieRandomly.pl'); A = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa); B = stream A through `head -10` as (name, age, gpa); C = stream A through CMD as (name, age, gpa); D = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa); E = union C,D; store E into ':OUTPATH:';#, 'sql' => "select * from studenttab10k ;", }, { # two stream operators one after another where first # one emits no output 'num' => 7, 'execonly' => 'mapred', 'pig' => q# define CMD `perl DieRandomly.pl 10000 0` ship(':SCRIPTHOMEPATH:/DieRandomly.pl'); A = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa); B = stream A through CMD as (name, age, gpa); C = stream B through `head -10` as (name, age, gpa); store C into ':OUTPATH:';#, # the above pig script should return a 0 byte output # to simulate that have a query which return 0 results 'sql' => "select name, age, gpa from studenttab10k where name = 'pigtester';", }, { # stream followed by split 'num' => 8, 'pig' => q# A = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa); B = stream A through `head -10` as (name, age, gpa); split B into C if name == 'ulysses thompson', D if name != 'ulysses thompson'; store C into ':OUTPATH:';#, 'sql' => "select name, age, gpa from studenttab10k where name = 'ulysses thompson' and age= 64;", }, ] }, ] } ;