Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  package com.fasterxml.clustermate.service.sync;
  
  import java.io.*;
  import java.net.*;
  
  import org.slf4j.Logger;
 
 
 
 
 public class SyncListAccessor implements StartAndStoppable
 {
     private final Logger LOG = LoggerFactory.getLogger(getClass());
     
     // public just because tests need it
     public final static String ACCEPTED_CONTENT_TYPES
         = ..toString() + ", " + ..toString();
 
     protected final SharedServiceStuff _stuff;
     
 //    protected final AsyncHttpClient _asyncHttpClient;
 //    protected final HttpClient _blockingHttpClient;
     
     protected final ObjectReader _syncListReader;
 
     protected final ObjectReader _syncEntryReader;
     
     protected final ObjectWriter _syncPullRequestWriter;
 
     protected final AtomicBoolean _closed = new AtomicBoolean(false);
     
     public SyncListAccessor(SharedServiceStuff stuff)
     {
          = stuff;
          = stuff.smileReader(SyncListResponse.class);
          = stuff.smileReader(SyncPullEntry.class);
          = stuff.jsonWriter(SyncPullRequest.class);
 
 //      _asyncHttpClient = new AsyncHttpClient();
 // important: if not using pooled conn manager, must use local instance:
 //        _blockingHttpClient = new DefaultHttpClient();
     }
 
     @Override public void start() { }
     
     @Override
     public void stop()
     {
         .set(true);
 //        _asyncHttpClient.close();
 //        _blockingHttpClient.getConnectionManager().shutdown();
     }
      
         // quick note: errors are in JSON, data as Smile.
         
         // Old code that uses AHC
         /*
     public SyncListResponse fetchSyncList(IpAndPort endpoint,
             long syncedUpTo, KeyRange syncRange,
             TimeSpan timeout)
         throws InterruptedException
     {
         String url = endpoint.getEndpoint() + Constants.PATH_SYNC_LIST + "/" + syncedUpTo;
         
         Request req = _asyncHttpClient.prepareGet(url)
                 .addQueryParameter(Constants.HTTP_QUERY_PARAM_KEYRANGE_START, String.valueOf(syncRange.getStart()))
                 .addQueryParameter(Constants.HTTP_QUERY_PARAM_KEYRANGE_LENGTH, String.valueOf(syncRange.getLength()))
                 .addHeader(HttpHeaders.ACCEPT, ACCEPTED_CONTENT_TYPES)
                 .build();
         // small responses; let's simply buffer, simpler error handling:
         try {
             Future<Response> future = _asyncHttpClient.executeRequest(req);
             Response resp = future.get(timeout.getMillis(), TimeUnit.MILLISECONDS);
             // check status code first:
             int statusCode = resp.getStatusCode();
             byte[] stuff = resp.getResponseBodyAsBytes();
             if (HttpUtil.isSuccess(statusCode)) {
                 try {
                     return _syncListReader.readValue(stuff);
                 } catch (IOException e) {
                     throw new IOException("Invalid sync list returned by '"+url+"', failed to parse Smile: "+e.getMessage());
                }
            }
            String msg = HttpUtil.getExcerpt(stuff);
            LOG.warn("Failed to send syncList request to '{}': status code {}, response excerpt: {}",
                    new Object[] { url, statusCode, msg});
        } catch (InterruptedException e) {
            throw e;
        } catch (TimeoutException e) {
            LOG.warn("syncList request to {} failed with timeout (of {})", url, timeout);
        } catch (Exception e) {
            LOG.warn("syncList request to {} failed with Exception ({}): {}",
                    new Object[] { url, e.getClass().getName(), e.getMessage()});
        }
        return null;
        */
    // And then the Real Thing, with basic JDK stuff:
    
    /*
            _syncState.getAddress(), 
            _syncState.getSyncedUpTo(), _syncState.getRangeSync());
     */
    
            TimeSpan timeoutNodeState remotelong lastClusterHash)
        throws InterruptedException
    {
        final String urlStr = _buildSyncListUrl(clusterremotelastClusterHash);
        HttpURLConnection conn;
        try {
            conn = prepareGet(urlStrtimeout);
            conn.connect();
        } catch (Exception e) {
            .warn("fetchSyncList request to {} failed on send with Exception ({}): {}",
                    new Object[] { urlStre.getClass().getName(), e.getMessage()});
            return null;
        }
        
        // and if we are good, deal with status code, handle headers/response:
        try {
            int statusCode = conn.getResponseCode();
            if (IOUtil.isHTTPSuccess(statusCode)) {
                InputStream in = conn.getInputStream();
                SyncListResponse<?> resp;
                try {
                    resp = .readValue(in);
                } catch (IOException e) {
                    throw new IOException("Invalid sync list returned by '"+urlStr+"', failed to parse Smile: "+e.getMessage());
                } finally {
                    try {
                        in.close();
                    } catch (Exception e) { }
                }
                // Bit of validation, as unknown props are allowed:
                if (resp.hasUnknownProperties()) {
                    .warn("Unrecognized properties in SyncListResponse: "+resp.unknownProperties());
                }
                return resp;
            }
            handleHTTPFailure(connurlStrstatusCode"fetchSyncList");
        } catch (Exception e) {
            .warn("syncList request to {} failed on response with Exception ({}): {}",
                    new Object[] { urlStre.getClass().getName(), e.getMessage()});
        }            
        return null;
    }
    // Old version with Apache HC:
    /*
    public InputStream readSyncPullResponse(SyncPullRequest request,
            IpAndPort endpoint, AtomicInteger statusCodeWrapper,
            int expectedPayloadSize)
        throws IOException
    {
        String url = endpoint.getEndpoint() + Constants.PATH_SYNC_PULL;
        byte[] reqPayload = _syncPullRequestWriter.writeValueAsBytes(request);
        
        HttpPost post = new HttpPost(url);
        
//        post.setEntity(new ByteArrayEntity(reqPayload, ContentType.APPLICATION_JSON));
        post.setEntity(new ByteArrayEntity(reqPayload));
        HttpResponse response = _blockingHttpClient.execute(post);
        int statusCode = response.getStatusLine().getStatusCode();
        statusCodeWrapper.set(statusCode);
        HttpEntity entity = response.getEntity();
        
        InputStream in = entity.getContent();
        
        if (!HttpUtil.isSuccess(statusCode)) {
            // try fetching error message?
            String msg = IOUtil.readExcerpt(in, 500);
            in.close();
            LOG.warn("Sync pull failure when requesting {} entries (of about {} mB total payload). Error code {}, response: {}",
                    new Object[] { request.size(), expectedPayloadSize, statusCode, msg});
            return null;
        }
        return in;
    }
    */
    
    public InputStream readSyncPullResponse(SyncPullRequest requestTimeSpan timeout,
            IpAndPort endpointAtomicInteger statusCodeWrapper,
            int expectedPayloadSize)
        throws IOException
    {
        final String urlStr = _buildSyncPullUrl(endpoint);
        byte[] reqPayload = .writeValueAsBytes(request);
        final int reqLength = reqPayload.length;
        
        HttpURLConnection conn;
        OutputStream out = null;
        try {
            conn = preparePost(urlStrtimeout.);
            // since we do know length in advance, let's just do this:
            conn.setFixedLengthStreamingMode(reqLength);
            conn.connect();
            out = conn.getOutputStream();
            out.write(reqPayload);
            out.close();
        } catch (Exception e) {
            .warn("readSyncPullResponse request to {} failed on send with Exception ({}): {}",
                    new Object[] { urlStre.getClass().getName(), e.getMessage()});
            return null;
        } finally {
            if (out != null) {
                try { out.close(); } catch (IOException e) { }
            }
        }
        try {
            int statusCode = conn.getResponseCode();
            if (IOUtil.isHTTPSuccess(statusCode)) {
                try {
                	return conn.getInputStream();
                } catch (IOException e) {
                    throw new IOException("readSyncPullResponse from '"+urlStr+"' failed: "
                    		+e.getMessage(), e);
                }
            }
            handleHTTPFailure(connurlStrstatusCode,
            		"readSyncPullResponse (requesting "+request.size()+" entries (of about "+expectedPayloadSize+" mB total payload)"
            		);
        } catch (Exception e) {
            .warn("syncList request to {} failed on response with Exception ({}): {}",
                    new Object[] { urlStre.getClass().getName(), e.getMessage()});
        }            
        return null;
    }

    
Helper method used for sending simple status update message, usually done when service starts up or shuts down.
    public boolean sendStatusUpdate(ClusterViewByServerUpdatable cluster,
            TimeSpan timeoutIpAndPort remoteString newStatus)
    {
        final String urlStr = _buildNodeStatusUpdateUrl(clusterremotenewStatus);
        HttpURLConnection conn;
        try {
            conn = preparePost(urlStrtimeout.);
            conn.setDoOutput(false);
            conn.connect();
        } catch (Exception e) {
            .warn("sendStatusUpdate request to {} failed on send with Exception ({}): {}",
                    urlStre.getClass().getName(), e.getMessage());
            return false;
        }
        int statusCode;
        try {
            statusCode = conn.getResponseCode();
        } catch (IOException e) {
            .warn("sendStatusUpdate request to {} failed with Exception ({}): {}",
                    urlStre.getClass().getName(), e.getMessage());
            return false;
        }
        if (IOUtil.isHTTPSuccess(statusCode)) {
            return true;
        }
        handleHTTPFailure(connurlStrstatusCode"sendStatusUpdate");
        return false;
    }
    
    /*
    /**********************************************************************
    /* Helper methods
    /**********************************************************************
     */
    
    protected HttpURLConnection preparePost(String urlStrTimeSpan timeout,
            ContentType contentType)
        throws IOException
    {
        return prepareHttpMethod(urlStrtimeout"POST"truecontentType);
    }
    protected HttpURLConnection prepareGet(String urlStrTimeSpan timeout)
            throws IOException
    {
        return prepareHttpMethod(urlStrtimeout"GET"falsenull);
    }
    
    protected HttpURLConnection prepareHttpMethod(String urlStrTimeSpan timeout,
            String methodNameboolean sendInputContentType contentType)
        throws IOException
    {
        URL url = new URL(urlStr);
        HttpURLConnection conn = (HttpURLConnectionurl.openConnection();
        conn.setRequestMethod(methodName);
        conn.setAllowUserInteraction(false);
        conn.setUseCaches(false);
        conn.setDoOutput(sendInput);
        conn.setDoInput(true); // we always read response
        if (contentType != null) { // should also indicate content type...
            conn.setRequestProperty(.contentType.toString());
        }
        
        // how about timeouts... JDK one does not give us whole-operation granularity but:
        int timeoutMs = (inttimeout.getMillis();
        // let's give only half to connect; more likely we detect down servers on connect
        conn.setConnectTimeout(timeoutMs/2);
        conn.setReadTimeout(timeoutMs);
        return conn;
    }
    // public as it's accessed from outside the package
    public SyncPullEntry decodePullEntry(byte[] datathrows IOException
    {
        return .readValue(data);
    }
    protected void handleHTTPFailure(HttpURLConnection connString urlStrint statusCode,
    		String operation)
    {
        String msg = "N/A";
        try {
            InputStream e = conn.getErrorStream();
            msg = IOUtil.getExcerpt(e);
        } catch (Exception e) {
            .warn("Problem reading ErrorStream for failed {} request to '{}': {}",
                    operationurlStre.getMessage());
        }
        .warn("Failed to process {} response from '{}': status code {}, response excerpt: {}",
                operationurlStrstatusCodemsg);
    }
    protected String _buildSyncListUrl(ClusterViewByServerUpdatable clusterNodeState remote,
            long lastClusterHash)
    {
        final NodeState local = cluster.getLocalState();
        final long syncedUpTo = remote.getSyncedUpTo();
        /* Need to be sure to pass the full range; remote end can do filtering,
         * (to reduce range if need be), but it needs to know full range
         * for initial auto-registration. Although ideally maybe we should
         * pass active and passive separately... has to do, for now.
         */
        final KeyRange syncRange = local.totalRange();
        final ServiceConfig config = .getServiceConfig();
        RequestPathBuilder pathBuilder = new JdkHttpClientPathBuilder(remote.getAddress())
            .addPathSegments(config.servicePathRoot);
        pathBuilder = .getPathStrategy().appendSyncListPath(pathBuilder);
        pathBuilder = pathBuilder.addParameter(.,
                String.valueOf(syncedUpTo));
        pathBuilder = pathBuilder.addParameter(., String.valueOf(syncRange.getStart()));
        pathBuilder = pathBuilder.addParameter(., String.valueOf(syncRange.getLength()));
        // this will include 'caller' param:
        pathBuilder = cluster.addClusterStateInfo(pathBuilder);
        pathBuilder = pathBuilder.addParameter(.,
                String.valueOf(lastClusterHash));
        return pathBuilder.toString();
    }
    
            String state)
    {
        final KeyRange syncRange = cluster.getLocalState().totalRange();
        RequestPathBuilder pathBuilder = new JdkHttpClientPathBuilder(remote)
        pathBuilder = .getPathStrategy().appendNodeStatusPath(pathBuilder);
        pathBuilder = pathBuilder.addParameter(., String.valueOf(syncRange.getStart()));
        pathBuilder = pathBuilder.addParameter(., String.valueOf(syncRange.getLength()));
        pathBuilder = pathBuilder.addParameter(.,
                String.valueOf(.getTimeMaster().currentTimeMillis()));
        pathBuilder = pathBuilder.addParameter(.state);
        // this will include 'caller' param:
        pathBuilder = cluster.addClusterStateInfo(pathBuilder);
        return pathBuilder.toString();
    }
    
    protected String _buildSyncPullUrl(IpAndPort endpoint)
    {
        final ServiceConfig config = .getServiceConfig();
        RequestPathBuilder pathBuilder = new JdkHttpClientPathBuilder(endpoint)
            .addPathSegments(config.servicePathRoot);
        pathBuilder = .getPathStrategy().appendSyncPullPath(pathBuilder);
        return pathBuilder.toString();
    }
New to GrepCode? Check out our FAQ X