Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   *
   *     http://www.apache.org/licenses/LICENSE-2.0
   *
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 package com.facebook.presto.operator;
 
 
 import java.util.List;
 
 import static com.facebook.presto.RowPagesBuilder.rowPagesBuilder;
 import static com.facebook.presto.SessionTestUtils.TEST_SESSION;
 import static com.facebook.presto.operator.OperatorAssertion.assertOperatorEquals;
 import static com.facebook.presto.spi.type.BigintType.BIGINT;
 import static com.facebook.presto.spi.type.VarcharType.VARCHAR;
 import static com.google.common.collect.Iterables.concat;
 import static io.airlift.concurrent.Threads.daemonThreadsNamed;
 import static io.airlift.units.DataSize.Unit.BYTE;
 import static java.util.concurrent.Executors.newCachedThreadPool;
 
 @Test(singleThreaded = true)
 public class TestHashJoinOperator
 {
     private ExecutorService executor;
     private TaskContext taskContext;
 
     @BeforeMethod
     public void setUp()
     {
          = newCachedThreadPool(daemonThreadsNamed("test-%s"));
          = new TaskContext(new TaskId("query""stage""task"), );
     }
 
     @AfterMethod
     public void tearDown()
     {
         .shutdownNow();
     }
 
     @DataProvider(name = "hashEnabledValues")
     public static Object[][] hashEnabledValuesProvider()
     {
         return new Object[][] { { true }, { false } };
     }
 
     @Test(dataProvider = "hashEnabledValues")
     public void testInnerJoin(boolean hashEnabled)
             throws Exception
     {
         DriverContext driverContext = .addPipelineContext(truetrue).addDriverContext();
 
         // build
         OperatorContext operatorContext = driverContext.addOperatorContext(0, ValuesOperator.class.getSimpleName());
         List<TypebuildTypes = ImmutableList.<Type>of();
         RowPagesBuilder rowPagesBuilder = rowPagesBuilder(hashEnabled, Ints.asList(0), buildTypes);
         Operator buildOperator = new ValuesOperator(operatorContextbuildTypesrowPagesBuilder
                 .addSequencePage(10, 20, 30, 40)
                 .build());
         HashBuilderOperatorFactory hashBuilderOperatorFactory = new HashBuilderOperatorFactory(1, rowPagesBuilder.getTypes(), Ints.asList(0), rowPagesBuilder.getHashChannel(), 100);
         Operator sourceHashProvider = hashBuilderOperatorFactory.createOperator(driverContext);
 
         Driver driver = new Driver(driverContextbuildOperatorsourceHashProvider);
         while (!driver.isFinished()) {
             driver.process();
         }
 
         // probe
         List<TypeprobeTypes = ImmutableList.<Type>of();
         RowPagesBuilder rowPagesBuilderProbe = rowPagesBuilder(hashEnabled, Ints.asList(0), probeTypes);
         List<PageprobeInput = rowPagesBuilderProbe
                 .addSequencePage(1000, 0, 1000, 2000)
                 .build();
         OperatorFactory joinOperatorFactory = LookupJoinOperators.innerJoin(
                 0,
                 hashBuilderOperatorFactory.getLookupSourceSupplier(),
                 rowPagesBuilderProbe.getTypes(),
                Ints.asList(0),
                rowPagesBuilderProbe.getHashChannel());
        Operator joinOperator = joinOperatorFactory.createOperator(.addPipelineContext(truetrue).addDriverContext());
        // expected
        MaterializedResult expected = MaterializedResult.resultBuilder(.getSession(), concat(probeTypesbuildTypes))
                .row("20", 1020, 2020, "20", 30, 40)
                .row("21", 1021, 2021, "21", 31, 41)
                .row("22", 1022, 2022, "22", 32, 42)
                .row("23", 1023, 2023, "23", 33, 43)
                .row("24", 1024, 2024, "24", 34, 44)
                .row("25", 1025, 2025, "25", 35, 45)
                .row("26", 1026, 2026, "26", 36, 46)
                .row("27", 1027, 2027, "27", 37, 47)
                .row("28", 1028, 2028, "28", 38, 48)
                .row("29", 1029, 2029, "29", 39, 49)
                .build();
        assertOperatorEquals(joinOperatorprobeInputexpectedhashEnabled, ImmutableList.of(buildTypes.size(), buildTypes.size() + probeTypes.size() + 1));
    }
    @Test(dataProvider = "hashEnabledValues")
    public void testInnerJoinWithNullProbe(boolean hashEnabled)
            throws Exception
    {
        DriverContext driverContext = .addPipelineContext(truetrue).addDriverContext();
        // build
        OperatorContext operatorContext = driverContext.addOperatorContext(0, ValuesOperator.class.getSimpleName());
        List<TypebuildTypes = ImmutableList.<Type>of();
        RowPagesBuilder rowPagesBuilder = rowPagesBuilder(hashEnabled, Ints.asList(0), buildTypes);
        Operator buildOperator = new ValuesOperator(operatorContextbuildTypesrowPagesBuilder
                .row("a")
                .row("b")
                .row("c")
                .build());
        HashBuilderOperatorFactory hashBuilderOperatorFactory = new HashBuilderOperatorFactory(1, rowPagesBuilder.getTypes(), Ints.asList(0), rowPagesBuilder.getHashChannel(), 100);
        Operator sourceHashProvider = hashBuilderOperatorFactory.createOperator(driverContext);
        Driver driver = new Driver(driverContextbuildOperatorsourceHashProvider);
        while (!driver.isFinished()) {
            driver.process();
        }
        // probe
        List<TypeprobeTypes = ImmutableList.<Type>of();
        RowPagesBuilder rowPagesBuilderProbe = rowPagesBuilder(probeTypes);
        List<PageprobeInput = rowPagesBuilderProbe
                .row("a")
                .row((Stringnull)
                .row((Stringnull)
                .row("a")
                .row("b")
                .build();
        OperatorFactory joinOperatorFactory = LookupJoinOperators.innerJoin(
                0,
                hashBuilderOperatorFactory.getLookupSourceSupplier(),
                rowPagesBuilderProbe.getTypes(),
                Ints.asList(0),
                rowPagesBuilderProbe.getHashChannel());
        Operator joinOperator = joinOperatorFactory.createOperator(.addPipelineContext(truetrue).addDriverContext());
        // expected
        MaterializedResult expected = MaterializedResult.resultBuilder(.getSession(), concat(probeTypesbuildTypes))
                .row("a""a")
                .row("a""a")
                .row("b""b")
                .build();
        assertOperatorEquals(joinOperatorprobeInputexpectedhashEnabled, ImmutableList.of(buildTypes.size() + probeTypes.size()));
    }
    @Test(dataProvider = "hashEnabledValues")
    public void testInnerJoinWithNullBuild(boolean hashEnabled)
            throws Exception
    {
        DriverContext driverContext = .addPipelineContext(truetrue).addDriverContext();
        // build
        OperatorContext operatorContext = driverContext.addOperatorContext(0, ValuesOperator.class.getSimpleName());
        List<TypebuildTypes = ImmutableList.<Type>of();
        RowPagesBuilder rowPagesBuilder = rowPagesBuilder(hashEnabled, Ints.asList(0), buildTypes);
        Operator buildOperator = new ValuesOperator(operatorContextbuildTypesrowPagesBuilder
                .row("a")
                .row((Stringnull)
                .row((Stringnull)
                .row("a")
                .row("b")
                .build());
        HashBuilderOperatorFactory hashBuilderOperatorFactory = new HashBuilderOperatorFactory(1, rowPagesBuilder.getTypes(), Ints.asList(0), rowPagesBuilder.getHashChannel(), 100);
        Operator sourceHashProvider = hashBuilderOperatorFactory.createOperator(driverContext);
        Driver driver = new Driver(driverContextbuildOperatorsourceHashProvider);
        while (!driver.isFinished()) {
            driver.process();
        }
        // probe
        List<TypeprobeTypes = ImmutableList.<Type>of();
        RowPagesBuilder rowPagesBuilderProbe = rowPagesBuilder(hashEnabled, Ints.asList(0), probeTypes);
        List<PageprobeInput = rowPagesBuilderProbe
                .row("a")
                .row("b")
                .row("c")
                .build();
        OperatorFactory joinOperatorFactory = LookupJoinOperators.innerJoin(
                0,
                hashBuilderOperatorFactory.getLookupSourceSupplier(),
                rowPagesBuilderProbe.getTypes(),
                Ints.asList(0),
                 rowPagesBuilderProbe.getHashChannel());
        Operator joinOperator = joinOperatorFactory.createOperator(.addPipelineContext(truetrue).addDriverContext());
        // expected
        MaterializedResult expected = MaterializedResult.resultBuilder(.getSession(), concat(probeTypesbuildTypes))
                .row("a""a")
                .row("a""a")
                .row("b""b")
                .build();
        assertOperatorEquals(joinOperatorprobeInputexpectedhashEnabled, ImmutableList.of(buildTypes.size(), buildTypes.size() + probeTypes.size() + 1));
    }
    @Test(dataProvider = "hashEnabledValues")
    public void testInnerJoinWithNullOnBothSides(boolean hashEnabled)
            throws Exception
    {
        DriverContext driverContext = .addPipelineContext(truetrue).addDriverContext();
        // build
        OperatorContext operatorContext = driverContext.addOperatorContext(0, ValuesOperator.class.getSimpleName());
        List<TypebuildTypes = ImmutableList.<Type>of();
        RowPagesBuilder rowPagesBuilder = rowPagesBuilder(hashEnabled, Ints.asList(0), buildTypes);
        Operator buildOperator = new ValuesOperator(operatorContextbuildTypesrowPagesBuilder
                .row("a")
                .row((Stringnull)
                .row((Stringnull)
                .row("a")
                .row("b")
                .build());
        HashBuilderOperatorFactory hashBuilderOperatorFactory = new HashBuilderOperatorFactory(1, rowPagesBuilder.getTypes(), Ints.asList(0), rowPagesBuilder.getHashChannel(), 100);
        Operator sourceHashProvider = hashBuilderOperatorFactory.createOperator(driverContext);
        Driver driver = new Driver(driverContextbuildOperatorsourceHashProvider);
        while (!driver.isFinished()) {
            driver.process();
        }
        // probe
        List<TypeprobeTypes = ImmutableList.<Type>of();
        RowPagesBuilder rowPagesBuilderProbe = rowPagesBuilder(probeTypes);
        List<PageprobeInput = rowPagesBuilderProbe
                .row("a")
                .row("b")
                .row((Stringnull)
                .row("c")
                .build();
        OperatorFactory joinOperatorFactory = LookupJoinOperators.innerJoin(
                0,
                hashBuilderOperatorFactory.getLookupSourceSupplier(),
                rowPagesBuilderProbe.getTypes(),
                Ints.asList(0),
                rowPagesBuilderProbe.getHashChannel());
        Operator joinOperator = joinOperatorFactory.createOperator(.addPipelineContext(truetrue).addDriverContext());
        // expected
        MaterializedResult expected = MaterializedResult.resultBuilder(.getSession(), concat(probeTypesbuildTypes))
                .row("a""a")
                .row("a""a")
                .row("b""b")
                .build();
        assertOperatorEquals(joinOperatorprobeInputexpectedhashEnabled, ImmutableList.of(buildTypes.size() + probeTypes.size()));
    }
    @Test(dataProvider = "hashEnabledValues")
    public void testProbeOuterJoin(boolean hashEnabled)
            throws Exception
    {
        DriverContext driverContext = .addPipelineContext(truetrue).addDriverContext();
        // build
        OperatorContext operatorContext = driverContext.addOperatorContext(0, ValuesOperator.class.getSimpleName());
        List<TypebuildTypes = ImmutableList.<Type>of();
        RowPagesBuilder rowPagesBuilder = rowPagesBuilder(hashEnabled, Ints.asList(0), buildTypes);
        Operator buildOperator = new ValuesOperator(operatorContextbuildTypesrowPagesBuilder
                .addSequencePage(10, 20, 30, 40)
                .build());
        HashBuilderOperatorFactory hashBuilderOperatorFactory = new HashBuilderOperatorFactory(1, rowPagesBuilder.getTypes(), Ints.asList(0), rowPagesBuilder.getHashChannel(), 100);
        Operator hashBuilderOperator = hashBuilderOperatorFactory.createOperator(driverContext);
        Driver driver = new Driver(driverContextbuildOperatorhashBuilderOperator);
        while (!driver.isFinished()) {
            driver.process();
        }
        // probe
        List<TypeprobeTypes = ImmutableList.<Type>of();
        RowPagesBuilder rowPagesBuilderProbe = rowPagesBuilder(hashEnabled, Ints.asList(0), probeTypes);
        List<PageprobeInput = rowPagesBuilderProbe
                .addSequencePage(15, 20, 1020, 2020)
                .build();
        OperatorFactory joinOperatorFactory = LookupJoinOperators.outerJoin(
                0,
                hashBuilderOperatorFactory.getLookupSourceSupplier(),
                rowPagesBuilderProbe.getTypes(),
                Ints.asList(0),
                rowPagesBuilderProbe.getHashChannel());
        Operator joinOperator = joinOperatorFactory.createOperator(.addPipelineContext(truetrue).addDriverContext());
        // expected
        // expected
        MaterializedResult expected = MaterializedResult.resultBuilder(.getSession(), concat(probeTypesbuildTypes))
                .row("20", 1020, 2020, "20", 30, 40)
                .row("21", 1021, 2021, "21", 31, 41)
                .row("22", 1022, 2022, "22", 32, 42)
                .row("23", 1023, 2023, "23", 33, 43)
                .row("24", 1024, 2024, "24", 34, 44)
                .row("25", 1025, 2025, "25", 35, 45)
                .row("26", 1026, 2026, "26", 36, 46)
                .row("27", 1027, 2027, "27", 37, 47)
                .row("28", 1028, 2028, "28", 38, 48)
                .row("29", 1029, 2029, "29", 39, 49)
                .row("30", 1030, 2030, nullnullnull)
                .row("31", 1031, 2031, nullnullnull)
                .row("32", 1032, 2032, nullnullnull)
                .row("33", 1033, 2033, nullnullnull)
                .row("34", 1034, 2034, nullnullnull)
                .build();
        assertOperatorEquals(joinOperatorprobeInputexpectedhashEnabled, ImmutableList.of(buildTypes.size(), buildTypes.size() + probeTypes.size() + 1));
    }
    @Test(dataProvider = "hashEnabledValues")
    public void testOuterJoinWithNullProbe(boolean hashEnabled)
            throws Exception
    {
        DriverContext driverContext = .addPipelineContext(truetrue).addDriverContext();
        // build
        OperatorContext operatorContext = driverContext.addOperatorContext(0, ValuesOperator.class.getSimpleName());
        List<TypebuildTypes = ImmutableList.<Type>of();
        RowPagesBuilder rowPagesBuilder = rowPagesBuilder(hashEnabled, Ints.asList(0), buildTypes);
        Operator buildOperator = new ValuesOperator(operatorContextbuildTypesrowPagesBuilder
                .row("a")
                .row("b")
                .row("c")
                .build());
        HashBuilderOperatorFactory hashBuilderOperatorFactory = new HashBuilderOperatorFactory(1, rowPagesBuilder.getTypes(), Ints.asList(0), rowPagesBuilder.getHashChannel(), 100);
        Operator sourceHashProvider = hashBuilderOperatorFactory.createOperator(driverContext);
        Driver driver = new Driver(driverContextbuildOperatorsourceHashProvider);
        while (!driver.isFinished()) {
            driver.process();
        }
        // probe
        List<TypeprobeTypes = ImmutableList.<Type>of();
        RowPagesBuilder rowPagesBuilderProbe = rowPagesBuilder(probeTypes);
        List<PageprobeInput = rowPagesBuilderProbe
                .row("a")
                .row((Stringnull)
                .row((Stringnull)
                .row("a")
                .row("b")
                .build();
        OperatorFactory joinOperatorFactory = LookupJoinOperators.outerJoin(
                0,
                hashBuilderOperatorFactory.getLookupSourceSupplier(),
                rowPagesBuilderProbe.getTypes(),
                Ints.asList(0),
                rowPagesBuilderProbe.getHashChannel());
        Operator joinOperator = joinOperatorFactory.createOperator(.addPipelineContext(truetrue).addDriverContext());
        // expected
        MaterializedResult expected = MaterializedResult.resultBuilder(.getSession(), concat(probeTypesbuildTypes))
                .row("a""a")
                .row(nullnull)
                .row(nullnull)
                .row("a""a")
                .row("b""b")
                .build();
        assertOperatorEquals(joinOperatorprobeInputexpectedhashEnabled, ImmutableList.of(buildTypes.size() + probeTypes.size()));
    }
    @Test(dataProvider = "hashEnabledValues")
    public void testOuterJoinWithNullBuild(boolean hashEnabled)
            throws Exception
    {
        DriverContext driverContext = .addPipelineContext(truetrue).addDriverContext();
        // build
        OperatorContext operatorContext = driverContext.addOperatorContext(0, ValuesOperator.class.getSimpleName());
        List<TypebuildTypes = ImmutableList.<Type>of();
        RowPagesBuilder rowPagesBuilder = rowPagesBuilder(hashEnabled, Ints.asList(0), buildTypes);
        Operator buildOperator = new ValuesOperator(operatorContextbuildTypesrowPagesBuilder
                .row("a")
                .row((Stringnull)
                .row((Stringnull)
                .row("a")
                .row("b")
                .build());
        HashBuilderOperatorFactory hashBuilderOperatorFactory = new HashBuilderOperatorFactory(1, rowPagesBuilder.getTypes(), Ints.asList(0), rowPagesBuilder.getHashChannel(), 100);
        Operator sourceHashProvider = hashBuilderOperatorFactory.createOperator(driverContext);
        Driver driver = new Driver(driverContextbuildOperatorsourceHashProvider);
        while (!driver.isFinished()) {
            driver.process();
        }
        // probe
        List<TypeprobeTypes = ImmutableList.<Type>of();
        RowPagesBuilder rowPagesBuilderProbe = rowPagesBuilder(probeTypes);
        List<PageprobeInput = rowPagesBuilderProbe
                .row("a")
                .row("b")
                .row("c")
                .build();
        OperatorFactory joinOperatorFactory = LookupJoinOperators.outerJoin(
                0,
                hashBuilderOperatorFactory.getLookupSourceSupplier(),
                rowPagesBuilderProbe.getTypes(),
                Ints.asList(0),
                rowPagesBuilderProbe.getHashChannel());
        Operator joinOperator = joinOperatorFactory.createOperator(.addPipelineContext(truetrue).addDriverContext());
        // expected
        MaterializedResult expected = MaterializedResult.resultBuilder(.getSession(), concat(probeTypesbuildTypes))
                .row("a""a")
                .row("a""a")
                .row("b""b")
                .row("c"null)
                .build();
        assertOperatorEquals(joinOperatorprobeInputexpectedhashEnabled, ImmutableList.of(buildTypes.size() + probeTypes.size()));
    }
    @Test(dataProvider = "hashEnabledValues")
    public void testOuterJoinWithNullOnBothSides(boolean hashEnabled)
            throws Exception
    {
        DriverContext driverContext = .addPipelineContext(truetrue).addDriverContext();
        // build
        OperatorContext operatorContext = driverContext.addOperatorContext(0, ValuesOperator.class.getSimpleName());
        List<TypebuildTypes = ImmutableList.<Type>of();
        RowPagesBuilder rowPagesBuilder = rowPagesBuilder(hashEnabled, Ints.asList(0), buildTypes);
        Operator buildOperator = new ValuesOperator(operatorContextbuildTypesrowPagesBuilder
                .row("a")
                .row((Stringnull)
                .row((Stringnull)
                .row("a")
                .row("b")
                .build());
        HashBuilderOperatorFactory hashBuilderOperatorFactory = new HashBuilderOperatorFactory(1, rowPagesBuilder.getTypes(), Ints.asList(0), rowPagesBuilder.getHashChannel(), 100);
        Operator sourceHashProvider = hashBuilderOperatorFactory.createOperator(driverContext);
        Driver driver = new Driver(driverContextbuildOperatorsourceHashProvider);
        while (!driver.isFinished()) {
            driver.process();
        }
        // probe
        List<TypeprobeTypes = ImmutableList.<Type>of();
        RowPagesBuilder rowPagesBuilderProbe = rowPagesBuilder(probeTypes);
        List<PageprobeInput = rowPagesBuilderProbe
                .row("a")
                .row("b")
                .row((Stringnull)
                .row("c")
                .build();
        OperatorFactory joinOperatorFactory = LookupJoinOperators.outerJoin(
                0,
                hashBuilderOperatorFactory.getLookupSourceSupplier(),
                rowPagesBuilderProbe.getTypes(),
                Ints.asList(0),
                rowPagesBuilderProbe.getHashChannel());
        Operator joinOperator = joinOperatorFactory.createOperator(.addPipelineContext(truetrue).addDriverContext());
        // expected
        MaterializedResult expected = MaterializedResult.resultBuilder(.getSession(), concat(probeTypesbuildTypes))
                .row("a""a")
                .row("a""a")
                .row("b""b")
                .row(nullnull)
                .row("c"null)
                .build();
        assertOperatorEquals(joinOperatorprobeInputexpectedhashEnabled, ImmutableList.of(buildTypes.size() + probeTypes.size()));
    }
    @Test(expectedExceptions = ExceededMemoryLimitException.class, expectedExceptionsMessageRegExp = "Task exceeded max memory size.*", dataProvider = "hashEnabledValues")
    public void testMemoryLimit(boolean hashEnabled)
            throws Exception
    {
        DriverContext driverContext = new TaskContext(new TaskId("query""stage""task"), new DataSize(100, ))
                .addPipelineContext(truetrue)
                .addDriverContext();
        OperatorContext operatorContext = driverContext.addOperatorContext(0, ValuesOperator.class.getSimpleName());
        List<TypebuildTypes = ImmutableList.<Type>of();
        RowPagesBuilder rowPagesBuilder = rowPagesBuilder(hashEnabled, Ints.asList(0), buildTypes);
        Operator buildOperator = new ValuesOperator(operatorContextbuildTypesrowPagesBuilder
                .addSequencePage(10, 20, 30, 40)
                .build());
        Operator hashBuilderOperator = new HashBuilderOperatorFactory(1, rowPagesBuilder.getTypes(), Ints.asList(0), rowPagesBuilder.getHashChannel(), 1_500_000).createOperator(driverContext);
        Driver driver = new Driver(driverContextbuildOperatorhashBuilderOperator);
        while (!driver.isFinished()) {
            driver.process();
        }
    }
New to GrepCode? Check out our FAQ X