#!/usr/bin/env perl ############################################################################ # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. ############################################################################### # Tests for pig streaming. # # This configuration file follows streaming functional spec: http://wiki.apache.org/pig/PigStreamingFunctionalSpec $cfg = { 'driver' => 'Pig', 'nummachines' => 5, 'groups' => [ { # This group is for local mode testing 'name' => 'StreamingLocal', 'sortBenchmark' => 1, 'sortResults' => 1, 'floatpostprocess' => 1, 'delimiter' => ' ', 'tests' => [ { #Section 1.1: perl script, no parameters 'num' => 1, 'execonly' => 'local', 'pig' => q# A = load ':INPATH:/singlefile/studenttab10k'; B = foreach A generate $0, $1, $2; C = stream B through `perl :SCRIPTHOMEPATH:/PigStreaming.pl`; store C into ':OUTPATH:';#, 'sql' => "select name, age, gpa from studenttab10k;", }, { #Section 1.3: define clause; perl script, with parameters 'num' => 2, 'execonly' => 'local', 'pig' => q# define CMD `perl :SCRIPTHOMEPATH:/PigStreaming.pl - -`; A = load ':INPATH:/singlefile/studenttab10k'; B = foreach A generate $0, $1, $2; C = stream B through CMD; store C into ':OUTPATH:';#, 'sql' => "select name, age, gpa from studenttab10k;", }, { # Section 1.4: grouped data 'num' => 3, 'execonly' => 'local', 'pig' => q# define CMD `perl :SCRIPTHOMEPATH:/GroupBy.pl '\t' 0` ship(':SCRIPTHOMEPATH:/GroupBy.pl'); A = load ':INPATH:/singlefile/studenttab10k'; B = group A by $0; C = foreach B generate flatten(A); D = stream C through CMD; store D into ':OUTPATH:';#, 'sql' => "select name, count(*) from studenttab10k group by name;", }, { # Section 1.4: grouped and ordered data 'num' => 4, 'execonly' => 'local', 'pig' => q# define CMD `perl :SCRIPTHOMEPATH:/GroupBy.pl '\t' 0 1`; A = load ':INPATH:/singlefile/studenttab10k'; B = group A by $0; C = foreach B { D = order A by $1; generate flatten(D); }; E = stream C through CMD; store E into ':OUTPATH:';#, 'sql' => "select name, age, count(*) from studenttab10k group by name, age;", }, { # Section 1.5: multiple streaming operators - adjacent - before local rearrange 'num' => 5, 'execonly' => 'local', 'pig' => q# register :FUNCPATH:/testudf.jar; define CMD `perl :SCRIPTHOMEPATH:/PigStreamingDepend.pl` input(stdin using org.apache.pig.test.udf.streaming.StreamingDump); A = load ':INPATH:/singlefile/studenttab10k'; B = stream A through `perl :SCRIPTHOMEPATH:/PigStreaming.pl`; C = stream B through CMD as (name, age, gpa); D = foreach C generate name, age; store D into ':OUTPATH:';#, 'sql' => "select name, age from studenttab10k;", }, { # Section 1.5: multiple streaming operators - not adjacent - before local rearrange 'num' => 6, 'execonly' => 'local', 'pig' => q# register :FUNCPATH:/testudf.jar; A = load ':INPATH:/singlefile/studenttab10k'; define CMD `perl :SCRIPTHOMEPATH:/PigStreamingDepend.pl` input(stdin using org.apache.pig.test.udf.streaming.StreamingDump); B = stream A through CMD as (name, age, gpa); C = filter B by age < '20'; D = foreach C generate name; define CMD `perl :SCRIPTHOMEPATH:/PigStreaming.pl - - :SCRIPTHOMEPATH:/nameMap`; E = stream D through CMD; store E into ':OUTPATH:';#, 'sql' => "select UPPER(name) from studenttab10k where age < '20';", }, { # Section 1.5: multiple streaming operators - adjacent - after local rearrange 'num' => 7, 'execonly' => 'local', 'pig' => q# register :FUNCPATH:/testudf.jar; define CMD1 `perl :SCRIPTHOMEPATH:/GroupBy.pl '\t' 0 1`; define CMD2 `perl :SCRIPTHOMEPATH:/PigStreamingDepend.pl` input(stdin using org.apache.pig.test.udf.streaming.StreamingDump); A = load ':INPATH:/singlefile/studenttab10k'; B = group A by $0; C = foreach B { D = order A by $1; generate flatten(D); }; E = stream C through CMD1; F = stream E through CMD2; store F into ':OUTPATH:';#, 'sql' => "select name, age, count(*) from studenttab10k group by name, age;", }, { # Section 1.5: multiple streaming operators - one before and one after local rearrange # same alias name 'num' => 8, 'execonly' => 'local', 'pig' => q# register :FUNCPATH:/testudf.jar; define CMD1 `perl :SCRIPTHOMEPATH:/GroupBy.pl '\t' 0`; define CMD2 `perl :SCRIPTHOMEPATH:/PigStreamingDepend.pl` input(stdin using org.apache.pig.test.udf.streaming.StreamingDump); A = load ':INPATH:/singlefile/studenttab10k'; B = stream A through CMD2; C = group B by $0; D = foreach C generate flatten(B); B = stream D through CMD1; store B into ':OUTPATH:';#, 'sql' => "select name, count(*) from studenttab10k group by name;", }, { # Section 3.1: use of custom deserializer 'num' => 9, 'execonly' => 'local', 'pig' => q# define CMD `perl :SCRIPTHOMEPATH:/PigStreaming.pl` output(stdout using PigStreaming()); A = load ':INPATH:/singlefile/studenttab10k'; B = stream A through CMD; store B into ':OUTPATH:';#, 'sql' => "select name, age, gpa from studenttab10k;", }, { # Section 3.1: use of custom serializer and deserializer 'num' => 10, 'execonly' => 'local', 'pig' => q# register :FUNCPATH:/testudf.jar; define CMD `perl :SCRIPTHOMEPATH:/PigStreaming.pl` input(stdin using org.apache.pig.test.udf.streaming.StreamingDump) output(stdout using org.apache.pig.test.udf.streaming.DumpStreamer); A = load ':INPATH:/singlefile/studenttab10k'; B = stream A through CMD as (name, age, gpa); C = foreach B generate name, age; store C into ':OUTPATH:';#, 'sql' => "select name, age from studenttab10k;", }, { # Section 3.3: streaming application reads from file rather than stdin 'num' => 11, 'execonly' => 'local', 'pig' => q# define CMD `perl :SCRIPTHOMEPATH:/PigStreaming.pl foo -` input('foo'); A = load ':INPATH:/singlefile/studenttab10k'; B = stream A through CMD; store B into ':OUTPATH:';#, 'sql' => "select name, age, gpa from studenttab10k;", }, { # Section 3.4: streaming application writes single output to a file 'num' => 12, 'execonly' => 'local', 'pig' => q# define CMD `perl :SCRIPTHOMEPATH:/PigStreaming.pl - foo :SCRIPTHOMEPATH:/nameMap` output('foo' using PigStreaming); A = load ':INPATH:/singlefile/studenttab10k'; B = foreach A generate $0; C = stream B through CMD; store C into ':OUTPATH:';#, 'sql' => "select upper(name) from studenttab10k;", }, { # Section 3.4: streaming application writes multiple outputs to file 'num' => 13, 'execonly' => 'local', 'pig' => q# register :FUNCPATH:/testudf.jar; define CMD `perl :SCRIPTHOMEPATH:/PigStreamingDepend.pl - sio_5_1 sio_5_2` input(stdin using org.apache.pig.test.udf.streaming.StreamingDump) output('sio_5_1', 'sio_5_2'); A = load ':INPATH:/singlefile/studenttab10k'; B = stream A through CMD; store B into ':OUTPATH:';#, 'sql' => "select name, age, gpa from studenttab10k;", }, { # Section 3.4: streaming application writes multiple outputs: 1 to file and 1 to stdout 'num' => 14, 'execonly' => 'local', 'pig' => q# register :FUNCPATH:/testudf.jar; define CMD `perl :SCRIPTHOMEPATH:/PigStreamingDepend.pl - - sio_5_2` input(stdin using org.apache.pig.test.udf.streaming.StreamingDump) output(stdout, 'sio_5_2'); A = load ':INPATH:/singlefile/studenttab10k'; B = stream A through CMD; store B into ':OUTPATH:';#, 'sql' => "select name, age, gpa from studenttab10k;", }, { # Section 4.3: integration with parameter substitition 'num' => 15, 'execonly' => 'local', 'pig_params' => ['-p', qq(script_name='PigStreaming.pl')], 'pig' => q# define CMD `perl :SCRIPTHOMEPATH:/$script_name - - :SCRIPTHOMEPATH:/nameMap`; A = load ':INPATH:/singlefile/studenttab10k'; B = foreach A generate $0; C = stream B through CMD as (name); D = group C by name; E = foreach D generate group, COUNT(C); store E into ':OUTPATH:';#, 'sql' => "select upper(name) as nm, count(*) from studenttab10k group by nm;", }, { # Section 5.1: load/store optimization 'num' => 16, 'execonly' => 'local', 'pig' => q# define CMD `perl :SCRIPTHOMEPATH:/PigStreaming.pl`; A = load ':INPATH:/singlefile/studenttab10k'; C = stream A through CMD; store C into ':OUTPATH:';#, 'sql' => "select name, age, gpa from studenttab10k;", }, { # PIG-272: problem with optimization and intermediate store 'num' => 17, 'execonly' => 'local', 'pig' => q# define CMD1 `perl -ne 'print $_;print STDERR "stderr $_";'`; define CMD2 `:SCRIPTHOMEPATH:/Split.pl 3` input(stdin using PigStreaming(',')); A = load ':INPATH:/singlefile/studenttab10k'; B = stream A through CMD1; C = stream B through CMD1; D = stream C through CMD2; store D into ':OUTPATH:';#, 'sql' => "select name, age, gpa from studenttab10k;", }, { # PIG-272: problem with optimization and intermediate store 'num' => 18, 'execonly' => 'local', 'pig' => q# define CMD1 `perl -ne 'print $_;'`; define CMD2 `:SCRIPTHOMEPATH:/Split.pl 3` input(stdin using PigStreaming(',')); A = load ':INPATH:/singlefile/studenttab10k'; B = stream A through CMD1; store B into ':OUTPATH:.intermediate'; C = stream B through CMD1; D = stream C through CMD2; E = JOIN B by $0, D by $0; store E into ':OUTPATH:';#, 'notmq' => 1, 'sql' => "select A.name, A.age, A.gpa, B.name, B.age, B.gpa from studenttab10k as A join studenttab10k as B using(name);", }, ] }, ] } ;