Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   *
   *     http://www.apache.org/licenses/LICENSE-2.0
   *
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 package com.facebook.presto.operator;
 
 
 
 
 import static com.facebook.presto.operator.BlockedReason.WAITING_FOR_MEMORY;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static io.airlift.units.DataSize.Unit.BYTE;
 import static java.util.concurrent.TimeUnit.NANOSECONDS;
 
 public class OperatorContext
 {
     private static final ThreadMXBean THREAD_MX_BEAN = ManagementFactory.getThreadMXBean();
 
     private final int operatorId;
     private final String operatorType;
     private final DriverContext driverContext;
     private final Executor executor;
 
     private final AtomicLong intervalWallStart = new AtomicLong();
     private final AtomicLong intervalCpuStart = new AtomicLong();
     private final AtomicLong intervalUserStart = new AtomicLong();
 
     private final AtomicLong addInputCalls = new AtomicLong();
     private final AtomicLong addInputWallNanos = new AtomicLong();
     private final AtomicLong addInputCpuNanos = new AtomicLong();
     private final AtomicLong addInputUserNanos = new AtomicLong();
     private final CounterStat inputDataSize = new CounterStat();
     private final CounterStat inputPositions = new CounterStat();
 
     private final AtomicLong getOutputCalls = new AtomicLong();
     private final AtomicLong getOutputWallNanos = new AtomicLong();
     private final AtomicLong getOutputCpuNanos = new AtomicLong();
     private final AtomicLong getOutputUserNanos = new AtomicLong();
     private final CounterStat outputDataSize = new CounterStat();
     private final CounterStat outputPositions = new CounterStat();
 
     private final AtomicReference<SettableFuture<?>> memoryFuture = new AtomicReference<>();
     private final AtomicReference<BlockedMonitorblockedMonitor = new AtomicReference<>();
     private final AtomicLong blockedWallNanos = new AtomicLong();
 
     private final AtomicLong finishCalls = new AtomicLong();
     private final AtomicLong finishWallNanos = new AtomicLong();
     private final AtomicLong finishCpuNanos = new AtomicLong();
     private final AtomicLong finishUserNanos = new AtomicLong();
 
     private final AtomicLong memoryReservation = new AtomicLong();
     private final long maxMemoryReservation;
 
     private final AtomicReference<Supplier<Object>> infoSupplier = new AtomicReference<>();
     private final boolean collectTimings;
 
     public OperatorContext(int operatorIdString operatorTypeDriverContext driverContextExecutor executorlong maxMemoryReservation)
     {
         checkArgument(operatorId >= 0, "operatorId is negative");
         this. = operatorId;
         this. = maxMemoryReservation;
         this. = checkNotNull(operatorType"operatorType is null");
         this. = checkNotNull(driverContext"driverContext is null");
         this. = checkNotNull(executor"executor is null");
         SettableFuture<Objectfuture = SettableFuture.create();
         future.set(null);
         this..set(future);
 
          = driverContext.isVerboseStats() && driverContext.isCpuTimerEnabled();
     }
    public int getOperatorId()
    {
        return ;
    }
    public String getOperatorType()
    {
        return ;
    }
    {
        return ;
    }
    public Session getSession()
    {
        return .getSession();
    }
    public boolean isDone()
    {
        return .isDone();
    }
    public void startIntervalTimer()
    {
        .set(System.nanoTime());
    }
    public void recordAddInput(Page page)
    {
        if (page != null) {
            .update(page.getSizeInBytes());
            .update(page.getPositionCount());
        }
    }
    public void recordGeneratedInput(long sizeInByteslong positions)
    {
        recordGeneratedInput(sizeInBytespositions, 0);
    }
    public void recordGeneratedInput(long sizeInByteslong positionslong readNanos)
    {
        .update(sizeInBytes);
        .update(positions);
        recordInputWallNanos(readNanos);
    }
    public long recordInputWallNanos(long readNanos)
    {
        return .getAndAdd(readNanos);
    }
    public void recordGetOutput(Page page)
    {
        if (page != null) {
            .update(page.getSizeInBytes());
            .update(page.getPositionCount());
        }
    }
    public void recordGeneratedOutput(long sizeInByteslong positions)
    {
        .update(sizeInBytes);
        .update(positions);
    }
    public void recordBlocked(ListenableFuture<?> blocked)
    {
        checkNotNull(blocked"blocked is null");
        BlockedMonitor monitor = new BlockedMonitor();
        BlockedMonitor oldMonitor = .getAndSet(monitor);
        if (oldMonitor != null) {
            oldMonitor.run();
        }
        blocked.addListener(monitor);
        // Do not register blocked with driver context.  The driver handles this directly.
    }
    public void recordFinish()
    {
        .incrementAndGet();
    }
    {
        return .get();
    }
    public DataSize getMaxMemorySize()
    {
        return .getMaxMemorySize();
    }
    {
    }
    public void reserveMemory(long bytes)
    {
        ListenableFuture<?> future = .reserveMemory(bytes);
        if (!future.isDone()) {
            SettableFuture<?> currentMemoryFuture = .get();
            while (currentMemoryFuture.isDone()) {
                SettableFuture<?> settableFuture = SettableFuture.create();
                // We can't replace one that's not done, because the task may be blocked on that future
                if (.compareAndSet(currentMemoryFuturesettableFuture)) {
                    currentMemoryFuture = settableFuture;
                }
                else {
                    currentMemoryFuture = .get();
                }
            }
            SettableFuture<?> finalMemoryFuture = currentMemoryFuture;
            // Create a new future, so that this operator can un-block before the pool does, if it's moved to a new pool
            Futures.addCallback(futurenew FutureCallback<Object>()
            {
                @Override
                public void onSuccess(Object result)
                {
                    finalMemoryFuture.set(null);
                }
                @Override
                public void onFailure(Throwable t)
                {
                    finalMemoryFuture.set(null);
                }
            });
        }
        long newReservation = .getAndAdd(bytes);
        if (newReservation > ) {
            .getAndAdd(-bytes);
            throw new ExceededMemoryLimitException(getMaxMemorySize());
        }
    }
    public boolean tryReserveMemory(long bytes)
    {
        if (!.tryReserveMemory(bytes)) {
            return false;
        }
        long newReservation = .getAndAdd(bytes);
        if (newReservation > ) {
            .getAndAdd(-bytes);
            return false;
        }
        return true;
    }
    public void freeMemory(long bytes)
    {
        checkArgument(bytes >= 0, "bytes is negative");
        checkArgument(bytes <= .get(), "tried to free more memory than is reserved");
        .freeMemory(bytes);
        .getAndAdd(-bytes);
    }
    public void moreMemoryAvailable()
    {
        .get().set(null);
    }
    public void setMemoryReservation(long newMemoryReservation)
    {
        checkArgument(newMemoryReservation >= 0, "newMemoryReservation is negative");
        long delta = newMemoryReservation - .get();
        if (delta > 0) {
            reserveMemory(delta);
        }
        else {
            freeMemory(-delta);
        }
    }
    public boolean trySetMemoryReservation(long newMemoryReservation)
    {
        checkArgument(newMemoryReservation >= 0, "newMemoryReservation is negative");
        long delta = newMemoryReservation - .get();
        if (delta > 0) {
            return tryReserveMemory(delta);
        }
        else {
            freeMemory(-delta);
            return true;
        }
    }
    public void setInfoSupplier(Supplier<ObjectinfoSupplier)
    {
        checkNotNull(infoSupplier"infoProvider is null");
        this..set(infoSupplier);
    }
    public CounterStat getInputDataSize()
    {
        return ;
    }
    {
        return ;
    }
    {
        return ;
    }
    {
        return ;
    }
    {
        Supplier<ObjectinfoSupplier = this..get();
        Object info = null;
        if (infoSupplier != null) {
            info = infoSupplier.get();
        }
        return new OperatorStats(
                ,
                ,
                .get(),
                new Duration(.get(), ).convertToMostSuccinctTimeUnit(),
                new Duration(.get(), ).convertToMostSuccinctTimeUnit(),
                new Duration(.get(), ).convertToMostSuccinctTimeUnit(),
                new DataSize(.getTotalCount(), ).convertToMostSuccinctDataSize(),
                .getTotalCount(),
                .get(),
                new Duration(.get(), ).convertToMostSuccinctTimeUnit(),
                new Duration(.get(), ).convertToMostSuccinctTimeUnit(),
                new Duration(.get(), ).convertToMostSuccinctTimeUnit(),
                new DataSize(.getTotalCount(), ).convertToMostSuccinctDataSize(),
                .getTotalCount(),
                new Duration(.get(), ).convertToMostSuccinctTimeUnit(),
                .get(),
                new Duration(.get(), ).convertToMostSuccinctTimeUnit(),
                new Duration(.get(), ).convertToMostSuccinctTimeUnit(),
                new Duration(.get(), ).convertToMostSuccinctTimeUnit(),
                new DataSize(.get(), ).convertToMostSuccinctDataSize(),
                .get().isDone() ? Optional.empty() : Optional.of(),
                info);
    }
    private long currentThreadUserTime()
    {
        if (!) {
            return 0;
        }
        return .getCurrentThreadUserTime();
    }
    private long currentThreadCpuTime()
    {
        if (!) {
            return 0;
        }
        return .getCurrentThreadCpuTime();
    }
    private static long nanosBetween(long startlong end)
    {
        return Math.abs(end - start);
    }
    private class BlockedMonitor
            implements Runnable
    {
        private final long start = System.nanoTime();
        private boolean finished;
        @Override
        public synchronized void run()
        {
            synchronized (this) {
                if () {
                    return;
                }
                 = true;
                .compareAndSet(thisnull);
                .getAndAdd(getBlockedTime());
            }
        }
        public long getBlockedTime()
        {
            return nanosBetween(, System.nanoTime());
        }
    }
New to GrepCode? Check out our FAQ X