Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   *
   *     http://www.apache.org/licenses/LICENSE-2.0
   *
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 package com.facebook.presto.sql.planner.optimizations;
 
 
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
 import static com.facebook.presto.sql.ExpressionUtils.combineConjuncts;
 import static com.facebook.presto.sql.tree.BooleanLiteral.TRUE_LITERAL;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 import static com.google.common.base.Predicates.in;
 import static java.util.Objects.requireNonNull;
 
 public class IndexJoinOptimizer
         extends PlanOptimizer
 {
     private final IndexManager indexManager;
     private final Metadata metadata;
 
     public IndexJoinOptimizer(Metadata metadataIndexManager indexManager)
     {
         this. = requireNonNull(metadata"metadata is null");
         this. = requireNonNull(indexManager"indexManager is null");
     }
 
     @Override
     public PlanNode optimize(PlanNode planSession sessionMap<Symbolcom.facebook.presto.spi.type.TypetypesSymbolAllocator symbolAllocatorPlanNodeIdAllocator idAllocator)
     {
         checkNotNull(plan"plan is null");
         checkNotNull(session"session is null");
         checkNotNull(types"types is null");
         checkNotNull(symbolAllocator"symbolAllocator is null");
         checkNotNull(idAllocator"idAllocator is null");
 
         return PlanRewriter.rewriteWith(new Rewriter(symbolAllocatoridAllocatorsession), plannull);
     }
 
     private static class Rewriter
             extends PlanRewriter<Void>
     {
         private final IndexManager indexManager;
         private final SymbolAllocator symbolAllocator;
         private final PlanNodeIdAllocator idAllocator;
         private final Metadata metadata;
         private final Session session;
 
         private Rewriter(SymbolAllocator symbolAllocatorPlanNodeIdAllocator idAllocatorIndexManager indexManagerMetadata metadataSession session)
         {
             this. = requireNonNull(symbolAllocator"symbolAllocator is null");
            this. = requireNonNull(idAllocator"idAllocator is null");
            this. = requireNonNull(indexManager"indexManager is null");
            this. = requireNonNull(metadata"metadata is null");
            this. = requireNonNull(session"session is null");
        }
        @Override
        public PlanNode visitJoin(JoinNode nodeRewriteContext<Voidcontext)
        {
            PlanNode leftRewritten = context.rewrite(node.getLeft());
            PlanNode rightRewritten = context.rewrite(node.getRight());
            if (!node.getCriteria().isEmpty()) { // Index join only possible with JOIN criteria
                List<SymbolleftJoinSymbols = Lists.transform(node.getCriteria(), JoinNode.EquiJoinClause::getLeft);
                List<SymbolrightJoinSymbols = Lists.transform(node.getCriteria(), JoinNode.EquiJoinClause::getRight);
                Optional<PlanNodeleftIndexCandidate = IndexSourceRewriter.rewriteWithIndex(
                        leftRewritten,
                        ImmutableSet.copyOf(leftJoinSymbols),
                        ,
                        ,
                        ,
                        ,
                        );
                if (leftIndexCandidate.isPresent()) {
                    // Sanity check that we can trace the path for the index lookup key
                    Map<SymbolSymboltrace = IndexKeyTracer.trace(leftIndexCandidate.get(), ImmutableSet.copyOf(leftJoinSymbols));
                    checkState(!trace.isEmpty() && leftJoinSymbols.containsAll(trace.keySet()));
                }
                Optional<PlanNoderightIndexCandidate = IndexSourceRewriter.rewriteWithIndex(
                        rightRewritten,
                        ImmutableSet.copyOf(rightJoinSymbols),
                        ,
                        ,
                        ,
                        ,
                        );
                if (rightIndexCandidate.isPresent()) {
                    // Sanity check that we can trace the path for the index lookup key
                    Map<SymbolSymboltrace = IndexKeyTracer.trace(rightIndexCandidate.get(), ImmutableSet.copyOf(rightJoinSymbols));
                    checkState(!trace.isEmpty() && rightJoinSymbols.containsAll(trace.keySet()));
                }
                switch (node.getType()) {
                    case :
                        // Prefer the right candidate over the left candidate
                        if (rightIndexCandidate.isPresent()) {
                            return new IndexJoinNode(.getNextId(), ..leftRewrittenrightIndexCandidate.get(), createEquiJoinClause(leftJoinSymbolsrightJoinSymbols), Optional.empty(), Optional.empty());
                        }
                        else if (leftIndexCandidate.isPresent()) {
                            return new IndexJoinNode(.getNextId(), ..rightRewrittenleftIndexCandidate.get(), createEquiJoinClause(rightJoinSymbolsleftJoinSymbols), Optional.empty(), Optional.empty());
                        }
                        break;
                    case :
                        if (rightIndexCandidate.isPresent()) {
                            return new IndexJoinNode(.getNextId(), ..leftRewrittenrightIndexCandidate.get(), createEquiJoinClause(leftJoinSymbolsrightJoinSymbols), Optional.empty(), Optional.empty());
                        }
                        break;
                    case :
                        if (leftIndexCandidate.isPresent()) {
                            return new IndexJoinNode(.getNextId(), ..rightRewrittenleftIndexCandidate.get(), createEquiJoinClause(rightJoinSymbolsleftJoinSymbols), Optional.empty(), Optional.empty());
                        }
                        break;
                    case :
                        break;
                    default:
                        throw new IllegalArgumentException("Unknown type: " + node.getType());
                }
            }
            if (leftRewritten != node.getLeft() || rightRewritten != node.getRight()) {
                return new JoinNode(node.getId(), node.getType(), leftRewrittenrightRewrittennode.getCriteria(), node.getLeftHashSymbol(), node.getRightHashSymbol());
            }
            return node;
        }
        private static List<IndexJoinNode.EquiJoinClausecreateEquiJoinClause(List<SymbolprobeSymbolsList<SymbolindexSymbols)
        {
            checkArgument(probeSymbols.size() == indexSymbols.size());
            ImmutableList.Builder<IndexJoinNode.EquiJoinClausebuilder = ImmutableList.builder();
            for (int i = 0; i < probeSymbols.size(); i++) {
                builder.add(new IndexJoinNode.EquiJoinClause(probeSymbols.get(i), indexSymbols.get(i)));
            }
            return builder.build();
        }
    }
    private static Symbol referenceToSymbol(Expression expression)
    {
        checkArgument(expression instanceof QualifiedNameReference);
        return Symbol.fromQualifiedName(((QualifiedNameReferenceexpression).getName());
    }

    
Tries to rewrite a PlanNode tree with an IndexSource instead of a TableScan
    private static class IndexSourceRewriter
            extends PlanRewriter<IndexSourceRewriter.Context>
    {
        private final IndexManager indexManager;
        private final SymbolAllocator symbolAllocator;
        private final PlanNodeIdAllocator idAllocator;
        private final Metadata metadata;
        private final Session session;
        private IndexSourceRewriter(IndexManager indexManagerSymbolAllocator symbolAllocatorPlanNodeIdAllocator idAllocatorMetadata metadataSession session)
        {
            this. = requireNonNull(metadata"metadata is null");
            this. = checkNotNull(symbolAllocator"symbolAllocator is null");
            this. = checkNotNull(idAllocator"idAllocator is null");
            this. = checkNotNull(indexManager"indexManager is null");
            this. = requireNonNull(session"session is null");
        }
        public static Optional<PlanNoderewriteWithIndex(
                PlanNode planNode,
                Set<SymbollookupSymbols,
                IndexManager indexManager,
                SymbolAllocator symbolAllocator,
                PlanNodeIdAllocator idAllocator,
                Metadata metadata,
                Session session)
        {
            AtomicBoolean success = new AtomicBoolean();
            IndexSourceRewriter indexSourceRewriter = new IndexSourceRewriter(indexManagersymbolAllocatoridAllocatormetadatasession);
            PlanNode rewritten = PlanRewriter.rewriteWith(indexSourceRewriterplanNodenew Context(lookupSymbolssuccess));
            if (success.get()) {
                return Optional.of(rewritten);
            }
            return Optional.empty();
        }
        @Override
        public PlanNode visitPlan(PlanNode nodeRewriteContext<Contextcontext)
        {
            // We don't know how to process this PlanNode in the context of an IndexJoin, so just give up by returning something
            return node;
        }
        @Override
        public PlanNode visitTableScan(TableScanNode nodeRewriteContext<Contextcontext)
        {
            return planTableScan(node.context.get());
        }
        @NotNull
        private PlanNode planTableScan(TableScanNode nodeExpression predicateContext context)
        {
            DomainTranslator.ExtractionResult decomposedPredicate = DomainTranslator.fromPredicate(
                    ,
                    ,
                    predicate,
                    .getTypes());
            TupleDomain<ColumnHandlesimplifiedConstraint = decomposedPredicate.getTupleDomain()
                    .transform(node.getAssignments()::get)
                    .intersect(node.getCurrentConstraint());
            checkState(node.getOutputSymbols().containsAll(context.getLookupSymbols()));
            Set<ColumnHandlelookupColumns = FluentIterable.from(context.getLookupSymbols())
                    .transform(Functions.forMap(node.getAssignments()))
                    .toSet();
            Optional<ResolvedIndexoptionalResolvedIndex = .resolveIndex(node.getTable(), lookupColumnssimplifiedConstraint);
            if (!optionalResolvedIndex.isPresent()) {
                // No index available, so give up by returning something
                return node;
            }
            ResolvedIndex resolvedIndex = optionalResolvedIndex.get();
            Map<ColumnHandleSymbolinverseAssignments = ImmutableBiMap.copyOf(node.getAssignments()).inverse();
            PlanNode source = new IndexSourceNode(
                    .getNextId(),
                    resolvedIndex.getIndexHandle(),
                    node.getTable(),
                    context.getLookupSymbols(),
                    node.getOutputSymbols(),
                    node.getAssignments(),
                    simplifiedConstraint);
            Expression resultingPredicate = combineConjuncts(
                    DomainTranslator.toPredicate(
                            resolvedIndex.getUnresolvedTupleDomain().transform(inverseAssignments::get),
                            .getTypes()),
                    decomposedPredicate.getRemainingExpression());
            if (!resultingPredicate.equals()) {
                // todo it is likely we end up with redundant filters here because the predicate push down has already been run... the fix is to run predicate push down again
                source = new FilterNode(.getNextId(), sourceresultingPredicate);
            }
            context.markSuccess();
            return source;
        }
        @Override
        public PlanNode visitProject(ProjectNode nodeRewriteContext<Contextcontext)
        {
            // Rewrite the lookup symbols in terms of only the pre-projected symbols that have direct translations
            Set<SymbolnewLookupSymbols = FluentIterable.from(context.get().getLookupSymbols())
                    .transform(Functions.forMap(node.getAssignments()))
                    .filter(QualifiedNameReference.class::isInstance)
                    .transform(IndexJoinOptimizer::referenceToSymbol)
                    .toSet();
            if (newLookupSymbols.isEmpty()) {
                return node;
            }
            return context.defaultRewrite(nodenew Context(newLookupSymbolscontext.get().getSuccess()));
        }
        @Override
        public PlanNode visitFilter(FilterNode nodeRewriteContext<Contextcontext)
        {
            if (node.getSource() instanceof TableScanNode) {
                return planTableScan((TableScanNodenode.getSource(), node.getPredicate(), context.get());
            }
            return context.defaultRewrite(nodenew Context(context.get().getLookupSymbols(), context.get().getSuccess()));
        }
        @Override
        public PlanNode visitIndexSource(IndexSourceNode nodeRewriteContext<Contextcontext)
        {
            throw new IllegalStateException("Should not be trying to generate an Index on something that has already been determined to use an Index");
        }
        @Override
        public PlanNode visitIndexJoin(IndexJoinNode nodeRewriteContext<Contextcontext)
        {
            // Lookup symbols can only be passed through the probe side of an index join
            Set<SymbolprobeLookupSymbols = FluentIterable.from(context.get().getLookupSymbols())
                    .filter(in(node.getProbeSource().getOutputSymbols()))
                    .toSet();
            if (probeLookupSymbols.isEmpty()) {
                return node;
            }
            PlanNode rewrittenProbeSource = context.rewrite(node.getProbeSource(), new Context(probeLookupSymbolscontext.get().getSuccess()));
            PlanNode source = node;
            if (rewrittenProbeSource != node.getProbeSource()) {
                source = new IndexJoinNode(node.getId(), node.getType(), rewrittenProbeSourcenode.getIndexSource(), node.getCriteria(), node.getProbeHashSymbol(), node.getIndexHashSymbol());
            }
            return source;
        }
        @Override
        public PlanNode visitAggregation(AggregationNode nodeRewriteContext<Contextcontext)
        {
            // Lookup symbols can only be passed through if they are part of the group by columns
            Set<SymbolgroupByLookupSymbols = FluentIterable.from(context.get().getLookupSymbols())
                    .filter(in(node.getGroupBy()))
                    .toSet();
            if (groupByLookupSymbols.isEmpty()) {
                return node;
            }
            return context.defaultRewrite(nodenew Context(groupByLookupSymbolscontext.get().getSuccess()));
        }
        @Override
        public PlanNode visitSort(SortNode nodeRewriteContext<Contextcontext)
        {
            // Sort has no bearing when building an index, so just ignore the sort
            return context.rewrite(node.getSource(), context.get());
        }
        public static class Context
        {
            private final Set<SymbollookupSymbols;
            private final AtomicBoolean success;
            public Context(Set<SymbollookupSymbolsAtomicBoolean success)
            {
                checkArgument(!lookupSymbols.isEmpty(), "lookupSymbols can not be empty");
                this. = ImmutableSet.copyOf(checkNotNull(lookupSymbols"lookupSymbols is null"));
                this. = checkNotNull(success"success is null");
            }
            public Set<SymbolgetLookupSymbols()
            {
                return ;
            }
            public AtomicBoolean getSuccess()
            {
                return ;
            }
            public void markSuccess()
            {
                checkState(.compareAndSet(falsetrue), "Can only have one success per context");
            }
        }
    }

    
Identify the mapping from the lookup symbols used at the top of the index plan to the actual symbols produced by the IndexSource. Note that multiple top-level lookup symbols may share the same underlying IndexSource symbol. Also note that lookup symbols that do not correspond to underlying index source symbols will be omitted from the returned Map.
    public static class IndexKeyTracer
    {
        public static Map<SymbolSymboltrace(PlanNode nodeSet<SymbollookupSymbols)
        {
            return node.accept(new Visitor(), lookupSymbols);
        }
        private static class Visitor
                extends PlanVisitor<Set<Symbol>, Map<SymbolSymbol>>
        {
            @Override
            protected Map<SymbolSymbolvisitPlan(PlanNode nodeSet<SymbollookupSymbols)
            {
                throw new UnsupportedOperationException("Node not expected to be part of Index pipeline: " + node);
            }
            @Override
            public Map<SymbolSymbolvisitProject(ProjectNode nodeSet<SymbollookupSymbols)
            {
                // Map from output Symbols to source Symbols
                Map<SymbolSymboldirectSymbolTranslationOutputMap = Maps.transformValues(Maps.filterValues(node.getAssignments(), QualifiedNameReference.class::isInstance), IndexJoinOptimizer::referenceToSymbol);
                Map<SymbolSymboloutputToSourceMap = FluentIterable.from(lookupSymbols)
                        .filter(in(directSymbolTranslationOutputMap.keySet()))
                        .toMap(Functions.forMap(directSymbolTranslationOutputMap));
                checkState(!outputToSourceMap.isEmpty(), "No lookup symbols were able to pass through the projection");
                // Map from source Symbols to underlying index source Symbols
                Map<SymbolSymbolsourceToIndexMap = node.getSource().accept(this, ImmutableSet.copyOf(outputToSourceMap.values()));
                // Generate the Map the connects lookup symbols to underlying index source symbols
                Map<SymbolSymboloutputToIndexMap = Maps.transformValues(Maps.filterValues(outputToSourceMapin(sourceToIndexMap.keySet())), Functions.forMap(sourceToIndexMap));
                return ImmutableMap.copyOf(outputToIndexMap);
            }
            @Override
            public Map<SymbolSymbolvisitFilter(FilterNode nodeSet<SymbollookupSymbols)
            {
                return node.getSource().accept(thislookupSymbols);
            }
            @Override
            public Map<SymbolSymbolvisitIndexJoin(IndexJoinNode nodeSet<SymbollookupSymbols)
            {
                Set<SymbolprobeLookupSymbols = FluentIterable.from(lookupSymbols)
                        .filter(in(node.getProbeSource().getOutputSymbols()))
                        .toSet();
                checkState(!probeLookupSymbols.isEmpty(), "No lookup symbols were able to pass through the index join probe source");
                return node.getProbeSource().accept(thisprobeLookupSymbols);
            }
            @Override
            public Map<SymbolSymbolvisitAggregation(AggregationNode nodeSet<SymbollookupSymbols)
            {
                Set<SymbolgroupByLookupSymbols = FluentIterable.from(lookupSymbols)
                        .filter(in(node.getGroupBy()))
                        .toSet();
                checkState(!groupByLookupSymbols.isEmpty(), "No lookup symbols were able to pass through the aggregation group by");
                return node.getSource().accept(thisgroupByLookupSymbols);
            }
            @Override
            public Map<SymbolSymbolvisitSort(SortNode nodeSet<SymbollookupSymbols)
            {
                return node.getSource().accept(thislookupSymbols);
            }
            @Override
            public Map<SymbolSymbolvisitIndexSource(IndexSourceNode nodeSet<SymbollookupSymbols)
            {
                checkState(node.getLookupSymbols().equals(lookupSymbols), "lookupSymbols must be the same as IndexSource lookup symbols");
                return FluentIterable.from(lookupSymbols)
                        .toMap(Functions.<Symbol>identity());
            }
        }
    }
New to GrepCode? Check out our FAQ X