Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  package com.fasterxml.clustermate.client.operation;
  
  import java.util.*;
  
 
 public class PutOperationImpl<K extends EntryKey,
     CONFIG extends StoreClientConfig<K, CONFIG>
 >
     extends OperationBase<K,CONFIG>
     implements PutOperation
 {
     /*
     /**********************************************************************
     /* Timing constraints
     /**********************************************************************
      */

    
Timestamp at which the whole operation will fail.
 
     protected final long _endOfTime;

    
Last possible time when we can start a call and have some hope of it not failing for timeout.
 
     protected final long _lastValidTime;
 
     /*
     /**********************************************************************
     /* PUT-specific config
     /**********************************************************************
      */
 
     protected final PutCallParameters _params;
     protected final PutContentProvider _content;
 
     protected final int _maxCallRetries;

    
Result object we will be using to pass information.
 
     protected final PutOperationResult _result;
 
     /*
     /**********************************************************************
     /* State
     /**********************************************************************
      */
 
     protected boolean _released;
 
     protected long _roundStartTime;
 
     // // // State
 
    
Number of round(s) of calls completed: each round consists of calling a subset of available nodes.
 
     protected int _round;

    
We need to keep a list of active nodes with possible past and future calls; ones that we may keep trying to call, and that have not yet been added to result object as success or fail.
 
     protected final List<PutCallState_activeNodes;
 
     protected Iterator<PutCallState_currentNodes;
     
     /*
     /**********************************************************************
     /* Life-cycle
     /**********************************************************************
      */
     
     public PutOperationImpl(CONFIG configlong startTime,
             NodesForKey serverNodes, K key,
             PutCallParameters paramsPutContentProvider content)            
     {
         super(configstartTimekey);
 
         // may make configurable in future but for now is static (not very useful to change)
 
          = params;
          = content;
          = new PutOperationResult();
 
         final int serverCount = serverNodes.size();
         if (serverCount == 0) {
              = Collections.emptyList();
         } else {
             = new ArrayList<PutCallState>(serverCount);
            for (int i = 0; i < serverCount; ++i) {
                .add(new PutCallState(serverNodes.node(i)));
            }
        }
        // Then figure out how long we have for the whole operation
    }
    @Override
    public PutOperationResult finish()
    {
        if (!) {
            .release();
             = true;
        }
        for (PutCallState state : ) {
            NodeFailure getFails = state.getFails();
            if (getFails == null) {
                .withIgnored(state.server());
            } else {
                .withFailed(getFails);
            }
        }
        return ;
    }
    /*
    /**********************************************************************
    /* Public API implementation
    /**********************************************************************
     */
    @Override
    public PutOperationResult result() {
        return ;
    }
    @Override
    public int completedRounds() {
        return ;
    }
    @Override
    public boolean hasRemainingHosts() {
        return remainingHostCount() > 0;
    }
    @Override
    public int remainingHostCount() {
        return .size();
    }
    
    @Override
    }
    @Override
        return perform(.getOptimalOks());
    }
    @Override
        /* Here we only want to proceed, if we get all done in first round
         * without issues; sort of bonus call.
         */
        if ( == 0) {
        }
        return this;
    }
    
    @Override
        return perform(.getMaxOks());
    }
    /*
    /**********************************************************************
    /* Main call handling method(s)
    /**********************************************************************
     */

    

Parameters:
result Result object to update, return
oksNeeded Number of success nodes we need, total
    protected PutOperation perform(int oksNeededthrows InterruptedException
    {
        if () {
            throw new IllegalStateException("Can not call 'complete' methods after content has been released");
        }
        // already done?
        if (_shouldFinish(oksNeeded)) {
            return this;
        }
        while ( < ) {
            if ( == 0) {
                if (_performPrimary(oksNeeded)) {
                    return this;
                }
            } else {
                if (_performSecondary(oksNeeded)) {
                    return this;
                }
            }
            ++;
            if () {
                break;
            }
            
        }
        return this;
    }
    protected PutOperation performSingleRound(int oksNeededthrows InterruptedException
    {
        if () {
            throw new IllegalStateException("Can not call 'complete' methods after content has been released");
        }
        if (!_shouldFinish(oksNeeded)) {
            if ( == 0) {
                if (_performPrimary(oksNeeded)) {
                    return this;
                }
            } else {
                if (_performSecondary(oksNeeded)) {
                    return this;
                }
            }
            ++;
        }
        return this;
    }
    
    

Returns:
True if processing is now complete; false if more work needed
    protected boolean _performPrimary(int oksNeededthrows InterruptedException
    {
        if ( == null) { // for very first call
             = .iterator();
             = System.currentTimeMillis();
        }
        while (.hasNext()) {
            final boolean includeDisabled = !// only try disabled ones if no retries allowed
            final PutCallState call = .next();
            final ClusterServerNode server = call.server();
            if (!includeDisabled && server.isDisabled()) { // skip disabled during first round (unless no retries)
                continue;
            }
            CallFailure fail = server.entryPutter().tryPut();
            if (fail == null) { // success
                .remove();
                .addSucceeded(server);
                if (_shouldFinish(oksNeeded)) {
                    return true;
                }
                continue;
            }
            // nope, failed. If retriable, keep; if not, add as failure, remove from active
            if (fail.isRetriable()) {
                call.addFailure(fail);
            } else {
                .remove();
                .withFailed(new NodeFailure(serverfail));
            }
        }
        return false;
    }
    protected boolean _performSecondary(int oksNeededthrows InterruptedException
    {
        // Starting a new round? Will need bit of delay most likely
        if ( == null) {
            if (.isEmpty()) { // no more nodes? We are done
                return true;
            }
             = .iterator();
            
            // If we got this far, let's accept sub-optimal outcomes as well; or, if we timed out
            final long nextRoundStart = System.currentTimeMillis();
            _doDelay(nextRoundStart);
             = nextRoundStart;
        }
        while (.hasNext()) {
            final PutCallState call = .next();
            final ClusterServerNode server = call.server();
            CallFailure fail = server.entryPutter().tryPut();
            if (fail == null) { // success
                .remove();
                .addSucceeded(server);
                if (_shouldFinish(oksNeeded)) {
                    return true;
                }
                continue;
            }
            if (fail.isRetriable()) {
                call.addFailure(fail);
            } else {
                .remove();
                .withFailed(new NodeFailure(serverfail));
            }
        }
        return false// still node done
    }
    /*
    /**********************************************************************
    /* Internal methods
    /**********************************************************************
     */
    protected boolean _shouldFinish(PutOperationResult resultint oksNeeded) {
        if (result.getSuccessCount() >= oksNeeded) {
            return true;
        }
        final long currTime = System.currentTimeMillis();
        if (currTime > ) {
            return true;
        }
        return false;
    }
    protected boolean _shouldFinish(PutOperationResult resultint oksNeededlong currTime) {
        if (result.getSuccessCount() >= oksNeeded) {
            return true;
        }
        if (currTime > ) {
            return true;
        }
        return false;
    }
    /*
    /**********************************************************************
    /* Helper class(es)
    /**********************************************************************
     */

    
Container used to hold in-flight information about calls to a single applicable target node.
    protected final static class PutCallState
    {
        protected final ClusterServerNode _node;
        protected NodeFailure _fails;
        
        public PutCallState(ClusterServerNode node)
        {
             = node;
        }
        public void addFailure(CallFailure fail) {
            if ( == null) {
                 = new NodeFailure(fail);
            } else {
                .addFailure(fail);
            }
        }
        
        public ClusterServerNode server() { return ; }
        public NodeFailure getFails() { return ; }
    }
New to GrepCode? Check out our FAQ X