Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   *
   *     http://www.apache.org/licenses/LICENSE-2.0
   *
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 package com.facebook.presto.operator.index;
 
 
 
 import java.util.List;
 import java.util.Set;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 import static com.google.common.base.Predicates.equalTo;
 import static com.google.common.base.Predicates.not;
 import static com.google.common.collect.Iterables.filter;
 
 public class IndexLoader
 {
     private final BlockingQueue<UpdateRequestupdateRequests = new LinkedBlockingQueue<>();
 
     private final List<TypeoutputTypes;
     private final int expectedPositions;
     private final DataSize maxIndexMemorySize;
     private final IndexJoinLookupStats stats;
 
     private final AtomicReference<TaskContexttaskContextReference = new AtomicReference<>();
     private final Set<IntegerlookupSourceInputChannels;
     private final List<IntegerkeyOutputChannels;
     private final Optional<IntegerkeyOutputHashChannel;
     private final List<TypekeyTypes;
 
     @GuardedBy("this")
     private IndexSnapshotLoader indexSnapshotLoader// Lazily initialized
 
     @GuardedBy("this")
     private PipelineContext pipelineContext// Lazily initialized
 
     @GuardedBy("this")
 
     public IndexLoader(
             Set<IntegerlookupSourceInputChannels,
             List<IntegerkeyOutputChannels,
             Optional<IntegerkeyOutputHashChannel,
             List<TypeoutputTypes,
             IndexBuildDriverFactoryProvider indexBuildDriverFactoryProvider,
             int expectedPositions,
             DataSize maxIndexMemorySize,
             IndexJoinLookupStats stats)
     {
         checkNotNull(lookupSourceInputChannels"lookupSourceInputChannels is null");
         checkArgument(!lookupSourceInputChannels.isEmpty(), "lookupSourceInputChannels must not be empty");
         checkNotNull(keyOutputChannels"keyOutputChannels is null");
         checkArgument(!keyOutputChannels.isEmpty(), "keyOutputChannels must not be empty");
         checkNotNull(keyOutputHashChannel"keyOutputHashChannel is null");
         checkArgument(lookupSourceInputChannels.size() <= keyOutputChannels.size(), "Lookup channels must supply a subset of the actual index columns");
         checkNotNull(outputTypes"outputTypes is null");
         checkNotNull(indexBuildDriverFactoryProvider"indexBuildDriverFactoryProvider is null");
         checkNotNull(expectedPositions"expectedPositions is null");
         checkNotNull(maxIndexMemorySize"maxIndexMemorySize is null");
        checkNotNull(stats"stats is null");
        this. = ImmutableSet.copyOf(lookupSourceInputChannels);
        this. = ImmutableList.copyOf(keyOutputChannels);
        this. = keyOutputHashChannel;
        this. = ImmutableList.copyOf(outputTypes);
        this. = indexBuildDriverFactoryProvider;
        this. = expectedPositions;
        this. = maxIndexMemorySize;
        this. = stats;
        ImmutableList.Builder<TypekeyTypeBuilder = ImmutableList.builder();
        for (int keyOutputChannel : keyOutputChannels) {
            keyTypeBuilder.add(outputTypes.get(keyOutputChannel));
        }
        this. = keyTypeBuilder.build();
        // start with an empty source
        this. = new AtomicReference<>(new IndexSnapshot(new EmptyLookupSource(outputTypes.size()), new EmptyLookupSource(keyOutputChannels.size())));
    }
    // This is a ghetto way to acquire a TaskContext at runtime (unavailable at planning)
    public void setContext(TaskContext taskContext)
    {
        .compareAndSet(nulltaskContext);
    }
    public int getChannelCount()
    {
        return .size();
    }
    public List<TypegetOutputTypes()
    {
        return ;
    }
    {
        return .get();
    }
    private static Block[] sliceBlocks(Block[] indexBlocksint startPositionint length)
    {
        Block[] slicedIndexBlocks = new Block[indexBlocks.length];
        for (int i = 0; i < indexBlocks.lengthi++) {
            slicedIndexBlocks[i] = indexBlocks[i].getRegion(startPositionlength);
        }
        return slicedIndexBlocks;
    }
    public IndexedData getIndexedDataForKeys(int positionBlock[] indexBlocks)
    {
        // Normalize the indexBlocks so that they only encompass the unloaded positions
        int totalPositions = indexBlocks[0].getPositionCount();
        int remainingPositions = totalPositions - position;
        return getIndexedDataForKeys(sliceBlocks(indexBlockspositionremainingPositions));
    }
    private IndexedData getIndexedDataForKeys(Block[] indexBlocks)
    {
        UpdateRequest myUpdateRequest = new UpdateRequest(indexBlocks);
        .add(myUpdateRequest);
        synchronized (this) {
            if (!myUpdateRequest.isFinished()) {
                .recordIndexJoinLookup();
                initializeStateIfNecessary();
                List<UpdateRequestrequests = new ArrayList<>();
                .drainTo(requests);
                long initialCacheSizeInBytes = .getCacheSizeInBytes();
                // TODO: add heuristic to jump to load strategy that is most likely to succeed
                // Try to load all the requests
                if (.load(requests)) {
                    return myUpdateRequest.getFinishedIndexSnapshot();
                }
                // Retry again if there was initial data (load failures will clear the cache automatically)
                if (initialCacheSizeInBytes > 0 && .load(requests)) {
                    .recordSuccessfulIndexJoinLookupByCacheReset();
                    return myUpdateRequest.getFinishedIndexSnapshot();
                }
                // Try loading just my request
                if (requests.size() > 1) {
                    // Add all other requests back into the queue
                    Iterables.addAll(filter(requestsnot(equalTo(myUpdateRequest))));
                    if (.load(ImmutableList.of(myUpdateRequest))) {
                        .recordSuccessfulIndexJoinLookupBySingleRequest();
                        return myUpdateRequest.getFinishedIndexSnapshot();
                    }
                }
                // Repeatedly decrease the number of rows to load by a factor of 10
                int totalPositions = indexBlocks[0].getPositionCount();
                int attemptedPositions = totalPositions / 10;
                while (attemptedPositions > 1) {
                    myUpdateRequest = new UpdateRequest(sliceBlocks(indexBlocks, 0, attemptedPositions));
                    if (.load(ImmutableList.of(myUpdateRequest))) {
                        .recordSuccessfulIndexJoinLookupByLimitedRequest();
                        return myUpdateRequest.getFinishedIndexSnapshot();
                    }
                    attemptedPositions /= 10;
                }
                // Just load the single index key in a streaming fashion (no caching)
                .recordStreamedIndexJoinLookup();
                return streamIndexDataForSingleKey(myUpdateRequest);
            }
        }
        // return the snapshot from the update request as another thread may have already flushed the request
        return myUpdateRequest.getFinishedIndexSnapshot();
    }
    {
        Page indexKeyTuple = new Page(sliceBlocks(updateRequest.getBlocks(), 0, 1));
        PageBuffer pageBuffer = new PageBuffer(100);
        DriverFactory driverFactory = .createStreaming(pageBufferindexKeyTuple);
        Driver driver = driverFactory.createDriver(.addDriverContext());
        PageRecordSet pageRecordSet = new PageRecordSet(indexKeyTuple);
        PlanNodeId planNodeId = Iterables.getOnlyElement(driverFactory.getSourceIds());
        driver.updateSource(new TaskSource(planNodeId, ImmutableSet.of(new ScheduledSplit(0, new Split("index"new IndexSplit(pageRecordSet)))), true));
        return new StreamingIndexedData(indexKeyTuplepageBufferdriver);
    }
    private synchronized void initializeStateIfNecessary()
    {
        if ( == null) {
            TaskContext taskContext = .get();
            checkState(taskContext != null"Task context must be set before index can be built");
             = taskContext.addPipelineContext(falsefalse);
        }
        if ( == null) {
             = new IndexSnapshotLoader(
                    ,
                    ,
                    ,
                    ,
                    ,
                    ,
                    ,
                    ,
                    );
        }
    }
    private static class IndexSnapshotLoader
    {
        private final DriverFactory driverFactory;
        private final PipelineContext pipelineContext;
        private final Set<IntegerlookupSourceInputChannels;
        private final Set<IntegerallInputChannels;
        private final List<TypeoutputTypes;
        private final List<TypeindexTypes;
        private final AtomicReference<IndexSnapshotindexSnapshotReference;
        private final IndexSnapshotBuilder indexSnapshotBuilder;
        private IndexSnapshotLoader(IndexBuildDriverFactoryProvider indexBuildDriverFactoryProvider,
                PipelineContext pipelineContext,
                AtomicReference<IndexSnapshotindexSnapshotReference,
                Set<IntegerlookupSourceInputChannels,
                List<TypeindexTypes,
                List<IntegerkeyOutputChannels,
                Optional<IntegerkeyOutputHashChannel,
                int expectedPositions,
                DataSize maxIndexMemorySize)
        {
            this. = pipelineContext;
            this. = indexSnapshotReference;
            this. = lookupSourceInputChannels;
            this. = indexBuildDriverFactoryProvider.getOutputTypes();
            this. = indexTypes;
            this. = new IndexSnapshotBuilder(
                    ,
                    keyOutputChannels,
                    keyOutputHashChannel,
                    pipelineContext.addDriverContext(),
                    maxIndexMemorySize,
                    expectedPositions);
            this. = indexBuildDriverFactoryProvider.createSnapshot(this.);
            ImmutableSet.Builder<Integerbuilder = ImmutableSet.builder();
            for (int i = 0; i < indexTypes.size(); i++) {
                builder.add(i);
            }
            this. = builder.build();
        }
        public long getCacheSizeInBytes()
        {
            return .getMemoryInBytes();
        }
        public boolean load(List<UpdateRequestrequests)
        {
            // Generate a RecordSet that only presents index keys that have not been cached and are deduped based on lookupSourceInputChannels
            UnloadedIndexKeyRecordSet recordSetForLookupSource = new UnloadedIndexKeyRecordSet(.get(), requests);
            // Drive index lookup to produce the output (landing in indexSnapshotBuilder)
            Driver driver = .createDriver(.addDriverContext());
            PlanNodeId sourcePlanNodeId = Iterables.getOnlyElement(.getSourceIds());
            driver.updateSource(new TaskSource(sourcePlanNodeId, ImmutableSet.of(new ScheduledSplit(0, new Split("index"new IndexSplit(recordSetForLookupSource)))), true));
            while (!driver.isFinished()) {
                ListenableFuture<?> process = driver.process();
                checkState(process.isDone(), "Driver should never block");
            }
            if (.isMemoryExceeded()) {
                clearCachedData();
                return false;
            }
            // Generate a RecordSet that presents unique index keys that have not been cached
            UnloadedIndexKeyRecordSet indexKeysRecordSet = (.equals())
                    ? recordSetForLookupSource
                    : new UnloadedIndexKeyRecordSet(.get(), requests);
            // Create lookup source with new data
            IndexSnapshot newValue = .createIndexSnapshot(indexKeysRecordSet);
            if (newValue == null) {
                clearCachedData();
                return false;
            }
            .set(newValue);
            for (UpdateRequest request : requests) {
                request.finished(newValue);
            }
            return true;
        }
        private void clearCachedData()
        {
            .reset();
        }
    }
    private static class EmptyLookupSource
            implements LookupSource
    {
        private final int channelCount;
        public EmptyLookupSource(int channelCount)
        {
            this. = channelCount;
        }
        @Override
        public int getChannelCount()
        {
            return ;
        }
        @Override
        public long getJoinPosition(int positionPage pageint rawHash)
        {
            return .;
        }
        @Override
        public long getJoinPosition(int positionPage page)
        {
            return .;
        }
        @Override
        public long getNextJoinPosition(long currentPosition)
        {
            return .;
        }
        @Override
        public void appendTo(long positionPageBuilder pageBuilderint outputChannelOffset)
        {
            throw new UnsupportedOperationException();
        }
        @Override
        public void close()
        {
        }
    }
New to GrepCode? Check out our FAQ X