Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   *
   *     http://www.apache.org/licenses/LICENSE-2.0
   *
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 package com.facebook.presto.operator;
 
 
 import java.util.List;
 
 import static com.facebook.presto.operator.LookupJoinOperators.JoinType.FULL_OUTER;
 import static com.facebook.presto.operator.LookupJoinOperators.JoinType.LOOKUP_OUTER;
 import static com.facebook.presto.operator.LookupJoinOperators.JoinType.PROBE_OUTER;
 import static com.facebook.presto.util.MoreFutures.tryGetUnchecked;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 
 public class LookupJoinOperator
         implements OperatorCloseable
 {
 
     private final OperatorContext operatorContext;
     private final JoinProbeFactory joinProbeFactory;
     private final List<Typetypes;
     private final List<TypeprobeTypes;
     private final PageBuilder pageBuilder;
 
     private final boolean lookupOnOuterSide;
     private final boolean probeOnOuterSide;
 
     private LookupSource lookupSource;
     private JoinProbe probe;
 
     private boolean finishing;
     private long joinPosition = -1;
 
 
     public LookupJoinOperator(
             OperatorContext operatorContext,
             LookupSourceSupplier lookupSourceSupplier,
             List<TypeprobeTypes,
             JoinType joinType,
             JoinProbeFactory joinProbeFactory)
     {
         this. = checkNotNull(operatorContext"operatorContext is null");
 
         // todo pass in desired projection
         checkNotNull(lookupSourceSupplier"lookupSourceSupplier is null");
         checkNotNull(probeTypes"probeTypes is null");
 
         this. = lookupSourceSupplier.getLookupSource(operatorContext);
         this. = joinProbeFactory;
 
         // Cannot use switch case here, because javac will synthesize an inner class and cause IllegalAccessError
          = joinType ==  || joinType == ;
          = joinType ==  || joinType == ;
 
         this. = ImmutableList.<Type>builder()
                 .addAll(probeTypes)
                 .addAll(lookupSourceSupplier.getTypes())
                 .build();
         this. = probeTypes;
         this. = new PageBuilder();
     }
 
     @Override
     {
         return ;
     }
 
     @Override
     public List<TypegetTypes()
     {
         return ;
     }
 
     @Override
     public void finish()
     {
          = true;
    }
    @Override
    public boolean isFinished()
    {
        boolean finished =
                 &&
                 == null &&
                .isEmpty() &&
                (! || ( != null && !.hasNext()));
        // if finished drop references so memory is freed early
        if (finished) {
            if ( != null) {
                .close();
                 = null;
            }
             = null;
            .reset();
        }
        return finished;
    }
    @Override
    public ListenableFuture<?> isBlocked()
    {
        return ;
    }
    @Override
    public boolean needsInput()
    {
        if () {
            return false;
        }
        if ( == null) {
             = tryGetUnchecked();
        }
        return  != null &&  == null;
    }
    @Override
    public void addInput(Page page)
    {
        checkNotNull(page"page is null");
        checkState(!"Operator is finishing");
        checkState( != null"Lookup source has not been built yet");
        checkState( == null"Current page has not been completely processed yet");
        // create probe
        // initialize to invalid join position to force output code to advance the cursors
         = -1;
    }
    @Override
    public Page getOutput()
    {
        // If needsInput was never called, lookupSource has not been initialized so far.
        if ( == null) {
             = tryGetUnchecked();
            if ( == null) {
                return null;
            }
        }
        // join probe page with the lookup source
        if ( != null) {
            while (joinCurrentPosition()) {
                if (!advanceProbePosition()) {
                    break;
                }
                if (!outerJoinCurrentPosition()) {
                    break;
                }
            }
        }
        if ( &&  &&  == null) {
            buildSideOuterJoinUnvisitedPositions();
        }
        // only flush full pages unless we are done
        if (.isFull() || ( && !.isEmpty() &&  == null)) {
            Page page = .build();
            .reset();
            return page;
        }
        return null;
    }
    @Override
    public void close()
    {
        if ( != null) {
            .close();
             = null;
        }
    }
    private boolean joinCurrentPosition()
    {
        // while we have a position to join against...
        while ( >= 0) {
            .declarePosition();
            // write probe columns
            .appendTo();
            // write build columns
            // get next join position for this row
            if (.isFull()) {
                return false;
            }
        }
        return true;
    }
    private boolean advanceProbePosition()
    {
        if (!.advanceNextPosition()) {
             = null;
            return false;
        }
        // update join position
        return true;
    }
    private boolean outerJoinCurrentPosition()
    {
        if ( &&  < 0) {
            // write probe columns
            .declarePosition();
            .appendTo();
            // write nulls into build columns
            int outputIndex = .getChannelCount();
            for (int buildChannel = 0; buildChannel < .getChannelCount(); buildChannel++) {
                .getBlockBuilder(outputIndex).appendNull();
                outputIndex++;
            }
            if (.isFull()) {
                return false;
            }
        }
        return true;
    }
    {
        if ( == null) {
        }
        while (.hasNext()) {
            long buildSideOuterJoinPosition = .nextLong();
            .declarePosition();
            // write nulls into probe columns
            for (int probeChannel = 0; probeChannel < .size(); probeChannel++) {
                .getBlockBuilder(probeChannel).appendNull();
            }
            // write build columns
            .appendTo(buildSideOuterJoinPosition.size());
            if (.isFull()) {
                return;
            }
        }
    }
New to GrepCode? Check out our FAQ X