Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   *
   *     http://www.apache.org/licenses/LICENSE-2.0
   *
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 package com.facebook.presto.operator;
 
 
 import java.util.List;
 
 import static com.facebook.presto.util.MoreFutures.tryGetUnchecked;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 
 public class LookupJoinOperator
         implements OperatorCloseable
 {
 
     private final OperatorContext operatorContext;
     private final JoinProbeFactory joinProbeFactory;
     private final boolean enableOuterJoin;
     private final List<Typetypes;
     private final PageBuilder pageBuilder;
 
     private LookupSource lookupSource;
     private JoinProbe probe;
 
     private boolean finishing;
     private long joinPosition = -1;
 
     public LookupJoinOperator(
             OperatorContext operatorContext,
             LookupSourceSupplier lookupSourceSupplier,
             List<TypeprobeTypes,
             boolean enableOuterJoin,
             JoinProbeFactory joinProbeFactory)
     {
         this. = checkNotNull(operatorContext"operatorContext is null");
 
         // todo pass in desired projection
         checkNotNull(lookupSourceSupplier"lookupSourceSupplier is null");
         checkNotNull(probeTypes"probeTypes is null");
 
         this. = lookupSourceSupplier.getLookupSource(operatorContext);
         this. = joinProbeFactory;
         this. = enableOuterJoin;
 
         this. = ImmutableList.<Type>builder()
                 .addAll(probeTypes)
                 .addAll(lookupSourceSupplier.getTypes())
                 .build();
         this. = new PageBuilder();
     }
 
     @Override
     {
         return ;
     }
 
     @Override
     public List<TypegetTypes()
     {
         return ;
     }
 
     @Override
     public void finish()
     {
          = true;
     }
 
     @Override
     public boolean isFinished()
     {
         boolean finished =  &&  == null && .isEmpty();
 
         // if finished drop references so memory is freed early
         if (finished) {
             if ( != null) {
                 .close();
                  = null;
             }
              = null;
            .reset();
        }
        return finished;
    }
    @Override
    public ListenableFuture<?> isBlocked()
    {
        return ;
    }
    @Override
    public boolean needsInput()
    {
        if () {
            return false;
        }
        if ( == null) {
             = tryGetUnchecked();
        }
        return  != null &&  == null;
    }
    @Override
    public void addInput(Page page)
    {
        checkNotNull(page"page is null");
        checkState(!"Operator is finishing");
        checkState( != null"Lookup source has not been built yet");
        checkState( == null"Current page has not been completely processed yet");
        // create probe
        // initialize to invalid join position to force output code to advance the cursors
         = -1;
    }
    @Override
    public Page getOutput()
    {
        // join probe page with the lookup source
        if ( != null) {
            while (joinCurrentPosition()) {
                if (!advanceProbePosition()) {
                    break;
                }
                if (!outerJoinCurrentPosition()) {
                    break;
                }
            }
        }
        // only flush full pages unless we are done
        if (.isFull() || ( && !.isEmpty() &&  == null)) {
            Page page = .build();
            .reset();
            return page;
        }
        return null;
    }
    @Override
    public void close()
    {
        if ( != null) {
            .close();
             = null;
        }
    }
    private boolean joinCurrentPosition()
    {
        // while we have a position to join against...
        while ( >= 0) {
            .declarePosition();
            // write probe columns
            .appendTo();
            // write build columns
            // get next join position for this row
            if (.isFull()) {
                return false;
            }
        }
        return true;
    }
    private boolean advanceProbePosition()
    {
        if (!.advanceNextPosition()) {
             = null;
            return false;
        }
        // update join position
        return true;
    }
    private boolean outerJoinCurrentPosition()
    {
        if ( &&  < 0) {
            // write probe columns
            .declarePosition();
            .appendTo();
            // write nulls into build columns
            int outputIndex = .getChannelCount();
            for (int buildChannel = 0; buildChannel < .getChannelCount(); buildChannel++) {
                .getBlockBuilder(outputIndex).appendNull();
                outputIndex++;
            }
            if (.isFull()) {
                return false;
            }
        }
        return true;
    }
New to GrepCode? Check out our FAQ X