<?php 
/* 
 * This deamon monitors a directory for files, and will process each file in 
 * its own thread. 
 * 
 * Sample daemon using Benoit Perroud's MultiThreaded Daemon (MTD) 
 *     See http://code.google.com/p/phpmultithreadeddaemon/  
 *    and http://phpmultithreaddaemon.blogspot.com/ for more information 
 * 
 * Modifications by Daniel Kadosh, Affinegy Inc., June 2009 
 * Made many enhancements for robustness: 
 * - Truly daemonized by starting process group with posix_setsid() 
 * - Surrounded key items in try/catch for proper logging 
 * - Handling of SIGTERM/ SIGQUIT = no more children spawned, wait for all threads to die. 
 * - Handling of SIGHUP = call to loadConfig() method 
 * - Created PID file to ensure only 1 copy of the daemon is running 
 * - Full cleanup of semaphores & PID file on exit 
 * 
 */ 
error_reporting(E_ALL); 
require_once 'class.MTDaemon.php'; 
 
// Set number of threads for daemon 
$nThreads = 2; 
 
// Optional: Init logging class 
$sLogfile = $argv[0].'.log'; // Set to null for logging to stdout 
MTLog::getInstance($sLogfile)->setVerbosity(MTLog::INFO); 
 
// Directory names 
$sBaseDir = dirname(__FILE__); 
$aDir['queue'] = "$sBaseDir/_queue"; // incoming files 
$aDir['done'] = "$sBaseDir/_done";   // files parsed OK 
$aDir['error'] = "$sBaseDir/_error"; // files not parsed OK 
 
////////////////// For DEMO: populate _queue directory 
$sQueuePath = $aDir['queue']; 
if ( !file_exists($sQueuePath) ) mkdir($sQueuePath, 0775, true); 
if ( !is_dir($sQueuePath) ) die('This should be a directory: '.$sQueuePath); 
$sThisDir = dirname(__FILE__); 
echo `cp $sThisDir/*.php $sQueuePath`; 
////////////////// For DEMO: populate _queue directory 
 
class MTFileParser extends MTDaemon { 
    /* 
     * Optional: read in config files or perform other "init" actions 
     * Called from MTDaemon's _prerun(), plus when a SIGHUP signal is received. 
     * NOTE: Runs under the PARENT process. 
     */ 
    public function loadConfig() { 
        // Already locked from within handle() loop, so don't lock/unlock. 
 
        // Could load $aDir settings from a config file, but just using the global's setting here 
        global $aDir; 
        foreach ( $aDir as $sPath ) { 
            if ( !file_exists($sPath) ) mkdir($sPath, 0775, true); 
        } 
        $this->setVar('aDir', $aDir); 
 
        global $argv; 
        MTLog::getInstance()->info($argv[0].': Monitoring Queue dir='.$aDir['queue']); 
    } 
 
    /*  
     * Function to return quickly with (a) no work to do, or (b) next file to process 
     * NOTE: Runs under the PARENT process 
     * Could keep open DB connection to affinegy_master, and figure out $build here 
     */ 
    public function getNext($slot) { 
        $sFileToProcess = null; 
 
        // Get shared arrays 
        $this->lock(); 
        $aDir = $this->getVar('aDir'); 
        $aFileList = $this->getVar('aFileList'); // FIFO queue 
        $aFilesInProcess = $this->getVar('aFilesInProcess'); 
        if ( $aFilesInProcess==null ) $aFilesInProcess = array(); 
 
        // Use ls to get files, only if the $aFileList queue is empty 
        if ( !$aFileList && is_dir($aDir['queue']) ) { 
            $sCmd = 'ls -1 '.$aDir['queue']; 
            exec($sCmd, $aFileList); 
            // Take out files already in process 
            if ( count($aFileList) && count($aFilesInProcess) ) { 
                $aFileList = array_diff($aFileList, $aFilesInProcess); 
            } 
        } 
 
        // Pull out a file to process from FIFO queue 
        if ( count($aFileList) ) { 
            $sFileToProcess = array_shift($aFileList); 
            array_push($aFilesInProcess, $sFileToProcess); 
        } 
 
        // Store shared arrays back 
        $this->setVar('aFileList', $aFileList); 
        $this->setVar('aFilesInProcess', $aFilesInProcess); 
        $this->unlock(); 
        if ( !$sFileToProcess ) return null; 
        return $aDir['queue'].'/'.$sFileToProcess; // $sFileName in run() method below 
    } 
 
    /*  
     * Do main work here. 
     * NOTE: Runs under a CHILD process 
     */ 
    public function run($sFileName, $slot) { 
        $sMsg = 'slot='.$slot.' file='.basename($sFileName); 
        MTLog::getInstance()->info('## Start '.$sMsg); 
        try { 
            // Call parsing function that returns TRUE if OK, FALSE if not. 
            // Could instantiate an object of a file-parsing class and run it instead... 
            $bProcessedOK = ParseFile($sFileName); 
        } catch( Exception $e ) { 
            MTLog::getInstance()->error('ProcessFile error '.$sMsg.': '.$e->getMessage()); 
            $bProcessedOK = false; 
        } 
 
        // Done, take file off "in-process" list 
        $this->lock(); 
        $aFilesInProcess = $this->getVar('aFilesInProcess'); 
        $nFileToRemove = array_search(basename($sFileName), $aFilesInProcess); 
        unset($aFilesInProcess[$nFileToRemove]); 
        $this->setVar('aFilesInProcess', $aFilesInProcess); 
 
        // Move file out of queue directory 
        $aDir = $this->getVar('aDir'); 
        $sDestDir = ($bProcessedOK ? $aDir['done'] : $aDir['error']); 
        $sDestFile = $sDestDir.'/'.basename($sFileName); 
        rename($sFileName, $sDestFile); 
        $this->unlock(); 
 
        MTLog::getInstance()->info('-- End '.$sMsg); 
        return 0; 
    } 
} 
 
// For demo purposes, function that actually does the work on a file 
function ParseFile($sFileName) { 
    $sContent = file_get_contents($sFileName); 
    if ( !$sContent ) return false; 
 
    $nRand = rand(10, 20); 
    MTLog::getInstance()->info('ParseFile() read '.strlen($sContent).' bytes from file '.$sFileName.' |pausing '.$nRand.' seconds'); 
    sleep($nRand); 
    return true; 
} 
 
// Run daemon, start threads 
try { 
    $mttest = null; 
    $mttest = new MTFileParser($nThreads); // Init 
    $mttest->handle();                     // Run threads 
} catch( Exception $e ) { 
    if ( $mttest==null ) { 
        $sErr = $argv[0].': Daemon failed to start: '.$e->getMessage(); 
    } else { 
        $sErr = $argv[0].': Daemon died: '.$e->getMessage(); 
    } 
    MTLog::getInstance()->error($sErr); 
    die($sErr."\n"); 
} 
 
?> 
 
 |