Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   *
   *     http://www.apache.org/licenses/LICENSE-2.0
   *
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 package com.facebook.presto.sql.planner;
 
 
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
 import static com.facebook.presto.util.ImmutableCollectors.toImmutableList;
 import static com.google.common.base.Preconditions.checkState;
 import static com.google.common.base.Predicates.in;

Splits a logical plan into fragments that can be shipped and executed on distributed nodes
 
 public class PlanFragmenter
 {
     public SubPlan createSubPlans(Plan plan)
     {
         Fragmenter fragmenter = new Fragmenter(plan.getSymbolAllocator().getTypes());
 
         FragmentProperties properties = new FragmentProperties();
         PlanNode root = PlanRewriter.rewriteWith(fragmenterplan.getRoot(), properties);
 
         SubPlan result = fragmenter.buildFragment(rootproperties);
         result.sanityCheck();
 
         return result;
     }
 
     private static class Fragmenter
             extends PlanRewriter<FragmentProperties>
     {
         private final Map<SymbolTypetypes;
         private int nextFragmentId;
 
         public Fragmenter(Map<SymbolTypetypes)
         {
             this. = types;
         }
 
         private PlanFragmentId nextFragmentId()
         {
             return new PlanFragmentId(String.valueOf(++));
         }
 
         private SubPlan buildFragment(PlanNode rootFragmentProperties properties)
         {
             Set<Symboldependencies = SymbolExtractor.extract(root);
 
             PlanFragment fragment = new PlanFragment(
                     nextFragmentId(),
                     root,
                     Maps.filterKeys(in(dependencies)),
                     properties.getOutputLayout(),
                     properties.getDistribution(),
                     properties.getDistributeBy(),
                     properties.getOutputPartitioning(),
                     properties.getPartitionBy(),
                     properties.getHash());
 
             return new SubPlan(fragmentproperties.getChildren());
         }
 
         @Override
         public PlanNode visitOutput(OutputNode nodeRewriteContext<FragmentPropertiescontext)
         {
             context.get()
                     .setSingleNodeDistribution() // TODO: add support for distributed output
                    .setOutputLayout(node.getOutputSymbols())
                    .setUnpartitionedOutput();
            return context.defaultRewrite(nodecontext.get());
        }
        @Override
        {
            context.get().setCoordinatorOnlyDistribution();
            return context.defaultRewrite(nodecontext.get());
        }
        @Override
        public PlanNode visitTableScan(TableScanNode nodeRewriteContext<FragmentPropertiescontext)
        {
            context.get().setSourceDistribution(node.getId());
            return context.defaultRewrite(nodecontext.get());
        }
        @Override
        public PlanNode visitValues(ValuesNode nodeRewriteContext<FragmentPropertiescontext)
        {
            context.get().setSingleNodeDistribution();
            return context.defaultRewrite(nodecontext.get());
        }
        @Override
        public PlanNode visitExchange(ExchangeNode exchangeRewriteContext<FragmentPropertiescontext)
        {
            ImmutableList.Builder<SubPlanbuilder = ImmutableList.builder();
            if (exchange.getType() == ..) {
                context.get().setSingleNodeDistribution();
                for (int i = 0; i < exchange.getSources().size(); i++) {
                    FragmentProperties childProperties = new FragmentProperties();
                    childProperties.setUnpartitionedOutput();
                    childProperties.setOutputLayout(exchange.getInputs().get(i));
                    builder.add(buildSubPlan(exchange.getSources().get(i), childPropertiescontext));
                }
            }
            else if (exchange.getType() == ..) {
                context.get().setFixedDistribution();
                FragmentProperties childProperties = new FragmentProperties()
                        .setHashPartitionedOutput(exchange.getPartitionKeys(), exchange.getHashSymbol())
                        .setOutputLayout(Iterables.getOnlyElement(exchange.getInputs()));
                builder.add(buildSubPlan(Iterables.getOnlyElement(exchange.getSources()), childPropertiescontext));
            }
            else if (exchange.getType() == ..) {
                FragmentProperties childProperties = new FragmentProperties();
                childProperties.setUnpartitionedOutput();
                childProperties.setOutputLayout(Iterables.getOnlyElement(exchange.getInputs()));
                builder.add(buildSubPlan(Iterables.getOnlyElement(exchange.getSources()), childPropertiescontext));
            }
            List<SubPlanchildren = builder.build();
            context.get().addChildren(children);
            List<PlanFragmentIdchildrenIds = children.stream()
                    .map(SubPlan::getFragment)
                    .map(PlanFragment::getId)
                    .collect(toImmutableList());
            return new RemoteSourceNode(exchange.getId(), childrenIdsexchange.getOutputSymbols());
        }
        private SubPlan buildSubPlan(PlanNode nodeFragmentProperties propertiesRewriteContext<FragmentPropertiescontext)
        {
            PlanNode child = context.rewrite(nodeproperties);
            return buildFragment(childproperties);
        }
    }
    private static class FragmentProperties
    {
        private final List<SubPlanchildren = new ArrayList<>();
        private Optional<List<Symbol>> outputLayout = Optional.empty();
        private Optional<OutputPartitioningoutputPartitioning = Optional.empty();
        private List<SymbolpartitionBy = ImmutableList.of();
        private Optional<Symbolhash = Optional.empty();
        private Optional<PlanDistributiondistribution = Optional.empty();
        private PlanNodeId distributeBy;
        public List<SubPlangetChildren()
        {
            return ;
        }
        {
            if (.isPresent()) {
                PlanDistribution value = .get();
                checkState(value == . || value == .,
                        "Cannot overwrite distribution with %s (currently set to %s)".value);
            }
            else {
                 = Optional.of(.);
            }
            return this;
        }
        public FragmentProperties setFixedDistribution()
        {
            .ifPresent(current -> checkState(current == .,
                    "Cannot set distribution to %s. Already set to %s",
                    .,
                    current));
             = Optional.of(.);
            return this;
        }
        {
            // only SINGLE can be upgraded to COORDINATOR_ONLY
            .ifPresent(current -> checkState(.get() == .,
                    "Cannot overwrite distribution with %s (currently set to %s)",
                    .,
                    .get()));
             = Optional.of(.);
            return this;
        }
        public FragmentProperties setSourceDistribution(PlanNodeId source)
        {
            if (.isPresent()) {
                // If already SINGLE or COORDINATOR_ONLY, leave it as is (this is for single-node execution)
                        "Cannot overwrite distribution with %s (currently set to %s)",
                        .,
                        .get());
            }
            else {
                 = Optional.of(.);
                this. = source;
            }
            return this;
        }
        {
            .ifPresent(current -> {
                throw new IllegalStateException(String.format("Output overwrite partitioning with %s (currently set to %s)".current));
            });
             = Optional.of(.);
            return this;
        }
        public FragmentProperties setOutputLayout(List<Symbollayout)
        {
            .ifPresent(current -> {
                throw new IllegalStateException(String.format("Cannot overwrite output layout with %s (currently set to %s)"layoutcurrent));
            });
             = Optional.of(layout);
            return this;
        }
        public FragmentProperties setHashPartitionedOutput(List<SymbolpartitionKeysOptional<Symbolhash)
        {
            .ifPresent(current -> {
                throw new IllegalStateException(String.format("Cannot overwrite output partitioning with %s (currently set to %s)".current));
            });
            this. = Optional.of(.);
            this. = ImmutableList.copyOf(partitionKeys);
            this. = hash;
            return this;
        }
        public FragmentProperties addChildren(List<SubPlanchildren)
        {
            this..addAll(children);
            return this;
        }
        public List<SymbolgetOutputLayout()
        {
            return .get();
        }
        public OutputPartitioning getOutputPartitioning()
        {
            return .get();
        }
        public PlanDistribution getDistribution()
        {
            return .get();
        }
        public List<SymbolgetPartitionBy()
        {
            return ;
        }
        public Optional<SymbolgetHash()
        {
            return ;
        }
        public PlanNodeId getDistributeBy()
        {
            return ;
        }
    }
New to GrepCode? Check out our FAQ X