package votorola.a.count; // Copyright 2007-2013, Michael Allan. Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Votorola Software"), to deal in the Votorola Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicence, and/or sell copies of the Votorola Software, and to permit persons to whom the Votorola Software is furnished to do so, subject to the following conditions: The preceding copyright notice and this permission notice shall be included in all copies or substantial portions of the Votorola Software. THE VOTOROLA SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE VOTOROLA SOFTWARE OR THE USE OR OTHER DEALINGS IN THE VOTOROLA SOFTWARE. import com.google.gson.stream.*; import com.sun.jersey.api.uri.*; import java.io.*; import java.net.*; import java.nio.charset.*; import java.sql.*; import java.text.*; import java.util.*; import java.util.logging.*; import votorola.g.logging.*; import java.util.regex.*; import javax.mail.internet.AddressException; import javax.script.*; import javax.xml.stream.*; import javax.xml.ws.Holder; import votorola.a.*; import votorola.a.position.*; import votorola.a.trust.*; import votorola.a.voter.*; import votorola.g.*; import votorola.g.hold.*; import votorola.g.io.*; import votorola.g.lang.*; import votorola.g.mail.*; import votorola.g.sql.*; import votorola.g.text.*; import votorola.g.xml.stream.*; import votorola.s.wap.PollspaceWAP; import static votorola.a.count.CountNode.DART_SECTOR_MAX; /** The path to a snap/readyCount record. It is the file part of the backing for a poll * count. It is guaranteed to be in canonical form at the time of construction. * * @see snap/readyCount record * @see Count */ public @ThreadSafe final class ReadyDirectory extends OutputStore.ReadyDirectory { // cf. a/trust/ReadyDirectory private static final long serialVersionUID = 3L; /** Creates a new ReadyDirectory. * * @param d the abstract pathname. It will be converted to canonical form if * necessary. * @param readyTracePathname the pathname of the ready directory of the trust * trace to use. * @see #pipeRecognizer() */ public static ReadyDirectory createReadyDirectory( final File d, final String readyTracePathname, final PipeRecognizer pipeRecognizer ) throws IOException { FileX.writeObject( pipeRecognizer, new File(d,pipeRecognizerSerialName) ); final ReadyDirectory r = new ReadyDirectory( d ); FileX.symlink( readyTracePathname, r.readyTraceLink().getPath() ); return r; } /** Constructs a ReadyDirectory from an abstract (File) pathname. * * @param _d the abstract pathname. It will be converted to canonical form if * necessary. * * @throws FileNotFoundException if no directory exists at the specified pathname. * @see #createReadyDirectory(File,String,PipeRecognizer) */ public ReadyDirectory( File _d ) throws IOException { super( _d ); final File f = new File( ReadyDirectory.this, pipeRecognizerSerialName ); if( f.exists() ) { PipeRecognizer p = null; try{ p = (PipeRecognizer)FileX.readObject( f ); } catch( final ClassNotFoundException x ) // class names have changed { logger.log( LoggerX.WARNING, /*message*/"defaulting to a null pipe recognizer, unable to deserialize from file: " + f, x ); isInitWarned = true; p = new PipeRecognizer0(); } pipeRecognizer = p; } else pipeRecognizer = new PipeRecognizer0(); // directory apparently readied prior to addition of pipe support in May 2013 } /** Constructs a ReadyDirectory from a string pathname. * * @param pathname the string pathname, per {@linkplain File#File(String) * File}(pathname). It will be converted to canonical form if necessary. * * @throws FileNotFoundException if no directory exists at the specified pathname. * @see #createReadyDirectory(File,String,PipeRecognizer) */ public ReadyDirectory( String pathname ) throws IOException { this( new File( pathname )); } // ------------------------------------------------------------------------------------ /** Answers whether any warnings were logged during the construction of this ready * directory. */ public boolean isInitWarned() { return isInitWarned; } private boolean isInitWarned; /** Tallies the results and posts them to the database and filebase. * * @param networkTrace the previously mounted trace of the trust network. * @param isVerbose if true, additional details are printed to standard output * as the mount progresses. * * @return count of polls tallied. * * @see #isMounted() * @see #unmount(VoteServer.Run) */ public int mount( final VoteServer.Run run, final NetworkTrace networkTrace, final boolean isVerbose ) throws VotorolaException { // Currently the count depends directly on the trustserver's mounted trace, which // it uses as a voter list. This works because the mounted trace is already an // aggregate of the registration lists for all areas of the reference streetwiki. // In future, we will interpose a 'volist' command that allows for aggregation // from multiple remote trustservers (list lengthening), as well as merging from // non-residential registration lists (widening). See // http://reluk.ca/w/User:Mike-ZeleaCom/G/p/edor final int nPruneTrigger = PollspaceWAP.MAX_RESPONSE_SIZE * 2; final ArrayList pollspace0List = new ArrayList<>( nPruneTrigger + /*spare*/1 ); final Holder pollCountH = new Holder( 0 ); // for write access from inner class final Database database = run.database(); try { final Membership.Table membershipTable = new Membership.Table( networkTrace.readyDirectory(), database ); final TraceNodeW.Table traceNodeTable = new TraceNodeW.Table( networkTrace.readyDirectory(), database ); final CountTable countTable = new CountTable( ReadyDirectory.this, database ); countTable.drop(); countTable.create(); mountedDirectory.mkdir(); final URI inVoteDirectoryURI = inVoteDirectory.toURI(); final Holder xH = new Holder(); // " synchronized( database ){ database.connection().setAutoCommit( false ); } // bulk write // otherwise too slow on Linux 3 kernels, which use stricter ext3 file synchronization try { // Tally all polls. // - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - FileX.traverseDown( inVoteDirectory, new FileFilter() { public boolean accept( final File inVoteFile ) { if( !inVoteFile.isFile() ) return true; final String suffix = ".xml"; if( !inVoteFile.getName().endsWith( suffix )) return true; final String pollName; { final URI inVoteFileURI = inVoteFile.toURI(); final String relativePath = inVoteDirectoryURI.relativize( inVoteFileURI ).getPath(); pollName = relativePath.substring( 0, relativePath.length() - suffix.length() ); } try { final PollService poll = run.scopePoll().ensurePoll( pollName ); logger.fine( "counting votes for poll: " + pollName ); // after constructing, because it logs its own message if( isVerbose ) { if( pollCountH.value > 0 ) System.out.print( ", " ); System.out.print( pollName ); } final Count count = tally( poll, networkTrace, inVoteFile, countTable, membershipTable, traceNodeTable, isVerbose ); count.writeObjectToSerialFile(); ++pollCountH.value; pollspace0List.add( poll ); if( pollspace0List.size() >= nPruneTrigger ) sortPrune( pollspace0List ); return true; } catch( IOException|ParseException|ScriptException|SQLException|XMLStreamException x ) { xH.value = new VotorolaException( "Unable to count votes for poll: " + pollName, x ); return false; // escape from here and throw it } } }); if( xH.value != null ) throw xH.value; // Create indices in count table (last, as it slows down table updates). // - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - countTable.createIndices(); } finally{ synchronized( database ){ database.connection().setAutoCommit( true ); }} // does a commit, too // Write the cache file for PollspaceWAP. (temporary hack, per pollspace0Cache) // - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - sortPrune( pollspace0List ); // sort at least is almost always needed try ( final JsonWriter out = new JsonWriter( new BufferedWriter( new OutputStreamWriter( new FileOutputStream(pollspace0Cache), StandardCharsets.UTF_8 ))); // see g.web GSONCLOSE final JsonWriter outP = new JsonWriter( new BufferedWriter( new OutputStreamWriter( new FileOutputStream(pollspace0PCache), StandardCharsets.UTF_8 )) ); // " ){ // outPJ.setIndent( votorola.a.web.wap.Responding.PRETTY_INDENT ); /// gives NoClassDefFoundError ServletRequest, so just do this for now: outP.setIndent( " " ); // otherwise it would be packed out.beginArray(); outP.beginArray(); for( int p = 0, pN = pollspace0List.size(); p < pN; ++p ) { final PollService poll = pollspace0List.get( p ); out.beginObject(); outP.beginObject(); String s; s = poll.name(); out.name( "name" ).value( s ); outP.name( "name" ).value( s ); s = poll.displayTitle(); if( s != null ) { out.name( "displayTitle" ).value( s ); outP.name( "displayTitle" ).value( s ); } s = poll.issueType(); out.name( "issueType" ).value( s ); outP.name( "issueType" ).value( s ); out.endObject(); outP.endObject(); } out.endArray(); outP.endArray(); } } catch( VotorolaException x ) { throw x; } catch( IOException|SQLException x ) { throw new VotorolaException( "Unable to mount:" + ReadyDirectory.this, x ); } return pollCountH.value; } /** The directory to which the count is serialized when mounted. */ public File mountedDirectory() { return mountedDirectory; } private final File mountedDirectory = new File( ReadyDirectory.this, "_mountedCount" ); /** Constructs the formal file to which the count for the specified poll service is * serialized when mounted. */ public File newSerialFile( final String serviceName ) { return new File( mountedDirectory(), serviceName + ".serial" ); } /** The pipe recognizer for this ready directory. */ public PipeRecognizer pipeRecognizer() { return pipeRecognizer; } private final PipeRecognizer pipeRecognizer; /** File to which JSON content for the first block of a {@linkplain PollspaceWAP * PollspaceWAP} response is written, when the count is mounted. */ public File pollspace0Cache() { return pollspace0Cache; } // These files are a temporary hack. PollspaceWAP will probably need a poll table // later, and that table will be what the mount creates. Each row will be a // compiled poll. It will be kept separate from the count summary (currently // serialized as a file) because the two are conceptually distinct, and because // polls might be compiled separately from counts in future, as trust traces are. private final File pollspace0Cache = new File( mountedDirectory, "pollspace0.json" ); /** File to which JSON content for the first block of a {@linkplain PollspaceWAP * PollspaceWAP} response is written in prettified form, when the count is mounted. */ public File pollspace0PCache() { return pollspace0PCache; } private final File pollspace0PCache = new File( mountedDirectory, "pollspace0P.json" ); /** A file filter that accepts only apparent ready directories. */ public static final FileFilter READY_DIRECTORY_FILTER = new FileFilter() { public boolean accept( final File file ) { final String name = file.getName(); return file.isDirectory() && name.startsWith( "readyCount-" ) && OutputStore.isS( OutputStore.suffix( name )); } }; /** The symbolic link to the ready directory of the trust network trace. */ public File readyTraceLink() { return readyTraceLink; } private final File readyTraceLink = new File( ReadyDirectory.this, "readyTrace" ); /** Reverses a previous mount, erasing the results from the database and filebase. * * @return true if the count was unmounted, false if it was not mounted to begin * with, either in whole or part. * * @see #isMounted() * @see #mount(VoteServer.Run,NetworkTrace,boolean) */ public boolean unmount( final VoteServer.Run run ) throws IOException, SQLException { boolean unmounted = false; // thus far if( mountedDirectory.isDirectory() ) { FileX.deleteRecursiveSure( mountedDirectory ); unmounted = true; } if( new CountTable(ReadyDirectory.this,run.database()).drop() ) unmounted = true; return unmounted; } // - O u t p u t - S t o r e . R e a d y - D i r e c t o r y -------------------------- /** Answers whether the count is nominally mounted. */ public boolean isMounted() { return mountedDirectory.isDirectory(); } //// P r i v a t e /////////////////////////////////////////////////////////////////////// /** @param r the reader having just read the 'pos' start element. */ private void imagePos( final File mirrorSourceDirectory, final PollService poll, final CountTable.PollView countTablePV, final XMLStreamReader r, final SimpleDateFormat iso8601Formatter ) throws ParseException, ScriptException, SQLException, XMLStreamException { final LocationI rLocPos = new LocationI( r.getLocation() ); // of position element in stream final CountNodeW node = new CountNodeW( countTablePV ); while( r.hasNext() ) { r.next(); if( r.isEndElement() && "pos".equals( r.getLocalName() )) break; if( r.isStartElement() ) { if( "doc".equals( r.getLocalName() )) { final String url = r.getAttributeValue( /*namespaceURI*/null, "url" ); if( url != null ) node.setLocation( url ); else logger.warning( "unlocated position doc, missing 'url' attribute: " + LocationX.toString(r.getLocation()) ); } else if( "subj".equals( r.getLocalName() )) { try { node.init( IDPair.fromEmail( InternetAddressX.newValidAddress( r.getAttributeValue( /*namespaceURI*/null, "loc" ), r.getAttributeValue( /*namespaceURI*/null, "dom" )))); } catch( AddressException x ) { logger.log( LoggerX.WARNING, /*message*/LocationX.toString(r.getLocation()), x ); } } else if( "vote".equals( r.getLocalName() )) inputVote( node, r, iso8601Formatter ); } } final String email = node.email(); if( email == null ) { logger.warning( "ignoring vote, unidentified subject: " + rLocPos ); return; } final CountNodeW existingNode = countTablePV.get( email ); if( existingNode != null ) { if( node.getTime() < existingNode.getTime() ) return; // image is stale node.setDartSector( existingNode.dartSector() ); // retain voter's dart sector } node.setSource( mirrorSourceDirectory.getName() ); if( node.isBarrable() ) { node.setBar( (String)poll.configurationScript().invokeKnownFunction( "voterBarUnregistered", node.person() )); // till proven otherwise, in next pass } node.write(); } /** @param r the reader having just read the 'vote' start element. */ private void inputVote( final CountNodeW node, final XMLStreamReader r, final SimpleDateFormat iso8601Formatter ) throws ParseException, XMLStreamException { final String tISO = r.getAttributeValue( /*namespaceURI*/null, "t" ); if( tISO != null ) { final long time = iso8601Formatter.parse( SimpleDateFormatX.simplifiedISO8601( tISO )).getTime(); if( time <= System.currentTimeMillis() ) node.setTime( time ); else logger.warning( "ignoring 't' attribute in future: " + LocationX.toString(r.getLocation()) ); } else logger.warning( "vote likely to be ignored, missing 't' attribute: " + LocationX.toString(r.getLocation()) ); final String dS = r.getAttributeValue( /*namespaceURI*/null, "dS" ); if( dS != null ) node.setDartSector( Integer.parseInt( dS )); while( r.hasNext() ) { r.next(); if( r.isEndElement() && "vote".equals( r.getLocalName() )) break; if( r.isStartElement() && "obj".equals( r.getLocalName() )) { try { node.setCandidateEmail( InternetAddressX.newValidAddress( r.getAttributeValue( /*namespaceURI*/null, "loc" ), r.getAttributeValue( /*namespaceURI*/null, "dom" ))); } catch( AddressException x ) { logger.log( LoggerX.WARNING, /*message*/LocationX.toString(r.getLocation()), x ); } } } return; } private final File inVoteDirectory = new File( snapDirectory(), "_in_vote" ); private static final Logger logger = LoggerX.i( ReadyDirectory.class ); /** @param r the reader having just read the 'pos' start element. */ private void originatePos( final PollService poll, final CountTable.PollView countTablePV, final XMLStreamReader r, final SimpleDateFormat iso8601Formatter ) throws ParseException, ScriptException, SQLException, XMLStreamException { final LocationI rLocPos = new LocationI( r.getLocation() ); // of position element in stream final CountNodeW node = new CountNodeW( countTablePV ); while( r.hasNext() ) { r.next(); if( r.isEndElement() && "pos".equals( r.getLocalName() )) break; if( r.isStartElement() ) { if( "subj".equals( r.getLocalName() )) { try { node.init( IDPair.fromEmail( InternetAddressX.newValidAddress( r.getAttributeValue( /*namespaceURI*/null, "loc" ), r.getAttributeValue( /*namespaceURI*/null, "dom" )))); } catch( final AddressException x ) { logger.log( LoggerX.WARNING, /*message*/LocationX.toString(r.getLocation()), x ); } } else if( "vote".equals( r.getLocalName() )) inputVote( node, r, iso8601Formatter ); } } final IDPair person = node.person(); if( person == null ) { logger.warning( "ignoring vote, unidentified subject: " + rLocPos ); return; } if( node.isBarrable() ) { node.setBar( (String)poll.configurationScript().invokeKnownFunction( "voterBarUnregistered", person )); // till proven otherwise in next pass } node.write(); } private static final String pipeRecognizerSerialName = "pipeRecognizer.serial"; /** Sorts the list and removes any trailing elements over MAX_RESPONSE_SIZE. */ private static void sortPrune( final ArrayList pollspace0List ) { Collections.sort( pollspace0List ); int p = pollspace0List.size(); if( p <= PollspaceWAP.MAX_RESPONSE_SIZE ) return; // pollspace0List.removeRange( PollspaceWAP.MAX_RESPONSE_SIZE, p ); /// has protected access for some reason, so do it the hard way: --p; for( ;; ) { pollspace0List.remove( p ); --p; if( p < PollspaceWAP.MAX_RESPONSE_SIZE ) break; } assert pollspace0List.size() == PollspaceWAP.MAX_RESPONSE_SIZE; } private Count tally( final PollService poll, final NetworkTrace networkTrace, final File inVoteFile, final CountTable countTable, final Membership.Table membershipTable, final TraceNodeW.Table traceNodeTable, final boolean isVerbose ) throws IOException, ParseException, ScriptException, SQLException, XMLStreamException { final String pollName = poll.name(); final CountTable.PollView countTablePV = countTable.new PollView( pollName ); final SimpleDateFormat iso8601Formatter = new SimpleDateFormat( SimpleDateFormatX.ISO_8601_PATTERN ); // Pass 1. Read snapshot of voter input and create one node per voter. // - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - if( isVerbose ) System.out.print( " 1" ); // This pass pre-creates subject nodes (voters), thus allowing duplicate image // votes to be reconciled in pass 2. // It also pre-creates intermediate object nodes (delegates), thus allowing // their bars to be recorded in pass 3 prior to casting in pass 4. The bars are // here initialized to (configurationScript) barUnregistered. // Finally, pre-creation enables end-to-end cascading of single votes in pass 4. // End-to-end cascading is simpler (and more robust) than multiple partial // cascades. The latter would involve carries that are complicated by cycles. // See CountNodeW.castCyclic(trace). { final InputStream in = new BufferedInputStream( new FileInputStream( inVoteFile )); // one stream for each read through, as using mark/reset on a single one // would defeat buffering try { final XMLStreamReader r = XMLColumnAppender.newStreamReader( /*systemId*/inVoteFile.toString(), in ); try { while( r.hasNext() ) { r.next(); if( r.isStartElement() ) { if( "in".equals( r.getLocalName() )) verifyServiceName( poll, r ); else if( "pos".equals( r.getLocalName() )) { originatePos( poll, countTablePV, r, iso8601Formatter ); } } } } finally{ r.close(); } } catch( final XMLStreamException x ) { throw new RuntimeException( x ); } finally{ in.close(); } } // Pass 2. Image votes from external engines. // - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - if( isVerbose ) System.out.print( "2" ); final VoteServer vS = poll.vsRun().voteServer(); { for( final File mirrorSourceDirectory: InputStore.U.mirrorSourceDirectories( vS )) { final File file = new File( mirrorSourceDirectory, // ought to be stored in output snap directory, for later verification (not yet implemented) "_snap_current" + File.separator + pollName + ".xml" ).getCanonicalFile(); if( !file.isFile() ) continue; final InputStream in = new BufferedInputStream( new FileInputStream( file )); try { final XMLStreamReader r = XMLColumnAppender.newStreamReader( /*systemId*/file.toString(), in ); try { while( r.hasNext() ) { r.next(); if( r.isStartElement() ) { if( "in".equals( r.getLocalName() )) verifyServiceName( poll, r ); else if( "pos".equals( r.getLocalName() )) { imagePos( mirrorSourceDirectory, poll, countTablePV, r, iso8601Formatter ); } } } } finally{ r.close(); } } catch( final XMLStreamException x ) { throw new RuntimeException( x ); } finally{ in.close(); } } } // Pass 3. Read reported trace of network and mark eligible casters. // - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - if( isVerbose ) System.out.print( "3" ); // This pass will be slow for large trust networks, especially where poll // turnout is low. OPT Speed it by traversing the poll instead of the network. traceNodeTable.run( new TraceNodeW.Runner() { public void run( final TraceNodeW node ) { try { final IDPair registrant = node.registrant(); if( !CountNodeW.isBarrable( registrant.username(), ReadyDirectory.this )) return; // though it's unlikely a non-barrable would register final CountNodeW countNode = countTablePV.get( registrant.email() ); if( countNode == null ) return; // registrant has no input to this poll if( !countNode.isVoter() ) { countNode.setBar( (String)poll.configurationScript().invokeKnownFunction( "voterBarUnknown", registrant )); /* not voting, so save time by bypassing the bar test */ countNode.write(); return; } // Poll eligibility bar? // ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` final Set divisions = Collections.unmodifiableSet( membershipTable.divisionSet( node.registrant().email() )); final VoteCastingContext vCC = new VoteCastingContext( /*isRealCount*/true, node, divisions ); poll.configurationScript().invokeKnownFunction( "castingVote", vCC ); countNode.setBar( vCC.getBar() ); countNode.write(); } catch( final Exception x ) { throw VotorolaRuntimeException.castOrWrapped( x ); } } }); // Pass 4. For each eligible caster, trace the route and effect the cast. // - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - if( isVerbose ) System.out.print( "4" ); // after this pass, all participating nodes will have been created synchronized( countTable.database() ) { final ResultSet r = countTablePV.getByIndeces(); try { final CountTablePVC countTablePVC = new CountTablePVC( countTable, pollName ); while( r.next() ) { countTablePVC.skimFlush(); final String voterEmail = r.getString( 1 ); final CountNodeW node = countTablePVC.getOrCreate( voterEmail ); node.castSolo(); } countTablePVC.writeAll(); } finally{ r.close(); } } // Pass 5. Assign dart sectors. // - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - if( isVerbose ) System.out.print( "5" ); final Random randomizer = new Random(); { // assign dart sectors to base candidates // ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` final NodeThrower nT = new NodeThrower( poll, randomizer ); countTablePV.run( CountTable.BASE_CANDIDATE_TAIL + " " + CountNodeW.RECEIVE_COUNT_ORDER_CLAUSE, nT ); nT.writeDartBoard(); } { // assign dart sectors to voters // ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` final class Thrower extends NodeThrower { Thrower() { super( poll, randomizer ); } @Override boolean wasSectoredAsBase( final CountNodeW cN ) { return cN.isBaseCandidate(); // whether already sectored above } } final class Runner implements CountNodeW.Runner { String candidateEmailLast; Thrower nT; void flush() { if( nT == null ) return; try{ nT.writeDartBoard(); } catch( final SQLException x ) { throw new RuntimeException( x ); } } public void run( final CountNodeW node ) { final String candidateEmail = node.getCandidateEmail(); if( !candidateEmail.equals( candidateEmailLast )) { flush(); nT = new Thrower(); candidateEmailLast = candidateEmail; } nT.run( node ); } } final Runner r = new Runner(); // adding email for stability: countTablePV.run( "AND isCast ORDER BY candidateEmail, carryVolume DESC, email", r ); r.flush(); } // Pass 6. Transfer position properties from pollwiki to nodes. // - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - if( isVerbose ) System.out.print( "6" ); // This pass does not contribute to the tally proper, and therefore does not // violate its repeatability. It merely decorates the nodes with non-tally // properties, effectively using the nodes as a cache onto the wiki. pass6: { final StringBuilder qB = new StringBuilder(); qB.append( vS.pollwiki().scriptURI() ); qB.append( "/api.php?action=ask&query=" ); // http://semantic-mediawiki.org/wiki/Ask_API qB.append( UriComponent.encode( "[[Category:Position]][[Display title::+]][[Poll::", UriComponent.Type.QUERY_PARAM )); qB.append( pollName ); final int limit = 500; qB.append( UriComponent.encode( "]]|?Display title|limit=", UriComponent.Type.QUERY_PARAM )); qB.append( limit ); qB.append( "&format=json" ); // JSON as XML element names are malformed (SMW 1.7.1) final URL queryURL = new URL( qB.toString() ); logger.fine( "querying pollwiki for position properties: " + queryURL ); final Spool spool = new Spool1(); try { final JsonReader in = MediaWiki.requestJSON( queryURL.openConnection(), spool ); if( in.peek() != JsonToken.BEGIN_OBJECT ) break pass6; in.beginObject(); if( in.peek() != JsonToken.NAME || !in.nextName().equals("query") || in.peek() != JsonToken.BEGIN_OBJECT ) break pass6; in.beginObject(); for( ;; ) // find the results { if( in.peek() != JsonToken.NAME ) break pass6; if( in.nextName().equals( "results" )) break; in.skipValue(); } if( in.peek() != JsonToken.BEGIN_OBJECT ) break pass6; in.beginObject(); results: while( in.peek() != JsonToken.END_OBJECT ) { if( in.peek() != JsonToken.NAME ) { in.skipValue(); continue; } final String pageName = in.nextName(); if( in.peek() != JsonToken.BEGIN_OBJECT ) break results; in.beginObject(); page: while( in.peek() != JsonToken.END_OBJECT ) { if( in.peek() != JsonToken.NAME ) { in.skipValue(); continue; } String n = in.nextName(); if( "printouts".equals(n) && in.peek() == JsonToken.BEGIN_OBJECT ) { in.beginObject(); printouts: while( in.peek() != JsonToken.END_OBJECT ) { if( in.peek() != JsonToken.NAME ) { in.skipValue(); continue; } n = in.nextName(); if( "Display title".equals(n) && in.peek() == JsonToken.BEGIN_ARRAY ) { in.beginArray(); if( in.peek() == JsonToken.STRING ) { final String displayTitle = in.nextString(); final MatchResult m = MediaWiki.parsePageNameS( pageName ); if( m != null ) { CountNodeW node = null; try { node = countTablePV.get( IDPair.toInternetAddress(m.group(2)).getAddress() ); } catch( final AddressException x ) { logger.config( "skipping position page with malformed username: " + pageName ); } if( node != null ) // if a participant in this poll { node.setDisplayTitle( displayTitle ); node.write(); } } } while( in.peek() != JsonToken.END_ARRAY ) in.skipValue(); // just the one value expected, ignore any others in.endArray(); } else in.skipValue(); } in.endObject(); // printouts } else in.skipValue(); } in.endObject(); // page } in.endObject(); // results while( in.peek() != JsonToken.END_OBJECT ) in.skipValue(); in.endObject(); // query while( in.peek() != JsonToken.END_OBJECT ) { if( in.peek() == JsonToken.NAME ) { if( in.nextName().startsWith( "query-continue" )) { // expect "query-continue" if continuation not supported // (e.g. SMW 1.7.1), or "query-continue-offset" if it is // supported (1.8+ I think, which we don't have yet) logger.warning( "continuation code needed, exceeded SMW query limit: " + limit ); break; } } else in.skipValue(); } } finally{ spool.unwind(); } } // Pass 7. Rank candidates and construct the formal count. // - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - if( isVerbose ) System.out.print( "7" ); final class Finalizer implements CountNodeW.Runner { long baseCandidateCount; long candidateCount; long holdVolume; long rankCount; long castVolume; private long n; // node count private long receiveVolumeLast = Long.MAX_VALUE; private long rank; public void run( final CountNodeW node ) { // rank candidates // ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` if( node.receiveVolume() < receiveVolumeLast ) { ++rankCount; rank = n + 1; assert !(n == 0L && rank != 1L): "top rank is 1"; receiveVolumeLast = node.receiveVolume(); } else assert node.receiveVolume() == receiveVolumeLast: "nodes are sorted"; node.setRank( rank ); node.setRankIndex( n ); // accumulate other count data // ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` if( node.isCandidate() ) { ++candidateCount; if( node.isCycler() || !node.isCast() ) { assert node.isBaseCandidate(); ++baseCandidateCount; } } castVolume += node.castVolume(); holdVolume += node.holdVolume(); // - - - ++n; try{ node.write(); } catch( final SQLException x ) { throw new RuntimeException( x ); } } }; final Finalizer f = new Finalizer(); countTablePV.run( CountNodeW.RECEIVE_COUNT_ORDER_CLAUSE, f ); assert f.castVolume == f.holdVolume; return new Count( pollName, ReadyDirectory.this, f.baseCandidateCount, f.candidateCount, f.castVolume, f.rankCount ); } /** @param r the reader having just read the 'in' start element. */ private void verifyServiceName( final PollService poll, final XMLStreamReader r ) throws XMLStreamException { final String serviceName = r.getAttributeValue( /*namespaceURI*/null, "serviceName" ); if( !poll.name().equals( serviceName )) { throw new VotorolaRuntimeException( "wrong service name '" + serviceName + "' at " + LocationX.toString(r.getLocation()) ); } } // ==================================================================================== /** A cached view of a count table, restricted to a particular poll. */ private static @ThreadRestricted final class CountTablePVC extends CountTable.PollView implements CacheControlledTable { // cf. CR_Vote.CountTablePVC private CountTablePVC( CountTable t, String serviceName ) throws IOException { t.super( serviceName ); } private final LinkedHashMap cache; { final float loadFactor = 0.75f; final int maximumOvergrow = 100; // in capacity, guess cache = new LinkedHashMap( /*initial capacity*/(int)((skimmedSize + maximumOvergrow) / loadFactor), loadFactor, /*accessOrder*/true ); } private static final int skimmedSize = 1000; // to which it returns after skimming // -------------------------------------------------------------------------------- public @Override CountNodeW get( final String voterEmail ) throws SQLException, XMLStreamException { CountNodeW node = cache.get( voterEmail ); // if( node == null && !cache.containsKey( voterEmail )) //// no need of null values if( node == null ) { node = super.get( voterEmail ); if( node != null ) cache.put( voterEmail, node ); } return node; } public @Override CountNodeW getOrCreate( final String voterEmail ) throws SQLException, XMLStreamException { final CountNodeW node = super.getOrCreate( voterEmail ); if( node instanceof CountNodeIC ) cache.put( voterEmail, node ); return node; } // - C a c h e - C o n t r o l l e d - T a b l e ---------------------------------- public void skimFlush() throws SQLException { if( cache.size() <= skimmedSize ) return; final Iterator nodeIterator = cache.values().iterator(); for( ;; ) { CountNodeW node = nodeIterator.next(); if( node != null ) node.write(); // null guard probably redundant nodeIterator.remove(); if( cache.size() <= skimmedSize ) break; } } public void writeAll() throws SQLException { for( CountNodeW node: cache.values() ) { if( node != null ) node.write(); // null guard probably redundant } } } // ==================================================================================== private static class NodeThrower implements CountNodeW.Runner { NodeThrower( PollService _poll, Random _randomizer ) { poll = _poll; randomizer = _randomizer; } private final CountNodeW[] dartBoard = new CountNodeW[DART_SECTOR_MAX]; private int nodeCount; private final PollService poll; private final void set( final CountNodeW node, final int newSector ) throws SQLException { node.setDartSector( newSector ); node.write(); final Vote vote = new Vote( node.person(), poll.voterInputTable() ); vote.setDartSector( newSector ); // persist try{ vote.write( poll.voterInputTable(), /*userSession*/null, /*toForce*/true ); } catch( final VoterInputTable.BadInputException x ) { throw new RuntimeException( x ); } } private final Random randomizer; // -------------------------------------------------------------------------------- /** Answers whether the node was already sectored as a base candidate. */ boolean wasSectoredAsBase( final CountNodeW node ) { return false; } final void writeDartBoard() throws SQLException { for( int s = 0; s < DART_SECTOR_MAX; ++s ) { final CountNodeW node = dartBoard[s]; if( node == null ) continue; final int sector = s + 1; if( node.dartSector() == sector ) continue; set( node, sector ); } } // - C o u n t - N o d e . R u n n e r -------------------------------------------- public final void run( final CountNodeW newNode ) { final int sector = newNode.dartSector(); // 0, 1..20 if( nodeCount >= DART_SECTOR_MAX ) // board is full { if( sector != 0 ) { if( wasSectoredAsBase( newNode )) throw new IllegalStateException(); // if in top 20 on end board, then in top 20 on all upstream boards try { set( newNode, 0 ); } catch( final Exception x ) { throw VotorolaRuntimeException.castOrWrapped( x ); } } } else // throw onto board { int s; // 0..19 if( sector == 0 ) { if( wasSectoredAsBase( newNode )) return; // per CountNodeW.dartSector(), a cycler who does not // fit on end board will appear nowhere s = randomizer.nextInt( DART_SECTOR_MAX ); } else s = sector - 1; CountNodeW oldNode = dartBoard[s]; if( oldNode == null ) dartBoard[s] = newNode; else // collision, one of the nodes must be displaced { final CountNodeW nodeToDisplace; if( wasSectoredAsBase( newNode )) { assert !wasSectoredAsBase( oldNode ); // cyclers cannot be co-voters nodeToDisplace = oldNode; } else if( wasSectoredAsBase( oldNode )) { assert !wasSectoredAsBase( newNode ); // " nodeToDisplace = newNode; } else if( newNode.getTime() > oldNode.getTime() ) nodeToDisplace = newNode; else { dartBoard[s] = newNode; nodeToDisplace = oldNode; } s = randomizer.nextInt( DART_SECTOR_MAX ); while( dartBoard[s] != null ) // find next open sector { ++s; if( s == DART_SECTOR_MAX ) s = 0; } dartBoard[s] = nodeToDisplace; } } ++nodeCount; } } }