#!/usr/bin/env perl ############################################################################ # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. #################################################################### # SUB: Multiquery # Please include a brief description here. # - _TEST_ The first example; one that is defined in the bug with one split # - in the map phase # - _TEST_ Multiple side files, all in map phase. # - _TEST_ Two loads and two stores in map phase. # - _TEST_ One split added in reduce phase and map-only splitee. # - _TEST_ One split added in reduce phase and one map-reduce splitee # - _TEST_ One split in reduce phase and two Map-Reduce splitees. # - _TEST_ Two loads and two stores in reduce phase # - _TEST_ Explicit split with two side files. # - _TEST_ Explicit split with order by and two side files. # - _TEST_ Implicit split with multiple side files. # - _TEST_ Streaming with multiple stores. # - _TEST_ Script with intermediate stores. # - _TEST_ Implicit split with order by and multiple side files. # - _TEST_ Self join using fragment replicate join with multiple side files. # - _TEST_ PigMix Test Case L12. # - _TEST_ One split in map phase and two Map-Reduce splitees with mixed combiners. # - _TEST_ One split in map phase and two Map-Reduce splitees without combiners. # - _TEST_ Splittees with different map key types and nested splits. # - _TEST_ Splittees with different map key type. # - _TEST_ Streaming in demux. # - _TEST_ Streaming in nested demux. # - _TEST_ PigMix Test Case L12 version 2 $cfg = { 'driver' => 'Pig', 'nummachines' => 5, 'groups' => [ { 'name' => 'MultiQuery', 'floatpostprocess' => 1, 'delimiter' => ' ', 'tests' => [ { # The first exmaple; one that is defined in the bug with one split # in the map phase 'num' => 1, 'pig' => q# a = load ':INPATH:/singlefile/studenttab10k' as (name: chararray, age: int, gpa: float); b = filter a by age < 22; store b into ':OUTPATH:.1'; c = group b by age; d = foreach c generate group, SUM(b.gpa); store d into ':OUTPATH:.2'; #, 'sql' => "select name, age, gpa from studenttab10k where age < 22; select age, sum(gpa) from studenttab10k where age < 22 group by age;", }, { # Multiple side files, all in map phase. 'num' => 2, 'pig' => q# a = load ':INPATH:/singlefile/studenttab10k' as (name: chararray, age: int, gpa: float); b = filter a by age < 22; store b into ':OUTPATH:.1'; c = filter b by gpa > 3.0; store c into ':OUTPATH:.2'; d = filter c by name < 'm'; store d into ':OUTPATH:.3'; #, 'sql' => "select name, age, gpa from studenttab10k where age < 22; select name, age, gpa from studenttab10k where age < 22 and gpa > 3.0; select name, age, gpa from studenttab10k where age < 22 and gpa > 3.0 and name < 'm';", }, { # Two loads and two stores in map phase. 'num' => 3, 'pig' => q# a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa); b = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions); c = filter a by age < 20; d = filter b by age < 20; store c into ':OUTPATH:.1'; store d into ':OUTPATH:.2'; e = cogroup c by name, d by name; f = foreach e generate flatten(c), flatten(d); store f into ':OUTPATH:.3'; #, 'sql' => "select name, age, gpa from studenttab10k where age < 20; select name, age, registration, contributions from votertab10k where age < 20; select a.name, a.age, a.gpa, b.name, b.age, b.registration, b.contributions from studenttab10k as a full outer join votertab10k as b using(name) where a.age < 20 and b.age < 20;", }, { # One split added in reduce phase and map-only splitee. 'num' => 4, 'pig' => q# a = load ':INPATH:/singlefile/studenttab10k' as (name: chararray, age: int, gpa: float); b = filter a by gpa < 3.0; c = group b by age; d = foreach c generate group, AVG(b.gpa); store d into ':OUTPATH:.1'; e = filter d by $1 > 1.5; store e into ':OUTPATH:.2'; #, 'sql' => "select age, avg(gpa) from studenttab10k where gpa < 3.0 group by age; select age, avg(gpa) from studenttab10k where gpa < 3.0 group by age having avg(gpa) > 1.5;", }, { # One split added in reduce phase and one map-reduce splitee 'num' => 5, 'pig' => q# a = load ':INPATH:/singlefile/studenttab10k' as (name: chararray, age: int, gpa: float); b = filter a by gpa < 3.0; c = group b by age; d = foreach c generate group, AVG(b.gpa); store d into ':OUTPATH:.1'; e = filter d by $1 > 1.5; f = group e by $1; g = foreach f generate group, SUM(e.$0); store g into ':OUTPATH:.2'; #, 'sql' => "select age, avg(gpa) from studenttab10k where gpa < 3.0 group by age; select t.b, sum(t.a) from (select age as a, avg(gpa) as b from studenttab10k where gpa < 3.0 group by age having avg(gpa) > 1.5) as t group by t.b;", }, { # One split in reduce phase and two Map-Reduce splitees. 'num' => 6, 'pig' => q# a = load ':INPATH:/singlefile/studenttab10k' as (name: chararray, age: int, gpa: float); b = filter a by gpa < 3.0; c = group b by age; d = foreach c generate group, AVG(b.gpa); e = filter d by $1 > 1.5; e1= group e by $1; e2 = foreach e1 generate group, SUM(e.$0); store e2 into ':OUTPATH:.1'; f = filter d by $1 <= 1.5; f1 = group f by $1; f2 = foreach f1 generate group, COUNT(f.$0); store f2 into ':OUTPATH:.2'; #, 'sql' => "select t.c1, sum(t.c0) from (select age as c0, avg(gpa) as c1 from studenttab10k where gpa < 3.0 group by age having avg(gpa) > 1.5) as t group by t.c1; select t.c1, count(t.c0) from (select age as c0, avg(gpa) as c1 from studenttab10k where gpa < 3.0 group by age having avg(gpa) <= 1.5) as t group by t.c1;", }, { # Two loads and two stores in reduce phase 'num' => 7, 'pig' => q# a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa); b = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions); c = filter a by age < 20; d = filter b by age < 20; e = cogroup c by name, d by name; f = foreach e generate flatten(c), flatten(d); g = group f by d::age; h = foreach g generate group, SUM(f.gpa); store h into ':OUTPATH:.1'; e = filter f by c::gpa < 3.0; store e into ':OUTPATH:.2'; #, 'sql' => "select c5, sum(c3) from (select a.name as c1, a.age as c2, a.gpa as c3, b.name as c4, b.age as c5, b.registration as c6, b.contributions as c7 from studenttab10k as a full outer join votertab10k as b using(name) where a.age < 20 and b.age < 20) as t group by t.c5; select a.name, a.age, a.gpa, b.name, b.age, b.registration, b.contributions from studenttab10k as a full outer join votertab10k as b using(name) where a.age < 20 and b.age < 20 and a.gpa < 3.0;", }, { # Explicit split with two side files. 'num'=> 8, 'pig' => q# a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa); split a into a1 if name > 'm', a2 if name <= 'm'; store a1 into ':OUTPATH:.1'; store a2 into ':OUTPATH:.2'; b = cogroup a1 by age, a2 by age; c = foreach b generate flatten(a1), flatten(a2); store c into ':OUTPATH:.3'; #, 'sql' => "select name, age, gpa from studenttab10k where name > 'm'; select name, age, gpa from studenttab10k where name <= 'm'; select A.name, A.age, A.gpa, B.name, B.age, B.gpa from (select * from studenttab10k where name > 'm') as A join (select * from studenttab10k where name <= 'm') as B using (age);", }, { # Explicit split with order by and two side files. 'num'=> 9, 'pig' => q# a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:float); split a into a1 if age > 50, a2 if name < 'm'; b2 = distinct a2; b1 = order a1 by name; store b2 into ':OUTPATH:.2'; store b1 into ':OUTPATH:.1'; c = cogroup b2 by name, b1 by name; d = foreach c generate flatten(group), COUNT($1), COUNT($2); store d into ':OUTPATH:.3'; #, 'sql' => "select name, age, gpa from studenttab10k where age > 50 order by name; select distinct name, age, gpa from studenttab10k where name < 'm'; select name, count(A.name), count(B.name) from (select distinct name from studenttab10k where name < 'm') as A join (select name from studenttab10k where age > 50) as B using (name) group by name;", 'verify_with_pig' => 1, 'verify_pig_version' => 'old', }, { # Implicit split with multiple side files. 'num'=> 10, 'pig' => q# a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:float); b = filter a by age > 50; c = filter a by gpa > 3.0; store c into ':OUTPATH:.1'; d = cogroup b by name, c by name; e = foreach d generate flatten(b), flatten(c); store e into ':OUTPATH:.2'; f = filter e by b::age < 75; store f into ':OUTPATH:.3'; #, 'sql' => "select name, age, gpa from studenttab10k where gpa > 3.0; select A.name, A.age, A.gpa, B.name, B.age, B.gpa from (select * from studenttab10k where age > 50) as A join (select * from studenttab10k where gpa > 3.0) as B using (name); select A.name, A.age, A.gpa, B.name, B.age, B.gpa from (select * from studenttab10k where age > 50) as A join (select * from studenttab10k where gpa > 3.0) as B using (name) where A.age < 75;", }, { # Streaming with multiple stores 'num' => 11, 'pig' => q# define CMD1 `perl -ne 'print $_;'`; define CMD2 `perl -ne 'print $_;'`; A = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa); B = stream A through CMD1 as (name, age, gpa); store B into ':OUTPATH:.1'; C = stream B through CMD2 as (name, age, gpa); D = JOIN B by name, C by name; store D into ':OUTPATH:.2'; #, 'sql' => "select name, age, gpa from studenttab10k; select A.name, A.age, A.gpa, B.name, B.age, B.gpa from studenttab10k as A join studenttab10k as B using(name);", }, { # With intermediate store 'num' => 12, 'pig' => q# A = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa); store A into ':OUTPATH:.1'; B = load ':OUTPATH:.1'; store B into ':OUTPATH:.2'; #, 'sql' => "select name, age, gpa from studenttab10k; select name, age, gpa from studenttab10k;", }, { # Implicit split with order by and multiple side files. 'num'=>13, 'pig' => q# a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:float); b = filter a by age > 50; c = filter a by gpa > 3.0; store c into ':OUTPATH:.1'; d = cogroup b by name, c by name; e = foreach d generate flatten(b), flatten(c); f = order e by b::name; store e into ':OUTPATH:.2'; f = filter e by b::age < 75; store f into ':OUTPATH:.3'; #, 'sql' => "select name, age, gpa from studenttab10k where gpa > 3.0; select A.name, A.age, A.gpa, B.name, B.age, B.gpa from (select * from studenttab10k where age > 50) as A join (select * from studenttab10k where gpa > 3.0) as B using (name) order by A.name; select A.name, A.age, A.gpa, B.name, B.age, B.gpa from (select * from studenttab10k where age > 50) as A join (select * from studenttab10k where gpa > 3.0) as B using (name) where A.age < 75 order by A.name;", }, # Self join using fragment replicate join with multiple side files { 'num' => 14, 'pig' => q# a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:double); b = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:double); c = filter a by age > 50; store c into ':OUTPATH:.1'; d = filter b by gpa > 3.0; store d into ':OUTPATH:.2'; e = join c by gpa, d by gpa using 'repl'; store e into ':OUTPATH:.3'; #, 'sql' => "select name, age, gpa from studenttab10k where age > 50; select name, age, gpa from studenttab10k where gpa > 3.0; select a.name, a.age, a.gpa, b.name, b.age, b.gpa from studenttab10k as a join studenttab10k as b using(gpa) where a.age > 50 and b.gpa > 3.0;", }, # PigMix Test Case L12 { 'num' => 15, 'pig' => q# a = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions); b = foreach a generate name, age, contributions; split b into c1 if age > 50, c2 if age <= 50; split c1 into d1 if name < 'm', d2 if name >= 'm'; e = group c2 by name; e1 = foreach e generate group, SUM(c2.contributions); store e1 into ':OUTPATH:.1'; f = group d1 by name; f1 = foreach f generate group, MAX(d1.contributions); store f1 into ':OUTPATH:.2'; g = group d2 by name; g1 = foreach g generate group, COUNT(d2); store g1 into ':OUTPATH:.3'; #, 'sql' => "select name, sum(contributions) from votertab10k where age <= 50 group by name; select name, max(contributions) from votertab10k where (age > 50 and name < 'm') group by name; select name, count(*) from votertab10k where (age > 50 and name >= 'm') group by name;", }, # One split in map phase and two Map-Reduce splitees with mixed combiner. { 'num' => 16, 'pig' => q# a = load ':INPATH:/singlefile/studenttab10k' as (name: chararray, age: int, gpa: float); b = filter a by gpa < 3.0; c = filter a by gpa >= 3.0; d = group b by age; e = foreach d generate group, AVG(b.gpa); store e into ':OUTPATH:.1'; f = group c by age; g = foreach f generate group, MAX(c.gpa) - MIN(c.gpa); store g into ':OUTPATH:.2'; #, 'sql' => "select age, avg(gpa) from studenttab10k where gpa < 3.0 group by age; select age, max(gpa) - min(gpa) from studenttab10k where gpa >= 3.0 group by age;", }, # One split in map phase and two Map-Reduce splitees without combiner. { 'num' => 17, 'pig' => q# a = load ':INPATH:/singlefile/studenttab10k' as (name: chararray, age: int, gpa: float); b = filter a by gpa < 3.0; c = filter a by gpa >= 3.0; d = group b by age; e = foreach d generate group, MAX(b.gpa) + MIN(b.gpa); store e into ':OUTPATH:.1'; f = group c by age; g = foreach f generate group, MAX(c.gpa) - MIN(c.gpa); store g into ':OUTPATH:.2'; #, 'sql' => "select age, max(gpa) + min(gpa) from studenttab10k where gpa < 3.0 group by age; select age, max(gpa) - min(gpa) from studenttab10k where gpa >= 3.0 group by age;", }, # Splittees with different map key types and nested splits { 'num' => 18, 'pig' => q# a = load ':INPATH:/singlefile/votertab10k' as (name: chararray, age:int, registration, contributions:double); b = foreach a generate name, age, contributions; split b into c1 if age > 10, c2 if age <= 60; split c1 into d1 if name < 'y', d2 if name >= 'c'; e = group c2 by name parallel 2; e1 = foreach e generate group, SUM(c2.contributions); store e1 into ':OUTPATH:.1'; f = group d1 by name parallel 3; f1 = foreach f generate group, MAX(d1.contributions); store f1 into ':OUTPATH:.2'; g = group d2 by age parallel 4; g1 = foreach g generate group, COUNT(d2); store g1 into ':OUTPATH:.3'; #, 'sql' => "select name, sum(contributions) from votertab10k where age <= 60 group by name; select name, max(contributions) from votertab10k where (age > 10 and name < 'y') group by name; select age, count(*) from votertab10k where (age > 10 and name >= 'c') group by age;", }, # Splittees with different map key types { 'num' => 19, 'pig' => q# a = load ':INPATH:/singlefile/votertab10k' as (name: chararray, age:int, registration, contributions:double); b = foreach a generate name, age, contributions; split b into c1 if age > 50, c2 if age <= 50; e = group c2 by name; e1 = foreach e generate group, SUM(c2.contributions); store e1 into ':OUTPATH:.1'; f = group c1 by age; f1 = foreach f generate group, MAX(c1.contributions); store f1 into ':OUTPATH:.2'; #, 'sql' => "select name, sum(contributions) from votertab10k where age <= 50 group by name; select age, max(contributions) from votertab10k where age > 50 group by age;", }, # Streaming in demux { 'num' => 20, 'execonly' => 'mapred', 'pig' => q# define CMD `perl GroupBy.pl '\t' 0` ship(':SCRIPTHOMEPATH:/GroupBy.pl'); A = load ':INPATH:/singlefile/studenttab10k'; split A into A1 if $0 < 'm', A2 if $0 >= 'm'; B = group A1 by $0; C = foreach B generate flatten(A1); D = stream C through CMD; store D into ':OUTPATH:.1'; E = group A2 by $0; F = foreach E generate group, COUNT(A2); store F into ':OUTPATH:.2';#, 'sql' => "select name, count(*) from studenttab10k where name < 'm' group by name; select name, count(*) from studenttab10k where name >= 'm' group by name;", }, # Streaming in nested demux { 'num' => 21, 'execonly' => 'mapred', 'pig' => q# define CMD `perl GroupBy.pl '\t' 0` ship(':SCRIPTHOMEPATH:/GroupBy.pl'); A = load ':INPATH:/singlefile/studenttab10k'; split A into A1 if $0 < 'm', A2 if $0 >= 'm'; split A1 into A3 if $1 < 30, A4 if $1 >= 30; B = group A3 by $0; C = foreach B generate flatten(A3); D = stream C through CMD; store D into ':OUTPATH:.1'; E = group A2 by $0; F = foreach E generate group, COUNT(A2); store F into ':OUTPATH:.2'; G = group A4 by $0; H = foreach G generate group, COUNT(A4); store H into ':OUTPATH:.3';#, 'sql' => "select name, count(*) from studenttab10k where name < 'm' and age < 30 group by name; select name, count(*) from studenttab10k where name >= 'm' group by name; select name, count(*) from studenttab10k where name < 'm' and age >= 30 group by name;", }, # PigMix Test Case L12 version 2 { 'num' => 22, 'pig' => q# a = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions); b = foreach a generate name, age, contributions; split b into c1 if age > 50, c2 if age <= 50; split c1 into d1 if name < 'm', d2 if name >= 'm'; e = group c2 by (name, age); e1 = foreach e generate flatten(group), SUM(c2.contributions); store e1 into ':OUTPATH:.1'; f = group d1 by (name, age); f1 = foreach f generate flatten(group), MAX(d1.contributions); store f1 into ':OUTPATH:.2'; g = group d2 by (name, age); g1 = foreach g generate flatten(group), COUNT(d2); store g1 into ':OUTPATH:.3'; #, 'sql' => "select name, age, sum(contributions) from votertab10k where age <= 50 group by name, age; select name, age, max(contributions) from votertab10k where (age > 50 and name < 'm') group by name, age; select name, age, count(*) from votertab10k where (age > 50 and name >= 'm') group by name, age;", }, # PigMix Test Case L12 version 3 (modified to have different map key types in inner split) { 'num' => 23, 'pig' => q# a = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions); b = foreach a generate name, age, contributions; split b into c1 if age > 50, c2 if age <= 50; split c1 into d1 if name < 'm', d2 if name >= 'm'; f = group d1 by name; f1 = foreach f generate flatten(group), MAX(d1.contributions); store f1 into ':OUTPATH:.1'; g = group d2 by (name, age); g1 = foreach g generate flatten(group), COUNT(d2); store g1 into ':OUTPATH:.2'; e = group c2 by (name, age); e1 = foreach e generate flatten(group), SUM(c2.contributions); store e1 into ':OUTPATH:.3'; #, 'sql' => "select name, max(contributions) from votertab10k where (age > 50 and name < 'm') group by name; select name, age, count(*) from votertab10k where (age > 50 and name >= 'm') group by name, age; select name, age, sum(contributions) from votertab10k where age <= 50 group by name, age;", }, # Pig-976: Multi-query optimization throws ClassCastException { 'num' => 24, 'pig' => q# a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:float); b = group a by name; c = group a by age; d = foreach b generate MAX(a.age); e = foreach c generate group, SUM(a.gpa); store d into ':OUTPATH:.1'; store e into ':OUTPATH:.2'; #, 'sql' => "select max(age) from studenttab10k group by name; select age, sum(gpa) from studenttab10k group by age;", }, # Pig-976: Multi-query optimization throws ClassCastException { 'num' => 25, 'pig' => q# a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:float); b = group a all; c = group a by age; d = foreach b generate COUNT(a), MAX(a.age); e = foreach c generate group, SUM(a.gpa); store d into ':OUTPATH:.1'; store e into ':OUTPATH:.2'; #, 'sql' => "select count(*), max(age) from studenttab10k; select age, sum(gpa) from studenttab10k group by age;", }, # Pig-983: multi-query optimization on multiple group bys following a join or cogroup { 'num' => 26, 'pig' => q# a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:float); b = load ':INPATH:/singlefile/votertab10k' as (name:chararray, age:int, registration, contributions:double); c = join a by name, b by name; d = group c by a::age; e = group c by b::age; d1 = foreach d generate group, COUNT(c), MAX(c.a::gpa); e1 = foreach e generate group, SUM(c.b::contributions); store d1 into ':OUTPATH:.1'; store e1 into ':OUTPATH:.2'; #, 'sql' => "select a.age, count(*), max(a.gpa) from studenttab10k as a inner join votertab10k as b on (a.name = b.name) group by a.age; select b.age, sum(b.contributions) from studenttab10k as a inner join votertab10k as b on (a.name = b.name) group by b.age;", }, # Pig-976: Multi-query optimization throws ClassCastException { 'num' => 27, 'pig' => q# a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:float); b = group a by name; c = group a by age; d = foreach b generate MAX(a.age), group; e = foreach c generate group, SUM(a.gpa); store d into ':OUTPATH:.1'; store e into ':OUTPATH:.2'; #, 'sql' => "select max(age), name from studenttab10k group by name; select age, sum(gpa) from studenttab10k group by age;", }, ] # end of tests }, ] # end of groups } ;