Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   *
   *  Copyright 2011 Netflix, Inc.
   *
   *     Licensed under the Apache License, Version 2.0 (the "License");
   *     you may not use this file except in compliance with the License.
   *     You may obtain a copy of the License at
   *
   *         http://www.apache.org/licenses/LICENSE-2.0
  *
  *     Unless required by applicable law or agreed to in writing, software
  *     distributed under the License is distributed on an "AS IS" BASIS,
  *     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  *     See the License for the specific language governing permissions and
  *     limitations under the License.
  *
  */
 package com.netflix.curator.framework.imps;
 
 
 public class CuratorFrameworkImpl implements CuratorFramework
 {
     private final Logger                                                log = LoggerFactory.getLogger(getClass());
     private final CuratorZookeeperClient                                client;
     private final ListenerContainer<CuratorListener>                    listeners;
     private final ListenerContainer<UnhandledErrorListener>             unhandledErrorListeners;
     private final ExecutorService                                       executorService;
     private final BlockingQueue<OperationAndData<?>>                    backgroundOperations;
     private final String                                                namespace;
     private final EnsurePath                                            ensurePath;
     private final ConnectionStateManager                                connectionStateManager;
     private final AtomicReference<AuthInfo>                             authInfo = new AtomicReference<AuthInfo>();
     private final byte[]                                                defaultData;
     private final FailedDeleteManager                                   failedDeleteManager;
 
     private enum State
     {
         LATENT,
         STARTED,
         STOPPED
     }
     private final AtomicReference<State>                    state = new AtomicReference<State>(.);
 
     private static class AuthInfo
     {
         final String    scheme;
         final byte[]    auth;
 
         private AuthInfo(String schemebyte[] auth)
         {
             this. = scheme;
             this. = auth;
         }
     }
 
     {
         this. = new CuratorZookeeperClient
         (
             builder.getConnectString(),
             builder.getSessionTimeoutMs(),
             builder.getConnectionTimeoutMs(),
             new Watcher()
             {
                 @Override
                 public void process(WatchedEvent watchedEvent)
                 {
                    CuratorEvent event = new CuratorEventImpl
                    (
                        CuratorFrameworkImpl.this,
                        .,
                        watchedEvent.getState().getIntValue(),
                        unfixForNamespace(watchedEvent.getPath()),
                        null,
                        null,
                        null,
                        null,
                        null,
                        watchedEvent,
                        null
                    );
                    processEvent(event);
                }
            },
            builder.getRetryPolicy()
        );
         = new ListenerContainer<CuratorListener>();
         = builder.getNamespace();
         = ( != null) ? new EnsurePath(ZKPaths.makePath("/")) : null;
         = Executors.newFixedThreadPool(2, getThreadFactory(builder));  // 1 for listeners, 1 for background ops
         = new ConnectionStateManager(thisbuilder.getThreadFactory());
        byte[]      builderDefaultData = builder.getDefaultData();
         = (builderDefaultData != null) ? Arrays.copyOf(builderDefaultDatabuilderDefaultData.length) : new byte[0];
        if ( builder.getAuthScheme() != null )
        {
            .set(new AuthInfo(builder.getAuthScheme(), builder.getAuthValue()));
        }
         = new FailedDeleteManager(this);
    }
    {
        ThreadFactory threadFactory = builder.getThreadFactory();
        if ( threadFactory == null )
        {
            threadFactory = new ThreadFactoryBuilder().setNameFormat("CuratorFramework-%d").build();
        }
        return threadFactory;
    }
    protected CuratorFrameworkImpl(CuratorFrameworkImpl parent)
    {
         = parent.client;
         = parent.listeners;
         = parent.unhandledErrorListeners;
         = parent.executorService;
         = parent.backgroundOperations;
         = parent.connectionStateManager;
         = parent.defaultData;
         = parent.failedDeleteManager;
         = null;
         = null;
    }
    @Override
    public boolean isStarted()
    {
        return .get() == .;
    }
    @Override
    public void     start()
    {
        .info("Starting");
        if ( !.compareAndSet(..) )
        {
            IllegalStateException error = new IllegalStateException();
            .error("Already started"error);
            throw error;
        }
        try
        {
            .start();
            .start();
            .submit
            (
                new Callable<Object>()
                {
                    @Override
                    public Object call() throws Exception
                    {
                        backgroundOperationsLoop();
                        return null;
                    }
                }
            );
        }
        catch ( Exception e )
        {
            handleBackgroundOperationException(nulle);
        }
    }
    @Override
    public void     close()
    {
        .debug("Closing");
        if ( !.compareAndSet(..) )
        {
            IllegalStateException error = new IllegalStateException();
            .error("Already closed"error);
            throw error;
        }
        .forEach
        (
            new Function<CuratorListenerVoid>()
            {
                @Override
                public Void apply(CuratorListener listener)
                {
                    CuratorEvent event = new CuratorEventImpl(CuratorFrameworkImpl.this., 0, nullnullnullnullnullnullnullnull);
                    try
                    {
                        listener.eventReceived(CuratorFrameworkImpl.thisevent);
                    }
                    catch ( Exception e )
                    {
                        .error("Exception while sending Closing event"e);
                    }
                    return null;
                }
            }
        );
        .clear();
        .close();
        .close();
        .shutdownNow();
    }
    @Override
    {
        Preconditions.checkState(.get() == .);
        return new NonNamespaceFacade(this);
    }
    @Override
    public CreateBuilder create()
    {
        Preconditions.checkState(.get() == .);
        return new CreateBuilderImpl(this);
    }
    @Override
    public DeleteBuilder delete()
    {
        Preconditions.checkState(.get() == .);
        return new DeleteBuilderImpl(this);
    }
    @Override
    public ExistsBuilder checkExists()
    {
        Preconditions.checkState(.get() == .);
        return new ExistsBuilderImpl(this);
    }
    @Override
    public GetDataBuilder getData()
    {
        Preconditions.checkState(.get() == .);
        return new GetDataBuilderImpl(this);
    }
    @Override
    public SetDataBuilder setData()
    {
        Preconditions.checkState(.get() == .);
        return new SetDataBuilderImpl(this);
    }
    @Override
    {
        Preconditions.checkState(.get() == .);
        return new GetChildrenBuilderImpl(this);
    }
    @Override
    public GetACLBuilder getACL()
    {
        Preconditions.checkState(.get() == .);
        return new GetACLBuilderImpl(this);
    }
    @Override
    public SetACLBuilder setACL()
    {
        Preconditions.checkState(.get() == .);
        return new SetACLBuilderImpl(this);
    }
    @Override
    {
        Preconditions.checkState(.get() == .);
        return new CuratorTransactionImpl(this);
    }
    @Override
    {
        return .getListenable();
    }
    @Override
    {
        return ;
    }
    @Override
    {
        return ;
    }
    @Override
    public void sync(String pathObject context)
    {
        Preconditions.checkState(.get() == .);
        path = fixForNamespace(path);
        internalSync(thispathcontext);
    }
    protected void internalSync(CuratorFrameworkImpl implString pathObject context)
    {
        BackgroundOperation<Stringoperation = new BackgroundSyncImpl(implcontext);
        .offer(new OperationAndData<String>(operationpathnullnull));
    }
    @Override
    {
        return ;
    }
    @Override
    {
        return new EnsurePath(fixForNamespace(path));
    }
    {
        return ;
    }
    {
        return .newRetryLoop();
    }
    ZooKeeper getZooKeeper() throws Exception
    {
        return .getZooKeeper();
    }
    @SuppressWarnings({"ThrowableResultOfMethodCallIgnored"})
    <DATA_TYPE> void processBackgroundOperation(OperationAndData<DATA_TYPE> operationAndDataCuratorEvent event)
    {
        boolean     queueOperation = false;
        do
        {
            if ( event == null )
            {
                queueOperation = true;
                break;
            }
            if ( RetryLoop.shouldRetry(event.getResultCode()) )
            {
                if ( .getRetryPolicy().allowRetry(operationAndData.getThenIncrementRetryCount(), operationAndData.getElapsedTimeMs()) )
                {
                    queueOperation = true;
                }
                else
                {
                    if ( operationAndData.getErrorCallback() != null )
                    {
                        operationAndData.getErrorCallback().retriesExhausted(operationAndData);
                    }
                    KeeperException.Code    code = KeeperException.Code.get(event.getResultCode());
                    Exception               e = null;
                    try
                    {
                        e = (code != null) ? KeeperException.create(code) : null;
                    }
                    catch ( Throwable ignore )
                    {
                    }
                    if ( e == null )
                    {
                        e = new Exception("Unknown result code: " + event.getResultCode());
                    }
                    logError("Background operation retry gave up"e);
                }
                break;
            }
            if ( operationAndData.getCallback() != null )
            {
                sendToBackgroundCallback(operationAndDataevent);
                break;
            }
            processEvent(event);
        } while ( false );
        if ( queueOperation )
        {
            .offer(operationAndData);
        }
    }
    void logError(String reasonfinal Throwable e)
    {
        if ( (reason == null) || (reason.length() == 0) )
        {
            reason = "n/a";
        }
        .error(reasone);
        if ( e instanceof KeeperException.ConnectionLossException )
        {
        }
        final String        localReason = reason;
        (
            new Function<UnhandledErrorListenerVoid>()
            {
                @Override
                public Void apply(UnhandledErrorListener listener)
                {
                    listener.unhandledError(localReasone);
                    return null;
                }
            }
        );
    }
    String    unfixForNamespace(String path)
    {
        if ( ( != null) && (path != null) )
        {
            String      namespacePath = ZKPaths.makePath(null);
            if ( path.startsWith(namespacePath) )
            {
                path = (path.length() > namespacePath.length()) ? path.substring(namespacePath.length()) : "/";
            }
        }
        return path;
    }
    String    fixForNamespace(String path)
    {
        if ( !ensurePath() )
        {
            return "";
        }
        return ZKPaths.fixForNamespace(path);
    }
    byte[] getDefaultData()
    {
        return ;
    }
    private boolean ensurePath()
    {
        if (  != null )
        {
            try
            {
                .ensure();
            }
            catch ( Exception e )
            {
                logError("Ensure path threw exception"e);
                return false;
            }
        }
        return true;
    }
    private <DATA_TYPE> void sendToBackgroundCallback(OperationAndData<DATA_TYPE> operationAndDataCuratorEvent event)
    {
        try
        {
            operationAndData.getCallback().processResult(thisevent);
        }
        catch ( Exception e )
        {
            handleBackgroundOperationException(operationAndDatae);
        }
    }
    private<DATA_TYPE> void handleBackgroundOperationException(OperationAndData<DATA_TYPE> operationAndDataThrowable e)
    {
        do
        {
            if ( (operationAndData != null) && RetryLoop.isRetryException(e) )
            {
                .debug("Retry-able exception received"e);
                if ( .getRetryPolicy().allowRetry(operationAndData.getThenIncrementRetryCount(), operationAndData.getElapsedTimeMs()) )
                {
                    .debug("Retrying operation");
                    .offer(operationAndData);
                    break;
                }
                else
                {
                    .debug("Retry policy did not allow retry");
                    if ( operationAndData.getErrorCallback() != null )
                    {
                        operationAndData.getErrorCallback().retriesExhausted(operationAndData);
                    }
                }
            }
            logError("Background exception was not retry-able or retry gave up"e);
        } while ( false );
    }
    private void backgroundOperationsLoop()
    {
        AuthInfo    auth = .getAndSet(null);
        if ( auth != null )
        {
            try
            {
                .getZooKeeper().addAuthInfo(auth.schemeauth.auth);
            }
            catch ( Exception e )
            {
                logError("addAuthInfo for background operation threw exception"e);
                return;
            }
        }
        while ( !Thread.interrupted() )
        {
            OperationAndData<?>         operationAndData;
            try
            {
                operationAndData = .take();
            }
            catch ( InterruptedException e )
            {
                Thread.currentThread().interrupt();
                break;
            }
            try
            {
                operationAndData.callPerformBackgroundOperation();
            }
            catch ( Throwable e )
            {
                handleBackgroundOperationException(operationAndDatae);
            }
        }
    }
    private void processEvent(final CuratorEvent curatorEvent)
    {
        validateConnection(curatorEvent);
        .forEach
        (
            new Function<CuratorListenerVoid>()
            {
                @Override
                public Void apply(CuratorListener listener)
                {
                    try
                    {
                        TimeTrace trace = .startTracer("EventListener");
                        listener.eventReceived(CuratorFrameworkImpl.thiscuratorEvent);
                        trace.commit();
                    }
                    catch ( Exception e )
                    {
                        logError("Event listener threw exception"e);
                    }
                    return null;
                }
            }
        );
    }
    private void validateConnection(CuratorEvent curatorEvent)
    {
        if ( curatorEvent.getType() == . )
        {
            if ( curatorEvent.getWatchedEvent().getState() == ... )
            {
                internalSync(this"/"null);  // we appear to have disconnected, force a new ZK event and see if we can connect to another server
            }
            else if ( curatorEvent.getWatchedEvent().getState() == ... )
            {
            }
            else if ( curatorEvent.getWatchedEvent().getState() == ... )
            {
            }
        }
    }
New to GrepCode? Check out our FAQ X