Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * (C) 2007-2012 Alibaba Group Holding Limited.
   * 
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   * 
   *      http://www.apache.org/licenses/LICENSE-2.0
   * 
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  * Authors:
  *   wuhua <wq163@163.com> , boyan <killme2008@gmail.com>
  */
 package com.taobao.metamorphosis.tools.shell;
 
 import java.util.List;
 import java.util.Map;
 
 
 usage:
      CopyOffsetInZk -topic xxtopic -src 1 -target 2 -start 5 -end 10
      CopyOffsetInZk -topic xxtopic -src 1 -target 2 -start 5 -end 10 -targetStart 2
 

Author(s):
�޻�
Since:
2011-8-24 ����10:19:30
 
 
 public class CopyOffsetInZk extends ShellTool {
     private final Query query;
     private final ZkManager zkManager;
 
 
     public static void main(String[] argsthrows Exception {
         new CopyOffsetInZk(.).doMain(args);
         System.exit(0);
     }
 
 
     CopyOffsetInZk(PrintStream outthrows InitException {
         super(out);
         this. = new Query();
         this..init("zk.properties"null);
         this. = new ZkManager(this.);
     }
 
 
     public CopyOffsetInZk(String configFilePrintWriter outthrows InitException {
         super(out);
         this. = new Query();
         this..init(configFilenull);
         this. = new ZkManager();
     }
 
 
     @Override
     public void doMain(String[] argsthrows Exception {
 
         CommandLine commandLine = this.getCommandLine(args);
         String topic = commandLine.getOptionValue("topic");
         int src = Integer.parseInt(commandLine.getOptionValue("src"));
         int target = Integer.parseInt(commandLine.getOptionValue("target"));
         int start = Integer.parseInt(commandLine.getOptionValue("start"));
         int end = Integer.parseInt(commandLine.getOptionValue("end"));
         int targetStart = Integer.parseInt(commandLine.getOptionValue("targetStart""0"));
 
         this.checkArg(topicsrctargetstartend);
 
         List<Stringgroups = this..getConsumerGroups(.);
         Map<PartitionPartitionsrcTargetPartitionMap =
                 this.getSrcTargetPartitionMap(this.getSourcePartitions(srcstartend), targettargetStart);
         for (String group : groups) {
 
             for (Map.Entry<PartitionPartitionentry : srcTargetPartitionMap.entrySet()) {
                 Partition oldPartition = entry.getKey();
                 Partition newPartition = entry.getValue();
 
                String srcOffset = this..queryOffset(new OffsetQueryDO(topicgroupoldPartition.toString(), "zk"));
                if (!StringUtils.isBlank(srcOffset)) {
                    if (!StringUtils.isBlank(this..queryOffset(new OffsetQueryDO(topicgroupnewPartition.toString(), "zk")))) {
                        this.println("topic=" + topic + ",group=" + group + ",partition[" + newPartition
                                + "] offset �Ѿ�����");
                        continue;
                    }
                    this..setOffset(topicgroupnewPartitionsrcOffset);
                    this.println("copy offset successed for topic[" + topic + "], group=" + group + ", " + oldPartition
                            + "-->" + newPartition + ", offset=" + srcOffset);
                }
                else {
                    this.println("topic=" + topic + ",group=" + group + ",partition[" + oldPartition
                            + "] offset �����ڻ��ѯ����");
                }
            }
        }
    }
    //
    // private Map<String, String> getOffsetPathMap(String group, String topic,
    // Map<Partition, Partition> srcTargetPartitionMap) {
    //
    // Map<String, String> map = new LinkedHashMap<String, String>();
    //
    // for (Map.Entry<Partition, Partition> entry :
    // srcTargetPartitionMap.entrySet()) {
    // map.put(Query.getOffsetPath(group, topic, entry.getKey()),
    // Query.getOffsetPath(group, topic, entry.getValue()));
    // }
    // return map;
    // }
    private List<PartitiongetSourcePartitions(int brokerIdint startPartitionNoint endPartitionNo) {
        List<Partitionlist = new LinkedList<Partition>();
        for (int i = startPartitionNoi <= endPartitionNoi++) {
            list.add(new Partition(brokerIdi));
        }
        return list;
    }
    private Map<PartitionPartitiongetSrcTargetPartitionMap(List<PartitionsrcPartitionsint targetint start) {
        Map<PartitionPartitionmap = new TreeMap<PartitionPartition>();
        for (int i = 0; i < srcPartitions.size(); i++) {
            map.put(srcPartitions.get(i), new Partition(targeti + start));
        }
        return map;
    }
    private CommandLine getCommandLine(String[] args) {
        Option topicOption = new Option("topic"true"topic");
        topicOption.setRequired(true);
        Option fromOption = new Option("src"true"source broker id");
        fromOption.setRequired(true);
        Option toOption = new Option("target"true"target broker id");
        toOption.setRequired(true);
        Option startOption = new Option("start"true"start partition number");
        startOption.setRequired(true);
        Option endOption = new Option("end"true"end partition number");
        endOption.setRequired(true);
        Option targetStartOption = new Option("targetStart"true"Ŀ��������ʼ���");
        return CommandLineUtils.parseCmdLine(argsnew Options().addOption(topicOption).addOption(fromOption)
            .addOption(toOption).addOption(startOption).addOption(endOption).addOption(targetStartOption));
    }
    private void checkArg(String topicint srcint targetint startint end) {
        if (StringUtils.isBlank(topic)) {
            throw new IllegalArgumentException("topic is blank");
        }
        if (src < 0 || target < 0) {
            throw new IllegalArgumentException("src and target must not less than 0");
        }
        if (src == target) {
            throw new IllegalArgumentException("src equals target");
        }
        if (start > end) {
            throw new IllegalArgumentException("start less then end");
        }
    }
    Query getQuery() {
        return this.;
    }
New to GrepCode? Check out our FAQ X