package votorola.a.diff.harvest.run; // Copyright 2012. Christian Weilbach. Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Votorola Software"), to deal in the Votorola Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicence, and/or sell copies of the Votorola Software, and to permit persons to whom the Votorola Software is furnished to do so, subject to the following conditions: The preceding copyright notice and this permission notice shall be included in all copies or substantial portions of the Votorola Software. THE VOTOROLA SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE VOTOROLA SOFTWARE OR THE USE OR OTHER DEALINGS IN THE VOTOROLA SOFTWARE. import static java.util.concurrent.TimeUnit.SECONDS; import java.io.IOException; import java.util.Collections; import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Level; import java.util.logging.Logger; import org.apache.http.HttpEntity; import org.apache.http.HttpResponse; import org.apache.http.client.methods.HttpGet; import org.apache.http.concurrent.FutureCallback; import org.apache.http.impl.nio.client.DefaultHttpAsyncClient; import org.apache.http.nio.client.HttpAsyncClient; import org.apache.http.nio.reactor.IOReactorException; import org.apache.http.params.CoreConnectionPNames; import org.apache.http.params.CoreProtocolPNames; import org.apache.http.params.HttpParams; import org.apache.http.params.SyncBasicHttpParams; import votorola.g.lang.ThreadSafe; import votorola.g.logging.LoggerX; /** * Service to schedule HTTP traffic asynchronously and gracefully on a per * archive basis. The stepping is one second, fetchs are appended at the end of * the future queue sorted by each individual base-url of the forum. */ public @ThreadSafe class HarvestRunner { /** * Manage queues for each host and forum. * */ private final class Queue { public final List queue = Collections .synchronizedList(new LinkedList()); } /** * Internal runtime counter through the stepping. */ private final static AtomicLong counter = new AtomicLong(0L); /** * Number of available processors on runtime. */ private final static int CPUS = Runtime.getRuntime().availableProcessors(); private static HarvestRunner instance; final static Logger LOGGER = LoggerX.i(HarvestRunner.class); /** * Version advocated by the HTTP-client. */ public final static String VERSION = "0.0.1"; /** * Sane default settings for the TCP sockets. Taken from Apache example. */ private final static HttpParams params = new SyncBasicHttpParams() .setIntParameter(CoreConnectionPNames.SO_TIMEOUT, 5000) .setIntParameter(CoreConnectionPNames.CONNECTION_TIMEOUT, 30000) .setIntParameter(CoreConnectionPNames.SOCKET_BUFFER_SIZE, 8 * 1024) .setBooleanParameter(CoreConnectionPNames.STALE_CONNECTION_CHECK, false) .setParameter(CoreProtocolPNames.USER_AGENT, "Votorola-HarvestBot/" + VERSION); /** * Stepping is one second. */ public final static int STEPPING = 1; /** * Factor for number of worker threads. Total number of threads is * *
     * THREADFACTOR * Runtime.getRuntime().availableProcessors()
     * 
*/ private final static int THREADFACTOR = 4; /** * Default singleton access. * * @return instance of HarvestRunner */ public static HarvestRunner i() { if (instance == null) { synchronized(HarvestRunner.class) { instance = new HarvestRunner(); } } return instance; } private final HttpAsyncClient client; /** * Queues for each base url. */ private final Map queues = Collections .synchronizedMap(new HashMap()); private final ScheduledExecutorService scheduler; private HarvestRunner() { scheduler = Executors.newScheduledThreadPool(THREADFACTOR * CPUS); HttpAsyncClient tempClient = null; try { final DefaultHttpAsyncClient defaultClient = new DefaultHttpAsyncClient(); defaultClient.setParams(params); tempClient = defaultClient; } catch (IOReactorException e) { LOGGER.log(Level.SEVERE, "Cannot start async http client.", e); System.exit(1); } client = tempClient; client.start(); initScheduler(); } protected void finalize() { try { client.shutdown(); } catch (InterruptedException e) { // ignore } } /** * Directly fetch and run a fetch in the thread pool. The scheduler * runs this every second for the first fetch per archive. * * @param fetch */ private void get(final Fetcher fetch) { final String url = fetch.url(); LOGGER.fine("GETting: " + url); client.execute(new HttpGet(url), new FutureCallback() { public void cancelled() { LOGGER.info("Request cancelled(?):" + url); } public void completed(final HttpResponse res) { LOGGER.finest("Got response for: " + url); final HttpEntity entity = res.getEntity(); if (entity == null) { return; } if (res.getStatusLine().getStatusCode() > 400) { LOGGER.info("HTTP error: " + res.getStatusLine()); } try { fetch.setStatusCode(res.getStatusLine().getStatusCode()); fetch.setInputStream(entity.getContent()); } catch (IOException e) { LOGGER.log(Level.WARNING, "Cannot get response' content.", e); return; } scheduler.execute(new Runnable() { public void run() { fetch.run(); } }); } public void failed(final Exception e) { LOGGER.log(Level.INFO, "Request failed:" + url, e); } }); } private synchronized Queue getOrInitQueue(final String archiveUrl) { Queue queue = queues.get(archiveUrl); if (queue == null) { queue = new Queue(); } queues.put(archiveUrl, queue); return queue; } /** * Initialize the stepping scheduler. */ private synchronized void initScheduler() { final Runnable fetcher = new Runnable() { public void run() { LOGGER.fine("Running step " + counter.getAndIncrement()); // O(n), n number of archives for (final Queue queue : queues.values()) { if (queue.queue.isEmpty()) { continue; } // run one fetch from the regular queue final Fetcher fetch = queue.queue.remove(0); get(fetch); } } }; scheduler.scheduleAtFixedRate(fetcher, 0, STEPPING, SECONDS); } /** * Schedule fetchs for fetching and execution. The fetch gets the next empty * slot in a queue for its archive. The archive is determined by base-url. * Once the data has arrived, {@linkplain Runnable#run()} is executed in the * thread pool. */ public void scheduleFirst(final Fetcher fetch) { final Queue queue = getOrInitQueue(fetch.archiveUrl()); queue.queue.add(0, fetch); } public void scheduleLast(final Fetcher fetch) { final Queue queue = getOrInitQueue(fetch.archiveUrl()); queue.queue.add(fetch); } }