Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   *
   *     http://www.apache.org/licenses/LICENSE-2.0
   *
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 package com.facebook.presto.testing;
 
 
import static com.facebook.presto.spi.type.BigintType.BIGINT;
import static com.facebook.presto.sql.testing.TreeAssertions.assertFormattedSql;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static io.airlift.concurrent.Threads.daemonThreadsNamed;
import static java.util.concurrent.Executors.newCachedThreadPool;
public class LocalQueryRunner
    implements QueryRunner
    private final Session defaultSession;
    private final ExecutorService executor;
    private final SqlParser sqlParser;
    private final InMemoryNodeManager nodeManager;
    private final TypeRegistry typeRegistry;
    private final MetadataManager metadata;
    private final SplitManager splitManager;
    private final PageSourceManager pageSourceManager;
    private final IndexManager indexManager;
    private final PageSinkManager pageSinkManager;
    private final ExpressionCompiler compiler;
    private final ConnectorManager connectorManager;
    private final boolean hashEnabled;
    private boolean printPlan;
    public LocalQueryRunner(Session defaultSession)
    {
        this. = checkNotNull(defaultSession"defaultSession is null");
        this. = SystemSessionProperties.isOptimizeHashGenerationEnabled(defaultSessionfalse);
        this. = newCachedThreadPool(daemonThreadsNamed("local-query-runner-%s"));
        this. = new SqlParser();
        this. = new InMemoryNodeManager();
        this. = new TypeRegistry();
        this. = new IndexManager();
        this. = new PageSinkManager();
        // sys schema
        SystemTablesMetadata systemTablesMetadata = new SystemTablesMetadata();
        SystemSplitManager systemSplitManager = new SystemSplitManager();
        SystemRecordSetProvider systemRecordSetProvider = new SystemRecordSetProvider();
        this. = new MetadataManager(new FeaturesConfig().setExperimentalSyntaxEnabled(true), systemTablesMetadata);
        this. = new SplitManager(systemSplitManager);
        this. = new PageSourceManager(systemRecordSetProvider);
        SystemTablesManager systemTablesManager = new SystemTablesManager(systemTablesMetadatasystemSplitManagersystemRecordSetProvider, ImmutableSet.<SystemTable>of());
        // sys.node
        systemTablesManager.addTable(new NodesSystemTable());
        // sys.catalog
        systemTablesManager.addTable(new CatalogSystemTable());
        this. = new ExpressionCompiler();
        this. = new ConnectorManager(
                ,
                ,
                ,
                ,
                ,
                new HandleResolver(),
                ImmutableMap.<StringConnectorFactory>of(),
                
        );
    }
    public static LocalQueryRunner createHashEnabledQueryRunner(LocalQueryRunner localQueryRunner)
    {
        Session session = localQueryRunner.getDefaultSession();
        Session.SessionBuilder builder = Session.builder()
                .setUser(session.getUser())
                .setSource(session.getSource())
                .setCatalog(session.getCatalog())
                .setTimeZoneKey(session.getTimeZoneKey())
                .setLocale(session.getLocale())
                .setSystemProperties(ImmutableMap.of("optimizer.optimize_hash_generation""true"));
        return new LocalQueryRunner(builder.build());
    }
    @Override
    public void close()
    {
        .shutdownNow();
    }
    @Override
    public int getNodeCount()
    {
        return 1;
    }
    {
        return ;
    }
    public TypeRegistry getTypeManager()
    {
        return ;
    }
    public Metadata getMetadata()
    {
        return ;
    }
    public ExecutorService getExecutor()
    {
        return ;
    }
    @Override
    public Session getDefaultSession()
    {
        return ;
    }
    public void createCatalog(String catalogNameConnectorFactory connectorFactoryMap<StringStringproperties)
    {
        .addCurrentNodeDatasource(catalogName);
        .createConnection(catalogNameconnectorFactoryproperties);
    }
    @Override
    public void installPlugin(Plugin plugin)
    {
        throw new UnsupportedOperationException();
    }
    @Override
    public void createCatalog(String catalogNameString connectorNameMap<StringStringproperties)
    {
        throw new UnsupportedOperationException();
    }
    public LocalQueryRunner printPlan()
    {
         = true;
        return this;
    }
    public boolean isHashEnabled()
    {
        return ;
    }
    private static class MaterializedOutputFactory
            implements OutputFactory
    {
        private final AtomicReference<MaterializingOperatormaterializingOperator = new AtomicReference<>();
        {
            MaterializingOperator operator = .get();
            checkState(operator != null"Output not created");
            return operator;
        }
        @Override
        public OperatorFactory createOutputOperator(final int operatorIdfinal List<TypesourceType)
        {
            checkNotNull(sourceType"sourceType is null");
            return new OperatorFactory()
            {
                @Override
                public List<TypegetTypes()
                {
                    return ImmutableList.of();
                }
                @Override
                public Operator createOperator(DriverContext driverContext)
                {
                    OperatorContext operatorContext = driverContext.addOperatorContext(operatorIdMaterializingOperator.class.getSimpleName());
                    MaterializingOperator operator = new MaterializingOperator(operatorContextsourceType);
                    if (!.compareAndSet(nulloperator)) {
                        throw new IllegalArgumentException("Output already created");
                    }
                    return operator;
                }
                @Override
                public void close()
                {
                }
            };
        }
    }
    @Override
    public List<QualifiedTableNamelistTables(Session sessionString catalogString schema)
    {
        return getMetadata().listTables(sessionnew QualifiedTablePrefix(catalogschema));
    }
    @Override
    public boolean tableExists(Session sessionString table)
    {
        QualifiedTableName name =  new QualifiedTableName(session.getCatalog(), session.getSchema(), table);
        return getMetadata().getTableHandle(sessionname).isPresent();
    }
    @Override
    public MaterializedResult execute(@Language("SQL"String sql)
    {
        return execute(sql);
    }
    @Override
    public MaterializedResult execute(Session session, @Language("SQL"String sql)
    {
        MaterializedOutputFactory outputFactory = new MaterializedOutputFactory();
        TaskContext taskContext = new TaskContext(new TaskId("query""stage""task"), session);
        List<Driverdrivers = createDrivers(sessionsqloutputFactorytaskContext);
        boolean done = false;
        while (!done) {
            boolean processed = false;
            for (Driver driver : drivers) {
                if (!driver.isFinished()) {
                    driver.process();
                    processed = true;
                }
            }
            done = !processed;
        }
        return outputFactory.getMaterializingOperator().getMaterializedResult();
    }
    public List<DrivercreateDrivers(@Language("SQL"String sqlOutputFactory outputFactoryTaskContext taskContext)
    {
        return createDrivers(sqloutputFactorytaskContext);
    }
    public List<DrivercreateDrivers(Session session, @Language("SQL"String sqlOutputFactory outputFactoryTaskContext taskContext)
    {
        Statement statement = .createStatement(sql);
        assertFormattedSql(statement);
        PlanNodeIdAllocator idAllocator = new PlanNodeIdAllocator();
        FeaturesConfig featuresConfig = new FeaturesConfig()
                .setExperimentalSyntaxEnabled(true)
                .setDistributedIndexJoinsEnabled(false)
                .setOptimizeHashGeneration(true);
        PlanOptimizersFactory planOptimizersFactory = new PlanOptimizersFactory(featuresConfigtrue);
        QueryExplainer queryExplainer = new QueryExplainer(sessionplanOptimizersFactory.get(), featuresConfig.isExperimentalSyntaxEnabled());
        Analyzer analyzer = new Analyzer(session, Optional.of(queryExplainer), featuresConfig.isExperimentalSyntaxEnabled());
        Analysis analysis = analyzer.analyze(statement);
        Plan plan = new LogicalPlanner(sessionplanOptimizersFactory.get(), idAllocator).plan(analysis);
        if () {
            ..println(PlanPrinter.textLogicalPlan(plan.getRoot(), plan.getTypes(), ));
        }
        SubPlan subplan = new PlanFragmenter().createSubPlans(plan);
        if (!subplan.getChildren().isEmpty()) {
            throw new AssertionError("Expected subplan to have no children");
        }
        LocalExecutionPlanner executionPlanner = new LocalExecutionPlanner(
                ,
                ,
                ,
                ,
                ,
                null,
                ,
                new IndexJoinLookupStats(),
                new CompilerConfig().setInterpreterEnabled(false), // make sure tests fail if compiler breaks
                new TaskManagerConfig()
        );
        // plan query
        LocalExecutionPlan localExecutionPlan = executionPlanner.plan(session,
                subplan.getFragment().getRoot(),
                subplan.getFragment().getOutputLayout(),
                plan.getTypes(),
                outputFactory);
        // generate sources
        List<TaskSourcesources = new ArrayList<>();
        long sequenceId = 0;
        for (PlanNode sourceNode : subplan.getFragment().getSources()) {
            if (sourceNode instanceof ValuesNode) {
                continue;
            }
            TableScanNode tableScan = (TableScanNodesourceNode;
            SplitSource splitSource = .getPartitionSplits(tableScan.getTable(), getPartitions(tableScan));
            ImmutableSet.Builder<ScheduledSplitscheduledSplits = ImmutableSet.builder();
            while (!splitSource.isFinished()) {
                try {
                    for (Split split : splitSource.getNextBatch(1000)) {
                        scheduledSplits.add(new ScheduledSplit(sequenceId++, split));
                    }
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw Throwables.propagate(e);
                }
            }
            sources.add(new TaskSource(tableScan.getId(), scheduledSplits.build(), true));
        }
        // create drivers
        List<Driverdrivers = new ArrayList<>();
        Map<PlanNodeIdDriverdriversBySource = new HashMap<>();
        for (DriverFactory driverFactory : localExecutionPlan.getDriverFactories()) {
            DriverContext driverContext = taskContext.addPipelineContext(driverFactory.isInputDriver(), driverFactory.isOutputDriver()).addDriverContext();
            Driver driver = driverFactory.createDriver(driverContext);
            drivers.add(driver);
            for (PlanNodeId sourceId : driver.getSourceIds()) {
                driversBySource.put(sourceIddriver);
            }
            driverFactory.close();
        }
        // add sources to the drivers
        for (TaskSource source : sources) {
            for (Driver driver : driversBySource.values()) {
                driver.updateSource(source);
            }
        }
        return ImmutableList.copyOf(drivers);
    }
    private List<PartitiongetPartitions(TableScanNode node)
    {
        if (node.getGeneratedPartitions().isPresent()) {
            return node.getGeneratedPartitions().get().getPartitions();
        }
        // Otherwise return all partitions
        PartitionResult matchingPartitions = .getPartitions(node.getTable(), Optional.empty());
        return matchingPartitions.getPartitions();
    }
    public OperatorFactory createTableScanOperator(int operatorIdString tableNameString... columnNames)
    {
        return createTableScanOperator(operatorIdtableNamecolumnNames);
    }
            Session session,
            final int operatorId,
            String tableName,
            String... columnNames)
    {
        // look up the table
        TableHandle tableHandle = .getTableHandle(sessionnew QualifiedTableName(session.getCatalog(), session.getSchema(), tableName)).orElse(null);
        checkArgument(tableHandle != null"Table %s does not exist"tableName);
        // lookup the columns
        Map<StringColumnHandleallColumnHandles = .getColumnHandles(tableHandle);
        ImmutableList.Builder<ColumnHandlecolumnHandlesBuilder = ImmutableList.builder();
        ImmutableList.Builder<TypecolumnTypesBuilder = ImmutableList.builder();
        for (String columnName : columnNames) {
            ColumnHandle columnHandle = allColumnHandles.get(columnName);
            checkArgument(columnHandle != null"Table %s does not have a column %s"tableNamecolumnName);
            columnHandlesBuilder.add(columnHandle);
            ColumnMetadata columnMetadata = .getColumnMetadata(tableHandlecolumnHandle);
            columnTypesBuilder.add(columnMetadata.getType());
        }
        final List<ColumnHandlecolumnHandles = columnHandlesBuilder.build();
        final List<TypecolumnTypes = columnTypesBuilder.build();
        // get the split for this table
        final Split split = getLocalQuerySplit(tableHandle);
        return new OperatorFactory()
        {
            @Override
            public List<TypegetTypes()
            {
                return columnTypes;
            }
            @Override
            public Operator createOperator(DriverContext driverContext)
            {
                OperatorContext operatorContext = driverContext.addOperatorContext(operatorId"BenchmarkSource");
                ConnectorPageSource pageSource = .createPageSource(splitcolumnHandles);
                return new PageSourceOperator(pageSourcecolumnTypesoperatorContext);
            }
            @Override
            public void close()
            {
            }
        };
    }
    public OperatorFactory createHashProjectOperator(int operatorIdList<TypecolumnTypesList<IntegerchannelsToHash)
    {
        ImmutableList.Builder<ProjectionFunctionprojectionFunctions = ImmutableList.builder();
        for (int i = 0; i < columnTypes.size(); i++) {
            projectionFunctions.add(ProjectionFunctions.singleColumn(columnTypes.get(i), i));
        }
        projectionFunctions.add(new HashProjectionFunction(columnTypeschannelsToHash));
                operatorId,
                new GenericPageProcessor(.projectionFunctions.build()),
                ImmutableList.copyOf(Iterables.concat(columnTypes, ImmutableList.of())));
    }
    private Split getLocalQuerySplit(TableHandle tableHandle)
    {
        try {
            List<Partitionpartitions = .getPartitions(tableHandle, Optional.empty()).getPartitions();
            SplitSource splitSource = .getPartitionSplits(tableHandlepartitions);
            Split split = Iterables.getOnlyElement(splitSource.getNextBatch(1000));
            while (!splitSource.isFinished()) {
                checkState(splitSource.getNextBatch(1000).isEmpty(), "Expected only one split for a local query");
            }
            return split;
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw Throwables.propagate(e);
        }
    }
    private static class HashProjectionFunction
            implements ProjectionFunction
    {
        private final List<TypecolumnTypes;
        private final List<IntegerhashChannels;
        public HashProjectionFunction(List<TypecolumnTypesList<IntegerhashChannels)
        {
            this. = columnTypes;
            this. = hashChannels;
        }
        @Override
        public Type getType()
        {
            return ;
        }
        @Override
        public void project(int positionBlock[] blocksBlockBuilder output)
        {
            .writeLong(output, TypeUtils.getHashPosition(blocksposition));
        }
        @Override
        public void project(RecordCursor cursorBlockBuilder output)
        {
            throw new UnsupportedOperationException("Operation not supported");
        }
    }
New to GrepCode? Check out our FAQ X