Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Copyright 2009-2010 by The Regents of the University of California
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * you may obtain a copy of the License from
   * 
   *     http://www.apache.org/licenses/LICENSE-2.0
   * 
   * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 package edu.uci.ics.hyracks.examples.tpch.client;
 
 import java.io.File;
 
 
 
 public class Main {
     private static class Options {
         @Option(name = "-host", usage = "Hyracks Cluster Controller Host name", required = true)
         public String host;
 
         @Option(name = "-port", usage = "Hyracks Cluster Controller Port (default: 1098)", required = false)
         public int port = 1098;
 
         @Option(name = "-infile-customer-splits", usage = "Comma separated list of file-splits for the CUSTOMER input. A file-split is <node-name>:<path>", required = true)
         public String inFileCustomerSplits;
 
         @Option(name = "-infile-order-splits", usage = "Comma separated list of file-splits for the ORDER input. A file-split is <node-name>:<path>", required = true)
         public String inFileOrderSplits;
 
         @Option(name = "-outfile-splits", usage = "Comma separated list of file-splits for the output", required = true)
         public String outFileSplits;
 
         @Option(name = "-num-join-partitions", usage = "Number of Join partitions to use (default: 1)", required = false)
         public int numJoinPartitions = 1;
 
         @Option(name = "-profile", usage = "Enable/Disable profiling. (default: enabled)")
         public boolean profile = true;
 
         @Option(name = "-table-size", usage = "Table size for in-memory hash join", required = false)
         public int tableSize = 8191;
 
         @Option(name = "-algo", usage = "Join types", required = true)
         public String algo;
 
         // For grace/hybrid hash join only
         @Option(name = "-mem-size", usage = "Memory size for hash join", required = true)
         public int memSize;
 
        @Option(name = "-input-size", usage = "Input size of the grace/hybrid hash join", required = false)
        public int graceInputSize = 10;
        @Option(name = "-records-per-frame", usage = "Records per frame for grace/hybrid hash join", required = false)
        public int graceRecordsPerFrame = 200;
        @Option(name = "-grace-factor", usage = "Factor of the grace/hybrid hash join", required = false)
        public double graceFactor = 1.2;
        // Whether group-by is processed after the join
        @Option(name = "-has-groupby", usage = "Whether to have group-by operation after join (default: disabled)", required = false)
        public boolean hasGroupBy = false;
    }
    public static void main(String[] argsthrows Exception {
        Options options = new Options();
        CmdLineParser parser = new CmdLineParser(options);
        parser.parseArgument(args);
        IHyracksClientConnection hcc = new HyracksConnection(options.hostoptions.port);
        JobSpecification job = createJob(parseFileSplits(options.inFileCustomerSplits),
                parseFileSplits(options.inFileOrderSplits), parseFileSplits(options.outFileSplits),
                options.numJoinPartitionsoptions.algooptions.graceInputSizeoptions.graceRecordsPerFrame,
                options.graceFactoroptions.memSizeoptions.tableSizeoptions.hasGroupBy);
        long start = System.currentTimeMillis();
        JobId jobId = hcc.startJob(job,
                options.profile ? EnumSet.of(.) : EnumSet.noneOf(JobFlag.class));
        hcc.waitForCompletion(jobId);
        long end = System.currentTimeMillis();
        ..println(start + " " + end + " " + (end - start));
    }
    private static FileSplit[] parseFileSplits(String fileSplits) {
        String[] splits = fileSplits.split(",");
        FileSplit[] fSplits = new FileSplit[splits.length];
        for (int i = 0; i < splits.length; ++i) {
            String s = splits[i].trim();
            int idx = s.indexOf(':');
            if (idx < 0) {
                throw new IllegalArgumentException("File split " + s + " not well formed");
            }
            fSplits[i] = new FileSplit(s.substring(0, idx), new FileReference(new File(s.substring(idx + 1))));
        }
        return fSplits;
    }
    private static JobSpecification createJob(FileSplit[] customerSplitsFileSplit[] orderSplits,
            FileSplit[] resultSplitsint numJoinPartitionsString algoint graceInputSizeint graceRecordsPerFrame,
            double graceFactorint memSizeint tableSizeboolean hasGroupBythrows HyracksDataException {
        JobSpecification spec = new JobSpecification();
        IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(customerSplits);
        RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
        IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(orderSplits);
        RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
                . });
        RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
                . });
        FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(specordersSplitsProvider,
                        ..,
                        ..,
                        ..,
                        .. }, '|'), ordersDesc);
        createPartitionConstraint(specordScannerorderSplits);
        FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(speccustSplitsProvider,
                        ..,
                        ..,
                        ..,
                        . }, '|'), custDesc);
        createPartitionConstraint(speccustScannercustomerSplits);
        IOperatorDescriptor join;
        if ("nestedloop".equalsIgnoreCase(algo)) {
            join = new NestedLoopJoinOperatorDescriptor(specnew JoinComparatorFactory(
                    PointableBinaryComparatorFactory.of(.), 0, 1), custOrderJoinDescmemSizefalsenull);
        } else if ("gracehash".equalsIgnoreCase(algo)) {
            join = new GraceHashJoinOperatorDescriptor(
                    spec,
                    memSize,
                    graceInputSize,
                    graceRecordsPerFrame,
                    graceFactor,
                    new int[] { 0 },
                    new int[] { 1 },
                    new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
                            .of(.) },
                    new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(.) },
                    custOrderJoinDesc);
        } else if ("hybridhash".equalsIgnoreCase(algo)) {
            join = new HybridHashJoinOperatorDescriptor(
                    spec,
                    memSize,
                    graceInputSize,
                    graceRecordsPerFrame,
                    graceFactor,
                    new int[] { 0 },
                    new int[] { 1 },
                    new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
                            .of(.) },
                    new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(.) },
                    custOrderJoinDesc);
        } else {
            join = new InMemoryHashJoinOperatorDescriptor(
                    spec,
                    new int[] { 0 },
                    new int[] { 1 },
                    new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
                            .of(.) },
                    new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(.) },
                    custOrderJoinDesc, 6000000);
        }
        PartitionConstraintHelper.addPartitionCountConstraint(specjoinnumJoinPartitions);
        IConnectorDescriptor ordJoinConn = new MToNPartitioningConnectorDescriptor(spec,
                new FieldHashPartitionComputerFactory(new int[] { 1 },
                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
                                .of(.) }));
        spec.connect(ordJoinConnordScanner, 0, join, 1);
        IConnectorDescriptor custJoinConn = new MToNPartitioningConnectorDescriptor(spec,
                new FieldHashPartitionComputerFactory(new int[] { 0 },
                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
                                .of(.) }));
        spec.connect(custJoinConncustScanner, 0, join, 0);
        IOperatorDescriptor endingOp = join;
        if (hasGroupBy) {
            RecordDescriptor groupResultDesc = new RecordDescriptor(new ISerializerDeserializer[] {
            HashGroupOperatorDescriptor gby = new HashGroupOperatorDescriptor(
                    spec,
                    new int[] { 6 },
                    new FieldHashPartitionComputerFactory(new int[] { 6 },
                            new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
                                    .of(.) }),
                    new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(.) },
                    new MultiFieldsAggregatorFactory(
                            new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(true) }),
                    groupResultDesc, 16);
            createPartitionConstraint(specgbyresultSplits);
            IConnectorDescriptor joinGroupConn = new MToNPartitioningConnectorDescriptor(spec,
                    new FieldHashPartitionComputerFactory(new int[] { 6 },
                            new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
                                    .of(.) }));
            spec.connect(joinGroupConnjoin, 0, gby, 0);
            endingOp = gby;
        }
        IFileSplitProvider outSplitProvider = new ConstantFileSplitProvider(resultSplits);
        FrameFileWriterOperatorDescriptor writer = new FrameFileWriterOperatorDescriptor(specoutSplitProvider);
        createPartitionConstraint(specwriterresultSplits);
        IConnectorDescriptor endingPrinterConn = new OneToOneConnectorDescriptor(spec);
        spec.connect(endingPrinterConnendingOp, 0, writer, 0);
        spec.addRoot(writer);
        return spec;
    }
    private static void createPartitionConstraint(JobSpecification specIOperatorDescriptor opFileSplit[] splits) {
        String[] parts = new String[splits.length];
        for (int i = 0; i < splits.length; ++i) {
            parts[i] = splits[i].getNodeName();
        }
        PartitionConstraintHelper.addAbsoluteLocationConstraint(specopparts);
    }
    static class JoinComparatorFactory implements ITuplePairComparatorFactory {
        private static final long serialVersionUID = 1L;
        private final IBinaryComparatorFactory bFactory;
        private final int pos0;
        private final int pos1;
        public JoinComparatorFactory(IBinaryComparatorFactory bFactoryint pos0int pos1) {
            this. = bFactory;
            this. = pos0;
            this. = pos1;
        }
        @Override
            return new JoinComparator(.createBinaryComparator(), );
        }
    }
    static class JoinComparator implements ITuplePairComparator {
        private final IBinaryComparator bComparator;
        private final int field0;
        private final int field1;
        public JoinComparator(IBinaryComparator bComparatorint field0int field1) {
            this. = bComparator;
            this. = field0;
            this. = field1;
        }
        @Override
        public int compare(IFrameTupleAccessor accessor0int tIndex0IFrameTupleAccessor accessor1int tIndex1) {
            int tStart0 = accessor0.getTupleStartOffset(tIndex0);
            int fStartOffset0 = accessor0.getFieldSlotsLength() + tStart0;
            int tStart1 = accessor1.getTupleStartOffset(tIndex1);
            int fStartOffset1 = accessor1.getFieldSlotsLength() + tStart1;
            int fStart0 = accessor0.getFieldStartOffset(tIndex0);
            int fEnd0 = accessor0.getFieldEndOffset(tIndex0);
            int fLen0 = fEnd0 - fStart0;
            int fStart1 = accessor1.getFieldStartOffset(tIndex1);
            int fEnd1 = accessor1.getFieldEndOffset(tIndex1);
            int fLen1 = fEnd1 - fStart1;
            int c = .compare(accessor0.getBuffer().array(), fStart0 + fStartOffset0fLen0accessor1
                    .getBuffer().array(), fStart1 + fStartOffset1fLen1);
            if (c != 0) {
                return c;
            }
            return 0;
        }
    }
New to GrepCode? Check out our FAQ X