Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
   /*
    * Licensed under the Apache License, Version 2.0 (the "License");
    * you may not use this file except in compliance with the License.
    * You may obtain a copy of the License at
    *
    *     http://www.apache.org/licenses/LICENSE-2.0
    *
    * Unless required by applicable law or agreed to in writing, software
    * distributed under the License is distributed on an "AS IS" BASIS,
   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   * See the License for the specific language governing permissions and
   * limitations under the License.
   */
  package com.facebook.presto.sql.planner.optimizations;
  
  
  import java.util.HashMap;
  import java.util.List;
  import java.util.Map;
  import java.util.Set;
  
  import static com.facebook.presto.SystemSessionProperties.isBigQueryEnabled;
  import static com.facebook.presto.sql.ExpressionUtils.combineConjuncts;
  import static com.facebook.presto.sql.ExpressionUtils.extractConjuncts;
  import static com.facebook.presto.sql.ExpressionUtils.stripDeterministicConjuncts;
  import static com.facebook.presto.sql.ExpressionUtils.stripNonDeterministicConjuncts;
  import static com.facebook.presto.sql.analyzer.ExpressionAnalyzer.getExpressionTypes;
  import static com.facebook.presto.sql.planner.optimizations.LocalProperties.grouped;
  import static com.facebook.presto.sql.planner.plan.AggregationNode.Step.FINAL;
 import static com.facebook.presto.sql.planner.plan.AggregationNode.Step.PARTIAL;
 import static com.facebook.presto.sql.planner.plan.ExchangeNode.gatheringExchange;
 import static com.facebook.presto.sql.planner.plan.ExchangeNode.partitionedExchange;
 import static com.facebook.presto.sql.planner.plan.JoinNode.Type.FULL;
 import static com.facebook.presto.sql.planner.plan.JoinNode.Type.RIGHT;
 import static com.facebook.presto.util.ImmutableCollectors.toImmutableSet;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
 import static com.google.common.collect.Iterables.getOnlyElement;
 import static java.util.stream.Collectors.toList;
 
 public class AddExchanges
         extends PlanOptimizer
 {
     private final SqlParser parser;
     private final Metadata metadata;
     private final boolean distributedIndexJoins;
     private final boolean distributedJoins;
 
     public AddExchanges(Metadata metadataSqlParser parserboolean distributedIndexJoinsboolean distributedJoins)
     {
         this. = metadata;
         this. = parser;
         this. = distributedIndexJoins;
         this. = distributedJoins;
     }
 
     @Override
     public PlanNode optimize(PlanNode planSession sessionMap<SymbolTypetypesSymbolAllocator symbolAllocatorPlanNodeIdAllocator idAllocator)
     {
         boolean distributedJoinEnabled = SystemSessionProperties.isDistributedJoinEnabled(session);
         PlanWithProperties result = plan.accept(new Rewriter(symbolAllocatoridAllocatorsymbolAllocatorsessiondistributedJoinEnabled), PreferredProperties.any());
         return result.getNode();
     }
 
     private class Rewriter
             extends PlanVisitor<PreferredPropertiesPlanWithProperties>
     {
         private final SymbolAllocator allocator;
         private final PlanNodeIdAllocator idAllocator;
         private final SymbolAllocator symbolAllocator;
         private final Session session;
         private final boolean distributedIndexJoins;
         private final boolean distributedJoins;
 
         public Rewriter(SymbolAllocator allocatorPlanNodeIdAllocator idAllocatorSymbolAllocator symbolAllocatorSession sessionboolean distributedIndexJoinsboolean distributedJoins)
         {
             this. = allocator;
             this. = idAllocator;
             this. = symbolAllocator;
             this. = session;
             this. = distributedIndexJoins;
             this. = distributedJoins;
         }
 
         @Override
         protected PlanWithProperties visitPlan(PlanNode nodePreferredProperties preferred)
         {
             return rebaseAndDeriveProperties(nodeplanChild(nodepreferred));
         }
 
         @Override
         public PlanWithProperties visitProject(ProjectNode nodePreferredProperties preferred)
         {
             Map<SymbolSymbolidentities = computeIdentityTranslations(node.getAssignments());
             PreferredProperties translatedPreferred = PreferredProperties.translate(preferredidentities);
 
             return rebaseAndDeriveProperties(nodeplanChild(nodetranslatedPreferred));
         }
 
         @Override
         public PlanWithProperties visitOutput(OutputNode nodePreferredProperties preferred)
         {
             PlanWithProperties child = planChild(node, PreferredProperties.any());
 
             if (child.getProperties().isPartitioned()) {
                 child = withDerivedProperties(
                         gatheringExchange(.getNextId(), child.getNode()),
                         child.getProperties());
             }
 
             return rebaseAndDeriveProperties(nodechild);
         }
 
         @Override
         public PlanWithProperties visitAggregation(AggregationNode nodePreferredProperties preferred)
         {
             boolean decomposable = node.getFunctions()
                     .values().stream()
                     .map(::getExactFunction)
                     .map(FunctionInfo::getAggregationFunction)
                     .allMatch(InternalAggregationFunction::isDecomposable);
 
             PreferredProperties preferredProperties = node.getGroupBy().isEmpty()
                     ? PreferredProperties.any()
                     : PreferredProperties.derivePreferences(preferred, ImmutableSet.copyOf(node.getGroupBy()), Optional.of(node.getGroupBy()), grouped(node.getGroupBy()));
 
             PlanWithProperties child = planChild(nodepreferredProperties);
 
             if (!child.getProperties().isPartitioned()) {
                 // If already unpartitioned, just drop the single aggregation back on
                 return rebaseAndDeriveProperties(nodechild);
             }
 
             if (node.getGroupBy().isEmpty()) {
                 if (decomposable) {
                     return splitAggregation(nodechildpartial -> gatheringExchange(.getNextId(), partial));
                 }
                 else {
                     child = withDerivedProperties(
                             gatheringExchange(.getNextId(), child.getNode()),
                             child.getProperties());
 
                     return rebaseAndDeriveProperties(nodechild);
                 }
             }
             else {
                 if (child.getProperties().isPartitionedOn(node.getGroupBy())) {
                     return rebaseAndDeriveProperties(nodechild);
                 }
                 else {
                     if (decomposable) {
                         return splitAggregation(nodechildpartial -> partitionedExchange(.getNextId(), partialnode.getGroupBy(), node.getHashSymbol()));
                     }
                     else {
                         child = withDerivedProperties(
                                 partitionedExchange(.getNextId(), child.getNode(), node.getGroupBy(), node.getHashSymbol()),
                                 child.getProperties());
                         return rebaseAndDeriveProperties(nodechild);
                     }
                 }
             }
         }
 
         @NotNull
         private PlanWithProperties splitAggregation(AggregationNode nodePlanWithProperties newChildFunction<PlanNodePlanNodeexchanger)
         {
             // otherwise, add a partial and final with an exchange in between
             Map<SymbolSymbolmasks = node.getMasks();
 
             Map<SymbolFunctionCallfinalCalls = new HashMap<>();
             Map<SymbolFunctionCallintermediateCalls = new HashMap<>();
             Map<SymbolSignatureintermediateFunctions = new HashMap<>();
             Map<SymbolSymbolintermediateMask = new HashMap<>();
             for (Map.Entry<SymbolFunctionCallentry : node.getAggregations().entrySet()) {
                 Signature signature = node.getFunctions().get(entry.getKey());
                 FunctionInfo function = .getExactFunction(signature);
 
                 Symbol intermediateSymbol = .newSymbol(function.getName().getSuffix(), .getType(function.getIntermediateType()));
                 intermediateCalls.put(intermediateSymbolentry.getValue());
                 intermediateFunctions.put(intermediateSymbolsignature);
                 if (masks.containsKey(entry.getKey())) {
                     intermediateMask.put(intermediateSymbolmasks.get(entry.getKey()));
                 }
 
                 // rewrite final aggregation in terms of intermediate function
                 finalCalls.put(entry.getKey(), new FunctionCall(function.getName(), ImmutableList.<Expression>of(new QualifiedNameReference(intermediateSymbol.toQualifiedName()))));
             }
 
             PlanWithProperties partial = withDerivedProperties(
                     new AggregationNode(
                             .getNextId(),
                             newChild.getNode(),
                             node.getGroupBy(),
                             intermediateCalls,
                             intermediateFunctions,
                             intermediateMask,
                             ,
                             node.getSampleWeight(),
                             node.getConfidence(),
                             node.getHashSymbol()),
                     newChild.getProperties());
 
             PlanNode exchange = exchanger.apply(partial.getNode());
 
             return withDerivedProperties(
                     new AggregationNode(
                             node.getId(),
                             exchange,
                             node.getGroupBy(),
                             finalCalls,
                             node.getFunctions(),
                             ImmutableMap.of(),
                             ,
                             Optional.empty(),
                             node.getConfidence(),
                             node.getHashSymbol()),
                     deriveProperties(exchangepartial.getProperties()));
         }
 
         @Override
         public PlanWithProperties visitMarkDistinct(MarkDistinctNode nodePreferredProperties preferred)
         {
             PreferredProperties preferredChildProperties = PreferredProperties.derivePreferences(preferred, ImmutableSet.copyOf(node.getDistinctSymbols()), Optional.of(node.getDistinctSymbols()), grouped(node.getDistinctSymbols()));
             PlanWithProperties child = node.getSource().accept(thispreferredChildProperties);
 
             if ((!child.getProperties().isPartitioned() && isBigQueryEnabled(false)) ||
                     !child.getProperties().isPartitionedOn(node.getDistinctSymbols())) {
                 child = withDerivedProperties(
                         partitionedExchange(
                                 .getNextId(),
                                 child.getNode(),
                                 node.getDistinctSymbols(),
                                 node.getHashSymbol()),
                         child.getProperties());
             }
 
             return rebaseAndDeriveProperties(nodechild);
         }
 
         @Override
         public PlanWithProperties visitWindow(WindowNode nodePreferredProperties preferred)
         {
             List<LocalProperty<Symbol>> desiredProperties = new ArrayList<>();
             if (!node.getPartitionBy().isEmpty()) {
                 desiredProperties.add(new GroupingProperty<>(node.getPartitionBy()));
             }
             for (Symbol symbol : node.getOrderBy()) {
                 desiredProperties.add(new SortingProperty<>(symbolnode.getOrderings().get(symbol)));
             }
 
             PlanWithProperties child = planChild(node, PreferredProperties.derivePreferences(preferred, ImmutableSet.copyOf(node.getPartitionBy()), desiredProperties));
 
             if (!child.getProperties().isPartitionedOn(node.getPartitionBy())) {
                 if (node.getPartitionBy().isEmpty()) {
                     child = withDerivedProperties(
                             gatheringExchange(.getNextId(), child.getNode()),
                             child.getProperties());
                 }
                 else {
                     child = withDerivedProperties(
                             partitionedExchange(.getNextId(), child.getNode(), node.getPartitionBy(), node.getHashSymbol()),
                             child.getProperties());
                 }
             }
 
             Iterator<Optional<LocalProperty<Symbol>>> matchIterator = LocalProperties.match(child.getProperties().getLocalProperties(), desiredProperties).iterator();
 
             Set<SymbolprePartitionedInputs = ImmutableSet.of();
             if (!node.getPartitionBy().isEmpty()) {
                 Optional<LocalProperty<Symbol>> groupingRequirement = matchIterator.next();
                 Set<SymbolunPartitionedInputs = groupingRequirement.map(LocalProperty::getColumns).orElse(ImmutableSet.of());
                 prePartitionedInputs = node.getPartitionBy().stream()
                         .filter(symbol -> !unPartitionedInputs.contains(symbol))
                         .collect(toImmutableSet());
             }
 
             int preSortedOrderPrefix = 0;
             if (prePartitionedInputs.equals(ImmutableSet.copyOf(node.getPartitionBy()))) {
                 while (matchIterator.hasNext() && !matchIterator.next().isPresent()) {
                     preSortedOrderPrefix++;
                 }
             }
 
             return withDerivedProperties(
                     new WindowNode(
                             node.getId(),
                             child.getNode(),
                             node.getPartitionBy(),
                             node.getOrderBy(),
                             node.getOrderings(),
                             node.getFrame(),
                             node.getWindowFunctions(),
                             node.getSignatures(),
                             node.getHashSymbol(),
                             prePartitionedInputs,
                             preSortedOrderPrefix),
                     child.getProperties());
         }
 
         @Override
         public PlanWithProperties visitRowNumber(RowNumberNode nodePreferredProperties preferred)
         {
             if (node.getPartitionBy().isEmpty()) {
                 PlanWithProperties child = planChild(node, PreferredProperties.unpartitioned());
 
                 if (child.getProperties().isPartitioned()) {
                     child = withDerivedProperties(
                             gatheringExchange(.getNextId(), child.getNode()),
                             child.getProperties());
                 }
 
                 return rebaseAndDeriveProperties(nodechild);
             }
 
             PlanWithProperties child = planChild(node, PreferredProperties.derivePreferences(preferred, ImmutableSet.copyOf(node.getPartitionBy()), grouped(node.getPartitionBy())));
 
             // TODO: add config option/session property to force parallel plan if child is unpartitioned and window has a PARTITION BY clause
             if (!child.getProperties().isPartitionedOn(node.getPartitionBy())) {
                 child = withDerivedProperties(
                         partitionedExchange(
                                 .getNextId(),
                                 child.getNode(),
                                 node.getPartitionBy(),
                                 node.getHashSymbol()),
                         child.getProperties());
             }
 
             // TODO: streaming
 
             return rebaseAndDeriveProperties(nodechild);
         }
 
         @Override
         public PlanWithProperties visitTopNRowNumber(TopNRowNumberNode nodePreferredProperties preferred)
         {
             PreferredProperties preferredChildProperties;
             Function<PlanNodePlanNodeaddExchange;
 
             if (node.getPartitionBy().isEmpty()) {
                 preferredChildProperties = PreferredProperties.any();
                 addExchange = partial -> gatheringExchange(.getNextId(), partial);
             }
             else {
                 preferredChildProperties = PreferredProperties.derivePreferences(preferred, ImmutableSet.copyOf(node.getPartitionBy()), grouped(node.getPartitionBy()));
                 addExchange = partial -> partitionedExchange(.getNextId(), partialnode.getPartitionBy(), node.getHashSymbol());
             }
 
             PlanWithProperties child = planChild(nodepreferredChildProperties);
             if (!child.getProperties().isPartitionedOn(node.getPartitionBy())) {
                 // add exchange + push function to child
                 child = withDerivedProperties(
                         new TopNRowNumberNode(
                                 .getNextId(),
                                 child.getNode(),
                                 node.getPartitionBy(),
                                 node.getOrderBy(),
                                 node.getOrderings(),
                                 node.getRowNumberSymbol(),
                                 node.getMaxRowCountPerPartition(),
                                 true,
                                 node.getHashSymbol()),
                         child.getProperties());
 
                 child = withDerivedProperties(addExchange.apply(child.getNode()), child.getProperties());
             }
 
             return rebaseAndDeriveProperties(nodechild);
         }
 
         @Override
         public PlanWithProperties visitTopN(TopNNode nodePreferredProperties preferred)
         {
             PlanWithProperties child = planChild(node, PreferredProperties.any());
 
             if (child.getProperties().isPartitioned()) {
                 child = withDerivedProperties(
                         new TopNNode(.getNextId(), child.getNode(), node.getCount(), node.getOrderBy(), node.getOrderings(), true),
                         child.getProperties());
 
                 child = withDerivedProperties(
                         gatheringExchange(.getNextId(), child.getNode()),
                         child.getProperties());
             }
 
             return rebaseAndDeriveProperties(nodechild);
         }
 
         @Override
         public PlanWithProperties visitSort(SortNode nodePreferredProperties preferred)
         {
             PlanWithProperties child = planChild(node, PreferredProperties.unpartitioned());
 
             if (child.getProperties().isPartitioned()) {
                 child = withDerivedProperties(
                         gatheringExchange(.getNextId(), child.getNode()),
                         child.getProperties());
             }
 
             return rebaseAndDeriveProperties(nodechild);
         }
 
         @Override
         public PlanWithProperties visitLimit(LimitNode nodePreferredProperties preferred)
         {
             PlanWithProperties child = planChild(node, PreferredProperties.any());
 
             if (child.getProperties().isPartitioned()) {
                 child = withDerivedProperties(
                         new LimitNode(.getNextId(), child.getNode(), node.getCount()),
                         child.getProperties());
 
                 child = withDerivedProperties(
                         gatheringExchange(.getNextId(), child.getNode()),
                         child.getProperties());
             }
 
             return rebaseAndDeriveProperties(nodechild);
         }
 
         @Override
         public PlanWithProperties visitDistinctLimit(DistinctLimitNode nodePreferredProperties preferred)
         {
             PlanWithProperties child = planChild(node, PreferredProperties.any());
 
             if (child.getProperties().isPartitioned()) {
                 child = withDerivedProperties(
                         new DistinctLimitNode(.getNextId(), child.getNode(), node.getLimit(), node.getHashSymbol()),
                         child.getProperties());
 
                 child = withDerivedProperties(
                         gatheringExchange(
                                 .getNextId(),
                                 new DistinctLimitNode(.getNextId(), child.getNode(), node.getLimit(), node.getHashSymbol())),
                         child.getProperties());
             }
 
             return rebaseAndDeriveProperties(nodechild);
         }
 
         @Override
         public PlanWithProperties visitFilter(FilterNode nodePreferredProperties preferred)
         {
             if (node.getSource() instanceof TableScanNode) {
                 return planTableScan((TableScanNodenode.getSource(), node.getPredicate(), preferred);
             }
 
             return rebaseAndDeriveProperties(nodeplanChild(nodepreferred));
         }
 
         @Override
         public PlanWithProperties visitTableScan(TableScanNode nodePreferredProperties preferred)
         {
             return planTableScan(node.preferred);
         }
 
         private PlanWithProperties planTableScan(TableScanNode nodeExpression predicatePreferredProperties preferred)
         {
             // don't include non-deterministic predicates
             Expression deterministicPredicate = stripNonDeterministicConjuncts(predicate);
 
             DomainTranslator.ExtractionResult decomposedPredicate = DomainTranslator.fromPredicate(
                     ,
                     ,
                     deterministicPredicate,
                     .getTypes());
 
             TupleDomain<ColumnHandlesimplifiedConstraint = decomposedPredicate.getTupleDomain()
                     .transform(node.getAssignments()::get)
                     .intersect(node.getCurrentConstraint());
 
             Map<ColumnHandleSymbolassignments = ImmutableBiMap.copyOf(node.getAssignments()).inverse();
 
             Expression constraint = combineConjuncts(
                     deterministicPredicate,
                     DomainTranslator.toPredicate(
                             node.getCurrentConstraint().transform(assignments::get),
                             .getTypes()));
 
             // Layouts will be returned in order of the connector's preference
             List<TableLayoutResultlayouts = .getLayouts(
                     node.getTable(),
                     new Constraint<>(simplifiedConstraintbindings -> !shouldPrune(constraintnode.getAssignments(), bindings)),
                     Optional.of(node.getOutputSymbols().stream()
                             .map(node.getAssignments()::get)
                             .collect(toImmutableSet())));
 
             if (layouts.isEmpty()) {
                 return new PlanWithProperties(
                         new ValuesNode(.getNextId(), node.getOutputSymbols(), ImmutableList.of()),
                         ActualProperties.unpartitioned());
             }
 
             // Filter out layouts that cannot supply all the required columns
             layouts = layouts.stream()
                     .filter(layoutHasAllNeededOutputs(node))
                     .collect(toList());
             checkState(!layouts.isEmpty(), "No usable layouts for %s"node);
 
             List<PlanWithPropertiespossiblePlans = layouts.stream()
                     .map(layout -> {
                         TableScanNode tableScan = new TableScanNode(
                                 node.getId(),
                                 node.getTable(),
                                 node.getOutputSymbols(),
                                 node.getAssignments(),
                                 Optional.of(layout.getLayout().getHandle()),
                                 simplifiedConstraint.intersect(layout.getLayout().getPredicate()),
                                 Optional.ofNullable(node.getOriginalConstraint()).orElse(predicate));
 
                         PlanWithProperties result = new PlanWithProperties(tableScanderiveProperties(tableScan, ImmutableList.of()));
 
                         Expression resultingPredicate = combineConjuncts(
                                 DomainTranslator.toPredicate(
                                         layout.getUnenforcedConstraint().transform(assignments::get),
                                         .getTypes()),
                                 stripDeterministicConjuncts(predicate),
                                 decomposedPredicate.getRemainingExpression());
 
                         if (!..equals(resultingPredicate)) {
                             return withDerivedProperties(
                                     new FilterNode(.getNextId(), result.getNode(), resultingPredicate),
                                     deriveProperties(tableScan, ImmutableList.of()));
                         }
 
                         return result;
                     })
                     .collect(toList());
 
             return pickPlan(possiblePlanspreferred);
         }
 
         {
             return layout -> !layout.getLayout().getColumns().isPresent()
                     || layout.getLayout().getColumns().get().containsAll(Lists.transform(node.getOutputSymbols(), node.getAssignments()::get));
         }

        
possiblePlans should be provided in layout preference order
 
         private PlanWithProperties pickPlan(List<PlanWithPropertiespossiblePlansPreferredProperties preferred)
         {
             checkArgument(!possiblePlans.isEmpty());
 
             if (SystemSessionProperties.preferStreamingOperators(false)) {
                 possiblePlans = new ArrayList<>(possiblePlans);
                 Collections.sort(possiblePlans, Comparator.comparing(PlanWithProperties::getPropertiesstreamingExecutionPreference(preferred))); // stable sort; is Collections.min() guaranteed to be stable?
             }
 
             return possiblePlans.get(0);
         }
 
         private boolean shouldPrune(Expression predicateMap<SymbolColumnHandleassignmentsMap<ColumnHandle, ?> bindings)
         {
             List<Expressionconjuncts = extractConjuncts(predicate);
             IdentityHashMap<ExpressionTypeexpressionTypes = getExpressionTypes(.getTypes(), predicate);
 
             LookupSymbolResolver inputs = new LookupSymbolResolver(assignmentsbindings);
 
             // If any conjuncts evaluate to FALSE or null, then the whole predicate will never be true and so the partition should be pruned
             for (Expression expression : conjuncts) {
                 ExpressionInterpreter optimizer = ExpressionInterpreter.expressionOptimizer(expressionexpressionTypes);
                 Object optimized = optimizer.optimize(inputs);
                 if (..equals(optimized) || optimized == null || optimized instanceof NullLiteral) {
                     return true;
                 }
             }
             return false;
         }
 
         @Override
         public PlanWithProperties visitValues(ValuesNode nodePreferredProperties preferred)
         {
             return new PlanWithProperties(node, ActualProperties.unpartitioned());
         }
 
         @Override
         public PlanWithProperties visitTableCommit(TableCommitNode nodePreferredProperties preferred)
         {
             PlanWithProperties child = planChild(node, PreferredProperties.any());
             if (child.getProperties().isPartitioned() || !child.getProperties().isCoordinatorOnly()) {
                 child = withDerivedProperties(
                         gatheringExchange(.getNextId(), child.getNode()),
                         child.getProperties());
             }
 
             return rebaseAndDeriveProperties(nodechild);
         }
 
         @Override
         public PlanWithProperties visitJoin(JoinNode nodePreferredProperties preferred)
         {
             List<SymbolleftSymbols = Lists.transform(node.getCriteria(), JoinNode.EquiJoinClause::getLeft);
             List<SymbolrightSymbols = Lists.transform(node.getCriteria(), JoinNode.EquiJoinClause::getRight);
 
             PlanWithProperties left;
             PlanWithProperties right;
 
             if ( || node.getType() ==  || node.getType() == ) {
                 // The implementation of full outer join only works if the data is hash partitioned. See LookupJoinOperators#buildSideOuterJoinUnvisitedPositions
 
                 left  = node.getLeft().accept(this, PreferredProperties.hashPartitioned(leftSymbols));
                 right = node.getRight().accept(this, PreferredProperties.hashPartitioned(rightSymbols));
 
                 // force partitioning
                 if (!left.getProperties().isHashPartitionedOn(leftSymbols)) {
                     left = withDerivedProperties(
                             partitionedExchange(.getNextId(), left.getNode(), leftSymbolsnode.getLeftHashSymbol()),
                             left.getProperties());
                 }
 
                 if (!right.getProperties().isHashPartitionedOn(rightSymbols)) {
                     right = withDerivedProperties(
                             partitionedExchange(.getNextId(), right.getNode(), rightSymbolsnode.getRightHashSymbol()),
                             right.getProperties());
                 }
             }
             else {
                 // It can only be INNER or LEFT here. Therefore, no flipping is necessary even though the below code assumes the node is not RIGHT.
 
                 left = node.getLeft().accept(this, PreferredProperties.any());
                 right = node.getRight().accept(this, PreferredProperties.any());
 
                 if (!left.getProperties().isPartitioned() && right.getProperties().isPartitioned()) {
                     // force single-node join
                     // TODO: if inner join, flip order and do a broadcast join
                     right = withDerivedProperties(gatheringExchange(.getNextId(), right.getNode()), right.getProperties());
                 }
                 else if (left.getProperties().isPartitioned() && !(left.getProperties().isHashPartitionedOn(leftSymbols) && right.getProperties().isHashPartitionedOn(rightSymbols))) {
                     right = withDerivedProperties(new ExchangeNode(
                                     .getNextId(),
                                     ..,
                                     ImmutableList.of(),
                                     Optional.<Symbol>empty(),
                                     ImmutableList.of(right.getNode()),
                                     right.getNode().getOutputSymbols(),
                                     ImmutableList.of(right.getNode().getOutputSymbols())),
                             right.getProperties());
                 }
             }
 
             JoinNode result = new JoinNode(node.getId(),
                     node.getType(),
                     left.getNode(),
                     right.getNode(),
                     node.getCriteria(),
                     node.getLeftHashSymbol(),
                     node.getRightHashSymbol());
 
             return new PlanWithProperties(resultderiveProperties(result, ImmutableList.of(left.getProperties(), right.getProperties())));
         }
 
         @Override
         public PlanWithProperties visitSemiJoin(SemiJoinNode nodePreferredProperties preferred)
         {
             PlanWithProperties source = node.getSource().accept(this, PreferredProperties.any());
             PlanWithProperties filteringSource = node.getFilteringSource().accept(this, PreferredProperties.any());
 
             // make filtering source match requirements of source
             if (source.getProperties().isPartitioned()) {
                 filteringSource = withDerivedProperties(
                         new ExchangeNode(
                                 .getNextId(),
                                 ..,
                                 ImmutableList.of(),
                                 Optional.<Symbol>empty(),
                                 ImmutableList.of(filteringSource.getNode()),
                                 filteringSource.getNode().getOutputSymbols(),
                                 ImmutableList.of(filteringSource.getNode().getOutputSymbols())),
                         filteringSource.getProperties());
             }
             else {
                 filteringSource = withDerivedProperties(
                         gatheringExchange(.getNextId(), filteringSource.getNode()),
                         filteringSource.getProperties());
             }
 
             // TODO: add support for hash-partitioned semijoins
 
             return rebaseAndDeriveProperties(node, ImmutableList.of(sourcefilteringSource));
         }
 
         @Override
         public PlanWithProperties visitIndexJoin(IndexJoinNode nodePreferredProperties preferredProperties)
         {
             List<SymboljoinColumns = Lists.transform(node.getCriteria(), IndexJoinNode.EquiJoinClause::getProbe);
             PlanWithProperties probeSource = node.getProbeSource().accept(this, PreferredProperties.derivePreferences(preferredProperties, ImmutableSet.copyOf(joinColumns), grouped(joinColumns)));
             ActualProperties probeProperties = probeSource.getProperties();
 
             // TODO: allow repartitioning if unpartitioned to increase parallelism
             if ( && probeProperties.isPartitioned()) {
                 // Force partitioned exchange if we are not effectively partitioned on the join keys, or if the probe is currently executing as a single stream
                 // and the repartitioning will make a difference.
                 if (!probeProperties.isPartitionedOn(joinColumns) || (probeProperties.isSingleStream() && probeProperties.isRepartitionEffective(joinColumns))) {
                     probeSource = withDerivedProperties(
                             partitionedExchange(.getNextId(), probeSource.getNode(), joinColumnsnode.getProbeHashSymbol()),
                             probeProperties);
                 }
             }
 
             // TODO: if input is grouped, create streaming join
 
             // index side is really a nested-loops plan, so don't add exchanges
             PlanNode result = ChildReplacer.replaceChildren(node, ImmutableList.of(probeSource.getNode(), node.getIndexSource()));
             return new PlanWithProperties(resultderiveProperties(resultprobeSource.getProperties()));
         }
 
         @Override
         public PlanWithProperties visitUnion(UnionNode nodePreferredProperties preferred)
         {
             if (!preferred.getPartitioningProperties().isPresent() || !preferred.getPartitioningProperties().get().isHashPartitioned()) {
                 // first, classify children into partitioned and unpartitioned
                 List<PlanNodeunpartitionedChildren = new ArrayList<>();
                 List<List<Symbol>> unpartitionedOutputLayouts = new ArrayList<>();
 
                 List<PlanNodepartitionedChildren = new ArrayList<>();
                 List<List<Symbol>> partitionedOutputLayouts = new ArrayList<>();
 
                 List<PlanNodesources = node.getSources();
                 for (int i = 0; i < sources.size(); i++) {
                     PlanWithProperties child = sources.get(i).accept(this, PreferredProperties.any());
                     if (!child.getProperties().isPartitioned()) {
                         unpartitionedChildren.add(child.getNode());
                         unpartitionedOutputLayouts.add(node.sourceOutputLayout(i));
                     }
                     else {
                         partitionedChildren.add(child.getNode());
                         partitionedOutputLayouts.add(node.sourceOutputLayout(i));
                     }
                 }
 
                 PlanNode result = null;
                 if (!partitionedChildren.isEmpty()) {
                     // add an exchange above partitioned inputs and fold it into the
                     // set of unpartitioned inputs
                     result = new ExchangeNode(
                             .getNextId(),
                             ..,
                             ImmutableList.of(),
                             Optional.<Symbol>empty(),
                             partitionedChildren,
                             node.getOutputSymbols(),
                             partitionedOutputLayouts);
 
                     unpartitionedChildren.add(result);
                     unpartitionedOutputLayouts.add(result.getOutputSymbols());
                 }
 
                 // if there's at least one unpartitioned input (including the exchange that might have been added in the
                 // previous step), add a local union
                 if (unpartitionedChildren.size() > 1) {
                     ImmutableListMultimap.Builder<SymbolSymbolmappings = ImmutableListMultimap.builder();
                     for (int i = 0; i < node.getOutputSymbols().size(); i++) {
                         for (List<SymboloutputLayout : unpartitionedOutputLayouts) {
                             mappings.put(node.getOutputSymbols().get(i), outputLayout.get(i));
                         }
                     }
 
                     result = new UnionNode(node.getId(), unpartitionedChildrenmappings.build());
                 }
 
                 return new PlanWithProperties(result, ActualProperties.unpartitioned());
             }
 
             // hash partition the sources
             List<SymbolhashingColumns = preferred.getPartitioningProperties().get().getHashPartitioningColumns().get();
 
             ImmutableList.Builder<PlanNodepartitionedSources = ImmutableList.builder();
             ImmutableListMultimap.Builder<SymbolSymboloutputToSourcesMapping = ImmutableListMultimap.builder();
 
             for (int sourceIndex = 0; sourceIndex < node.getSources().size(); sourceIndex++) {
                 ImmutableList.Builder<SymbolhashColumnsBuilder = ImmutableList.builder();
                 for (Symbol column : hashingColumns) {
                     hashColumnsBuilder.add(node.getSymbolMapping().get(column).get(sourceIndex));
                 }
                 List<SymbolsourceHashColumns = hashColumnsBuilder.build();