Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   *
   *     http://www.apache.org/licenses/LICENSE-2.0
   *
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 package com.facebook.presto.operator;
 
 
 import java.util.List;
 
 import static com.facebook.presto.spi.StandardErrorCode.INVALID_WINDOW_FRAME;
 import static com.facebook.presto.spi.block.SortOrder.ASC_NULLS_LAST;
 import static com.facebook.presto.sql.tree.FrameBound.Type.FOLLOWING;
 import static com.facebook.presto.sql.tree.FrameBound.Type.PRECEDING;
 import static com.facebook.presto.sql.tree.FrameBound.Type.UNBOUNDED_FOLLOWING;
 import static com.facebook.presto.sql.tree.FrameBound.Type.UNBOUNDED_PRECEDING;
 import static com.facebook.presto.util.Failures.checkCondition;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 import static com.google.common.collect.Iterables.concat;
 
 public class WindowOperator
         implements Operator
 {
     public static class WindowOperatorFactory
             implements OperatorFactory
     {
         private final int operatorId;
         private final List<TypesourceTypes;
         private final List<IntegeroutputChannels;
         private final List<WindowFunctionDefinitionwindowFunctionDefinitions;
         private final List<TypepartitionTypes;
         private final List<IntegerpartitionChannels;
         private final List<TypesortTypes;
         private final List<IntegersortChannels;
         private final List<SortOrdersortOrder;
         private final WindowFrame.Type frameType;
         private final FrameBound.Type frameStartType;
         private final Optional<IntegerframeStartChannel;
         private final FrameBound.Type frameEndType;
         private final Optional<IntegerframeEndChannel;
         private final int expectedPositions;
         private final List<Typetypes;
         private boolean closed;
 
         public WindowOperatorFactory(
                 int operatorId,
                 List<? extends TypesourceTypes,
                 List<IntegeroutputChannels,
                 List<WindowFunctionDefinitionwindowFunctionDefinitions,
                 List<IntegerpartitionChannels,
                 List<IntegersortChannels,
                 List<SortOrdersortOrder,
                 WindowFrame.Type frameType,
                 FrameBound.Type frameStartType,
                 Optional<IntegerframeStartChannel,
                 FrameBound.Type frameEndType,
                 Optional<IntegerframeEndChannel,
                 int expectedPositions)
         {
             this. = operatorId;
             this. = ImmutableList.copyOf(sourceTypes);
             this. = ImmutableList.copyOf(checkNotNull(outputChannels"outputChannels is null"));
             this. = windowFunctionDefinitions;
             ImmutableList.Builder<TypepartitionTypes = ImmutableList.builder();
             for (int channel : partitionChannels) {
                 partitionTypes.add(sourceTypes.get(channel));
             }
             this. = partitionTypes.build();
             this. = ImmutableList.copyOf(checkNotNull(partitionChannels"partitionChannels is null"));
             ImmutableList.Builder<TypesortTypes = ImmutableList.builder();
             for (int channel : sortChannels) {
                 sortTypes.add(sourceTypes.get(channel));
             }
             this. = sortTypes.build();
             this. = ImmutableList.copyOf(checkNotNull(sortChannels"sortChannels is null"));
             this. = ImmutableList.copyOf(checkNotNull(sortOrder"sortOrder is null"));
 
            this. = checkNotNull(frameType"frameType is null");
            this. = checkNotNull(frameStartType"frameStartType is null");
            this. = checkNotNull(frameStartChannel"frameStartChannel is null");
            this. = checkNotNull(frameEndType"frameEndType is null");
            this. = checkNotNull(frameEndChannel"frameEndChannel is null");
            this. = expectedPositions;
            this. = toTypes(sourceTypesoutputChannelstoWindowFunctions(windowFunctionDefinitions));
        }
        @Override
        public List<TypegetTypes()
        {
            return ;
        }
        @Override
        public Operator createOperator(DriverContext driverContext)
        {
            checkState(!"Factory is already closed");
            OperatorContext operatorContext = driverContext.addOperatorContext(WindowOperator.class.getSimpleName());
            return new WindowOperator(
                    operatorContext,
                    ,
                    ,
                    ,
                    ,
                    ,
                    ,
                    ,
                    ,
                    ,
                    ,
                    ,
                    ,
                    ,
                    );
        }
        @Override
        public void close()
        {
             = true;
        }
    }
    private enum State
    {
        NEEDS_INPUT,
        HAS_OUTPUT,
        FINISHED
    }
    private final OperatorContext operatorContext;
    private final int[] outputChannels;
    private final List<WindowFunctionwindowFunctions;
    private final List<TypepartitionTypes;
    private final List<IntegerpartitionChannels;
    private final List<TypesortTypes;
    private final List<IntegersortChannels;
    private final List<SortOrdersortOrder;
    private final List<Typetypes;
    private final boolean frameRange;
    private final FrameBound.Type frameStartType;
    private final int frameStartChannel;
    private final FrameBound.Type frameEndType;
    private final int frameEndChannel;
    private final PagesIndex pagesIndex;
    private final PageBuilder pageBuilder;
    private State state = .;
    private int currentPosition;
    private int partitionStart;
    private int partitionEnd;
    private int peerGroupStart;
    private int peerGroupEnd;
    private int frameStart;
    private int frameEnd;
    public WindowOperator(
            OperatorContext operatorContext,
            List<TypesourceTypes,
            List<IntegeroutputChannels,
            List<WindowFunctionDefinitionwindowFunctionDefinitions,
            List<TypepartitionTypesList<IntegerpartitionChannels,
            List<TypesortTypesList<IntegersortChannels,
            List<SortOrdersortOrder,
            WindowFrame.Type frameType,
            FrameBound.Type frameStartType,
            Optional<IntegerframeStartChannel,
            FrameBound.Type frameEndType,
            Optional<IntegerframeEndChannel,
            int expectedPositions)
    {
        this. = checkNotNull(operatorContext"operatorContext is null");
        this. = Ints.toArray(checkNotNull(outputChannels"outputChannels is null"));
        this. = toWindowFunctions(checkNotNull(windowFunctionDefinitions"windowFunctionDefinitions is null"));
        this. = ImmutableList.copyOf(checkNotNull(partitionTypes"partitionTypes is null"));
        this. = ImmutableList.copyOf(checkNotNull(partitionChannels"partitionChannels is null"));
        this. = ImmutableList.copyOf(checkNotNull(sortTypes"sortTypes is null"));
        this. = ImmutableList.copyOf(checkNotNull(sortChannels"sortChannels is null"));
        this. = ImmutableList.copyOf(checkNotNull(sortOrder"sortOrder is null"));
        this. = (checkNotNull(frameType"frameType is null") == ..);
        this. = checkNotNull(frameStartType"frameStartType is null");
        this. = checkNotNull(frameStartChannel"frameStartChannel is null").orElse(-1);
        this. = checkNotNull(frameEndType"frameEndType is null");
        this. = checkNotNull(frameEndChannel"frameEndChannel is null").orElse(-1);
        this. = toTypes(sourceTypesoutputChannels);
        this. = new PagesIndex(sourceTypesexpectedPositions);
        this. = new PageBuilder(this.);
    }
    @Override
    {
        return ;
    }
    @Override
    public List<TypegetTypes()
    {
        return ;
    }
    @Override
    public void finish()
    {
        if ( == .) {
             = .;
            // we partition by ordering the values so partitions are sequential values
            List<SortOrderpartitionOrder = Collections.nCopies(.size(), );
            // sort everything by partition channels, then sort channels
            List<IntegerorderChannels = ImmutableList.copyOf(concat());
            List<SortOrderordering = ImmutableList.copyOf(concat(partitionOrder));
            List<TypeorderingTypes = ImmutableList.copyOf(concat());
            // sort the index
            .sort(orderingTypesorderChannelsordering);
            // create partition comparator
             = .createComparator(orderingTypespartitionOrder);
            // create order comparator
             = .createComparator(orderingTypes);
        }
    }
    @Override
    public boolean isFinished()
    {
        return  == .;
    }
    @Override
    public boolean needsInput()
    {
        return  == .;
    }
    @Override
    public void addInput(Page page)
    {
        checkState( == ."Operator is already finishing");
        checkNotNull(page"page is null");
        .addPage(page);
    }
    @Override
    public Page getOutput()
    {
        if ( != .) {
            return null;
        }
        if ( >= .getPositionCount()) {
             = .;
            return null;
        }
        // iterate through the positions sequentially until we have one full page
        .reset();
        while (!.isFull() &&  < .getPositionCount()) {
            // check for new partition
            boolean newPartition = ( == 0) || ( == );
            if (newPartition) {
                 = ;
                // find end of partition
                ++;
                while (( < .getPositionCount()) &&
                        (.compare( - 1, ) == 0)) {
                    ++;
                }
                // reset functions for new partition
                WindowIndex windowIndex = new WindowIndex();
                for (WindowFunction function : ) {
                    function.reset(windowIndex);
                }
            }
            // copy output channels
            .declarePosition();
            int channel = 0;
            while (channel < .) {
                .appendTo([channel], .getBlockBuilder(channel));
                channel++;
            }
            // check for new peer group
            boolean newPeerGroup = newPartition || ( == );
            if (newPeerGroup) {
                 = ;
                // find end of peer group
                ++;
                while (( < ) &&
                        (.compare( - 1, ) == 0)) {
                    ++;
                }
            }
            // compute window frame
            updateFrame();
            // process window functions
            for (WindowFunction function : ) {
                function.processRow(
                        .getBlockBuilder(channel),
                         - ,
                         -  - 1,
                        ,
                        );
                channel++;
            }
            ++;
        }
        // output the page if we have any data
        if (.isEmpty()) {
             = .;
            return null;
        }
        Page page = .build();
        return page;
    }
    private void updateFrame()
    {
        int rowPosition =  - ;
        int endPosition =  -  - 1;
        // frame start
        if ( == ) {
             = 0;
        }
        else if ( == ) {
             = preceding(rowPositiongetStartValue());
        }
        else if ( == ) {
             = following(rowPositionendPositiongetStartValue());
        }
        else if () {
             =  - ;
        }
        else {
             = rowPosition;
        }
        // frame end
        if ( == ) {
             = endPosition;
        }
        else if ( == ) {
             = preceding(rowPositiongetEndValue());
        }
        else if ( == ) {
             = following(rowPositionendPositiongetEndValue());
        }
        else if () {
             =  -  - 1;
        }
        else {
             = rowPosition;
        }
        // handle empty frame
        if (emptyFrame(rowPositionendPosition)) {
             = -1;
             = -1;
        }
    }
    private boolean emptyFrame(int rowPositionint endPosition)
    {
        if ( != ) {
            return false;
        }
        FrameBound.Type type = ;
        if ((type != ) && (type != )) {
            return false;
        }
        long start = getStartValue();
        long end = getEndValue();
        if (type == ) {
            return (start < end) || ((start > rowPosition) && (end > rowPosition));
        }
        int positions = endPosition - rowPosition;
        return (start > end) || ((start > positions) && (end > positions));
    }
    private static int preceding(int rowPositionlong value)
    {
        if (value > rowPosition) {
            return 0;
        }
        return Ints.checkedCast(rowPosition - value);
    }
    private static int following(int rowPositionint endPositionlong value)
    {
        if (value > (endPosition - rowPosition)) {
            return endPosition;
        }
        return Ints.checkedCast(rowPosition + value);
    }
    private long getStartValue()
    {
        return getFrameValue("starting");
    }
    private long getEndValue()
    {
        return getFrameValue("ending");
    }
    private long getFrameValue(int channelString type)
    {
        checkCondition(!.isNull(channel), "Window frame %s offset must not be null"type);
        long value = .getLong(channel);
        checkCondition(value >= 0, "Window frame %s offset must not be negative");
        return value;
    }
    private static List<TypetoTypes(List<? extends TypesourceTypesList<IntegeroutputChannelsList<WindowFunctionwindowFunctions)
    {
        ImmutableList.Builder<Typetypes = ImmutableList.builder();
        for (int channel : outputChannels) {
            types.add(sourceTypes.get(channel));
        }
        for (WindowFunction function : windowFunctions) {
            types.add(function.getType());
        }
        return types.build();
    }
    private static List<WindowFunctiontoWindowFunctions(List<WindowFunctionDefinitionwindowFunctionDefinitions)
    {
        ImmutableList.Builder<WindowFunctionbuilder = ImmutableList.builder();
        for (WindowFunctionDefinition windowFunctionDefinition : windowFunctionDefinitions) {
            builder.add(windowFunctionDefinition.createWindowFunction());
        }
        return builder.build();
    }
New to GrepCode? Check out our FAQ X