Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   *
   *     http://www.apache.org/licenses/LICENSE-2.0
   *
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 package com.facebook.presto.sql.planner.optimizations;
 
 
 import java.util.List;
 import java.util.Map;
 
 import static com.facebook.presto.SystemSessionProperties.isBigQueryEnabled;
 import static com.facebook.presto.sql.planner.plan.AggregationNode.Step.FINAL;
 import static com.facebook.presto.sql.planner.plan.AggregationNode.Step.PARTIAL;
 import static com.facebook.presto.sql.planner.plan.ExchangeNode.gatheringExchange;
 import static com.facebook.presto.sql.planner.plan.ExchangeNode.partitionedExchange;
 import static com.google.common.base.Preconditions.checkArgument;
 
 public class AddExchanges
         extends PlanOptimizer
 {
     private final Metadata metadata;
     private final boolean distributedIndexJoins;
     private final boolean distributedJoins;
 
     public AddExchanges(Metadata metadataboolean distributedIndexJoinsboolean distributedJoins)
     {
         this. = metadata;
         this. = distributedIndexJoins;
         this. = distributedJoins;
     }
 
     @Override
     public PlanNode optimize(PlanNode planSession sessionMap<SymbolTypetypesSymbolAllocator symbolAllocatorPlanNodeIdAllocator idAllocator)
     {
         boolean distributedJoinEnabled = SystemSessionProperties.isDistributedJoinEnabled(session);
         PlanWithProperties result = plan.accept(new Rewriter(symbolAllocatoridAllocatorsessiondistributedJoinEnabled), null);
         return result.getNode();
     }
 
     private class Rewriter
             extends PlanVisitor<VoidPlanWithProperties>
     {
         private final SymbolAllocator allocator;
         private final PlanNodeIdAllocator idAllocator;
         private final Session session;
         private final boolean distributedIndexJoins;
         private final boolean distributedJoins;
        public Rewriter(SymbolAllocator allocatorPlanNodeIdAllocator idAllocatorSession sessionboolean distributedIndexJoinsboolean distributedJoins)
        {
            this. = allocator;
            this. = idAllocator;
            this. = session;
            this. = distributedIndexJoins;
            this. = distributedJoins;
        }
        @Override
        protected PlanWithProperties visitPlan(PlanNode nodeVoid context)
        {
            // default behavior for nodes that have a single child and propagate child properties verbatim
            PlanWithProperties source = Iterables.getOnlyElement(node.getSources()).accept(thiscontext);
            return propagateChildProperties(nodesource);
        }
        @Override
        public PlanWithProperties visitOutput(OutputNode nodeVoid context)
        {
            return pushRequirementsToChild(node, Requirements.of(PartitioningProperties.unpartitioned()));
        }
        @Override
        public PlanWithProperties visitAggregation(final AggregationNode nodeVoid context)
        {
            boolean decomposable = node.getFunctions()
                    .values().stream()
                    .map(::getExactFunction)
                    .map(FunctionInfo::getAggregationFunction)
                    .allMatch(InternalAggregationFunction::isDecomposable);
            // if child is unpartitioned or already partitioned on this node's group by keys
            // keep the current structure
            PlanWithProperties source = node.getSource().accept(thiscontext);
            if (source.getProperties().isUnpartitioned()) {
                return propagateChildProperties(nodesource);
            }
            if (!node.getGroupBy().isEmpty() && source.getProperties().isPartitionedOnKeys(node.getGroupBy())) {
                return propagateChildProperties(nodesource);
            }
            if (!decomposable) {
                if (node.getGroupBy().isEmpty()) {
                    return pushRequirementsToChild(node, Requirements.of(PartitioningProperties.unpartitioned()));
                }
                else {
                    return pushRequirementsToChild(node, Requirements.of(PartitioningProperties.partitioned(node.getGroupBy(), node.getHashSymbol())));
                }
            }
            // otherwise, add a partial and final with an exchange in between
            Map<SymbolSymbolmasks = node.getMasks();
            Map<SymbolFunctionCallfinalCalls = new HashMap<>();
            Map<SymbolFunctionCallintermediateCalls = new HashMap<>();
            Map<SymbolSignatureintermediateFunctions = new HashMap<>();
            Map<SymbolSymbolintermediateMask = new HashMap<>();
            for (Map.Entry<SymbolFunctionCallentry : node.getAggregations().entrySet()) {
                Signature signature = node.getFunctions().get(entry.getKey());
                FunctionInfo function = .getExactFunction(signature);
                Symbol intermediateSymbol = .newSymbol(function.getName().getSuffix(), .getType(function.getIntermediateType()));
                intermediateCalls.put(intermediateSymbolentry.getValue());
                intermediateFunctions.put(intermediateSymbolsignature);
                if (masks.containsKey(entry.getKey())) {
                    intermediateMask.put(intermediateSymbolmasks.get(entry.getKey()));
                }
                // rewrite final aggregation in terms of intermediate function
                finalCalls.put(entry.getKey(), new FunctionCall(function.getName(), ImmutableList.<Expression>of(new QualifiedNameReference(intermediateSymbol.toQualifiedName()))));
            }
            return enforceWithPartial(
                    source,
                    computePartitioningRequirements(node.getGroupBy(), node.getHashSymbol()),
                    child -> new AggregationNode(
                            .getNextId(),
                            child,
                            node.getGroupBy(),
                            intermediateCalls,
                            intermediateFunctions,
                            intermediateMask,
                            ,
                            node.getSampleWeight(),
                            node.getConfidence(),
                            node.getHashSymbol()),
                    child -> new AggregationNode(
                            node.getId(),
                            child,
                            node.getGroupBy(),
                            finalCalls,
                            node.getFunctions(),
                            ImmutableMap.of(),
                            ,
                            Optional.empty(),
                            node.getConfidence(),
                            node.getHashSymbol()));
        }
        @Override
        public PlanWithProperties visitMarkDistinct(MarkDistinctNode nodeVoid context)
        {
            PlanWithProperties child = node.getSource().accept(thiscontext);
            if (child.getProperties().isPartitioned() || isBigQueryEnabled(false)) {
                child = enforce(child, Requirements.of(PartitioningProperties.partitioned(node.getDistinctSymbols(), node.getHashSymbol())));
            }
            return propagateChildProperties(nodechild);
        }
        @Override
        public PlanWithProperties visitWindow(WindowNode nodeVoid context)
        {
            return pushRequirementsToChild(node,
                    computePartitioningRequirements(node.getPartitionBy(), node.getHashSymbol()));
        }
        @Override
        public PlanWithProperties visitRowNumber(RowNumberNode nodeVoid context)
        {
            return pushRequirementsToChild(node,
                    computePartitioningRequirements(node.getPartitionBy(), node.getHashSymbol()));
        }
        @Override
        public PlanWithProperties visitTopNRowNumber(TopNRowNumberNode nodeVoid context)
        {
            return pushRequirementsToChildWithPartial(node,
                    computePartitioningRequirements(node.getPartitionBy(), node.getHashSymbol()),
                    child -> new TopNRowNumberNode(
                            .getNextId(),
                            child,
                            node.getPartitionBy(),
                            node.getOrderBy(),
                            node.getOrderings(),
                            node.getRowNumberSymbol(),
                            node.getMaxRowCountPerPartition(),
                            true,
                            node.getHashSymbol()),
                    child -> new TopNRowNumberNode(
                            node.getId(),
                            child,
                            node.getPartitionBy(),
                            node.getOrderBy(),
                            node.getOrderings(),
                            node.getRowNumberSymbol(),
                            node.getMaxRowCountPerPartition(),
                            false,
                            node.getHashSymbol()));
        }
        @Override
        public PlanWithProperties visitTopN(TopNNode nodeVoid context)
        {
            return pushRequirementsToChildWithPartial(node,
                    Requirements.of(PartitioningProperties.unpartitioned()),
                    child -> new TopNNode(.getNextId(), childnode.getCount(), node.getOrderBy(), node.getOrderings(), true),
                    child -> new TopNNode(node.getId(), childnode.getCount(), node.getOrderBy(), node.getOrderings(), false));
        }
        @Override
        public PlanWithProperties visitSort(SortNode nodeVoid context)
        {
            return pushRequirementsToChild(node, Requirements.of(PartitioningProperties.unpartitioned()));
        }
        @Override
        public PlanWithProperties visitLimit(LimitNode nodeVoid context)
        {
            return pushRequirementsToChildWithPartial(node,
                    Requirements.of(PartitioningProperties.unpartitioned()),
                    child -> new LimitNode(.getNextId(), childnode.getCount()),
                    child -> ChildReplacer.replaceChildren(node, ImmutableList.of(child)));
        }
        @Override
        public PlanWithProperties visitDistinctLimit(DistinctLimitNode nodeVoid context)
        {
            return pushRequirementsToChildWithPartial(node,
                    Requirements.of(PartitioningProperties.unpartitioned()),
                    child -> new DistinctLimitNode(.getNextId(), childnode.getLimit(), node.getHashSymbol()),
                    child -> ChildReplacer.replaceChildren(node, ImmutableList.of(child)));
        }
        @Override
        public PlanWithProperties visitTableScan(TableScanNode nodeVoid context)
        {
            return new PlanWithProperties(node, ActualProperties.of(PartitioningProperties.arbitrary(), PlacementProperties.source()));
        }
        @Override
        public PlanWithProperties visitValues(ValuesNode nodeVoid context)
        {
            return new PlanWithProperties(node, ActualProperties.of(PartitioningProperties.unpartitioned(), PlacementProperties.anywhere()));
        }
        @Override
        public PlanWithProperties visitTableCommit(TableCommitNode nodeVoid context)
        {
            return pushRequirementsToChild(node, Requirements.of(PartitioningProperties.unpartitioned(), PlacementProperties.coordinatorOnly()));
        }
        @Override
        public PlanWithProperties visitJoin(JoinNode nodeVoid context)
        {
            checkArgument(node.getType() != .."Expected RIGHT joins to be normalized to LEFT joins");
            PlanWithProperties left = node.getLeft().accept(thiscontext);
            PlanWithProperties right = node.getRight().accept(thiscontext);
            Optional<SymbolleftHashSymbol = node.getLeftHashSymbol();
            Optional<SymbolrightHashSymbol = node.getRightHashSymbol();
            List<SymbolleftSymbols = Lists.transform(node.getCriteria(), JoinNode.EquiJoinClause::getLeft);
            List<SymbolrightSymbols = Lists.transform(node.getCriteria(), JoinNode.EquiJoinClause::getRight);
            PlanNode rightNode;
            if () {
                left = enforce(left, Requirements.of(PartitioningProperties.partitioned(leftSymbolsleftHashSymbol)));
                rightNode = enforce(right, Requirements.of(PartitioningProperties.partitioned(rightSymbolsrightHashSymbol))).getNode();
            }
            else {
                rightNode = new ExchangeNode(
                        .getNextId(),
                        ..,
                        ImmutableList.of(),
                        Optional.<Symbol>empty(),
                        ImmutableList.of(right.getNode()),
                        right.getNode().getOutputSymbols(),
                        ImmutableList.of(right.getNode().getOutputSymbols()));
            }
            return new PlanWithProperties(
                    new JoinNode(node.getId(),
                            node.getType(),
                            left.getNode(),
                            rightNode,
                            node.getCriteria(),
                            node.getLeftHashSymbol(),
                            node.getRightHashSymbol()),
                    left.getProperties());
        }
        @Override
        public PlanWithProperties visitSemiJoin(SemiJoinNode nodeVoid context)
        {
            PlanWithProperties source = node.getSource().accept(thiscontext);
            PlanWithProperties filteringSource = node.getFilteringSource().accept(thiscontext);
            // make filtering source match requirements of source
            PlanNode filteringSourceNode;
            if (source.getProperties().isPartitioned()) {
                filteringSourceNode = new ExchangeNode(
                        .getNextId(),
                        ..,
                        ImmutableList.of(),
                        Optional.<Symbol>empty(),
                        ImmutableList.of(filteringSource.getNode()),
                        filteringSource.getNode().getOutputSymbols(),
                        ImmutableList.of(filteringSource.getNode().getOutputSymbols()));
            }
            else {
                filteringSourceNode = enforce(filteringSource, Requirements.of(PartitioningProperties.unpartitioned()))
                        .getNode();
            }
            // TODO: add support for hash-partitioned semijoins
            return withNewChildren(nodesource.getProperties(), ImmutableList.of(source.getNode(), filteringSourceNode));
        }
        @Override
        public PlanWithProperties visitIndexJoin(IndexJoinNode nodeVoid context)
        {
            PlanWithProperties probeSource = node.getProbeSource().accept(thiscontext);
            if () {
                probeSource = enforce(probeSource, Requirements.of(PartitioningProperties.partitioned(Lists.transform(node.getCriteria(), IndexJoinNode.EquiJoinClause::getProbe), node.getProbeHashSymbol())));
            }
            // index side runs with the same partitioning/distribution strategy as the probe side, so don't insert exchanges
            return withNewChildren(nodeprobeSource.getProperties(), ImmutableList.of(probeSource.getNode(), node.getIndexSource()));
        }
        @Override
        public PlanWithProperties visitUnion(UnionNode nodeVoid context)
        {
            // first, classify children into partitioned and unpartitioned
            List<PlanNodeunpartitionedChildren = new ArrayList<>();
            List<List<Symbol>> unpartitionedOutputLayouts = new ArrayList<>();
            List<PlanNodepartitionedChildren = new ArrayList<>();
            List<List<Symbol>> partitionedOutputLayouts = new ArrayList<>();
            List<PlanNodesources = node.getSources();
            for (int i = 0; i < sources.size(); i++) {
                PlanWithProperties child = sources.get(i).accept(thiscontext);
                if (child.getProperties().isUnpartitioned()) {
                    unpartitionedChildren.add(child.getNode());
                    unpartitionedOutputLayouts.add(node.sourceOutputLayout(i));
                }
                else {
                    partitionedChildren.add(child.getNode());
                    partitionedOutputLayouts.add(node.sourceOutputLayout(i));
                }
            }
            PlanNode result = null;
            if (!partitionedChildren.isEmpty()) {
                // add an exchange above partitioned inputs and fold it into the
                // set of unpartitioned inputs
                result = new ExchangeNode(
                        .getNextId(),
                        ..,
                        ImmutableList.of(),
                        Optional.<Symbol>empty(),
                        partitionedChildren,
                        node.getOutputSymbols(),
                        partitionedOutputLayouts);
                unpartitionedChildren.add(result);
                unpartitionedOutputLayouts.add(result.getOutputSymbols());
            }
            // if there's at least one unpartitioned input (including the exchange that might have been added in the
            // previous step), add a local union
            if (unpartitionedChildren.size() > 1) {
                ImmutableListMultimap.Builder<SymbolSymbolmappings = ImmutableListMultimap.builder();
                for (int i = 0; i < node.getOutputSymbols().size(); i++) {
                    for (List<SymboloutputLayout : unpartitionedOutputLayouts) {
                        mappings.put(node.getOutputSymbols().get(i), outputLayout.get(i));
                    }
                }
                result = new UnionNode(node.getId(), unpartitionedChildrenmappings.build());
            }
            return new PlanWithProperties(result, ActualProperties.of(PartitioningProperties.unpartitioned(), PlacementProperties.anywhere()));
        }
        private Requirements computePartitioningRequirements(List<SymbolpartitionKeysOptional<SymbolhashSymbol)
        {
            if (partitionKeys.isEmpty()) {
                return Requirements.of(PartitioningProperties.unpartitioned());
            }
            return Requirements.of(PartitioningProperties.partitioned(partitionKeyshashSymbol));
        }

        
Require the child to produce the give properties. If an exchange is added to enforce them, add a partial underneath it. So, a plan that looks like this A -> B will be rewritten either as F -> X -> P -> B or F -> B, where F and P are computed by calling the provided functions makeFinal and makePartial.
        private PlanWithProperties pushRequirementsToChildWithPartial(PlanNode nodeRequirements requirementsFunction<PlanNodePlanNodemakePartialFunction<PlanNodePlanNodemakeFinal)
        {
            PlanWithProperties child = Iterables.getOnlyElement(node.getSources())
                    .accept(thisnull);
            return enforceWithPartial(childrequirementsmakePartialmakeFinal);
        }
        private PlanWithProperties pushRequirementsToChild(PlanNode nodeRequirements requirements)
        {
            PlanWithProperties source = Iterables.getOnlyElement(node.getSources())
                    .accept(thisnull);
            return enforceWithPartial(
                    source,
                    requirements,
                    child -> child,
                    child -> ChildReplacer.replaceChildren(node, ImmutableList.of(child)));
        }

        
If the child is partitioned, push a partial computation, followed by an exchange and a final
        private PlanWithProperties enforceWithPartial(
                PlanWithProperties child,
                Requirements requirements,
                Function<PlanNodePlanNodemakePartial,
                Function<PlanNodePlanNodemakeFinal)
        {
            PlanWithProperties enforced = enforce(child.getNode(), child.getProperties(), requirementsmakePartial);
            return new PlanWithProperties(makeFinal.apply(enforced.getNode()), enforced.getProperties());
        }
        private PlanWithProperties enforce(PlanWithProperties planRequirements requirements)
        {
            return enforce(plan.getNode(), plan.getProperties(), requirementschild -> child);
        }
        private PlanWithProperties enforce(
                PlanNode node,
                ActualProperties properties,
                Requirements requirements,
                Function<PlanNodePlanNodemakePartial)
        {
            if (requirements.isCoordinatorOnly() && !properties.isCoordinatorOnly()) {
                return new PlanWithProperties(
                        gatheringExchange(.getNextId(), makePartial.apply(node)),
                        ActualProperties.of(PartitioningProperties.unpartitioned(), PlacementProperties.coordinatorOnly()));
            }
            // req: unpartitioned, actual: unpartitioned
            if (requirements.isUnpartitioned() && properties.isUnpartitioned()) {
                return new PlanWithProperties(nodeproperties);
            }
            // req: partitioned, actual: partitioned on same keys or arbitrary
            if (requirements.isPartitioned() &&
                    properties.isPartitioned() &&
                    properties.getPartitioning().getKeys().equals(requirements.getPartitioning().get().getKeys())) {
                return new PlanWithProperties(nodeproperties);
            }
            // req: unpartitioned, actual: partitioned
            if (properties.isPartitioned() && requirements.isUnpartitioned()) {
                return new PlanWithProperties(
                        gatheringExchange(.getNextId(), makePartial.apply(node)),
                        ActualProperties.of(PartitioningProperties.unpartitioned(), PlacementProperties.anywhere()));
            }
            // req: partitioned[k], actual: partitioned[?] or unpartitioned
            if (requirements.isPartitionedOnKeys() &&
                    (properties.isUnpartitioned() || (properties.isPartitioned() && !properties.getPartitioning().getKeys().equals(requirements.getPartitioning().get().getKeys())))) {
                return new PlanWithProperties(
                        partitionedExchange(
                                .getNextId(),
                                makePartial.apply(node),
                                requirements.getPartitioning().get().getKeys().get(),
                                requirements.getPartitioning().get().getHashSymbol()),
                        ActualProperties.of(requirements.getPartitioning().get(), PlacementProperties.anywhere()));
            }
            throw new UnsupportedOperationException(String.format("not supported: required %s, current %s"requirementsproperties));
        }
        {
            PlanNode result = ChildReplacer.replaceChildren(node, ImmutableList.of(child.getNode()));
            return new PlanWithProperties(resultchild.getProperties());
        }
        private PlanWithProperties withNewChildren(PlanNode nodeActualProperties propertiesList<PlanNodechildren)
        {
            PlanNode result = ChildReplacer.replaceChildren(nodechildren);
            return new PlanWithProperties(resultproperties);
        }
    }
    private static class PlanWithProperties
    {
        private final PlanNode node;
        private final ActualProperties properties;
        public PlanWithProperties(PlanNode nodeActualProperties properties)
        {
            this. = node;
            this. = properties;
        }
        public PlanNode getNode()
        {
            return ;
        }
        public ActualProperties getProperties()
        {
            return ;
        }
    }
    private static class Requirements
    {
        private final Optional<PartitioningPropertiespartitioning;
        private final Optional<PlacementPropertiesplacement;
        private Requirements(Optional<PartitioningPropertiespartitioningOptional<PlacementPropertiesplacement)
        {
            this. = partitioning;
            this. = placement;
        }
        public static Requirements of(PartitioningProperties partitioning)
        {
            return new Requirements(Optional.of(partitioning), Optional.empty());
        }
        public static Requirements of(PartitioningProperties partitioningPlacementProperties placement)
        {
            return new Requirements(Optional.of(partitioning), Optional.of(placement));
        }
        {
            return ;
        }
        @Override
        public String toString()
        {
            return "partitioning: " + (.isPresent() ? .get() : "*") +
                    "," +
                    "placement: " + (.isPresent() ? .get() : "*");
        }
        public boolean isCoordinatorOnly()
        {
            return .isPresent() && .get().getType() == ..;
        }
        public boolean isUnpartitioned()
        {
        }
        public boolean isPartitionedOnKeys()
        {
            return isPartitioned() && .get().getKeys().isPresent();
        }
        public boolean isPartitioned()
        {
            return .isPresent() && .get().getType() == ..;
        }
    }
    private static class ActualProperties
    {
        // partitioning:
        //   partitioned: *, {k_i}
        //   unpartitioned
        // placement
        //   coordinator-only (=> unpartitioned)
        //   source
        //   anywhere
        private final PartitioningProperties partitioning;
        private final PlacementProperties placement;
        public ActualProperties(PartitioningProperties partitioningPlacementProperties placement)
        {
            this. = partitioning;
            this. = placement;
        }
        public static ActualProperties of(PartitioningProperties partitioningPlacementProperties placement)
        {
            return new ActualProperties(partitioningplacement);
        }
        public PartitioningProperties getPartitioning()
        {
            return ;
        }
        public boolean isCoordinatorOnly()
        {
            return .getType() == ..;
        }
        public boolean isPartitioned()
        {
            return .getType() == ..;
        }
        public boolean isPartitionedOnKeys(List<Symbolkeys)
        {
            return isPartitioned() &&
                    .getKeys().isPresent() &&
                    .getKeys().get().equals(keys);
        }
        public boolean isUnpartitioned()
        {
            return .getType() == ..;
        }
        @Override
        public String toString()
        {
            return "partitioning: " +  + ", placement: " + ;
        }
    }
    private static class PartitioningProperties
    {
        public enum Type
        {
            UNPARTITIONED,
            PARTITIONED
        }
        private final Type type;
        private final Optional<SymbolhashSymbol;
        private final Optional<List<Symbol>> keys;
        public static PartitioningProperties arbitrary()
        {
            return new PartitioningProperties(.);
        }
        public static PartitioningProperties unpartitioned()
        {
            return new PartitioningProperties(.);
        }
        public static PartitioningProperties partitioned(List<SymbolsymbolsOptional<SymbolhashSymbol)
        {
            return new PartitioningProperties(.symbolshashSymbol);
        }
        private PartitioningProperties(Type type)
        {
            this. = type;
            this. = Optional.empty();
            this. = Optional.empty();
        }
        private PartitioningProperties(Type typeList<SymbolkeysOptional<SymbolhashSymbol)
        {
            this. = type;
            this. = Optional.of(keys);
            this. = hashSymbol;
        }
        public Type getType()
        {
            return ;
        }
        public Optional<List<Symbol>> getKeys()
        {
            return ;
        }
        public Optional<SymbolgetHashSymbol()
        {
            return ;
        }
        @Override
        public String toString()
        {
            if ( == .) {
                return .toString() + ": " + (.isPresent() ? .get() : "*");
            }
            return .toString();
        }
    }
    private static class PlacementProperties
    {
        public enum Type
        {
            COORDINATOR_ONLY,
            SOURCE,
            ANY
        }
        private final Type type;
        public static PlacementProperties anywhere()
        {
            return new PlacementProperties(.);
        }
        public static PlacementProperties source()
        {
            return new PlacementProperties(.);
        }
        public static PlacementProperties coordinatorOnly()
        {
            return new PlacementProperties(.);
        }
        private PlacementProperties(Type type)
        {
            this. = type;
        }
        public Type getType()
        {
            return ;
        }
        public String toString()
        {
            return .toString();
        }
    }