Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
 
 
 package org.apache.tez.dag.history.logging.ats;
 
 import java.util.List;
 
 
 
 
   private static final Log LOG = LogFactory.getLog(ATSHistoryLoggingService.class);
 
       new LinkedBlockingQueue<DAGHistoryEvent>();
 
   private Thread eventHandlingThread;
   private AtomicBoolean stopped = new AtomicBoolean(false);
   private int eventCounter = 0;
   private int eventsProcessed = 0;
   private final Object lock = new Object();
 
 
   private HashSet<TezDAGIDskippedDAGs = new HashSet<TezDAGID>();
   private long maxTimeToWaitOnShutdown;
   private boolean waitForeverOnShutdown = false;
 
   private int maxEventsPerBatch;
   private long maxPollingTimeMillis;
 
   public ATSHistoryLoggingService() {
     super(ATSHistoryLoggingService.class.getName());
   }
 
   @Override
   public void serviceInit(Configuration confthrows Exception {
     .info("Initializing ATSService");
      = TimelineClient.createTimelineClient();
     .init(conf);
      = conf.getInt(
      = conf.getInt(
     if ( < 0) {
        = true;
     }
   }
 
   @Override
   public void serviceStart() {
     .info("Starting ATSService");
     .start();
 
      = new Thread(new Runnable() {
       @Override
       public void run() {
         List<DAGHistoryEventevents = new LinkedList<DAGHistoryEvent>();
        boolean interrupted = false;
        while (!.get() && !Thread.currentThread().isInterrupted()
              && !interrupted) {
          // Log the size of the event-queue every so often.
          if ( != 0 &&  % 1000 == 0) {
            .info("Event queue stats"
                + ", eventsProcessedSinceLastUpdate=" + 
                + ", eventQueueSize=" + .size());
             = 0;
             = 0;
          } else {
            ++;
          }
          synchronized () {
            try {
              getEventBatch(events);
            } catch (InterruptedException e) {
              // Finish processing events and then return
              interrupted = true;
            }
            if (events.isEmpty()) {
              continue;
            }
             += events.size();
            try {
              handleEvents(events);
            } catch (Exception e) {
              .warn("Error handling events"e);
            }
          }
        }
      }
    }, "HistoryEventHandlingThread");
  }
  public void serviceStop() {
    .info("Stopping ATSService"
        + ", eventQueueBacklog=" + .size());
    .set(true);
    if ( != null) {
    }
    synchronized () {
      if (!.isEmpty()) {
        .warn("ATSService being stopped"
            + ", eventQueueBacklog=" + .size()
            + ", maxTimeLeftToFlush=" + 
            + ", waitForever=" + );
        long startTime = .getClock().getTime();
        long endTime = startTime + ;
        List<DAGHistoryEventevents = new LinkedList<DAGHistoryEvent>();
        while ( || (endTime >= .getClock().getTime())) {
          try {
            getEventBatch(events);
          } catch (InterruptedException e) {
            .info("ATSService interrupted while shutting down. Exiting."
                  + " EventQueueBacklog=" + .size());
          }
          if (events.isEmpty()) {
            .info("Event queue empty, stopping ATS Service");
            break;
          }
          try {
            handleEvents(events);
          } catch (Exception e) {
            .warn("Error handling event"e);
            break;
          }
        }
      }
    }
    if (!.isEmpty()) {
      .warn("Did not finish flushing eventQueue before stopping ATSService"
          + ", eventQueueBacklog=" + .size());
    }
  }
  private void getEventBatch(List<DAGHistoryEventeventsthrows InterruptedException {
    events.clear();
    int counter = 0;
    while (counter < ) {
      if (event == null) {
        break;
      }
      if (!isValidEvent(event)) {
        continue;
      }
      ++counter;
      events.add(event);
        // Special case this as it might be a large payload
        break;
      }
    }
  }
  public void handle(DAGHistoryEvent event) {
    .add(event);
  }
  private boolean isValidEvent(DAGHistoryEvent event) {
    HistoryEventType eventType = event.getHistoryEvent().getEventType();
    TezDAGID dagId = event.getDagID();
    if (eventType.equals(.)) {
      DAGSubmittedEvent dagSubmittedEvent =
          (DAGSubmittedEventevent.getHistoryEvent();
      String dagName = dagSubmittedEvent.getDAGName();
      if (dagName != null
          && dagName.startsWith(
        // Skip recording pre-warm DAG events
        .add(dagId);
        return false;
      }
    }
    if (eventType.equals(.)) {
      // Remove from set to keep size small
      // No more events should be seen after this point.
      if (.remove(dagId)) {
        return false;
      }
    }
    if (dagId != null && .contains(dagId)) {
      // Skip pre-warm DAGs
      return false;
    }
    return true;
  }
  private void handleEvents(List<DAGHistoryEventevents) {
    TimelineEntity[] entities = new TimelineEntity[events.size()];
    for (int i = 0; i < events.size(); ++i) {
      entities[i] = HistoryEventTimelineConversion.convertToTimelineEntity(
          events.get(i).getHistoryEvent());
    }
    if (.isDebugEnabled()) {
      .debug("Sending event batch to Timeline, batchSize=" + events.size());
    }
    try {
      TimelinePutResponse response =
          .putEntities(entities);
      if (response != null
        && !response.getErrors().isEmpty()) {
        int count = response.getErrors().size();
        for (int i = 0; i < count; ++i) {
          TimelinePutError err = response.getErrors().get(i);
          if (err.getErrorCode() != 0) {
            .warn("Could not post history event to ATS"
                + ", atsPutError=" + err.getErrorCode()
                + ", entityId=" + err.getEntityId());
          }
        }
      }
      // Do nothing additional, ATS client library should handle throttling
      // or auto-disable as needed
    } catch (Exception e) {
      .warn("Could not handle history events"e);
    }
  }
New to GrepCode? Check out our FAQ X