Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * (C) 2007-2012 Alibaba Group Holding Limited.
   * 
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   * 
   *      http://www.apache.org/licenses/LICENSE-2.0
   * 
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  * Authors:
  *   wuhua <wq163@163.com> , boyan <killme2008@gmail.com>
  */
 package com.taobao.metamorphosis.tools.monitor.msgprobe;
 
 import java.util.List;
 

Author(s):
�޻�
Since:
2011-5-24 ����01:52:50
 
 
 public class MsgProber extends AbstractProber {
 
     // private static Logger logger = Logger.getLogger(MsgProber.class);
 
     private final List<ProbeListenerprobeListeners = new ArrayList<ProbeListener>();
 
 
     private volatile AtomicBoolean isInited = new AtomicBoolean(false);
 
 
     public MsgProber(CoreManager coreManager) {
         super(coreManager);
     }
 
 
     public void init() throws InitException {
         if (this..compareAndSet(falsetrue)) {
             try {
                 this. = new InnerProbeListener();
 
                 this..info("publish topics...");
                 this.publishTopics(this.getSenders());
                 this.addListener(new DefaultProbeListener());
                 this.addListener(new AlarmProbeListener(this.getMonitorConfig()));
             }
             catch (Exception e) {
                 throw new InitException("unexpected errer at init"e);
             }
         }
     }
 
     private final class ProbOneBrokerTask extends ProbTask {
         MsgSender sender;
         MsgReceiver receiver;
         MsgProber prober;
 
 
         ProbOneBrokerTask(MsgSender senderMsgReceiver receiverMsgProber prober) {
             this. = sender;
             this. = receiver;
             this. = prober;
         }
 
 
         @Override
         protected void doExecute() throws Exception {
             if (MsgProber.this.getLogger().isDebugEnabled()) {
                 MsgProber.this.getLogger().debug("msg prob...");
             }
             this..prob(this.this.);
         }
 
 
         @Override
         protected void handleException(Throwable e) {
             MsgProber.this..error(
                 "unexpected error in msg prob thread. broker server: " + this..getServerUrl(), e);
         }
    }
    private final List<ScheduledFuture<?>> futures = new ArrayList<ScheduledFuture<?>>();
    @Override
    public void doProb() throws InterruptedException {
        // ��֤һ��sender��reveicerͬһʱ����ֻ����һ���߳���ʹ��
        StringBuilder sb = new StringBuilder();
        for (int i = 0; i < this.getSenders().lengthi++) {
            this..add(this.getProberExecutor().scheduleWithFixedDelay(
                new ProbOneBrokerTask(this.getSenders()[i], this.getReveicers()[i], this), 0,
                this.getMonitorConfig().getMsgProbeCycleTime(), .));
            sb.append("msg probe started:").append(this.getSenders()[i].getServerUrl()).append("\n");
        }
        this..info(sb.toString());
    }
    @Override
    protected void doStopProb() {
        cancelFutures(this.);
    }
    public void addListener(ProbeListener listener) {
        this..add(listener);
    }
    private void prob(MsgSender senderMsgReceiver receiverthrows InterruptedException {
        SendResultWrapper sendResult = sender.sendMessage(this.getMonitorConfig().getSendTimeout());
        ProbContext probContext = new ProbContext();
        probContext.lastSendTime = System.currentTimeMillis();
        probContext.sendResult = sendResult;
        if (!sendResult.isSuccess()) {
            this..onSendFail(sendersendResult);
            return;
        }
        Thread.sleep(this.getMonitorConfig().getProbeInterval());
        // receive����ֱ������,.
        // ֱ������ʱ���ص�patitionΪbrokerId=-1,patition=��ʵ��patition
        ReveiceResult reveiceResult = receiver.get(sender.getTopic(), sendResult.getSendResult().getPartition());
        probContext.lastRevTime = System.currentTimeMillis();
        probContext.reveiceResult = reveiceResult;
        if (!reveiceResult.isSuccess()) {
            this..onReceiveFail(probContext);
        }
        // ��һ�ν�����Ϣ��offset����һ�η��͵�offsetλ�ÿ�ʼ
        receiver.setOffset(sendResult.getSendResult().getPartition(), sendResult.getSendResult().getOffset());
    }
    private void publishTopics(MsgSender[] senders) {
        for (int i = 0; i < senders.lengthi++) {
            senders[i].publish();
        }
    }
    final private class InnerProbeListener extends ProbeListener {
        @Override
        public void onSendFail(MsgSender senderSendResultWrapper result) {
            for (ProbeListener probeListener : MsgProber.this.) {
                try {
                    probeListener.onSendFail(senderresult);
                }
                catch (Exception e) {
                    MsgProber.this..error("���?��ʧ���¼�ʱ�������," + probeListener.getClass(), e);
                }
            }
        }
        @Override
        public void onReceiveFail(ProbContext probContext) {
            for (ProbeListener probeListener : MsgProber.this.) {
                try {
                    probeListener.onReceiveFail(probContext);
                }
                catch (Exception e) {
                    MsgProber.this..error("�������ʧ���¼�ʱ�������," + probeListener.getClass(), e);
                }
            }
        }
    }
    static public class ProbContext {
        long lastSendTime;// ��һ�η�����Ϣ��ʱ��,����
        long lastRevTime// ���ν��յ���Ϣ��ʱ��,����
        public long getSendRevInterval() {
            return (this. - this.) / 1000;
        }
        public SendResultWrapper getSendResult() {
            return this.;
        }
        public void setSendResult(SendResultWrapper sendResult) {
            this. = sendResult;
        }
        public ReveiceResult getReveiceResult() {
            return this.;
        }
        public void setReveiceResult(ReveiceResult reveiceResult) {
            this. = reveiceResult;
        }
        public long getLastSendTime() {
            return this.;
        }
        public void setLastSendTime(long lastSendTime) {
            this. = lastSendTime;
        }
        public long getLastRevTime() {
            return this.;
        }
        public void setLastRevTime(long lastRevTime) {
            this. = lastRevTime;
        }
    }
New to GrepCode? Check out our FAQ X