File Source: ReferrerQueueManagerImpl.java

     1  /*
     2   * Licensed to the Apache Software Foundation (ASF) under one or more
     3   *  contributor license agreements.  The ASF licenses this file to You
     4   * under the Apache License, Version 2.0 (the "License"); you may not
     5   * use this file except in compliance with the License.
     6   * You may obtain a copy of the License at
     7   *
     8   *     http://www.apache.org/licenses/LICENSE-2.0
     9   *
    10   * Unless required by applicable law or agreed to in writing, software
    11   * distributed under the License is distributed on an "AS IS" BASIS,
    12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13   * See the License for the specific language governing permissions and
    14   * limitations under the License.  For additional information regarding
    15   * copyright in this work, please see the NOTICE file in the top level
    16   * directory of this distribution.
    17   */
    18  
    19  package org.apache.roller.weblogger.business.referrers;
    20  
    21  import java.util.ArrayList;
    22  import java.util.Collections;
    23  import java.util.HashMap;
    24  import java.util.Iterator;
    25  import java.util.List;
    26  import org.apache.commons.logging.Log;
    27  import org.apache.commons.logging.LogFactory;
    28  import org.apache.roller.weblogger.WebloggerException;
    29  import org.apache.roller.weblogger.business.Weblogger;
    30  import org.apache.roller.weblogger.business.runnable.ContinuousWorkerThread;
    31  import org.apache.roller.weblogger.business.runnable.WorkerThread;
    32  import org.apache.roller.weblogger.config.WebloggerConfig;
    33  
    34  
    35  /**
    36   * The base implementation of the ReferrerQueueManager.
    37   * 
    38   * This class is implemented using the singleton pattern to ensure that only
    39   * one instance exists at any given time.
    40   *
    41   * This implementation can be configured to handle referrers in 2 ways ...
    42   *  1. synchronously.  referrers are processed immediately.
    43   *  2. asynchronously.  referrers are queued for later processing.
    44   *
    45   * Users can control the referrer queue mode via properties in the static
    46   * roller.properties configuration file.
    47   *
    48   * In asynchronous processing mode we start some number of worker threads which
    49   * run continously to process any referrers that have been queued.  Each worker
    50   * processes queued referrers until the queue is empty, then sleeps for a given
    51   * amount of time.  The number of workers used and their sleep time can be set
    52   * via properties of the static roller.properties file.
    53   *
    54   * @author Allen Gilliland
    55   */
    56  @com.google.inject.Singleton
    57  public class ReferrerQueueManagerImpl implements ReferrerQueueManager {
    58      
             /* 
    P/P       *  Method: org.apache.roller.weblogger.business.referrers.ReferrerQueueManagerImpl__static_init
              * 
              *  Postconditions:
              *    init'ed(mLogger)
              */
    59      private static Log mLogger = LogFactory.getLog(ReferrerQueueManagerImpl.class);
    60      
    61      private final Weblogger roller;
    62      
    63      private boolean asyncMode = false;
    64      private int numWorkers = 1;
    65      private int sleepTime = 10000;
    66      private List workers = null;
    67      private List referrerQueue = null;
    68  
    69      
    70      // private because we are a singleton
    71      @com.google.inject.Inject
             /* 
    P/P       *  Method: void org.apache.roller.weblogger.business.referrers.ReferrerQueueManagerImpl(Weblogger)
              * 
              *  Preconditions:
              *    mLogger != null
              *    org/apache/roller/weblogger/config/WebloggerConfig.config != null
              *    org/apache/roller/weblogger/config/WebloggerConfig.log != null
              * 
              *  Presumptions:
              *    java.lang.Integer:parseInt(...)@101 in -2_147_483..4_294_967
              * 
              *  Postconditions:
              *    init'ed(this.asyncMode)
              *    this.numWorkers >= 1
              *    init'ed(this.referrerQueue)
              *    this.roller == roller
              *    init'ed(this.roller)
              *    this.sleepTime in -2_147_483_000..4_294_967_000
              *    this.workers in Addr_Set{null,&new ArrayList(ReferrerQueueManagerImpl#5)}
              *    new ArrayList(ReferrerQueueManagerImpl#5) num objects <= 1
              * 
              *  Test Vectors:
              *    java.lang.Integer:parseInt(...)@90: {1..232-1}, {-231..0}
              */
    72      protected ReferrerQueueManagerImpl(Weblogger roller) {
    73          
    74          mLogger.info("Instantiating Referrer Queue Manager");
    75          
    76          this.roller = roller;
    77  
    78          // lookup config options
    79          this.asyncMode = WebloggerConfig.getBooleanProperty("referrers.asyncProcessing.enabled");
    80          
    81          mLogger.info("Asynchronous referrer processing = "+this.asyncMode);
    82          
    83          if(this.asyncMode) {
    84              
    85              
    86              String num = WebloggerConfig.getProperty("referrers.queue.numWorkers");
    87              String sleep = WebloggerConfig.getProperty("referrers.queue.sleepTime");
    88              
    89              try {
    90                  this.numWorkers = Integer.parseInt(num);
    91                  
    92                  if(numWorkers < 1)
    93                      this.numWorkers = 1;
    94                  
    95              } catch(NumberFormatException nfe) {
    96                  mLogger.warn("Invalid num workers ["+num+"], using default");
    97              }
    98              
    99              try {
   100                  // multiply by 1000 because we expect input in seconds
   101                  this.sleepTime = Integer.parseInt(sleep) * 1000;
   102              } catch(NumberFormatException nfe) {
   103                  mLogger.warn("Invalid sleep time ["+sleep+"], using default");
   104              }
   105              
   106              // create the processing queue
   107              this.referrerQueue = Collections.synchronizedList(new ArrayList());
   108              
   109              // start up workers
   110              this.workers = new ArrayList();
   111              ContinuousWorkerThread worker = null;
   112              QueuedReferrerProcessingJob job = null;
   113              for(int i=0; i < this.numWorkers; i++) {
   114                  job = new QueuedReferrerProcessingJob();
   115                  worker = new ContinuousWorkerThread("ReferrerWorker"+i, job, this.sleepTime);
   116                  workers.add(worker);
   117                  worker.start();
   118              }
   119          }
   120      }
   121      
   122      
   123      /**
   124       * Process an incoming referrer.
   125       *
   126       * If we are doing asynchronous referrer processing then the referrer will
   127       * just go into the queue for later processing.  If not then we process it
   128       * now.
   129       */
   130      public void processReferrer(IncomingReferrer referrer) {
   131          
                 /* 
    P/P           *  Method: void processReferrer(IncomingReferrer)
                  * 
                  *  Preconditions:
                  *    init'ed(this.asyncMode)
                  *    (soft) mLogger != null
                  *    (soft) org/apache/roller/weblogger/business/WebloggerFactory.webloggerProvider != null
                  *    (soft) org/apache/roller/weblogger/business/WebloggerFactory.webloggerProvider.webloggerInstance != null
                  *    (soft) org/apache/roller/weblogger/business/jpa/JPARefererManagerImpl.log != null
                  *    (soft) org/apache/roller/weblogger/business/jpa/JPAUserManagerImpl.log != null
                  *    (soft) org/apache/roller/weblogger/business/jpa/JPAWeblogManagerImpl.log != null
                  *    (soft) org/apache/roller/weblogger/business/referrers/ReferrerProcessingJob.mLogger != null
                  *    (soft) org/apache/roller/weblogger/config/WebloggerRuntimeConfig.log != null
                  *    (soft) referrer != null
                  *    ...
                  * 
                  *  Test Vectors:
                  *    this.asyncMode: {0}, {1}
                  */
   132          if(this.asyncMode) {
   133              mLogger.debug("QUEUING: "+referrer.getRequestUrl());
   134              
   135              // add to queue
   136              this.enqueue(referrer);
   137          } else {
   138              // process now
   139              ReferrerProcessingJob job = new ReferrerProcessingJob();
   140              
   141              // setup input
   142              HashMap inputs = new HashMap();
   143              inputs.put("referrer", referrer);
   144              job.input(inputs);
   145              
   146              // execute
   147              job.execute();
   148              
   149              try {
   150                  // flush changes
   151                  roller.flush();
   152              } catch (WebloggerException ex) {
   153                  mLogger.error("ERROR commiting referrer", ex);
   154              }
   155          }
   156          
   157      }
   158      
   159      
   160      /**
   161       * Place a referrer in the queue.
   162       */
   163      public void enqueue(IncomingReferrer referrer) {
                 /* 
    P/P           *  Method: void enqueue(IncomingReferrer)
                  * 
                  *  Preconditions:
                  *    this.referrerQueue != null
                  *    (soft) mLogger != null
                  * 
                  *  Test Vectors:
                  *    java.util.List:size(...)@166: {-231..250}, {251..232-1}
                  */
   164          this.referrerQueue.add(referrer);
   165          
   166          if(this.referrerQueue.size() > 250) {
   167              mLogger.warn("Referrer queue is rather full. queued="+this.referrerQueue.size());
   168          }
   169      }
   170      
   171      
   172      /**
   173       * Retrieve the next referrer in the queue.
   174       */
   175      public synchronized IncomingReferrer dequeue() {
   176          
                 /* 
    P/P           *  Method: IncomingReferrer dequeue()
                  * 
                  *  Preconditions:
                  *    this.referrerQueue != null
                  * 
                  *  Postconditions:
                  *    init'ed(return_value)
                  * 
                  *  Test Vectors:
                  *    java.util.List:isEmpty(...)@177: {1}, {0}
                  */
   177          if(!this.referrerQueue.isEmpty()) {
   178              return (IncomingReferrer) this.referrerQueue.remove(0);
   179          }
   180          
   181          return null;
   182      }
   183      
   184      
   185      /**
   186       * clean up.
   187       */
   188      public void shutdown() {
   189          
                 /* 
    P/P           *  Method: void shutdown()
                  * 
                  *  Preconditions:
                  *    init'ed(this.workers)
                  *    (soft) mLogger != null
                  * 
                  *  Presumptions:
                  *    java.util.Iterator:next(...)@197 != null
                  * 
                  *  Test Vectors:
                  *    this.workers: Addr_Set{null}, Inverse{null}
                  *    java.util.Iterator:hasNext(...)@196: {0}, {1}
                  *    java.util.List:size(...)@190: {-231..0}, {1..232-1}
                  */
   190          if(this.workers != null && this.workers.size() > 0) {
   191              mLogger.info("stopping all ReferrerQueue worker threads");
   192              
   193              // kill all of our threads
   194              WorkerThread worker = null;
   195              Iterator it = this.workers.iterator();
   196              while(it.hasNext()) {
   197                  worker = (WorkerThread) it.next();
   198                  worker.interrupt();
   199              }
   200          }
   201          
   202      }
   203      
   204  }








SofCheck Inspector Build Version : 2.18479
ReferrerQueueManagerImpl.java 2009-Jan-02 14:25:06
ReferrerQueueManagerImpl.class 2009-Sep-04 03:12:31