Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
AnalyzerBeans Copyright (C) 2014 Neopost - Customer Information Management This copyrighted material is made available to anyone wishing to use, modify, copy, or redistribute it subject to the terms and conditions of the GNU Lesser General Public License, as published by the Free Software Foundation. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. You should have received a copy of the GNU Lesser General Public License along with this distribution; if not, write to: Free Software Foundation, Inc. 51 Franklin Street, Fifth Floor Boston, MA 02110-1301 USA
 
 package org.eobjects.analyzer.cluster;
 
 import java.util.List;
 
An org.eobjects.analyzer.job.runner.AnalysisRunner which executes org.eobjects.analyzer.job.AnalysisJobs accross a distributed set of slave nodes.
 
 public final class DistributedAnalysisRunner implements AnalysisRunner {
 
     private static final Logger logger = LoggerFactory.getLogger(DistributedAnalysisRunner.class);
 
     private final ClusterManager _clusterManager;
     private final AnalyzerBeansConfiguration _configuration;
 
     public DistributedAnalysisRunner(AnalyzerBeansConfiguration configurationClusterManager clusterManager) {
         this(configurationclusterManagernew AnalysisListener[0]);
     }
 
     public DistributedAnalysisRunner(AnalyzerBeansConfiguration configurationClusterManager clusterManager,
             AnalysisListener... listeners) {
          = configuration;
          = clusterManager;
          = new CompositeAnalysisListener(listeners);
     }

    
Determines if an org.eobjects.analyzer.job.AnalysisJob is distributable or not. If this method returns false, calling run(org.eobjects.analyzer.job.AnalysisJob) with the job will typically throw a java.lang.UnsupportedOperationException.

Parameters:
job
Returns:
 
     public boolean isDistributable(final AnalysisJob job) {
         try {
             failIfJobIsUnsupported(job);
             return true;
         } catch (Throwable e) {
             return false;
         }
     }

    

Throws:
java.lang.UnsupportedOperationException if the job is not distributable (either because components are not distributable in their nature, or because some features are limited).
    @Override
    public AnalysisResultFuture run(final AnalysisJob jobthrows UnsupportedOperationException {
        .info("Validating distributed job: {}"job);
        failIfJobIsUnsupported(job);
        final InjectionManager injectionManager = .getInjectionManager(job);
        final LifeCycleHelper lifeCycleHelper = new LifeCycleHelper(injectionManagertrue);
        final RowProcessingPublishers publishers = getRowProcessingPublishers(joblifeCycleHelper);
        final RowProcessingPublisher publisher = getRowProcessingPublisher(publishers);
        publisher.initializeConsumers(new TaskListener() {
            @Override
            public void onError(Task taskThrowable throwable) {
                .error("Failed to initialize consumers at master node!"throwable);
            }
            @Override
            public void onComplete(Task task) {
            }
            @Override
            public void onBegin(Task task) {
            }
        });
        .info("Validation passed! Chunking job for distribution amongst slaves: {}"job);
        // since we always use a SingleThreadedTaskRunner, the above operation
        // will be synchronized/blocking.
        final AnalysisJobMetrics analysisJobMetrics = publishers.getAnalysisJobMetrics();
        .jobBegin(jobanalysisJobMetrics);
        final RowProcessingMetrics rowProcessingMetrics = publisher.getRowProcessingMetrics();
        .rowProcessingBegin(jobrowProcessingMetrics);
        final AnalysisResultFuture resultFuture;
        try {
            final int expectedRows = rowProcessingMetrics.getExpectedRows();
            if (expectedRows == 0) {
                .info("Expected rows of the job was zero. Job will run on a local virtual slave.");
                // when there are no expected rows we still need to build a
                // single slave job, but run it locally, since the job lifecycle
                // still needs to be guaranteed.
                final DistributedJobContext context = new DistributedJobContextImpl(job, 0, 1);
                // use a virtual cluster, which just runs the job locally.
                final VirtualClusterManager localCluster = new VirtualClusterManager(, 1);
                resultFuture = localCluster.dispatchJob(jobcontext);
            } else {
                final JobDivisionManager jobDivisionManager = .getJobDivisionManager();
                final int chunks = jobDivisionManager.calculateDivisionCount(jobexpectedRows);
                final int rowsPerChunk = (expectedRows + 1) / chunks;
                .info(
                        "Expected rows was {}. A total number of {} slave jobs will be built, each of approx. {} rows.",
                        expectedRowschunksrowsPerChunk);
                final List<AnalysisResultFutureresults = dispatchJobs(jobchunksrowsPerChunkpublisher);
                final DistributedAnalysisResultReducer reducer = new DistributedAnalysisResultReducer(job,
                        lifeCycleHelperpublisher);
                resultFuture = new DistributedAnalysisResultFuture(resultsreducer);
            }
        } catch (RuntimeException e) {
            .errorUknown(jobe);
            throw e;
        }
        if (!.isEmpty()) {
            awaitAndInformListener(jobanalysisJobMetricsrowProcessingMetricsresultFuture);
        }
        return resultFuture;
    }

    
Spawns a new thread for awaiting the result future (which will force the reducer to inform about the progress).

Parameters:
job
analysisJobMetrics
resultFuture
    private void awaitAndInformListener(final AnalysisJob jobfinal AnalysisJobMetrics analysisJobMetrics,
            final RowProcessingMetrics rowProcessingMetricsfinal AnalysisResultFuture resultFuture) {
        SharedExecutorService.get().execute(new Runnable() {
            @Override
            public void run() {
                resultFuture.await();
                if (resultFuture.isSuccessful()) {
                    .jobSuccess(jobanalysisJobMetrics);
                }
            }
        });
    }
    public List<AnalysisResultFuturedispatchJobs(final AnalysisJob jobfinal int chunksfinal int rowsPerChunk,
            final RowProcessingPublisher publisher) {
        final List<AnalysisResultFutureresults = new ArrayList<AnalysisResultFuture>();
        for (int i = 0; i < chunksi++) {
            final int firstRow = (i * rowsPerChunk) + 1;
            final int maxRows;
            if (i == chunks - 1) {
                maxRows = . - firstRow - 1;
            } else {
                maxRows = rowsPerChunk;
            }
            final AnalysisJob slaveJob = buildSlaveJob(jobifirstRowmaxRows);
            final DistributedJobContext context = new DistributedJobContextImpl(jobichunks);
            try {
                .info("Dispatching slave job {} of {}"i + 1, chunks);
                final AnalysisResultFuture slaveResultFuture = .dispatchJob(slaveJobcontext);
                results.add(slaveResultFuture);
            } catch (Exception e) {
                .errorUknown(jobe);
                // exceptions due to dispatching jobs are added as the first of
                // the job's errors, and the rest of the execution is aborted.
                AnalysisResultFuture errorResult = new FailedAnalysisResultFuture(e);
                results.add(0, errorResult);
                break;
            }
        }
        return results;
    }

    
Creates a slave job by copying the original job and adding a org.eobjects.analyzer.beans.filter.MaxRowsFilter as a default requirement.

Parameters:
job
firstRow
maxRows
Returns:
    private AnalysisJob buildSlaveJob(AnalysisJob jobint slaveJobIndexint firstRowint maxRows) {
        .info("Building slave job {} with firstRow={} and maxRow={}"slaveJobIndex + 1, firstRowmaxRows);
        try (final AnalysisJobBuilder jobBuilder = new AnalysisJobBuilder(job)) {
            final FilterJobBuilder<MaxRowsFilterCategorymaxRowsFilter = jobBuilder.addFilter(MaxRowsFilter.class);
            maxRowsFilter.getConfigurableBean().setFirstRow(firstRow);
            maxRowsFilter.getConfigurableBean().setMaxRows(maxRows);
            final boolean naturalRecordOrderConsistent = jobBuilder.getDatastore().getPerformanceCharacteristics()
                    .isNaturalRecordOrderConsistent();
            if (!naturalRecordOrderConsistent) {
                final InputColumn<?> orderColumn = findOrderByColumn(jobBuilder);
                maxRowsFilter.getConfigurableBean().setOrderColumn(orderColumn);
            }
            jobBuilder.setDefaultRequirement(maxRowsFilter..);
            // in assertion/test mode do an early validation
            assert jobBuilder.isConfigured(true);
            return jobBuilder.toAnalysisJob();
        }
    }

    
Finds a source column which is appropriate for an ORDER BY clause in the generated paginated queries

Parameters:
jobBuilder
Returns:
    private InputColumn<?> findOrderByColumn(AnalysisJobBuilder jobBuilder) {
        final Table sourceTable = jobBuilder.getSourceTables().get(0);
        // preferred strategy: Use the primary key
        final Column[] primaryKeys = sourceTable.getPrimaryKeys();
        if (primaryKeys.length == 1) {
            final Column primaryKey = primaryKeys[0];
            final InputColumn<?> sourceColumn = jobBuilder.getSourceColumnByName(primaryKey.getName());
            if (sourceColumn == null) {
                jobBuilder.addSourceColumn(primaryKey);
                .info("Added PK source column for ORDER BY clause on slave jobs: {}"sourceColumn);
                return jobBuilder.getSourceColumnByName(primaryKey.getName());
            } else {
                .info("Using existing PK source column for ORDER BY clause on slave jobs: {}"sourceColumn);
                return sourceColumn;
            }
        } else {
            if (.isDebugEnabled()) {
                .debug("Found {} primary keys, cannot select a single for ORDER BY clause on slave jobs: {}",
                        primaryKeys.length, Arrays.toString(primaryKeys));
            }
        }
        // secondary strategy: See if there's a source column called something
        // like 'ID' or so, and use that.
        final List<MetaModelInputColumnsourceColumns = jobBuilder.getSourceColumns();
        final String tableName = sourceTable.getName().toLowerCase();
        for (final MetaModelInputColumn sourceColumn : sourceColumns) {
            String name = sourceColumn.getName();
            if (name != null) {
                name = StringUtils.replaceWhitespaces(name"");
                name = StringUtils.replaceAll(name"_""");
                name = StringUtils.replaceAll(name"-""");
                name = name.toLowerCase();
                if ("id".equals(name) || (tableName + "id").equals(name) || (tableName + "number").equals(name)
                        || (tableName + "key").equals(name)) {
                    .info("Using existing source column for ORDER BY clause on slave jobs: {}"sourceColumn);
                    return sourceColumn;
                }
            }
        }
        // last resort: Pick any source column and sort on that (might not work
        // if the column contains a lot of repeated values)
        final MetaModelInputColumn sourceColumn = sourceColumns.get(0);
        .warn(
                "Couldn't pick a good source column for ORDER BY clause on slave jobs. Picking the first column: {}",
                sourceColumn);
        return sourceColumn;
    }
        final SourceColumnFinder sourceColumnFinder = new SourceColumnFinder();
        sourceColumnFinder.addSources(job);
        final SingleThreadedTaskRunner taskRunner = new SingleThreadedTaskRunner();
        final RowProcessingPublishers publishers = new RowProcessingPublishers(jobnulltaskRunnerlifeCycleHelper,
                sourceColumnFinder);
        return publishers;
    }
        final Table[] tables = publishers.getTables();
        if (tables.length != 1) {
            throw new UnsupportedOperationException("Jobs with multiple source tables are not distributable");
        }
        final Table table = tables[0];
        final RowProcessingPublisher publisher = publishers.getRowProcessingPublisher(table);
        return publisher;
    }
    }
    private void failIfComponentsAreUnsupported(Collection<? extends ComponentJobjobs)
            throws UnsupportedOperationException {
        for (ComponentJob job : jobs) {
            final ComponentDescriptor<?> descriptor = job.getDescriptor();
            if (descriptor instanceof BeanDescriptor) {
                final BeanDescriptor<?> beanDescriptor = (BeanDescriptor<?>) descriptor;
                final boolean distributable = beanDescriptor.isDistributable();
                if (!distributable) {
                    throw new UnsupportedOperationException("Component is not distributable: " + job);
                }
            } else {
                throw new UnsupportedOperationException("Unsupported component type: " + descriptor);
            }
        }
    }
New to GrepCode? Check out our FAQ X