Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * (C) 2007-2012 Alibaba Group Holding Limited.
   * 
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   * 
   *      http://www.apache.org/licenses/LICENSE-2.0
   * 
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  * Authors:
  *   wuhua <wq163@163.com> , boyan <killme2008@gmail.com>
  */
 package com.taobao.metamorphosis.tools.monitor.offsetcompareprob;
 
 import java.util.List;
 
 
 
 public class OffsetCompareProber extends AbstractProber {
 	private final static Logger logger = Logger
 	private Query query;
 	private final List<ScheduledFuture<?>> futures = new ArrayList<ScheduledFuture<?>>();
 //	private Map<String, Long> serverMaxOffsetMap = new HashMap<String, Long>();
 	private final static String serverBigerFormat = "group:[%s]�ϵ�topic:[%S]��partition[%S]��metaServer:[%s]��offset[%S]��ZK�ϵ�offset:[%S]��[%S]";
 	private final static String zkBigerFormat = "group:[%s]�ϵ�topic:[%S]��partition[%S]��ZK�ϵ�offset:[%S]��metaServer:[%s]��offset[%S]��[%s]";
 	
 	public OffsetCompareProber(CoreManager coreManager) {
 		super(coreManager);
 	}
 
 	public void init() throws InitException {
 		this. = new Query();
 		
 		this..init(this.getMonitorConfig().getConfigPath(), null);
 
 	}
 
 	protected void doStopProb() {
 	}
 
 	protected void doProb() throws InterruptedException {
 				new ProbTask() {
 
 					protected void doExecute() throws Exception {
 						if (.isDebugEnabled()) {
 							.debug("offset prob...");
 						}
 					}
 
 					protected void handleException(Throwable e) {
 						.error("unexpected error in offset prob thread.",
 								e);
 					}
 
 //		 }, 0, this.getMonitorConfig().getOffsetProbCycleTime()*1000*5,
 //		 TimeUnit.MILLISECONDS));
 		.debug("offset prob started");
 
 	}
 
 	protected void probOnce() {
 //		serverMaxOffsetMap.clear();
 		List<StringconsumerGroups = this.
		for (String group : consumerGroups) {
			List<StringtopicsList = this..getTopicsExistOffset(group,
			for (String topic : topicsList) {
				List<Stringpartitions = this..getPartitionsOf(group,
						topic.);
				for (String partition : partitions) {
					// brokeId+topic��ɵ�key
					String key = partition+"_"+topic;
					String serverUrl = getServerUrl(partition);
					long zkOffset = ZkOffsetStorageQuery.parseOffsetAsLong(this..queryOffset(new OffsetQueryDO(topicgrouppartition..toString())));
					long serverMaxOffset;
//					if (null == serverMaxOffsetMap.get(key)) {
//						��server�˻�ȡoffset��ֵ
//						serverMaxOffset = getServerMaxOffset(serverUrl,topic,partition,serverMaxOffsetMap);
						serverMaxOffset = getServerMaxOffset(serverUrl,topic,partition);
//						serverMaxOffsetMap.put(key, serverMaxOffset);
//					} else {
//						serverMaxOffset = serverMaxOffsetMap.get(key);
//					}
					// �Ա�offset��������
					int offsetMaxGap = getOffsetMaxGap(topic);
					int zkMaxOverStep = this.getMonitorConfig().getZkMaxOverStep();
					String serverInfo = getServerInfo(partition);
					if (serverMaxOffset!=-1&&zkOffset-serverMaxOffset>zkMaxOverStep) {
						String msg = String.format(group,topic,partition,zkOffset,serverInfo,serverMaxOffset,zkOffset-serverMaxOffset);
						alertMsg(grouptopicmsg);
					}else if (serverMaxOffset!=-1&&serverMaxOffset - zkOffset > offsetMaxGap) {
						String msg = String.format(group,topic,partition,serverInfo,serverMaxOffset,zkOffset,serverMaxOffset - zkOffset);
						alertMsg(grouptopicmsg);
					}
				}
			}
		}
	}
	private void alertMsg(String groupString topicString msg) {
		//ʵ��topic��Ī��group�²������Ĺ��ܣ�ignoreTopic
		List<StringwwList = new ArrayList<String>();
		List<StringmobileList = new ArrayList<String>();
		List<StringdefaultWWList = this.getMonitorConfig()
		for (String ww : defaultWWList) {
			if (!wwList.contains(ww)) {
				wwList.add(ww);
			}
		}
		this.findAlertList(this
				topic,group,wwList);
		this.findAlertList(this
				topic,group,mobileList);
		.warn("alart to[" + wwList + "]mobiles["
mobileList + "]");
		Alarm.start().wangwangs(wwList).mobiles(mobileList)
				.alert(msg);
	}
	private int getOffsetMaxGap(String topic) {
		}else{
		}
	}
	private long getServerMaxOffset(String serverUrlfinal String topic,String partition) {
		final List<LongserverMaxOffsetList = new ArrayList<Long>();
//		String key=partition+"_"+topic;
		final String partitionId = partition.substring(partition.indexOf("-")+1, partition.length());
		MsgSender msgSender = this..getSender(serverUrl);
//		final String brokerId = partition.substring(0,partition.indexOf("-"));
        StatsResult ret = msgSender.getStats("offsets", 5000);
        if (ret.isSuccess()) {
        	try {
                Utils.processEachLine(ret.getStatsInfo(), new Action() {
                	@Override
                    public void process(String line) {
                        String[] tmp = StringUtils.splitByWholeSeparator(line" ");
                        if (tmp != null && tmp.length == 7) {
                            if(topic.equals(tmp[0])&&partitionId.equals(tmp[2])){
                        	  	serverMaxOffsetList.add(Long.parseLong(tmp[6]));
                            	return;
                            }
                        }
                    }
                });
            }
            catch (IOException e) {
                .error("IOException",e);
            }
        }
        if(null!=serverMaxOffsetList&&serverMaxOffsetList.size()>0){
        	return serverMaxOffsetList.get(0);
        }else{
        	return -1;
        }
        
	}
	private String getServerInfo(String partition) {
		int brokeId = Integer.parseInt(partition.substring(0,
				partition.indexOf("-")));
		List<MetaServermetaServerList = this.getMonitorConfig()
		for (MetaServer metaServer : metaServerList) {
			if (metaServer.getBrokeId() == brokeId) {
				return metaServer.getHostIp() + ","metaServer.getHostName();
			}
		}
		return "δ֪brokeId";
	}
	private String getServerUrl(String partition) {
		int brokeId = Integer.parseInt(partition.substring(0,
				partition.indexOf("-")));
		List<MetaServermetaServerList = this.getMonitorConfig()
		for (MetaServer metaServer : metaServerList) {
			if (metaServer.getBrokeId() == brokeId) {
				return metaServer.getUrl();
			}
		}
		return "δ֪brokeId";
	}
	public void findAlertList(List<GroupgroupListString alertKind,
			String topicString groupStrList<String>alertList) {
		if (groupList == null || groupList.isEmpty()) {
			return ;
		}
		for (Group group : groupList) {
			if(group.getGroup().equals(groupStr)){
				for(String ignoreTopic : group.getIgnoreTopicList()){
					if(ignoreTopic.equals(topic)){
						alertList.clear();
						return;
					}
				}
				for(String filterTopic : group.getTopicList()){
					if(filterTopic.equals(topic)){
						if("ww".equals(alertKind)){
							for(String ww:group.getWwList()){
								if(!alertList.contains(ww)){
									alertList.add(ww);
								}
							}
						}else{
							for(String mobile:group.getMobileList()){
								if(!alertList.contains(mobile)){
									alertList.add(mobile);
								}
							}
						}
						return;
					}
				}
			}
		}
	}
New to GrepCode? Check out our FAQ X