Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Copyright 2014, The OpenNMS Group
   * 
   * Licensed under the Apache License, Version 2.0 (the "License"); you may
   * not use this file except in compliance with the License. You may obtain
   * a copy of the License at
   * 
   *     http://www.apache.org/licenses/LICENSE-2.0
   *     
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 package org.opennms.newts.gsod;
 
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.opennms.newts.gsod.FileObservable.fileTreeWalker;
 import static org.opennms.newts.gsod.FileObservable.lines;
 import static rx.exceptions.Exceptions.propagate;
 
 import java.io.File;
 import java.net.URI;
 import java.util.List;
 
 
 import rx.Observable;
 import rx.Observer;
 import rx.Subscriber;
 
 
 
 public class ImportRunner {
     
     private int m_samplesPerBatch = 1000;
     private File m_source;
     private String m_restUrl = null;
     private SampleRepository m_repository;
     private int m_threadCount = 1;
     private int m_maxThreadQueueSize = 0;
     private double m_timescaleFactor = 1.0;
     private long m_timeoffset = 0;
     
     private void checkArgument(boolean checkString failureMessage) {
         if (!checkthrow new IllegalArgumentException(failureMessage);
     }
 
 
    @Option(name="-n", aliases="--samples-per-batch", metaVar="sample-count", usage="the maxinum number of samples to include in each post to the repository (default: 1000)")
    public void setSamplesPerBatch(int samplesPerBatch) {
        checkArgument(samplesPerBatch > 0, "samples per batch must be greater than zero!");
         = samplesPerBatch;
    }
    
    @Option(name="-u", aliases="--url", metaVar="url", usage="publish data via a Newts REST server at the given url (default: use direct access via Newts API)")
    public void setURL(String url) {
        checkArgument(url != null && !url.isEmpty(), "the url must not be empty");
         = url;
    }
    
    @Option(name="-p", aliases="--parallelism", metaVar="thread-count", usage="when using direct the size of the thread pool that posts the results.  (defaults to 1 ie no parallelism)")
    public void setParallelism(int threadCount) {
        checkArgument(threadCount > 0, "thread count must be at least 1.");
         = threadCount;
    }
    
    @Option(name="-q", aliases="--max-work-queue-size", metaVar="batch-count", usage="when using direct the max size of the work-queue (defaults to thread-count * 3)")
    public void setMaxThreadQueueSize(int maxThreadQueueSize) {
        checkArgument(maxThreadQueueSize > 0, "max thread queue size must be at least 1.");
         = maxThreadQueueSize;
    }
    
    @Option(name="-f", aliases="--time-scale-factor", metaVar="long", usage="to scale down the date we compress time dividing time by this factor")
    public void setTimescaleFactor(double factor) {
         = factor;
    }
    
    @Option(name="-o", aliases="--time-offset", metaVar="timestamp", usage="adjust epoch time in seconds to be <time-offset>. defaults to no offset.  'now' is allowed.")
    public void setTimeoffset(String offset) {
        if (offset.equals("now")) {
            = System.currentTimeMillis();
        } else {
             = Long.valueOf(offset)*1000;
        }
    }
    
    @Argument(metaVar="sourceDir", required=true, usage="the source directory that contains gsod data to import. These must be gzip'd files")
    public void setSource(File source) {
        checkArgument(source.exists(), "the source directory "+source+" does not exist");
        checkArgument(source.isDirectory(), "the source directory must be a directory");
         = source;
    }
    private static final Logger LOG = LoggerFactory.getLogger(ImportRunner.class);
    public static void main(String... argsthrows Exception {
        new ImportRunner().execute(args);
    }
    
    public void execute(String... argsthrows Exception {
        CmdLineParser parser = new CmdLineParser(this);
        try {
            parser.parseArgument(args);
        } catch (CmdLineException e) {
            // handling of wrong arguments
            ..println(e.getMessage());
            parser.printUsage(.);
            return;
        }
        // Setup the slf4j metrics reporter
        MetricRegistry metrics = new MetricRegistry();
        
        final long start = System.currentTimeMillis();
        metrics.register("elapsed-seconds"new Gauge<Double>() {
            @Override
            public Double getValue() {
                return (System.currentTimeMillis() - start)/1000.0;
            }
            
        });
        
        final ConsoleReporter reporter = ConsoleReporter.forRegistry(metrics)
                .outputTo(.)
                .convertRatesTo()
                .convertDurationsTo()
                .build();
        reporter.start(10, );
        if ( == null) {
            // we are using a direct importer so use a NewtsReporter for storing metrics
            NewtsReporter newtsReporter = NewtsReporter.forRegistry(metrics)
                    .name("importer")
                    .convertRatesTo()
                    .convertDurationsTo()
                    .build(repository());
            
            newtsReporter.start(1, );
        }
        .debug("Scanning {} for GSOD data files...");
        
        // walk the files in the directory given
        Observable<Samplesamples = fileTreeWalker(.toPath())
             .subscribeOn(Schedulers.io())
                
             // set up a meter for each file processed
            .map(meter(metrics.meter("files"), Path.class))
            
            // report file
            .map(reportFile())
            // read all the files and convert them into lines
            .mergeMap(lines())
            // excluding the header lines
            .filter(exclude("YEARMODA"))
            
            // turn each line into a list of samples
            .mergeMap(samples())
            
            // adjust time on samples according to arguments
            .map(adjustTime())
            
            // meter the samples
            .map(meter(metrics.meter("samples"), Sample.class))            
            ;
        
        
        Observable<List<Sample>> batches = samples
            // create batches each second or of size m_samplesPerBatch whichever comes first
            .buffer()
            ;
        
        Observable<BooleandoImport =  != null ? restPoster(batchesmetrics) : directPoster(batchesmetrics);
        
        
        
        ..println("doImport = " + doImport);
        // GO!!!
        final AtomicReference<Subscriptionsubscription = new AtomicReference<>();
        final AtomicBoolean failed = new AtomicBoolean(false);
        
        final CountDownLatch latch = new CountDownLatch(1);
        
        Subscription s = doImport.subscribe(new Observer<Boolean>() {
            @Override
            public void onCompleted() {
                ..println("Finished Importing Everything!");
                reporter.report();
                latch.countDown();
                System.exit(0);
            }
            @Override
            public void onError(Throwable e) {
                failed.set(true);
                ..println("Error importing!");
                e.printStackTrace();
                try {
                    //latch.await();
                    Subscription s = subscription.get();
                    if (s != nulls.unsubscribe();
                } catch (Exception ex) {
                    ..println("Failed to close httpClient!");
                    ex.printStackTrace();
                } finally {
                    //dumpThreads();
                }
            }
            @Override
            public void onNext(Boolean t) {
                ..println("Received a boolen: " + t);
            }
        });
        
        subscription.set(s);
        if (failed.get()) {
            s.unsubscribe();
        }
        //latch.countDown();
        ..println("Return from Subscribe!");
        
        latch.await();
        
        //dumpThreads();
        
    }
    
    private Func1<? super Sample, ? extends SampleadjustTime() {
        return new Func1<SampleSample>() {
            @Override
            public Sample call(Sample s) {
                Timestamp oldTs = s.getTimestamp();
                Timestamp newTs = Timestamp.fromEpochMillis( + Math.round(oldTs.asMillis()/));
                return new Sample(newTss.getResource(), s.getName(), s.getType(), s.getValue());
            }
            
        };
    }
    private SampleRepository repository() {
        if ( == null) {
            Injector injector = Guice.createInjector(new Config());
             = injector.getInstance(SampleRepository.class);
        }
        return ;
    }
    private Observable<BooleandirectPoster(Observable<List<Sample>> samplesMetricRegistry metrics) {
        final SampleRepository repository = repository();
        final Timer timer = metrics.timer("writes");
        final Meter completions = metrics.meter("samples-completed");
        
        Func1<List<Sample>, Booleaninsert = new Func1<List<Sample>, Boolean>() {
            @Override
            public Boolean call(List<Samples) {
                int sz = s.size();
                try (Context timerCtx = timer.time()) {
                    repository.insert(s);
                    return true;
                } finally {
                    completions.mark(sz);
                }
            }
        };
        
        
        return ( == 1 ? samples.map(insert) : parMap(samplesmetricsinsert)).all(Functions.<Boolean>identity());
        
        
    }
    private Observable<BooleanparMap(Observable<List<Sample>> samplesMetricRegistry metricsFunc1<List<Sample>, Booleaninsert) {
        
        final Timer waitTime = metrics.timer("wait-time");
    
        
        @SuppressWarnings("serial")
        final BlockingQueue<RunnableworkQueue = new LinkedBlockingQueue<Runnable>( == 0 ?  * 3 : ) {
            @Override
            public boolean offer(Runnable r) {
                try (Context time = waitTime.time()) {
                    this.put(r);
                    return true;
                } catch (InterruptedException e) {
                    throw Exceptions.propagate(e);
                }
            }
            @Override
            public boolean add(Runnable r) {
                try (Context time = waitTime.time()) {
                    this.put(r);
                    return true;
                } catch (InterruptedException e) {
                    throw Exceptions.propagate(e);
                }
            }
            
        };
        final ThreadPoolExecutor executor = new ThreadPoolExecutor(,
                                                                   0L, .,
                                                                   workQueue);
        
        metrics.register("active-threads"new Gauge<Integer>() {
            @Override
            public Integer getValue() {
                return executor.getActiveCount();
            }
            
        });
        
        metrics.register("pool-size"new Gauge<Integer>() {
            @Override
            public Integer getValue() {
                return executor.getPoolSize();
            }
            
        });
        metrics.register("largest-pool-size"new Gauge<Integer>() {
            @Override
            public Integer getValue() {
                return executor.getLargestPoolSize();
            }
            
        });
        
        metrics.register("work-queue-size"new Gauge<Integer>() {
            @Override
            public Integer getValue() {
                return workQueue.size();
            }
            
        });
        
        
        return parMap(samplesexecutormetricsinsert);
    }
    
    private Observable<BooleanparMap(Observable<List<Sample>> samplesExecutorService executorSvcfinal MetricRegistry metricsfinal Func1<List<Sample>, Booleaninsert) {
        final ListeningExecutorService executor = MoreExecutors.listeningDecorator(executorSvc);
        
        Observable<Booleano = samples
                .lift(new Operator<ListenableFuture<Boolean>, List<Sample>>() {
            @Override
            public Subscriber<? super List<Sample>> call(final Subscriber<? super ListenableFuture<Boolean>> s) {
                return new Subscriber<List<Sample>>() {
                    @Override
                    public void onCompleted() {
                        if (!s.isUnsubscribed()) {
                            s.onCompleted();
                        }
                        executor.shutdown();
                    }
                    @Override
                    public void onError(Throwable e) {
                        if (!s.isUnsubscribed()) {
                            s.onError(e);
                        }
                    }
                    @Override
                    public void onNext(final List<Samplet) {
                        if (!s.isUnsubscribed()) {
                            try {
                                ListenableFuture<Booleanf = executor.submit(new Callable<Boolean>() {
                                    @Override
                                    public Boolean call() throws Exception {
                                        return insert.call(t);
                                    }
                                });
                                s.onNext(f);
                            } catch (Throwable ex) {
                                onError(ex);
                            }
                        
                        }
                    }
                };
            }
                
        })
        .observeOn(Schedulers.io())
        .map(new Func1<ListenableFuture<Boolean>, Boolean>() {
            @Override
            public Boolean call(ListenableFuture<Booleanf) {
                try {
                    return f.get();
                } catch (Throwable e) {
                    throw Exceptions.propagate(e);
                }
            }
            
        });
        return o;
    }
    
    
    private Observable<BooleanrestPoster(Observable<List<Sample>> samples,  MetricRegistry metrics) {
        final CloseableHttpAsyncClient httpClient = HttpAsyncClients.createDefault();
        httpClient.start();
        return samples
                // turn each batch into json
                .map(toJSON())
                // meter them as the go into the post code
                .map(meter(metrics.meter("posts"), String.class))
                // post the json to the REST server
                .mergeMap(postJSON(httpClient))
                // meter the responses
                .map(meter(metrics.meter("responses"), ObservableHttpResponse.class))
                
                // count sample completions
                .map(meter(metrics.meter("samples-completed"), ObservableHttpResponse.class))
                // make sure every request has a successful return code
                .all(successful())
                
                .doOnCompleted(new Action0() {
                    @Override
                    public void call() {
                        try {
                            httpClient.close();
                        } catch (IOException e) {
                            ..println("Failed to close httpClient!");
                            e.printStackTrace();
                        }
                    }
                    
                });
    }
    private static Func1<? super Path, ? extends PathreportFile() {
        return new Func1<PathPath>() {
            @Override
            public Path call(Path file) {
                ..println("Begin Processing: " + file);
                return file;
            }
            
        };
    }
    public static Func1<StringObservable<Sample>> samples() {
        final LineParser parser = new LineParser();
        return new Func1<StringObservable<Sample>>() {
            @Override
            public Observable<Samplecall(String line) {
                try {
                    return Observable.from(parser.parseLine(line));
                } catch (ParseException e) {
                    throw propagate(e);
                }
            }
        };
    }
    private static boolean isNaN(Sample sample) {
        return (sample.getType() == .) && Double.isNaN(sample.getValue().doubleValue());
    }
    public static Func1<List<Sample>, StringtoJSON() {
        return new Func1<List<Sample>, String>() {
            @Override
            public String call(List<Samplesamples) {
                JSONBuilder bldr = new JSONBuilder();
                for(Sample sample : samples) {
                    if (isNaN(sample)) continue;
                    //System.err.println("Importing: " + sample);
                    bldr.newObject();
                    bldr.attr("timestamp"sample.getTimestamp().asMillis());
                    bldr.attr("resource"sample.getResource().getId());
                    bldr.attr("name"sample.getName());
                    bldr.attr("type"sample.getType().name());
                    if (sample.getType() == .) {
                        bldr.attr("value"sample.getValue().doubleValue());
                    } else {
                        bldr.attr("value"sample.getValue().longValue());
                    }
                }
                return bldr.toString();
            }
        };
    }
    
    private static Func1<ObservableHttpResponseBooleansuccessful() {
        return new Func1<ObservableHttpResponseBoolean>() {
            @Override
            public Boolean call(ObservableHttpResponse response) {
                if (response.getResponse().getStatusLine().getStatusCode() >= 400) {
                    throw new RuntimeException("Failed to post samples: " + response.getResponse().getStatusLine());
                }
                return true;
            }
            
        };
    }
    public static Func1<StringObservable<ObservableHttpResponse>> postJSON(final String baseURLfinal CloseableHttpAsyncClient httpClient) {
        final URI baseURI = URI.create(baseURL);
        return new Func1<StringObservable<ObservableHttpResponse>>() {
            @Override
            public Observable<ObservableHttpResponsecall(String json) {
                try {
                    return ObservableHttp.createRequest(HttpAsyncMethods.createPost(baseURIjson.), httpClient).toObservable();
                } catch (UnsupportedEncodingException e) {
                    throw Exceptions.propagate(e);
                }
            }
        };
    }
    
    public static Func1<StringBooleanexclude(final String pattern) {
        return new Func1<StringBoolean>() {
            @Override
            public Boolean call(String s) {
                return !s.contains(pattern);
            }
            
        };
    }
    
    public static <T> Func1<T, T> meter(final Meter meterClass<T> clazz) {
        return meter(meter, 1, clazz);
    }
    
    public static <T> Func1<T, T> meter(final Meter meterfinal int countClass<T> clazz) {
        return new Func1<T, T>() {
            @Override
            public T call(T t) {
                meter.mark(count);
                return t;
            }
            
        };
    }
    
New to GrepCode? Check out our FAQ X