public final class GenMapRedUtils extends Object
Modifier and Type | Method and Description |
---|---|
static void |
addDependentMoveTasks(Task<MoveWork> mvTask,
HiveConf hconf,
Task<? extends Serializable> parentTask,
DependencyCollectionTask dependencyTask)
Adds the dependencyTaskForMultiInsert in ctx as a dependent of parentTask.
|
static void |
addStatsTask(FileSinkOperator nd,
MoveTask mvTask,
Task<? extends Serializable> currTask,
HiveConf hconf)
Add the StatsTask as a dependent task of the MoveTask
because StatsTask will change the Table/Partition metadata.
|
static ConditionalTask |
createCondTask(HiveConf conf,
Task<? extends Serializable> currTask,
MoveWork mvWork,
Serializable mergeWork,
String inputPath)
Construct a conditional task given the current leaf task, the MoveWork and the MapredWork.
|
static MapWork |
createMergeTask(FileSinkDesc fsInputDesc,
org.apache.hadoop.fs.Path finalName,
boolean hasDynamicPartitions)
Create a block level merge task for RCFiles or stripe level merge task for
ORCFiles
|
static org.apache.hadoop.fs.Path |
createMoveTask(Task<? extends Serializable> currTask,
boolean chDir,
FileSinkOperator fsOp,
ParseContext parseCtx,
List<Task<MoveWork>> mvTasks,
HiveConf hconf,
DependencyCollectionTask dependencyTask)
Create and add any dependent move tasks
|
static void |
createMRWorkForMergingFiles(FileSinkOperator fsInput,
org.apache.hadoop.fs.Path finalName,
DependencyCollectionTask dependencyTask,
List<Task<MoveWork>> mvTasks,
HiveConf conf,
Task<? extends Serializable> currTask) |
static TableScanOperator |
createTemporaryFile(Operator<? extends OperatorDesc> parent,
Operator<? extends OperatorDesc> child,
org.apache.hadoop.fs.Path taskTmpDir,
TableDesc tt_desc,
ParseContext parseCtx)
Break the pipeline between parent and child, and then
output data generated by parent to a temporary file stored in taskTmpDir.
|
static TableScanOperator |
createTemporaryTableScanOperator(RowSchema rowSchema) |
static String |
findAlias(MapWork work,
Operator<?> operator) |
static Set<String> |
findAliases(MapWork work,
Operator<?> startOp) |
static Task<MoveWork> |
findMoveTask(List<Task<MoveWork>> mvTasks,
FileSinkOperator fsOp) |
static Set<Operator<?>> |
findTopOps(Operator<?> startOp,
Class<?> clazz) |
static Set<Partition> |
getConfirmedPartitionsForScan(TableScanOperator tableScanOp) |
static List<org.apache.hadoop.fs.Path> |
getInputPathsForPartialScan(TableScanOperator tableScanOp,
StringBuffer aggregationKey) |
static MapredWork |
getMapRedWork(ParseContext parseCtx)
create a new plan and return.
|
static MapredWork |
getMapRedWorkFromConf(HiveConf conf)
create a new plan and return.
|
static List<String> |
getPartitionColumns(TableScanOperator tableScanOp) |
static void |
initPlan(ReduceSinkOperator op,
GenMRProcContext opProcCtx)
Initialize the current plan by adding it to root tasks.
|
static void |
initUnionPlan(GenMRProcContext opProcCtx,
UnionOperator currUnionOp,
Task<? extends Serializable> currTask,
boolean local) |
static void |
initUnionPlan(ReduceSinkOperator op,
UnionOperator currUnionOp,
GenMRProcContext opProcCtx,
Task<? extends Serializable> unionTask)
Initialize the current union plan.
|
static void |
internTableDesc(Task<?> task,
com.google.common.collect.Interner<TableDesc> interner) |
static boolean |
isInsertInto(ParseContext parseCtx,
FileSinkOperator fsOp)
Returns true iff current query is an insert into for the given file sink
|
static boolean |
isMergeRequired(List<Task<MoveWork>> mvTasks,
HiveConf hconf,
FileSinkOperator fsOp,
Task<? extends Serializable> currTask,
boolean isInsertTable)
Returns true iff the fsOp requires a merge
|
static boolean |
isSkewedStoredAsDirs(FileSinkDesc fsInputDesc)
check if it is skewed table and stored as dirs.
|
static void |
joinPlan(Task<? extends Serializable> currTask,
Task<? extends Serializable> oldTask,
GenMRProcContext opProcCtx)
Merge the current task into the old task for the reducer
|
static void |
joinUnionPlan(GenMRProcContext opProcCtx,
UnionOperator currUnionOp,
Task<? extends Serializable> currentUnionTask,
Task<? extends Serializable> existingTask,
boolean local) |
static void |
linkMoveTask(FileSinkOperator newOutput,
ConditionalTask cndTsk,
List<Task<MoveWork>> mvTasks,
HiveConf hconf,
DependencyCollectionTask dependencyTask)
Make the move task in the GenMRProcContext following the FileSinkOperator a dependent of all
possible subtrees branching from the ConditionalTask.
|
static void |
linkMoveTask(Task<MoveWork> mvTask,
Task<? extends Serializable> task,
HiveConf hconf,
DependencyCollectionTask dependencyTask)
Follows the task tree down from task and makes all leaves parents of mvTask
|
static boolean |
needsTagging(ReduceWork rWork) |
static void |
replaceMapWork(String sourceAlias,
String targetAlias,
MapWork source,
MapWork target)
Replace the Map-side operator tree associated with targetAlias in
target with the Map-side operator tree associated with sourceAlias in source.
|
static void |
setKeyAndValueDesc(ReduceWork plan,
Operator<? extends OperatorDesc> topOp)
set key and value descriptor.
|
static void |
setKeyAndValueDesc(ReduceWork work,
ReduceSinkOperator rs)
Set key and value descriptor
|
static void |
setKeyAndValueDescForTaskTree(Task<? extends Serializable> task)
Set the key and value description for all the tasks rooted at the given
task.
|
static void |
setMapWork(MapWork plan,
ParseContext parseCtx,
Set<ReadEntity> inputs,
PrunedPartitionList partsList,
Operator<? extends OperatorDesc> topOp,
String alias_id,
HiveConf conf,
boolean local)
initialize MapWork
|
static void |
setTaskPlan(String alias_id,
Operator<? extends OperatorDesc> topOp,
Task<?> task,
boolean local,
GenMRProcContext opProcCtx)
set the current task in the mapredWork.
|
static void |
setTaskPlan(String alias_id,
Operator<? extends OperatorDesc> topOp,
Task<?> task,
boolean local,
GenMRProcContext opProcCtx,
PrunedPartitionList pList)
set the current task in the mapredWork.
|
static void |
setTaskPlan(String path,
String alias,
Operator<? extends OperatorDesc> topOp,
MapWork plan,
boolean local,
TableDesc tt_desc)
set the current task in the mapredWork.
|
public static boolean needsTagging(ReduceWork rWork)
public static void initPlan(ReduceSinkOperator op, GenMRProcContext opProcCtx) throws SemanticException
op
- the reduce sink operator encounteredopProcCtx
- processing contextSemanticException
public static void initUnionPlan(ReduceSinkOperator op, UnionOperator currUnionOp, GenMRProcContext opProcCtx, Task<? extends Serializable> unionTask) throws SemanticException
op
- the reduce sink operator encounteredopProcCtx
- processing contextSemanticException
public static void initUnionPlan(GenMRProcContext opProcCtx, UnionOperator currUnionOp, Task<? extends Serializable> currTask, boolean local) throws SemanticException
SemanticException
public static void joinUnionPlan(GenMRProcContext opProcCtx, UnionOperator currUnionOp, Task<? extends Serializable> currentUnionTask, Task<? extends Serializable> existingTask, boolean local) throws SemanticException
SemanticException
public static void joinPlan(Task<? extends Serializable> currTask, Task<? extends Serializable> oldTask, GenMRProcContext opProcCtx) throws SemanticException
currTask
- the current task for the current reduceroldTask
- the old task for the current reduceropProcCtx
- processing contextSemanticException
public static void setTaskPlan(String alias_id, Operator<? extends OperatorDesc> topOp, Task<?> task, boolean local, GenMRProcContext opProcCtx) throws SemanticException
alias_id
- current aliastopOp
- the top operator of the stackplan
- current planlocal
- whether you need to add to map-reduce or local workopProcCtx
- processing contextSemanticException
public static void setTaskPlan(String alias_id, Operator<? extends OperatorDesc> topOp, Task<?> task, boolean local, GenMRProcContext opProcCtx, PrunedPartitionList pList) throws SemanticException
alias_id
- current aliastopOp
- the top operator of the stackplan
- current planlocal
- whether you need to add to map-reduce or local workopProcCtx
- processing contextpList
- pruned partition list. If it is null it will be computed on-the-fly.SemanticException
public static void setMapWork(MapWork plan, ParseContext parseCtx, Set<ReadEntity> inputs, PrunedPartitionList partsList, Operator<? extends OperatorDesc> topOp, String alias_id, HiveConf conf, boolean local) throws SemanticException
alias_id
- current aliastopOp
- the top operator of the stackplan
- map work to initializelocal
- whether you need to add to map-reduce or local workpList
- pruned partition list. If it is null it will be computed on-the-fly.inputs
- read entities for the map workconf
- current instance of hive confSemanticException
public static void setTaskPlan(String path, String alias, Operator<? extends OperatorDesc> topOp, MapWork plan, boolean local, TableDesc tt_desc) throws SemanticException
alias
- current aliastopOp
- the top operator of the stackplan
- current planlocal
- whether you need to add to map-reduce or local worktt_desc
- table descriptorSemanticException
public static void setKeyAndValueDesc(ReduceWork work, ReduceSinkOperator rs)
work
- RedueWorkrs
- ReduceSinkOperatorpublic static void setKeyAndValueDesc(ReduceWork plan, Operator<? extends OperatorDesc> topOp)
plan
- current plantopOp
- current top operator in the pathpublic static void setKeyAndValueDescForTaskTree(Task<? extends Serializable> task)
task
- public static void internTableDesc(Task<?> task, com.google.common.collect.Interner<TableDesc> interner)
public static MapredWork getMapRedWork(ParseContext parseCtx)
public static MapredWork getMapRedWorkFromConf(HiveConf conf)
public static TableScanOperator createTemporaryTableScanOperator(RowSchema rowSchema)
public static TableScanOperator createTemporaryFile(Operator<? extends OperatorDesc> parent, Operator<? extends OperatorDesc> child, org.apache.hadoop.fs.Path taskTmpDir, TableDesc tt_desc, ParseContext parseCtx)
parent
- child
- taskTmpDir
- tt_desc
- parseCtx
- public static void replaceMapWork(String sourceAlias, String targetAlias, MapWork source, MapWork target)
sourceAlias
- targetAlias
- source
- target
- public static void createMRWorkForMergingFiles(FileSinkOperator fsInput, org.apache.hadoop.fs.Path finalName, DependencyCollectionTask dependencyTask, List<Task<MoveWork>> mvTasks, HiveConf conf, Task<? extends Serializable> currTask) throws SemanticException
fsInput
- The FileSink operator.ctx
- The MR processing context.finalName
- the final destination path the merge job should output.dependencyTask
- mvTasks
- conf
- currTask
- SemanticException
- create a Map-only merge job using CombineHiveInputFormat for all partitions with
following operators:
MR job J0:
...
|
v
FileSinkOperator_1 (fsInput)
|
v
Merge job J1:
|
v
TableScan (using CombineHiveInputFormat) (tsMerge)
|
v
FileSinkOperator (fsMerge)
Here the pathToPartitionInfo & pathToAlias will remain the same, which means the paths
do
not contain the dynamic partitions (their parent). So after the dynamic partitions are
created (after the first job finished before the moveTask or ConditionalTask start),
we need to change the pathToPartitionInfo & pathToAlias to include the dynamic
partition
directories.public static void linkMoveTask(FileSinkOperator newOutput, ConditionalTask cndTsk, List<Task<MoveWork>> mvTasks, HiveConf hconf, DependencyCollectionTask dependencyTask)
newOutput
- cndTsk
- mvTasks
- hconf
- dependencyTask
- public static void linkMoveTask(Task<MoveWork> mvTask, Task<? extends Serializable> task, HiveConf hconf, DependencyCollectionTask dependencyTask)
mvTask
- task
- hconf
- dependencyTask
- public static void addDependentMoveTasks(Task<MoveWork> mvTask, HiveConf hconf, Task<? extends Serializable> parentTask, DependencyCollectionTask dependencyTask)
mvTask
- hconf
- parentTask
- dependencyTask
- public static void addStatsTask(FileSinkOperator nd, MoveTask mvTask, Task<? extends Serializable> currTask, HiveConf hconf)
nd
- the FileSinkOperator whose results are taken care of by the MoveTask.mvTask
- The MoveTask that moves the FileSinkOperator's results.currTask
- The MapRedTask that the FileSinkOperator belongs to.hconf
- HiveConfpublic static boolean isInsertInto(ParseContext parseCtx, FileSinkOperator fsOp)
parseCtx
- fsOp
- public static MapWork createMergeTask(FileSinkDesc fsInputDesc, org.apache.hadoop.fs.Path finalName, boolean hasDynamicPartitions) throws SemanticException
fsInputDesc
- finalName
- inputFormatClass
- SemanticException
public static ConditionalTask createCondTask(HiveConf conf, Task<? extends Serializable> currTask, MoveWork mvWork, Serializable mergeWork, String inputPath)
conf
- HiveConfcurrTask
- current leaf taskmvWork
- MoveWork for the move taskmergeWork
- MapredWork for the merge task.inputPath
- the input directory of the merge/move taskpublic static boolean isSkewedStoredAsDirs(FileSinkDesc fsInputDesc)
fsInputDesc
- public static Task<MoveWork> findMoveTask(List<Task<MoveWork>> mvTasks, FileSinkOperator fsOp)
public static boolean isMergeRequired(List<Task<MoveWork>> mvTasks, HiveConf hconf, FileSinkOperator fsOp, Task<? extends Serializable> currTask, boolean isInsertTable)
mvTasks
- hconf
- fsOp
- currTask
- isInsertTable
- public static org.apache.hadoop.fs.Path createMoveTask(Task<? extends Serializable> currTask, boolean chDir, FileSinkOperator fsOp, ParseContext parseCtx, List<Task<MoveWork>> mvTasks, HiveConf hconf, DependencyCollectionTask dependencyTask)
currTask
- chDir
- fsOp
- parseCtx
- mvTasks
- hconf
- dependencyTask
- public static Set<Partition> getConfirmedPartitionsForScan(TableScanOperator tableScanOp)
public static List<String> getPartitionColumns(TableScanOperator tableScanOp)
public static List<org.apache.hadoop.fs.Path> getInputPathsForPartialScan(TableScanOperator tableScanOp, StringBuffer aggregationKey) throws SemanticException
SemanticException
Copyright © 2017 The Apache Software Foundation. All rights reserved.