package votorola.a.trust; // Copyright 2008, 2010-2012, Michael Allan. Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Votorola Software"), to deal in the Votorola Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicence, and/or sell copies of the Votorola Software, and to permit persons to whom the Votorola Software is furnished to do so, subject to the following conditions: The preceding copyright notice and this permission notice shall be included in all copies or substantial portions of the Votorola Software. THE VOTOROLA SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE VOTOROLA SOFTWARE OR THE USE OR OTHER DEALINGS IN THE VOTOROLA SOFTWARE. import java.io.*; import java.sql.*; import java.util.*; import votorola.a.*; import votorola.a.voter.*; import votorola.g.*; import votorola.g.lang.*; import votorola.g.sql.*; /** A writable node in a network trace. */ public @ThreadRestricted( "single writer, readers touch" ) class TraceNodeW implements TraceNode { // cf. a/count/CountNodeW private static final long serialVersionUID = 0L; /** Creates a TraceNodeW with all variables at default values. */ TraceNodeW( IDPair registrant ) { this( registrant, null, null, null, null ); } /** Creates a TraceNodeW. */ TraceNodeW( IDPair registrant, String area, String countryCode, String geohandle, String otherProperties ) { this( registrant, area, countryCode, geohandle, otherProperties, new int[0] ); isChanged = true; } private TraceNodeW( IDPair _registrant, String _area, String _countryCode, String _geohandle, String _otherProperties, int[] _trustEdgeCountArray ) { if( _registrant == null ) throw new NullPointerException(); // fail fast // Adding fields here? Increment serialVersionUID and NetworkTrace.serialVersionUID. registrant = _registrant; area = _area; countryCode = _countryCode; geohandle = _geohandle; otherProperties = _otherProperties; trustEdgeCountArray = _trustEdgeCountArray; } // ------------------------------------------------------------------------------------ /** Attaches a trust edge to this node. The change may affect the trust level of this * node, but is not automatically propagated to other nodes. * * @param trustLevel0 trust level of source node, * or Integer.MAX_VALUE for infinity * * @see #detachTrustEdge(int) * @see #trustLevel() */ final void attachTrustEdge( int trustLevel0 ) { if( trustLevel0 == 0 ) { assert false: "attach only from traced source"; return; } if( trustLevel0 == Integer.MAX_VALUE ) trustLevel0 = 0; if( trustLevel0 + 1 > trustEdgeCountArray.length ) { trustEdgeCountArray = Arrays.copyOf( trustEdgeCountArray, trustLevel0 + 1 + GROWTH_PADDING ); } ++trustEdgeCountArray[trustLevel0]; isChanged = true; } /** Count of attached edges, by source trust level. Index 0 is reserved * for the level of infinity (from the root node). */ private int[] trustEdgeCountArray; private static final int GROWTH_PADDING = 0; // none, because it can grow once only per trace (because cycles are impossible), and is never subject to more than one trace (because nodes are not cached) /** Detaches a trust edge from this node. The change may affect the {@linkplain * #trustLevel() trust level} of this node, but is not automatically propagated * to other nodes. * * @param trustLevel0 trust level of source node * * @see #attachTrustEdge(int) */ final void detachTrustEdge( int trustLevel0 ) { if( trustLevel0 == 0 ) { assert false: "detach only when attached"; return; } if( trustLevel0 == Integer.MAX_VALUE ) trustLevel0 = 0; if( trustEdgeCountArray[trustLevel0] <= 0 ) { assert false: "detach never causes negative trust"; return; } --trustEdgeCountArray[trustLevel0]; // no need to shrink array, it's done in write() isChanged = true; } /** Serializes this node in XML format. * * @throws BadInputException */ void toXML( final Set divisions, final Appendable a ) throws IOException { a.append( "\t\n" ); if( otherProperties != null ) { a.append( "\t\t" ); a.append( otherProperties ); a.append( '\n' ); } Registration.toCommonXMLElements( divisions, aC ); a.append( "\t\t\n" ); } /** Writes this node to the table if it has unwritten changes. */ final void write( final Table table ) throws SQLException { if( !isChanged ) return; final int[] trustEdgeCountArrayTrimmed; { int length = trustEdgeCountArray.length; // same as before, till proven otherwise for(; length > 0; --length ) { if( trustEdgeCountArray[length-1] > 0 ) break; // found a positive count } if( length == trustEdgeCountArray.length ) { trustEdgeCountArrayTrimmed = trustEdgeCountArray; } else trustEdgeCountArrayTrimmed = Arrays.copyOf( trustEdgeCountArray, length ); } table.put( TraceNodeW.this, trustEdgeCountArrayTrimmed ); isChanged = false; } // - T r a c e - N o d e -------------------------------------------------------------- public final String getArea() { return area; } private final String area; public final String getCountryCode() { return countryCode; } private final String countryCode; public final String getGeohandle() { return geohandle; } private final String geohandle; public final String getOtherProperties() { return otherProperties; } private final String otherProperties; public final int primaryTrustEdgeCount() { int count = 0; if( trustEdgeCountArray.length > 0 ) count = trustEdgeCountArray[0]; return count; } public IDPair registrant() { return registrant; } private final IDPair registrant; /** @see #attachTrustEdge(int) */ public final int trustLevel() { int level = primaryTrustEdgeCount(); // sources of infinite trust for( int l = trustEdgeCountArray.length - 1; l > level; --l ) // only sources at levels greater than the current level can contribute an increase { level = Math.min( level + trustEdgeCountArray[l], l ); // cannot reduce level, because l > level (per loop guard) } return level; } // - O b j e c t ---------------------------------------------------------------------- /** Returns the registrant's username. */ public @Override final String toString() { return registrant().username(); } // ==================================================================================== /** A routine that runs in the context of a trace node. */ public interface Runner { // - T r a c e - N o d e - C . R u n n e r ---------------------------------------- /** Runs this routine in the context of the specified trace node. */ public void run( final TraceNodeW node ); } // ==================================================================================== /** The relational store of nodes that (in part) backs a {@linkplain NetworkTrace * network trace}. * * @see votorola.a.trust.TrustEdge.Table * @see votorola.a.trust.Membership.Table */ public static @ThreadSafe final class Table { // cf. a/count/CountTable /** Constructs a Table. */ public Table( final ReadyDirectory readyDirectory, final Database database ) throws IOException, SQLException { this.readyDirectory = readyDirectory; this.database = database; synchronized( database ) { database.ensureSchema( SCHEMA_NAME ); } final String snapSuffix = OutputStore.suffix( readyDirectory.snapDirectory().getName() ); if( !OutputStore.isY4MDS( snapSuffix )) throw new VotorolaRuntimeException( "improperly suffixed snap directory parent of ready directory: " + readyDirectory ); tableName = snapSuffix.substring(1) + OutputStore.SUFFIX_DELIMITER + "trace_node" + OutputStore.suffix(readyDirectory.getName()); statementKeyBase = getClass().getName() + ":" + SCHEMA_NAME + "/" + tableName + "."; } private final String statementKeyBase; // -------------------------------------------------------------------------------- /** Creates this table in the database. */ void create() throws SQLException { final String sKey = statementKeyBase + "create"; synchronized( database ) { PreparedStatement s = database.statementCache().get( sKey ); if( s == null ) { s = database.connection().prepareStatement( "CREATE TABLE \"" + SCHEMA_NAME + "\".\"" + tableName + "\"" + " (registrantEmail character varying PRIMARY KEY," + " area character varying," + " countryCode character varying," + " geohandle character varying," + " otherProperties character varying," + " trustEdgeCountArray character varying)" ); // Changing table structure? Then also increment NetworkTrace.serialVersionUID. database.statementCache().put( sKey, s ); } s.execute(); } } /** The database in which this table is stored. */ @Warning("thread restricted object") Database database() { return database; } private final Database database; /** Drops this table from the database if it exists. * * @return true if any rows were actually removed as a result, false otherwise. */ final boolean drop() throws SQLException { final String sKey = statementKeyBase + "drop"; synchronized( database ) { PreparedStatement s = database.statementCache().get( sKey ); if( s == null ) { s = database.connection().prepareStatement( "DROP TABLE IF EXISTS \"" + SCHEMA_NAME + "\".\"" + tableName + "\"" ); database.statementCache().put( sKey, s ); } final int updatedRows = s.executeUpdate(); return updatedRows > 0; } } /** Retrieves a node from this table. * * @return node as stored in the table; or null, if none is stored */ public TraceNodeW get( final IDPair registrant ) throws SQLException { if( registrant == null ) throw new NullPointerException(); // fail fast final String sKey = statementKeyBase + "get"; synchronized( database ) { PreparedStatement s = database.statementCache().get( sKey ); if( s == null ) { s = database.connection().prepareStatement( "SELECT area,countryCode,geohandle,otherProperties,trustEdgeCountArray" + " FROM \"" + SCHEMA_NAME + "\".\"" + tableName + "\"" + " WHERE registrantEmail = ?" ); database.statementCache().put( sKey, s ); } s.setString( 1, registrant.email() ); final ResultSet r = s.executeQuery(); try { if( !r.next() ) return null; return new TraceNodeW( registrant, r.getString(1), r.getString(2), r.getString(3), r.getString(4), ArrayX.stringToInts(r.getString(5)) ); } finally{ r.close(); } } } /** Retrieves a node from this table; or if none is stored, a default node. * * @return TraceNodeW as stored in the table; or, if none is stored, * a {@linkplain TraceNodeIC TraceNodeIC} with default values */ public TraceNodeW getOrCreate( final IDPair registrant ) throws SQLException { TraceNodeW traceNode = get( registrant ); if( traceNode == null ) traceNode = new TraceNodeIC( registrant ); return traceNode; } /** Stores a node. */ void put( final TraceNodeW node, final int[] trustEdgeCountArray ) throws SQLException { final String qTable = "\"" + SCHEMA_NAME + "\".\"" + tableName + "\""; synchronized( database ) { // effect an "upsert" in PostgreSQL // http://stackoverflow.com/questions/1109061/insert-on-duplicate-update-postgresql/6527838#6527838 final Connection c = database.connection(); { final String sKey = statementKeyBase + "putU"; PreparedStatement s = database.statementCache().get( sKey ); if( s == null ) { s = c.prepareStatement( "UPDATE " + qTable + " SET area = ?, countryCode = ?, geohandle = ?, otherProperties = ?," + " trustEdgeCountArray = ?" + " WHERE registrantEmail = ?" ); database.statementCache().put( sKey, s ); } s.setString( 1, node.getArea() ); s.setString( 2, node.getCountryCode() ); s.setString( 3, node.getGeohandle() ); s.setString( 4, node.getOtherProperties() ); s.setString( 5, ArrayX.intsToString( trustEdgeCountArray )); s.setString( 6, node.registrant().email() ); final int updatedRows = s.executeUpdate(); if( updatedRows > 0 ) { assert updatedRows == 1; return; } } { final String sKey = statementKeyBase + "putI"; PreparedStatement s = database.statementCache().get( sKey ); if( s == null ) { s = c.prepareStatement( "INSERT INTO " + qTable + " (registrantEmail, area, countryCode, geohandle, otherProperties," + " trustEdgeCountArray)" + " SELECT ?, ?, ?, ?, ?, ? WHERE NOT EXISTS" + " (SELECT 1 FROM " + qTable + " WHERE registrantEmail = ?)" ); database.statementCache().put( sKey, s ); } s.setString( 1, node.registrant().email() ); s.setString( 2, node.getArea() ); s.setString( 3, node.getCountryCode() ); s.setString( 4, node.getGeohandle() ); s.setString( 5, node.getOtherProperties() ); s.setString( 6, ArrayX.intsToString( trustEdgeCountArray )); s.setString( 7, node.registrant().email() ); s.executeUpdate(); } } } /** The file-based counterpart to this table. */ ReadyDirectory readyDirectory() { return readyDirectory; } private final ReadyDirectory readyDirectory; // final after init /** Pass through the specified runner all nodes of this table. The nodes are * sorted by area. */ public void run( final TraceNodeW.Runner runner ) throws SQLException { final String sKey = statementKeyBase + "run"; synchronized( database ) { PreparedStatement s = database.statementCache().get( sKey ); if( s == null ) { s = database.connection().prepareStatement( "SELECT registrantEmail, area, countryCode, geohandle, otherProperties," + " trustEdgeCountArray" + " FROM \"" + SCHEMA_NAME + "\".\"" + tableName + "\"" + " ORDER BY area" ); database.statementCache().put( sKey, s ); } final ResultSet r = s.executeQuery(); try { while( r.next() ) { runner.run( new TraceNodeW( IDPair.fromEmail(r.getString(1)), r.getString(2), r.getString(3), r.getString(4), r.getString(5), ArrayX.stringToInts(r.getString(6)) )); } } finally{ r.close(); } } } /** The name of the table's schema. */ static final String SCHEMA_NAME = "out_trace"; /** The name of this table. */ String tableName() { return tableName; } private final String tableName; } //// P r i v a t e /////////////////////////////////////////////////////////////////////// private boolean isChanged; }