Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /* ************************************************************************
  #
  #  DivConq
  #
  #  http://divconq.com/
  #
  #  Copyright:
  #    Copyright 2014 eTimeline, LLC. All rights reserved.
  #
 #  License:
 #    See the license.txt file in the project's top-level directory for details.
 #
 #  Authors:
 #    * Andy White
 #
 ************************************************************************ */
 package divconq.scheduler;
 
 
 
Handles scheduling application tasks.

Author(s):
Andy White
 
 // TODO add tracing settings
 public class Scheduler {
 	// the first node in the list of scheduled nodes - the head of the list is moved
 	// forward as the items on the list get scheduled.  List is single linked list.
 	protected SchedulerNode first = null;
 
 	// how many are currently in the linked list?
 	protected long nodeCnt = 0;
 	
 	// lock during adding and removing of scheduled work.  all add and remove operations
 	// are thread safe
 	protected ReentrantLock lock = new ReentrantLock();
 	
 	protected ScheduledFuture<?> clock = null;
 	
 	
 	protected ISchedulerDriver driver = null;
 	
 	public LimitHelper getBatch(String name) {
 		return this..get(name);
 	}
 	
 	public void init(OperationResult orXElement config) {
 		// TODO
 
 		if (config != null) {
 			for (XElement el : config.selectAll("Batch")) {
 				XElement bel = el.find("Limits");
 				String name = el.getAttribute("Name");
 				
 				if (StringUtil.isNotEmpty(name) && (bel != null)) {
 					LimitHelper h = new LimitHelper();
 					h.init(bel);
 					this..put(nameh);
 				}				
 			}
 			
 			// setup the provider of the work queue
 			String classname = config.getAttribute("InterfaceClass");
 			
 			if (StringUtil.isEmpty(classname))
 				classname = "divconq.scheduler.LocalSchedulerDriver";
 			
 			if (StringUtil.isNotEmpty(classname)) {
 				Object impl =  ..getInstance(classname);		
 				
 				if ((impl == null) || !(impl instanceof ISchedulerDriver)) {
 					or.errorTr(227, classname);
					return;
				}
				this. = (ISchedulerDriver)impl;
				this..init(orconfig);
			}
		}
	}
	public void start(OperationResult or) {
			public void run(SysReporter reporter) {
			}
			public int period() {
				return 1;
			}
		});
			public void run(SysReporter reporter) {
				reporter.setStatus("before schedule update");
				// TODO check for updates to the schedule
				reporter.setStatus("after schedule update");
			}
			public int period() {
				return 5;
			}
		});
		if (this. != null) {
			or.copyMessages(loadres);
			if (loadres.isNotEmptyResult()) {				
				loadres.getResult().recordStream().forEach(rec -> {
					XElement schedule = rec.getFieldAsXml("Schedule");
					ISchedule sched = "CommonSchedule".equals(schedule.getName()) ? new CommonSchedule() : new SimpleSchedule();
					sched.init(schedule);
					sched.setTask(new Task()
						.withId(divconq.session.Session.nextTaskId("ScheduleLoader"))
						.withTitle("Scheduled Task Loader: " + rec.getFieldAsString("Title"))
						.withWork(trun -> {
								trun.copyMessages(loadres2);
								if (loadres2.isNotEmptyResult()) {
									ScheduleEntry entry = loadres2.getResult();
									entry.setSchedule(sched);
									entry.submit(trun);
								}
								// we are done, no need to wait 
								trun.complete();
						})
					);
					Scheduler.this.addNode(sched);
				});
			}
		}
	}
	public void stop(OperationResult or) {
		// TODO
		if (this. != null)
			this..cancel(false);
	}
	public long size() {
		return this.;
	}
	// the scheduler runs on its own thread, this is the code that starts and runs the scheduler
	private void execute() {
		OperationContext.useHubContext();
		long loadcnt = this.;
		this..lock();
		try {
			SchedulerNode curr = this.;
			long now = DateTimeUtils.currentTimeMillis();
			//System.out.println(new DateTime() +  " - scheduler - " + new DateTime(now) + " > " + new DateTime(curr.when));
			while ((curr != null) && (curr.when <= now)) {
				//System.out.println("Scheduled node: " + curr.scheduler.isCanceled());
				if (!curr.scheduler.isCanceled())
					p.submit(curr.taskcurr.scheduler);
				curr = curr.next;
				this. = curr;
				this.--;
			}
			loadcnt = this.;
		}
		catch(Exception x) {
			// TODO trace/log
		}		
		finally {
			this..unlock();
		}
		// it is possible due to race conditions to get a mis-ordered value in the counter
		// a) it doesn't matter 99.99999999% of the time, b) we cannot afford to do this in the lock
		..getCountManager().allocateSetNumberCounter("dcSchedulerLoad"loadcnt);
	}
	// add a work unit to run (almost) immediately - much the same as directly adding to the thread pool
	// any work unit submitted to the scheduler (or to any thread pool) will become owned by the
	// scheduler (or thread pool).
    public ISchedule runNow(Task work) {
    	return this.addNode(new SimpleSchedule(work, DateTimeUtils.currentTimeMillis(), 0));
    }
	// run the work unit once in Sec seconds from now
	public ISchedule runIn(Task workint secs) {
		return this.addNode(new SimpleSchedule(work, DateTimeUtils.currentTimeMillis() + (1000 * secs), 0));		
	}
	// run the work unit once at the specified time.  If less than now then submits immediately
	// if in the distant future, it may not be run if the process is terminated.  adding working
	// to the schedule is no guarantee work will be run.
	public ISchedule runAt(Task workReadableInstant time) {
		return this.addNode(new SimpleSchedule(worktime.getMillis(), 0));				
	}
	public ISchedule runAt(Task workLocalDate datePeriod period) {
		LocalDateTime ldt = date.toLocalDateTime(new LocalTime(0, 0).plus(period));
		return this.runAt(workldt.toDateTime());
	}
	// run the work unit repeatedly, every Secs seconds - note scheduler will not own work, you have to keep track of it
	public ISchedule runEvery(Task workint secs) {
		return this.addNode(new SimpleSchedule(work, DateTimeUtils.currentTimeMillis() + (1000 * secs), secs));		
	}
	public ISchedule addNode(ISchedule schedule) {
		long when = schedule.when();
		if (when < 0)
			return null;
		if ((schedule.task() == null) || (schedule.task().getContext() == null)) {
			Logger.warn("Schedule missing task or context: " + schedule.task());
			return null;
		}
		long loadcnt = this.;
		this..lock();
        try {
        	SchedulerNode snode = new SchedulerNode();
        	snode.task = schedule.task();
        	snode.when = when;
        	snode.scheduler = schedule;
			SchedulerNode curr = this.;
            SchedulerNode last = null;
			this.++;
			// loop through the scheduling linked list and find the right place to insert
			// the new TScheduleNode
            while (curr != null) {
				if (snode.when < curr.when) {
                    snode.next = curr;
					if (last == null
						this. = snode;
					else 
						last.next = snode;
                    return schedule;
                }
                last = curr;
				curr = curr.next;
            }
            // none found then add to end
			if (last == null
				this. = snode;
			else 
				last.next = snode;
			loadcnt = this.;
        }
        catch(Exception x) {
        	// TODO
            return null;
        }
        finally {
        	this..unlock();
        }
		// it is possible due to race conditions to get a mis-ordered value in the counter
		// a) it doesn't matter 99.99999999% of the time, b) we cannot afford to do this in the lock
		..getCountManager().allocateSetNumberCounter("dcSchedulerLoad"loadcnt);
        return schedule;
	}
	public class SchedulerNode {
		protected SchedulerNode next = null
		protected long when = 0;	
		protected Task task = null;
		protected ISchedule scheduler = null;
	}
	public void dump() {
		SchedulerNode curr = this.;
        while (curr != null) {
    		Logger.info("     + " + curr.task.getTitle());
        	
			curr = curr.next;
        }
	}
New to GrepCode? Check out our FAQ X