Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   *
   *     http://www.apache.org/licenses/LICENSE-2.0
   *
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 package com.facebook.presto.operator;
 
 
 
 import java.util.List;
 
 import static com.facebook.presto.spi.type.BigintType.BIGINT;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 import static it.unimi.dsi.fastutil.HashCommon.murmurHash3;
 import static java.util.Objects.requireNonNull;
 
 public class ParallelHashBuilder
 {
     private final List<IntegerhashChannels;
     private final Optional<IntegerhashChannel;
     private final int expectedPositions;
     private final List<SettableFuture<PagesIndex>> pagesIndexFutures;
     private final LookupSourceSupplier lookupSourceSupplier;
     private final List<Typetypes;
 
     public ParallelHashBuilder(
             List<Typetypes,
             List<IntegerhashChannels,
             Optional<IntegerhashChannel,
             int expectedPositions,
             int partitionCount)
     {
         this. = ImmutableList.copyOf(requireNonNull(types"types is null"));
         this. = ImmutableList.copyOf(requireNonNull(hashChannels"hashChannels is null"));
         this. = requireNonNull(hashChannel"hashChannel is null");
         checkArgument(expectedPositions >= 0, "expectedPositions is negative");
         this. = expectedPositions;
 
         checkArgument(Integer.bitCount(partitionCount) == 1, "partitionCount must be a power of 2");
         ImmutableList.Builder<SettableFuture<PagesIndex>> pagesIndexFutures = ImmutableList.builder();
         ImmutableList.Builder<SettableFuture<LookupSource>> lookupSourceFutures = ImmutableList.builder();
         for (int i = 0; i < partitionCounti++) {
             pagesIndexFutures.add(SettableFuture.create());
             lookupSourceFutures.add(SettableFuture.create());
         }
         this. = pagesIndexFutures.build();
         this. = lookupSourceFutures.build();
 
          = new ParallelLookupSourceSupplier(typeshashChannelsthis.);
     }
 
     public OperatorFactory getCollectOperatorFactory(int operatorId)
     {
         return new ParallelHashCollectOperatorFactory(
                 operatorId,
                 ,
                 ,
                 ,
                 ,
                 );
     }
 
     {
         return new ParallelHashBuilderOperatorFactory(
                 0,
                 ,
                 ,
                 ,
                 ,
                 );
     }
 
     {
        return ;
    }
    private static class ParallelHashCollectOperatorFactory
            implements OperatorFactory
    {
        private final int operatorId;
        private final List<SettableFuture<PagesIndex>> partitionFutures;
        private final List<Typetypes;
        private final List<IntegerhashChannels;
        private final Optional<IntegerhashChannel;
        private final int expectedPositions;
        private boolean closed;
        public ParallelHashCollectOperatorFactory(
                int operatorId,
                List<SettableFuture<PagesIndex>> partitionFutures,
                List<Typetypes,
                List<IntegerhashChannels,
                Optional<IntegerhashChannel,
                int expectedPositions)
        {
            this. = operatorId;
            this. = partitionFutures;
            this. = types;
            this. = hashChannels;
            this. = hashChannel;
            this. = checkNotNull(expectedPositions"expectedPositions is null");
        }
        @Override
        public List<TypegetTypes()
        {
            return ;
        }
        @Override
        public Operator createOperator(DriverContext driverContext)
        {
            checkState(!"Factory is already closed");
            OperatorContext operatorContext = driverContext.addOperatorContext(ParallelHashBuilder.class.getSimpleName());
            return new ParallelHashCollectOperator(
                    operatorContext,
                    ,
                    ,
                    ,
                    ,
                    );
        }
        @Override
        public void close()
        {
             = true;
        }
    }
    private static class ParallelHashCollectOperator
            implements Operator
    {
        private final OperatorContext operatorContext;
        private final List<SettableFuture<PagesIndex>> partitionFutures;
        private final HashGenerator hashGenerator;
        private final int parallelStreamMask;
        private final PagesIndex[] partitions;
        private final List<Typetypes;
        private boolean finished;
        public ParallelHashCollectOperator(
                OperatorContext operatorContext,
                List<SettableFuture<PagesIndex>> partitionFutures,
                List<Typetypes,
                List<IntegerhashChannels,
                Optional<IntegerhashChannel,
                int expectedPositions)
        {
            this. = operatorContext;
            this. = partitionFutures;
            this. = types;
            if (hashChannel.isPresent()) {
                this. = new PrecomputedHashGenerator(hashChannel.get());
            }
            else {
                ImmutableList.Builder<TypehashChannelTypes = ImmutableList.builder();
                for (int channel : hashChannels) {
                    hashChannelTypes.add(types.get(channel));
                }
                this. = new InterpretedHashGenerator(hashChannelTypes.build(), Ints.toArray(hashChannels));
            }
             = partitionFutures.size() - 1;
             = new PagesIndex[partitionFutures.size()];
            for (int partition = 0; partition < .partition++) {
                this.[partition] = new PagesIndex(typesexpectedPositions);
            }
        }
        @Override
        public OperatorContext getOperatorContext()
        {
            return ;
        }
        @Override
        public List<TypegetTypes()
        {
            return ;
        }
        @Override
        public void finish()
        {
            if () {
                return;
            }
            for (int partition = 0; partition < .partition++) {
                .get(partition).set([partition]);
            }
             = true;
        }
        @Override
        public boolean isFinished()
        {
            return ;
        }
        @Override
        public boolean needsInput()
        {
            return !;
        }
        @Override
        public void addInput(Page page)
        {
            checkNotNull(page"page is null");
            checkState(!isFinished(), "Operator is already finished");
            // build a block containing the partition id of each position
            BlockBuilder blockBuilder = .createBlockBuilder(new BlockBuilderStatus(), page.getPositionCount());
            for (int position = 0; position < page.getPositionCount(); position++) {
                int rawHash = .hashPosition(positionpage);
                int partition = murmurHash3(rawHash) & ;
                .writeLong(blockBuilderpartition);
            }
            Block partitionIds = blockBuilder.build();
            long size = 0;
            for (int partition = 0; partition < .partition++) {
                PagesIndex index = [partition];
                index.addPage(pagepartitionpartitionIds);
                size += index.getEstimatedSize().toBytes();
            }
            .setMemoryReservation(size);
        }
        @Override
        public Page getOutput()
        {
            return null;
        }
    }
    private static class ParallelHashBuilderOperatorFactory
            implements OperatorFactory
    {
        private final int operatorId;
        private final List<Typetypes;
        private final List<ListenableFuture<PagesIndex>> partitionFutures;
        private final List<SettableFuture<LookupSource>> lookupSourceFutures;
        private final List<IntegerhashChannels;
        private final Optional<IntegerhashChannel;
        private int partition;
        private boolean closed;
        public ParallelHashBuilderOperatorFactory(
                int operatorId,
                List<Typetypes,
                List<? extends ListenableFuture<PagesIndex>> partitionFutures,
                List<SettableFuture<LookupSource>> lookupSourceFutures,
                List<IntegerhashChannels,
                Optional<IntegerhashChannel)
        {
            this. = operatorId;
            this. = types;
            this. = ImmutableList.copyOf(partitionFutures);
            this. = lookupSourceFutures;
            this. = hashChannels;
            this. = hashChannel;
        }
        @Override
        public List<TypegetTypes()
        {
            return ;
        }
        @Override
        public Operator createOperator(DriverContext driverContext)
        {
            checkState(!"Factory is already closed");
            checkState( < .size(), "All operators already created");
            OperatorContext operatorContext = driverContext.addOperatorContext(ParallelHashBuilder.class.getSimpleName());
            ParallelHashBuilderOperator parallelHashBuilderOperator = new ParallelHashBuilderOperator(
                    operatorContext,
                    ,
                    .get(),
                    .get(),
                    ,
                    );
            ++;
            return parallelHashBuilderOperator;
        }
        @Override
        public void close()
        {
             = true;
        }
    }
    private static class ParallelHashBuilderOperator
            implements Operator
    {
        private final OperatorContext operatorContext;
        private final List<Typetypes;
        private final ListenableFuture<PagesIndexpagesIndexFuture;
        private final SettableFuture<LookupSourcelookupSourceFuture;
        private final List<IntegerhashChannels;
        private final Optional<IntegerhashChannel;
        private boolean finished;
        public ParallelHashBuilderOperator(
                OperatorContext operatorContext,
                List<Typetypes,
                ListenableFuture<PagesIndexpagesIndexFuture,
                SettableFuture<LookupSourcelookupSourceFuture,
                List<IntegerhashChannels,
                Optional<IntegerhashChannel)
        {
            this. = operatorContext;
            this. = types;
            this. = pagesIndexFuture;
            this. = lookupSourceFuture;
            this. = hashChannels;
            this. = hashChannel;
        }
        @Override
        public OperatorContext getOperatorContext()
        {
            return ;
        }
        @Override
        public List<TypegetTypes()
        {
            return ;
        }
        @Override
        public ListenableFuture<?> isBlocked()
        {
            if (.isDone()) {
                return ;
            }
            return ;
        }
        @Override
        public void finish()
        {
            if () {
                return;
            }
            PagesIndex pagesIndex = Futures.getUnchecked();
            LookupSource lookupSource = pagesIndex.createLookupSource();
            .setMemoryReservation(pagesIndex.getEstimatedSize().toBytes() + lookupSource.getInMemorySizeInBytes());
            .set(lookupSource);
             = true;
        }
        @Override
        public boolean isFinished()
        {
            return ;
        }
        @Override
        public boolean needsInput()
        {
            return false;
        }
        @Override
        public void addInput(Page page)
        {
            throw new UnsupportedOperationException(getClass().getName() + " can not take input");
        }
        @Override
        public Page getOutput()
        {
            return null;
        }
    }
New to GrepCode? Check out our FAQ X