Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
eobjects.org AnalyzerBeans Copyright (C) 2010 eobjects.org This copyrighted material is made available to anyone wishing to use, modify, copy, or redistribute it subject to the terms and conditions of the GNU Lesser General Public License, as published by the Free Software Foundation. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. You should have received a copy of the GNU Lesser General Public License along with this distribution; if not, write to: Free Software Foundation, Inc. 51 Franklin Street, Fifth Floor Boston, MA 02110-1301 USA
 
 package org.eobjects.analyzer.cluster.http;
 
 import java.util.List;
 import java.util.UUID;
 
 import  org.apache.http.impl.conn.PoolingClientConnectionManager;
A cluster manager that uses HTTP servlet transport to communicate between nodes.
 
 public class HttpClusterManager implements ClusterManager {
 
     private static final Logger logger = LoggerFactory.getLogger(HttpClusterManager.class);
 
     public static final String HTTP_PARAM_SLAVE_JOB_ID = "slave-job-id";
     public static final String HTTP_PARAM_ACTION = "action";
     public static final String HTTP_PARAM_JOB_DEF = "job-def";
 
     public static final String ACTION_RUN = "run";
     public static final String ACTION_CANCEL = "cancel";
 
     private final HttpClient _httpClient;
     private final List<String_slaveEndpoints;

    
Creates a new HTTP cluster manager

Parameters:
slaveEndpoints the endpoint URLs of the slaves
 
     public HttpClusterManager(List<StringslaveEndpoints) {
         this(new DefaultHttpClient(new PoolingClientConnectionManager()), slaveEndpoints);
     }

    
Create a new HTTP cluster manager

Parameters:
httpClient http client to use for invoking slave endpoints. Must be capable of executing multiple requests at the same time (see PoolingClientConnectionManager).
slaveEndpoints the endpoint URLs of the slaves
 
     public HttpClusterManager(HttpClient httpClientList<StringslaveEndpoints) {
          = httpClient;
          = slaveEndpoints;
     }
 
     @Override
    }
    @Override
    public AnalysisResultFuture dispatchJob(AnalysisJob jobDistributedJobContext contextthrows Exception {
        // determine endpoint url
        final int index = context.getJobDivisionIndex();
        final String slaveEndpoint = .get(index);
        // write the job as XML
        final JaxbJobWriter jobWriter = new JaxbJobWriter(context.getMasterConfiguration());
        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
        jobWriter.write(jobbaos);
        final byte[] bytes = baos.toByteArray();
        // send the request in another thread
        final List<Throwableerrors = new LinkedList<Throwable>();
        final String slaveJobUuid = UUID.randomUUID().toString();
        final LazyRef<AnalysisResultresultRef = sendExecuteRequest(slaveEndpointbyteserrorsslaveJobUuid);
        resultRef.requestLoad(new Action<Throwable>() {
            @Override
            public void run(Throwable errorthrows Exception {
                errors.add(error);
            }
        });
        return new LazyRefAnalysisResultFuture(resultReferrors) {
            @Override
            public void cancel() {
                sendCancelRequest(slaveEndpointslaveJobUuid);
            }
        };
    }
    private LazyRef<AnalysisResultsendExecuteRequest(final String slaveEndpointfinal byte[] bytes,
            final List<Throwableerrorsfinal String slaveJobId) {
        return new LazyRef<AnalysisResult>() {
            @Override
            protected AnalysisResult fetch() throws Throwable {
                // send the HTTP request
                final HttpPost request = new HttpPost(slaveEndpoint);
                final List<NameValuePairparameters = new ArrayList<NameValuePair>();
                parameters.add(new BasicNameValuePair(slaveJobId));
                parameters.add(new BasicNameValuePair());
                parameters.add(new BasicNameValuePair(new String(bytes)));
                final UrlEncodedFormEntity entity = new UrlEncodedFormEntity(parameters);
                request.setEntity(entity);
                .info("Firing run request to slave server '{}' for job id '{}'"slaveEndpointslaveJobId);
                final HttpResponse response = .execute(request);
                // handle the response
                final StatusLine statusLine = response.getStatusLine();
                if (statusLine.getStatusCode() != 200) {
                    throw new IllegalStateException("Slave server '" + slaveEndpoint
                            + "' responded with an error to 'run' request: " + statusLine.getReasonPhrase() + " ("
                            + statusLine.getStatusCode() + ")");
                }
                final InputStream inputStream = response.getEntity().getContent();
                try {
                    AnalysisResult result = readResult(inputStreamerrors);
                    return result;
                } finally {
                    FileHelper.safeClose(inputStream);
                }
            }
        };
    }
    private void sendCancelRequest(String slaveEndpointString slaveJobId) {
        final HttpPost request = new HttpPost(slaveEndpoint);
        request.getParams().setParameter(slaveJobId);
        try {
            final HttpResponse response = .execute(request);
            // handle the response
            final StatusLine statusLine = response.getStatusLine();
            if (statusLine.getStatusCode() != 200) {
                throw new IllegalStateException("Slave server '" + slaveEndpoint
                        + "' responded with an error to 'cancel' request: " + statusLine.getReasonPhrase() + " ("
                        + statusLine.getStatusCode() + ")");
            }
        } catch (Exception e) {
            if (e instanceof RuntimeException) {
                throw (RuntimeExceptione;
            }
            throw new IllegalStateException("Failed to fire cancel request to slave server '" + slaveEndpoint
                    + "' for job id '" + slaveJobId + "'"e);
        }
    }
    protected AnalysisResult readResult(InputStream inputStreamList<Throwableerrorsthrows Exception {
        final ChangeAwareObjectInputStream changeAwareObjectInputStream = new ChangeAwareObjectInputStream(inputStream);
        final Object object = changeAwareObjectInputStream.readObject();
        changeAwareObjectInputStream.close();
        if (object instanceof AnalysisResult) {
            // response carries a result
            return (AnalysisResultobject;
        } else if (object instanceof List) {
            // response carries a list of errors
            @SuppressWarnings("unchecked")
            List<ThrowableslaveErrors = (List<Throwable>) object;
            errors.addAll(slaveErrors);
            return null;
        } else {
            throw new IllegalStateException("Unexpected response payload: " + object);
        }
    }
New to GrepCode? Check out our FAQ X