package votorola.a.diff.harvest; // Copyright 2012. Christian Weilbach. Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Votorola Software"), to deal in the Votorola Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicence, and/or sell copies of the Votorola Software, and to permit persons to whom the Votorola Software is furnished to do so, subject to the following conditions: The preceding copyright notice and this permission notice shall be included in all copies or substantial portions of the Votorola Software. THE VOTOROLA SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE VOTOROLA SOFTWARE OR THE USE OR OTHER DEALINGS IN THE VOTOROLA SOFTWARE. import java.util.Date; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Timestamp; import java.util.logging.Level; import java.util.logging.Logger; import votorola.a.diff.harvest.run.HarvestRunner; import votorola.g.lang.ThreadSafe; import votorola.g.lang.Warning; import votorola.g.logging.LoggerX; import votorola.g.sql.Database; /** * Database table to store harvester state for PipermailHarvester only atm. This * is not part of the public API (yet). * */ @Warning("non-Api") @ThreadSafe final public class StateTable { static enum Type { TEMP, FIN, PERM } private final transient Database db; private final static String SCHEMA_NAME = "harvest"; private final transient String statementKeyBase; private final static String TABLE_NAME = "pipermail_state"; // SQL table string private final static String TABLE = "\"" + SCHEMA_NAME + "\".\"" + TABLE_NAME + "\""; private final static Logger LOGGER = LoggerX.i(StateTable.class); // TODO do not synchronize around db, this will be a bottleneck /** * Table which saves harvested urls for {@linkplain HarvestRunner}. * * @param db */ @Warning("non-Api") public StateTable(final Database db) { this.db = db; statementKeyBase = getClass().getName() + ":" + SCHEMA_NAME + "/" + TABLE_NAME + "."; } /** * Creates this table in the database. * * @throws SQLException */ void create() throws SQLException { try { synchronized (db) { db.ensureSchema(SCHEMA_NAME); } final String key = statementKeyBase + "create"; synchronized (db) { PreparedStatement prepStatem = db.statementCache().get(key); if (prepStatem == null) { // this also creates a b-tree on the hash column prepStatem = db .connection() .prepareStatement( "CREATE TABLE IF NOT EXISTS " + TABLE + " (archive_url character varying NOT NULL" + ", type character varying NOT NULL" + ", marker character varying NOT NULL" + ", sent_ts timestamp with time zone NOT NULL" + ", PRIMARY KEY(archive_url, type, marker))"); db.statementCache().put(key, prepStatem); } prepStatem.execute(); } } catch (SQLException e) { LOGGER.log(Level.SEVERE, "Cannot create StateTable.", e); System.exit(1); } } /** * Drops this table from the database. * * @throws SQLException */ void drop() throws SQLException { final String key = statementKeyBase + "drop"; synchronized (db) { PreparedStatement prepStatem = db.statementCache().get(key); if (prepStatem == null) { prepStatem = db.connection().prepareStatement( "DROP TABLE \"" + SCHEMA_NAME + "\".\"" + TABLE_NAME + "\""); db.statementCache().put(key, prepStatem); } prepStatem.execute(); } } /** * Returns true if this table exists in the database; false otherwise. * * @return whether database already exists or not. * @throws SQLException */ public boolean exists() throws SQLException { final String key = statementKeyBase + "exists"; synchronized (db) { PreparedStatement prepStatem = db.statementCache().get(key); if (prepStatem == null) { prepStatem = db.connection().prepareStatement( "SELECT * FROM " + TABLE ); prepStatem.setMaxRows(1); db.statementCache().put(key, prepStatem); } try { prepStatem.execute(); } catch (SQLException x) { final String sqlState = x.getSQLState(); if ("3F000".equals(sqlState)) { return false; // 3F000 = [missing schema] } if ("42P01".equals(sqlState)) { return false; // 42P01 = UNDEFINED TABLE } throw x; } } return true; } /** * Get newest marker of any type for a forum. If there is none, a dummy * value with empty string and date from beginning at unix time is used. * * @param archiveUrl * @throws SQLException */ public Marker getNewest(final String archiveUrl) throws SQLException { synchronized (db) { Marker temp = Marker.dummy(); final String getNewest1Key = statementKeyBase + "getNewest1"; PreparedStatement s = db.statementCache().get(getNewest1Key); if (s == null) { s = db.connection().prepareStatement( "SELECT marker, sent_ts FROM " + TABLE + " where archive_url ~ ?" + " ORDER BY sent_ts DESC LIMIT 1"); db.statementCache().put(getNewest1Key, s); } s.setString(1, archiveUrl); ResultSet rs = s.executeQuery(); if (rs.next()) { temp = Marker.create(rs.getString(1), new Date(rs.getTimestamp(2) .getTime())); } return temp; } } /** * Get either permanent or temporary marker, the newest known marker. * * @param archiveUrl * @param type * @return newest marker of type * @throws SQLException */ public Marker getNewest(final String archiveUrl, final Type type) throws SQLException { synchronized (db) { Marker temp = Marker.dummy(); final String getNewest2Key = statementKeyBase + "getNewest2"; PreparedStatement s = db.statementCache().get(getNewest2Key); if (s == null) { s = db.connection().prepareStatement( "SELECT marker, sent_ts FROM " + TABLE + " WHERE archive_url ~ ? AND type = ?" + "ORDER BY sent_ts DESC LIMIT 1"); db.statementCache().put(getNewest2Key, s); } s.setString(1, archiveUrl); s.setString(2, type.name()); ResultSet rs = s.executeQuery(); if (rs.next()) { temp = Marker.create( rs.getString(1), new Date(rs.getTimestamp(2) .getTime())); } return temp; } } /** * Put a temporary marker. * * @throws SQLException */ public boolean put(final String archiveUrl, final Marker marker) throws SQLException { synchronized (db) { // insert if update was not successful final String insertKey = statementKeyBase + "insertMarker"; PreparedStatement s = db.statementCache().get(insertKey); if (s == null) { s = db.connection().prepareStatement( "INSERT INTO " + TABLE + "(archive_url, type, marker" + ", sent_ts)" // silently drop stale TEMP markers // http://stackoverflow.com/questions/1109061/insert-on-duplicate-update-postgresql + " SELECT ?, ?, ?, ? WHERE NOT EXISTS" + " (SELECT 1 FROM " + TABLE // primary key constraint + " WHERE archive_url = ? AND type = ?" + " AND sent_ts = ?)"); db.statementCache().put(insertKey, s); } s.setString(1, archiveUrl); s.setString(2, Type.TEMP.name()); s.setString(3, marker.path()); s.setTimestamp(4, new Timestamp(marker.date().getTime())); // repeat for EXCEPT constraint avoidance s.setString(5, archiveUrl); s.setString(6, Type.TEMP.name()); s.setTimestamp(7, new Timestamp(marker.date().getTime())); // success? return (s.executeUpdate()==0); } } /** * Remove all temporary markers in finished range and mark start marker as * finished. The end marker is not removed and its type is not changed. It * can still be temporary. * * @param archiveUrl * @param start * @param end * @throws SQLException */ public void finish(final String archiveUrl, final Marker start, final Marker end) throws SQLException { synchronized (db) { // cleanup covered temp markers final String cleanupKey = statementKeyBase + "cleanup"; PreparedStatement clStat = db.statementCache().get(cleanupKey); if (clStat == null) { clStat = db.connection().prepareStatement( "DELETE FROM " + TABLE + " WHERE archive_url ~ ? AND type = ?" + " AND ( (sent_ts < ? AND sent_ts > ?)" // stale fin markers, about to be reset next // otherwise UPDATE fails on a stale marker + " OR (sent_ts = ? AND type = ?) )"); db.statementCache().put(cleanupKey, clStat); } clStat.setString(1, archiveUrl); clStat.setString(2, Type.TEMP.name()); clStat.setTimestamp(3, new Timestamp(start.date().getTime())); clStat.setTimestamp(4, new Timestamp(end.date().getTime())); clStat.setTimestamp(5, new Timestamp(start.date().getTime())); clStat.setString(6, Type.FIN.name()); int count = clStat.executeUpdate(); LOGGER.fine("Removed " + count + " stale entries."); // mark our marker as finished final String finishedKey = statementKeyBase + "finished"; PreparedStatement finStat = db.statementCache().get(finishedKey); if (finStat == null) { finStat = db.connection().prepareStatement( "UPDATE " + TABLE + " SET type = ?" + " WHERE archive_url ~ ? AND type = ?" + " AND sent_ts = ? AND marker = ?"); db.statementCache().put(finishedKey, finStat); } finStat.setString(1, Type.FIN.name()); finStat.setString(2, archiveUrl); finStat.setString(3, Type.TEMP.name()); finStat.setTimestamp(4, new Timestamp(start.date().getTime())); finStat.setString(5, start.path()); finStat.executeUpdate(); } } /** * Recalculate the PERM marker by setting the newest FIN marked Marker, * which is not newer than any temporary marker to PERM and deleting all * older markers. * * @param archiveUrl * @throws SQLException */ public void update(final String archiveUrl) throws SQLException { synchronized (db) { final String updateKey = statementKeyBase + "updateMarker"; PreparedStatement upStat = db.statementCache().get(updateKey); if (upStat == null) { upStat = db.connection().prepareStatement( "UPDATE " + TABLE + " set type = ?" + " where type = ? and archive_url = ?" + " and (sent_ts < " + "(select min(sent_ts) from " + TABLE + " where type = ? and archive_url = ?)" + " or (select count(*) from " + TABLE + "where type = ? and archive_url = ?) = 0)"); db.statementCache().put(updateKey, upStat); } upStat.setString(1, Type.PERM.name()); // new upStat.setString(2, Type.FIN.name()); // old upStat.setString(3, archiveUrl); upStat.setString(4, Type.TEMP.name()); // active upStat.setString(5, archiveUrl); upStat.setString(6, Type.TEMP.name()); // active upStat.setString(7, archiveUrl); if (upStat.executeUpdate() != 0) { // success LOGGER.fine("Updated PERM marker for " + archiveUrl); } // remove stale markers final String deleteStaleKey = statementKeyBase + "deleteStale"; PreparedStatement delStat = db.statementCache().get(deleteStaleKey); if (delStat == null) { delStat = db.connection().prepareStatement( "DELETE FROM " + TABLE + " WHERE archive_url = ?" + " AND (type = ? or type = ?)" + " AND sent_ts <" + " (SELECT max(sent_ts) FROM " + TABLE + " WHERE archive_url = ? AND type = ?)"); db.statementCache().put(deleteStaleKey, delStat); } delStat.setString(1, archiveUrl); // don't remove TEMP markers as they should not occur, if they // do this is a bug delStat.setString(2, Type.PERM.name()); delStat.setString(3, Type.FIN.name()); delStat.setString(4, archiveUrl); delStat.setString(5, Type.PERM.name()); delStat.executeUpdate(); } } /** * Remove all states of an archive. * @param archiveUrl * @return number of removed entries * @throws SQLException */ public int removeArchive(final String archiveUrl) throws SQLException { // remove an archive final String deleteArchiveKey = statementKeyBase + "deleteArchive"; synchronized (db) { PreparedStatement delStat = db.statementCache().get(deleteArchiveKey); if (delStat == null) { delStat = db.connection().prepareStatement( "DELETE FROM " + TABLE + " where archive_url = ?"); db.statementCache().put(deleteArchiveKey, delStat); } delStat.setString(1, archiveUrl); return delStat.executeUpdate(); } } }