Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   *
   *     http://www.apache.org/licenses/LICENSE-2.0
   *
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 package com.facebook.presto.operator;
 
 
 import java.util.List;
 
 import static com.facebook.presto.spi.block.SortOrder.ASC_NULLS_LAST;
 import static com.facebook.presto.util.ImmutableCollectors.toImmutableList;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkPositionIndex;
 import static com.google.common.base.Preconditions.checkState;
 import static com.google.common.collect.Iterables.concat;
 import static java.util.Collections.nCopies;
 import static java.util.Objects.requireNonNull;
 
 public class WindowOperator
         implements Operator
 {
     public static class WindowOperatorFactory
             implements OperatorFactory
     {
         private final int operatorId;
         private final List<TypesourceTypes;
         private final List<IntegeroutputChannels;
         private final List<WindowFunctionDefinitionwindowFunctionDefinitions;
         private final List<IntegerpartitionChannels;
         private final List<IntegerpreGroupedChannels;
         private final List<IntegersortChannels;
         private final List<SortOrdersortOrder;
         private final int preSortedChannelPrefix;
         private final FrameInfo frameInfo;
         private final int expectedPositions;
         private final List<Typetypes;
         private boolean closed;
 
         public WindowOperatorFactory(
                 int operatorId,
                 List<? extends TypesourceTypes,
                 List<IntegeroutputChannels,
                 List<WindowFunctionDefinitionwindowFunctionDefinitions,
                 List<IntegerpartitionChannels,
                 List<IntegerpreGroupedChannels,
                 List<IntegersortChannels,
                 List<SortOrdersortOrder,
                 int preSortedChannelPrefix,
                 FrameInfo frameInfo,
                 int expectedPositions)
         {
             requireNonNull(sourceTypes"sourceTypes is null");
             requireNonNull(outputChannels"outputChannels is null");
             requireNonNull(windowFunctionDefinitions"windowFunctionDefinitions is null");
             requireNonNull(partitionChannels"partitionChannels is null");
             requireNonNull(preGroupedChannels"preGroupedChannels is null");
             checkArgument(partitionChannels.containsAll(preGroupedChannels), "preGroupedChannels must be a subset of partitionChannels");
             requireNonNull(sortChannels"sortChannels is null");
             requireNonNull(sortOrder"sortOrder is null");
             checkArgument(sortChannels.size() == sortOrder.size(), "Must have same number of sort channels as sort orders");
             checkArgument(preSortedChannelPrefix <= sortChannels.size(), "Cannot have more pre-sorted channels than specified sorted channels");
             checkArgument(preSortedChannelPrefix == 0 || ImmutableSet.copyOf(preGroupedChannels).equals(ImmutableSet.copyOf(partitionChannels)), "preSortedChannelPrefix can only be greater than zero if all partition channels are pre-grouped");
             requireNonNull(frameInfo"frameInfo is null");
 
             this. = operatorId;
             this. = ImmutableList.copyOf(sourceTypes);
             this. = ImmutableList.copyOf(outputChannels);
             this. = ImmutableList.copyOf(windowFunctionDefinitions);
             this. = ImmutableList.copyOf(partitionChannels);
             this. = ImmutableList.copyOf(preGroupedChannels);
             this. = ImmutableList.copyOf(sortChannels);
             this. = ImmutableList.copyOf(sortOrder);
             this. = preSortedChannelPrefix;
             this. = frameInfo;
             this. = expectedPositions;
             this. = Stream.concat(
                    outputChannels.stream()
                            .map(sourceTypes::get),
                    windowFunctionDefinitions.stream()
                            .map(WindowFunctionDefinition::getType))
                    .collect(toImmutableList());
        }
        @Override
        public List<TypegetTypes()
        {
            return ;
        }
        @Override
        public Operator createOperator(DriverContext driverContext)
        {
            checkState(!"Factory is already closed");
            OperatorContext operatorContext = driverContext.addOperatorContext(WindowOperator.class.getSimpleName());
            return new WindowOperator(
                    operatorContext,
                    ,
                    ,
                    ,
                    ,
                    ,
                    ,
                    ,
                    ,
                    ,
                    );
        }
        @Override
        public void close()
        {
             = true;
        }
    }
    private enum State
    {
        NEEDS_INPUT,
        HAS_OUTPUT,
        FINISHING,
        FINISHED
    }
    private final OperatorContext operatorContext;
    private final int[] outputChannels;
    private final List<WindowFunctionwindowFunctions;
    private final List<IntegerorderChannels;
    private final List<SortOrderordering;
    private final List<Typetypes;
    private final int[] preGroupedChannels;
    private final FrameInfo frameInfo;
    private final PagesIndex pagesIndex;
    private final PageBuilder pageBuilder;
    private State state = .;
    private WindowPartition partition;
    private Page pendingInput;
    public WindowOperator(
            OperatorContext operatorContext,
            List<TypesourceTypes,
            List<IntegeroutputChannels,
            List<WindowFunctionDefinitionwindowFunctionDefinitions,
            List<IntegerpartitionChannels,
            List<IntegerpreGroupedChannels,
            List<IntegersortChannels,
            List<SortOrdersortOrder,
            int preSortedChannelPrefix,
            FrameInfo frameInfo,
            int expectedPositions)
    {
        requireNonNull(operatorContext"operatorContext is null");
        requireNonNull(outputChannels"outputChannels is null");
        requireNonNull(windowFunctionDefinitions"windowFunctionDefinitions is null");
        requireNonNull(partitionChannels"partitionChannels is null");
        requireNonNull(preGroupedChannels"preGroupedChannels is null");
        checkArgument(partitionChannels.containsAll(preGroupedChannels), "preGroupedChannels must be a subset of partitionChannels");
        requireNonNull(sortChannels"sortChannels is null");
        requireNonNull(sortOrder"sortOrder is null");
        checkArgument(sortChannels.size() == sortOrder.size(), "Must have same number of sort channels as sort orders");
        checkArgument(preSortedChannelPrefix <= sortChannels.size(), "Cannot have more pre-sorted channels than specified sorted channels");
        checkArgument(preSortedChannelPrefix == 0 || ImmutableSet.copyOf(preGroupedChannels).equals(ImmutableSet.copyOf(partitionChannels)), "preSortedChannelPrefix can only be greater than zero if all partition channels are pre-grouped");
        requireNonNull(frameInfo"frameInfo is null");
        this. = operatorContext;
        this. = Ints.toArray(outputChannels);
        this. = windowFunctionDefinitions.stream()
                .map(WindowFunctionDefinition::createWindowFunction)
                .collect(toImmutableList());
        this. = frameInfo;
        this. = Stream.concat(
                outputChannels.stream()
                        .map(sourceTypes::get),
                .stream()
                        .map(WindowFunction::getType))
                .collect(toImmutableList());
        this. = new PagesIndex(sourceTypesexpectedPositions);
        this. = Ints.toArray(preGroupedChannels);
        this. = .createPagesHashStrategy(preGroupedChannels, Optional.<Integer>empty());
        List<IntegerunGroupedPartitionChannels = partitionChannels.stream()
                .filter(channel -> !preGroupedChannels.contains(channel))
                .collect(toImmutableList());
        this. = .createPagesHashStrategy(unGroupedPartitionChannels, Optional.empty());
        List<IntegerpreSortedChannels = sortChannels.stream()
                .limit(preSortedChannelPrefix)
                .collect(toImmutableList());
        this. = .createPagesHashStrategy(preSortedChannels, Optional.<Integer>empty());
        this. = .createPagesHashStrategy(sortChannels, Optional.empty());
        this. = new PageBuilder(this.);
        if (preSortedChannelPrefix > 0) {
            // This already implies that set(preGroupedChannels) == set(partitionChannels) (enforced with checkArgument)
            this. = ImmutableList.copyOf(Iterables.skip(sortChannelspreSortedChannelPrefix));
            this. = ImmutableList.copyOf(Iterables.skip(sortOrderpreSortedChannelPrefix));
        }
        else {
            // Otherwise, we need to sort by the unGroupedPartitionChannels and all original sort channels
            this. = ImmutableList.copyOf(concat(unGroupedPartitionChannelssortChannels));
            this. = ImmutableList.copyOf(concat(nCopies(unGroupedPartitionChannels.size(), ), sortOrder));
        }
    }
    @Override
    {
        return ;
    }
    @Override
    public List<TypegetTypes()
    {
        return ;
    }
    @Override
    public void finish()
    {
        if ( == . ||  == .) {
            return;
        }
        if ( == .) {
            // Since was waiting for more input, prepare what we have for output since we will not be getting any more input
            sortPagesIndexIfNecessary();
        }
         = .;
    }
    @Override
    public boolean isFinished()
    {
        return  == .;
    }
    @Override
    public boolean needsInput()
    {
        return  == .;
    }
    @Override
    public void addInput(Page page)
    {
        checkState( == ."Operator can not take input at this time");
        requireNonNull(page"page is null");
        checkState( == null"Operator already has pending input");
        if (page.getPositionCount() == 0) {
            return;
        }
         = page;
        if (processPendingInput()) {
             = .;
        }
    }

    

Returns:
true if a full group has been buffered after processing the pendingInput, false otherwise
    private boolean processPendingInput()
    {
        checkState( != null);
        // If we have unused input or are finishing, then we have buffered a full group
        if ( != null ||  == .) {
            sortPagesIndexIfNecessary();
            return true;
        }
        else {
            return false;
        }
    }

    

Returns:
the unused section of the page, or null if fully applied. pagesIndex guaranteed to have at least one row after this method returns
    private Page updatePagesIndex(Page page)
    {
        checkArgument(page.getPositionCount() > 0);
        // TODO: Fix pagesHashStrategy to allow specifying channels for comparison, it currently requires us to rearrange the right side blocks in consecutive channel order
        Page preGroupedPage = rearrangePage(page);
        if (.getPositionCount() == 0 || .positionEqualsRow(, 0, 0, preGroupedPage.getBlocks())) {
            // Find the position where the pre-grouped columns change
            int groupEnd = findGroupEnd(preGroupedPage, 0);
            // Add the section of the page that contains values for the current group
            .addPage(page.getRegion(0, groupEnd));
            if (page.getPositionCount() - groupEnd > 0) {
                // Save the remaining page, which may contain multiple partitions
                return page.getRegion(groupEndpage.getPositionCount() - groupEnd);
            }
            else {
                // Page fully consumed
                return null;
            }
        }
        else {
            // We had previous results buffered, but the new page starts with new group values
            return page;
        }
    }
    private static Page rearrangePage(Page pageint[] channels)
    {
        Block[] newBlocks = new Block[channels.length];
        for (int i = 0; i < channels.lengthi++) {
            newBlocks[i] = page.getBlock(channels[i]);
        }
        return new Page(page.getPositionCount(), newBlocks);
    }
    @Override
    public Page getOutput()
    {
        if ( == . ||  == .) {
            return null;
        }
        Page page = extractOutput();
        return page;
    }
    private Page extractOutput()
    {
        // INVARIANT: pagesIndex contains the full grouped & sorted data for one or more partitions
        // Iterate through the positions sequentially until we have one full page
        while (!.isFull()) {
            if ( == null || !.hasNext()) {
                int partitionStart =  == null ? 0 : .getPartitionEnd();
                if (partitionStart >= .getPositionCount()) {
                    // Finished all of the partitions in the current pagesIndex
                     = null;
                    .clear();
                    // Try to extract more partitions from the pendingInput
                    if ( != null && processPendingInput()) {
                        partitionStart = 0;
                    }
                    else if ( == .) {
                         = .;
                        // Output the remaining page if we have anything buffered
                        if (!.isEmpty()) {
                            Page page = .build();
                            .reset();
                            return page;
                        }
                        return null;
                    }
                    else {
                         = .;
                        return null;
                    }
                }
                int partitionEnd = findGroupEnd(partitionStart);
                 = new WindowPartition(partitionStartpartitionEnd);
            }
            .processNextRow();
        }
        Page page = .build();
        .reset();
        return page;
    }
    private void sortPagesIndexIfNecessary()
    {
        if (.getPositionCount() > 1 && !.isEmpty()) {
            int startPosition = 0;
            while (startPosition < .getPositionCount()) {
                int endPosition = findGroupEnd(startPosition);
                .sort(startPositionendPosition);
                startPosition = endPosition;
            }
        }
    }
    // Assumes input grouped on relevant pagesHashStrategy columns
    private static int findGroupEnd(Page pagePagesHashStrategy pagesHashStrategyint startPosition)
    {
        checkArgument(page.getPositionCount() > 0, "Must have at least one position");
        checkPositionIndex(startPositionpage.getPositionCount(), "startPosition out of bounds");
        // Short circuit if the whole page has the same value
        if (pagesHashStrategy.rowEqualsRow(startPositionpage.getBlocks(), page.getPositionCount() - 1, page.getBlocks())) {
            return page.getPositionCount();
        }
        // TODO: do position binary search
        int endPosition = startPosition + 1;
        while (endPosition < page.getPositionCount() &&
                pagesHashStrategy.rowEqualsRow(endPosition - 1, page.getBlocks(), endPositionpage.getBlocks())) {
            endPosition++;
        }
        return endPosition;
    }
    // Assumes input grouped on relevant pagesHashStrategy columns
    private static int findGroupEnd(PagesIndex pagesIndexPagesHashStrategy pagesHashStrategyint startPosition)
    {
        checkArgument(pagesIndex.getPositionCount() > 0, "Must have at least one position");
        checkPositionIndex(startPositionpagesIndex.getPositionCount(), "startPosition out of bounds");
        // Short circuit if the whole page has the same value
        if (pagesIndex.positionEqualsPosition(pagesHashStrategystartPositionpagesIndex.getPositionCount() - 1)) {
            return pagesIndex.getPositionCount();
        }
        // TODO: do position binary search
        int endPosition = startPosition + 1;
        while ((endPosition < pagesIndex.getPositionCount()) &&
                pagesIndex.positionEqualsPosition(pagesHashStrategyendPosition - 1, endPosition)) {
            endPosition++;
        }
        return endPosition;
    }
New to GrepCode? Check out our FAQ X