Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
amass - web crawling made easy Copyright (c) 2011-2013, Sandeep Gupta http://www.sangupta/projects/amass Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
 
 
 package com.sangupta.amass.impl;
 
 
 
A priority based queue, that collects all URLs that need to be crawled. All crawling threads use this queue instance to work upon. This queue uses an integer priority to sort items. The higher the priority value the earlier the URL will be crawled.

Author(s):
sangupta
 
 public class CrawlingQueue {
 	
 	private static final Logger LOGGER = LoggerFactory.getLogger(CrawlingQueue.class);

The default priority of any item when this is added to the internal queue.
 
 	private static final int DEFAULT_PRIORITY = 1;

The map that maps the URL to a crawling job so that the same URL does not get crawled by two different threads at the same go.
 
 	private final ConcurrentHashMap<StringCrawlJobjobs;

The embeddded priority queue that serves worker threads.
 
 	private final BlockingQueue<CrawlJobinternalQueue;

A blocking queue that is provided from outside.
 
 	private final BlockingQueue<ObjectexternalQueue;

Converter to use to read from the externalQueue.
 
 	@SuppressWarnings("rawtypes")
The signal object that let's workers and everyone know if the jobs are paused, resumed, or stopped.
 
 	private final AmassSignal amassSignal;

Indicates if a closure of this queue has been seeked.
 
 	private volatile boolean closureSeeked;

Constructor that creates an object of the crawling queue.

Parameters:
externalQueue the backing queue from which we read crawling jobs
queueMessageConverter the converter to read object from queue and convert it to a com.sangupta.amass.domain.CrawlableURL object
amassSignal the com.sangupta.amass.domain.AmassSignal object that will be sending us the signals
	public CrawlingQueue(BlockingQueue<ObjectexternalQueueQueueMessageConverter<? extends ObjectqueueMessageConverterAmassSignal amassSignal) {
		this. = amassSignal;
		if(externalQueue != null) {
			if(queueMessageConverter == null) {
				throw new IllegalArgumentException("QueueMessageConverter cannot be null when specifying an external queue.");
			}
			this. = null;
			this. = externalQueue;
			this. = queueMessageConverter;
			this. = null;
else {
			if(queueMessageConverter != null) {
				throw new IllegalArgumentException("QueueMessageConverter must be null when using an internal queue.");
			}
			this. = null;
			this. = null;
		}
	}

Submit the given URL to the crawling queue with default priority.

Parameters:
url the URL that needs to be added to crawling queue
Returns:
true if the URL was added to the queue, false otherwise
	public boolean submitURL(String url) {
		return this.submitURL(url);
	}

Submit the given URL to the crawling queue with the given priority.

Parameters:
url the URL that needs to be added to crawling queue
priority the priority which needs to used for adding
Returns:
true if the URL was added to the queue, false otherwise
	public boolean submitURL(final String urlfinal int priority) {
		if(url == null) {
			return false;
		}
		return submitURL(new DefaultCrawlableURL(url), priority);
	}

Submit the given instance of com.sangupta.amass.domain.CrawlableURL object with default priority to the crawling queue

Parameters:
crawlableURL the object that needs to be added
Returns:
true if the URL was added to the queue, false otherwise
	public boolean submitURL(CrawlableURL crawlableURL) {
		return this.submitURL(crawlableURL);
	}

Submit the given instance of com.sangupta.amass.domain.CrawlableURL object with the given priority to the crawling queue

Parameters:
crawlableURL the object that needs to be added
priority the priority with which to add the instance
Returns:
true if the URL was added to the queue, false otherwise
	public boolean submitURL(final CrawlableURL crawlableURLfinal int priority) {
		if(this. == null) {
			throw new IllegalArgumentException("Jobs can only be submitted to internal queue implementations.");
		}
		if(crawlableURL == null) {
			return false;
		}
		CrawlJob job = this..get(crawlableURL.getURL());
		if(job == null) {
			job = new CrawlJob(crawlableURLpriority);
		}
		CrawlJob previous = this..putIfAbsent(crawlableURL.getURL(), job);
		if(previous == null) {
			// no previous jobs
			// submit this one up
else {
			// there seems to be a job previously submitted
			// let's increase its priority
			previous.incrementPriority(priority);
		}
		return true;
	}

Get a crawling job out of this CrawlingQueue. If no element is available in this queue, this method will wait till one is available. If this queue is shutting down, it will return a null.

Returns:
an instance of the com.sangupta.amass.domain.CrawlJob once it is available in the queue
	@SuppressWarnings("unchecked")
	public CrawlJob take() {
		CrawlJob job = null;
		do {
				// read from the internal queue if we are using one
				job = this..poll();
else {
				// else read from the external queue
				try {
					Object message = this..take();
					.debug("Message received from external queue: {}"message);
					if(message != null) {
						CrawlableURL crawlableURL = null;
						// convert the message
						try {
							crawlableURL = this..convert(message);
catch(Exception e) {
							.error("Unable to convert message to crawlable url: " + messagee);
						}
						// if the obtained URL is not null, return back
						// us a crawling job
						if(crawlableURL != null) {
							job = new CrawlJob(crawlableURL);
else {
							.error("NULL translated as message over the queue: {}"message);
						}
					}
catch (InterruptedException e) {
					.error("Interrupted while waiting to read message"e);
				}
			}
			// see if we are stopping by
			if(this..isStopping()) {
				.debug("Skipping message because stopping signal sent: {}"job);
				return null;
			}
			// any null attributes indicate that there is nothing
			// in the queue and we might need to wait more.
			if(job != null) {
				break;
			}
			if(this.) {
				.debug("Skipping message because closure seeked: {}"job);
				return null;
			}
			// wait for 100 millis before retrying
			try {
				Thread.sleep(100);
catch (InterruptedException e) {
				// eat up
			}
while(true);
		// remove from the jobs map
		if(this. != null) {
		}
		return job;
	}

Output the debug information on all jobs. This works only for all internal jobs.
	public void debugJobInfo() {
			return;
		}
		for (Enumeration<CrawlJobmyJobs = this..elements(); myJobs.hasMoreElements(); ) {
			CrawlJob myJob = myJobs.nextElement();
			..println("URL " + myJob.getCrawlableURL().getURL() + " with priority of " + myJob.getPriority().get());
		}
	}

Specifies if we are running using an internal queue backed implementation.

Returns:
true if we are running over an internal queue, false if we are running over an external queue supplied by the calling code
	public boolean isInternalQueueBacked() {
		return this. != null;
	}

Clear all pending internal jobs and close it out. We do not clean up any external queue that is provided, and it's responsibility lies with the using application.
	private void clearAllJobs() {
		}
		if(this. != null) {
			this..clear();
		}
	}

Wait for the closure of this queue. The closure time is the time till all jobs have been read from this queue.

Parameters:
clearJobs if set to true all pending jobs are deleted before we wait for completion of currently running jobs
	public void waitForClosure(boolean clearJobs) {
		if(clearJobs) {
		}
		// we need not wait for any external queue to close
		// and thus we can shutdown immediately when using such
		// a queue.
			this. = true;
			return;
		}
		// we are using an internal queue, we must wait
		// till it gets cleared up
		CrawlJob job = null;
		do {
			job = this..peek();
			if(job == null) {
				break;
			}
			// wait for 250ms
			try {
				Thread.sleep(250);
catch (InterruptedException e) {
				// eat up
			}
while(true);
		this. = true;
	}

Check if we have a job available in the actual queue over which this CrawlingQueue instance is based.

Returns:
true if the queue has elements, false otherwise
	public boolean hasJob() {
		if(this. != null) {
			return !this..isEmpty();
		}
		return !this..isEmpty();
	}
New to GrepCode? Check out our FAQ X