Add proper error handling to WorkerCLICommand with --threads > 1
authorTim Düsterhus <duesterhus@woltlab.com>
Fri, 14 Aug 2020 09:39:29 +0000 (11:39 +0200)
committerTim Düsterhus <duesterhus@woltlab.com>
Fri, 14 Aug 2020 09:39:29 +0000 (11:39 +0200)
Fixes #3507

wcfsetup/install/files/lib/system/cli/command/WorkerCLICommand.class.php

index 45df1ebb6e10d108837249b30808b0d5327789c3..5601f48c478d87d61cbc287c282020312bbe0313 100644 (file)
@@ -7,6 +7,7 @@ use wcf\system\io\File;
 use wcf\system\Regex;
 use wcf\util\CLIUtil;
 use wcf\util\DirectoryUtil;
+use wcf\util\JSON;
 use wcf\util\StringUtil;
 use Zend\Console\Exception\RuntimeException as ArgvException;
 use Zend\Console\Getopt as ArgvParser;
@@ -144,23 +145,39 @@ class WorkerCLICommand implements IArgumentedCLICommand {
                if (!empty($_ENV['WCF_WORKER_STATUS_FD'])) {
                        $output = new File("php://fd/".$_ENV['WCF_WORKER_STATUS_FD'], "w");
                }
-               for ($i = ($threadId !== null ? ($threadId + 1) : 0); $progress < 100; $i += $threads) {
-                       $worker->setLoopCount($i);
-                       $worker->validate();
-                       
-                       // execute worker
-                       $worker->execute();
-                       $worker->finalize();
-                       
-                       // update progress
-                       $progress = $worker->getProgress();
-                       $progressbar->update($progress);
+               try {
+                       for ($i = ($threadId !== null ? ($threadId + 1) : 0); $progress < 100; $i += $threads) {
+                               $worker->setLoopCount($i);
+                               $worker->validate();
+                               
+                               // execute worker
+                               $worker->execute();
+                               $worker->finalize();
+                               
+                               // update progress
+                               $progress = $worker->getProgress();
+                               $progressbar->update($progress);
+                               if ($output) {
+                                       $output->write(JSON::encode([
+                                               'iteration' => $i,
+                                               'progress' => $progress,
+                                       ])."\n");
+                               }
+                       }
+               }
+               catch (\Exception $e) {
                        if ($output) {
-                               $output->write("$i,$progress\n");
+                               $output->write(JSON::encode([
+                                       'error' => (string) $e,
+                               ]));
                        }
+                       throw $e;
                }
                if ($output) {
-                       $output->write("done,$progress");
+                       $output->write(JSON::encode([
+                               'finished' => true,
+                               'progress' => $progress,
+                       ])."\n");
                        $output->close();
                }
                $progressbar->update($progress);
@@ -192,8 +209,10 @@ class WorkerCLICommand implements IArgumentedCLICommand {
                
                // Invoke the worker processes with the same command line ...
                $arguments = $_SERVER['argv'];
-               // ... with the quiet argument.
+               // ... with the quiet argument ...
                $arguments[] = '-qqqqq';
+               // ... and requesting an unclean exit of an Exception is thrown.
+               $arguments[] = '--exitOnFail';
                $commandLine = PHP_BINARY.' '.implode(' ', array_map('escapeshellarg', $arguments));
                
                Log::debug('Using "'.$commandLine.'" as the worker command line.');
@@ -209,117 +228,159 @@ class WorkerCLICommand implements IArgumentedCLICommand {
                        'WCF_SESSION_ID' => CLIWCF::getSession()->sessionID,
                ]);
                
-               // 1) Spawn the processes.
-               $processes = [];
-               for ($threadId = 0; $threadId < $threads; $threadId++) {
-                       $process = proc_open($commandLine, $fileDescriptors, $pipes, null, $env, null);
-                       
-                       $consoleBar = new ConsoleProgressBar([
-                               'width' => CLIWCF::getTerminal()->getWidth()
-                       ]);
-                       $consoleBar->setElements([
-                               ConsoleProgressBar::ELEMENT_PERCENT,
-                               ConsoleProgressBar::ELEMENT_BAR,
-                               ConsoleProgressBar::ELEMENT_ETA,
-                               ConsoleProgressBar::ELEMENT_TEXT,
-                       ]);
-                       $consoleBar->setTextWidth(30);
-                       $progressbar = new ProgressBar($consoleBar);
-                       $progressbar->update(0, 'T'.$threadId.': spawned');
-                       echo "\n";
-                       
-                       $processes[] = [
-                               'threadId' => $threadId,
-                               'pipes' => $pipes,
-                               'process' => $process,
-                               'progressbar' => $progressbar,
-                       ];
-               }
-               
-               // 2) Start processing in all processes.
-               foreach ($processes as $processData) {
-                       $workerCommand = 'worker --threads='.$threads.' --threadId='.$processData['threadId'].' "'.addcslashes(get_class($worker), "\\\"");
-                       fwrite($processData['pipes'][0], $workerCommand."\n");
-                       fclose($processData['pipes'][0]);
-               }
-               
-               // 3) Handle their status output.
-               while (true) {
-                       // 3.1) Check which processes' status FD is readable.
-                       $read = array_filter(array_map(function ($processData) {
-                               return $processData['pipes'][3];
-                       }, $processes), function ($handle) {
-                               return $handle !== false;
-                       });
-                       $write = null;
-                       $except = null;
-                       
-                       // 3.2) Exit if all status FDs have been closed.
-                       if (empty($read)) {
-                               break;
+               try {
+                       // 1) Spawn the processes.
+                       $processes = [];
+                       for ($threadId = 0; $threadId < $threads; $threadId++) {
+                               $process = proc_open($commandLine, $fileDescriptors, $pipes, null, $env, null);
+                               
+                               $consoleBar = new ConsoleProgressBar([
+                                       'width' => CLIWCF::getTerminal()->getWidth()
+                               ]);
+                               $consoleBar->setElements([
+                                       ConsoleProgressBar::ELEMENT_PERCENT,
+                                       ConsoleProgressBar::ELEMENT_BAR,
+                                       ConsoleProgressBar::ELEMENT_ETA,
+                                       ConsoleProgressBar::ELEMENT_TEXT,
+                               ]);
+                               $consoleBar->setTextWidth(30);
+                               $progressbar = new ProgressBar($consoleBar);
+                               $progressbar->update(0, 'T'.$threadId.': spawned');
+                               echo "\n";
+                               
+                               $processes[] = [
+                                       'threadId' => $threadId,
+                                       'pipes' => $pipes,
+                                       'process' => $process,
+                                       'progressbar' => $progressbar,
+                               ];
                        }
                        
-                       stream_select($read, $write, $except, 2, null);
+                       // 2) Start processing in all processes.
+                       foreach ($processes as $processData) {
+                               $workerCommand = 'worker --threads='.$threads.' --threadId='.$processData['threadId'].' "'.addcslashes(get_class($worker), "\\\"");
+                               fwrite($processData['pipes'][0], $workerCommand."\n");
+                               fclose($processData['pipes'][0]);
+                       }
                        
-                       // 3.3) Rerender the progressbars with the updated status information.
-                       echo "\033[".$threads."A"; // Move up $threads lines to move into the line of the first progress bar.
-                       foreach ($processes as &$processData) {
-                               $status = proc_get_status($processData['process']);
-                               if ($status === false) {
-                                       throw new \LogicException('Unreachable');
+                       // 3) Handle their status output.
+                       while (true) {
+                               // 3.1) Check which processes' status FD is readable.
+                               $read = array_filter(array_map(function ($processData) {
+                                       return $processData['pipes'][3];
+                               }, $processes), function ($handle) {
+                                       return $handle !== false;
+                               });
+                               $write = null;
+                               $except = null;
+                               
+                               // 3.2) Exit if all status FDs have been closed.
+                               if (empty($read)) {
+                                       break;
                                }
                                
-                               $line = false;
-                               if ($processData['pipes'][3] !== false) {
-                                       if (in_array($processData['pipes'][3], $read)) {
-                                               $line = fgets($processData['pipes'][3]);
+                               stream_select($read, $write, $except, 2, null);
+                               
+                               // 3.3) Rerender the progressbars with the updated status information.
+                               echo "\033[".$threads."A"; // Move up $threads lines to move into the line of the first progress bar.
+                               $cursorOffset = -$threads;
+                               foreach ($processes as &$processData) {
+                                       $status = proc_get_status($processData['process']);
+                                       if ($status === false) {
+                                               throw new \LogicException('Unreachable');
                                        }
                                        
-                                       if (feof($processData['pipes'][3])) {
-                                               fclose($processData['pipes'][3]);
-                                               $processData['pipes'][3] = false;
+                                       $line = false;
+                                       if ($processData['pipes'][3] !== false) {
+                                               if (in_array($processData['pipes'][3], $read)) {
+                                                       $line = fgets($processData['pipes'][3]);
+                                               }
+                                               
+                                               if (feof($processData['pipes'][3])) {
+                                                       fclose($processData['pipes'][3]);
+                                                       $processData['pipes'][3] = false;
+                                               }
                                        }
-                               }
-                               
-                               $statusPrefix = 'T'.$processData['threadId'].': ';
-                               if ($line) {
-                                       // If a line could be read we update the progressbar with the data sent.
-                                       list($iteration, $progress) = explode(',', StringUtil::trim($line));
-                                       if ($iteration === 'done') {
-                                               $processData['progressbar']->update($progress, $statusPrefix.'finished');
+                                       
+                                       $statusPrefix = 'T'.$processData['threadId'].': ';
+                                       if ($line) {
+                                               // If a line could be read we update the progressbar with the data sent.
+                                               $parsedLine = JSON::decode(StringUtil::trim($line));
+                                               
+                                               if (!empty($parsedLine['error'])) {
+                                                       fwrite(STDERR, str_repeat("\n", -$cursorOffset + 1));
+                                                       fwrite(STDERR, str_repeat("=", 20)."\n");
+                                                       fwrite(STDERR, "Error in thread ".$processData['threadId']."\n");
+                                                       fwrite(STDERR, $parsedLine['error']."\n");
+                                                       fwrite(STDERR, str_repeat("=", 20)."\n");
+                                                       fwrite(STDERR, str_repeat("\n", $threads + $cursorOffset + 1));
+                                               }
+                                               else if (isset($parsedLine['finished'])) {
+                                                       $processData['progressbar']->update($parsedLine['progress'], $statusPrefix.'finished');
+                                               }
+                                               else {
+                                                       $processData['progressbar']->update($parsedLine['progress'], $statusPrefix.'loop#'.$parsedLine['iteration']);
+                                               }
+                                       }
+                                       else if (!$status['running']) {
+                                               // If the process exited we update the text status to indicate so.
+                                               $processData['progressbar']->update(null, $statusPrefix.'exited');
                                        }
                                        else {
-                                               $processData['progressbar']->update($progress, $statusPrefix.'loop#'.$iteration);
+                                               // Otherwise just rerender the bar without changing anything.
+                                               $processData['progressbar']->update();
+                                       }
+                                       echo "\n";
+                                       $cursorOffset++;
+                                       
+                                       // Check the exit code after processing the status line, to allow for dumping the error message.
+                                       if (!$status['running'] && $status['exitcode'] != -1) {
+                                               if ($status['exitcode']) {
+                                                       throw new \Exception('Unclean exit of thread '.$processData['threadId'].' detected. Exiting.');
+                                               }
                                        }
                                }
-                               else if (!$status['running']) {
-                                       // If the process exited we update the text status to indicate so.
-                                       $processData['progressbar']->update(null, $statusPrefix.'exited');
+                               unset($processData);
+                       }
+                       
+                       // 4) Make sure to update the text status to 'exited' for every process to not confuse the user.
+                       echo "\033[".$threads."A";
+                       foreach ($processes as $processData) {
+                               $status = proc_get_status($processData['process']);
+                               if ($status === false) {
+                                       throw new \LogicException('Unreachable');
+                               }
+                               
+                               if (!$status['running']) {
+                                       $processData['progressbar']->update(null, 'T'.$processData['threadId'].': exited');
                                }
                                else {
-                                       // Otherwise just rerender the bar without changing anything.
-                                       $processData['progressbar']->update();
+                                       throw new \LogicException('Unreachable');
                                }
                                echo "\n";
                        }
-                       unset($processData);
                }
-               
-               // 4) Make sure to update the text status to 'exited' for every process to not confuse the user.
-               echo "\033[".$threads."A";
-               foreach ($processes as &$processData) {
-                       $status = proc_get_status($processData['process']);
-                       if ($status === false) {
-                               throw new \LogicException('Unreachable');
-                       }
-                       
-                       if (!$status['running']) {
-                               $processData['progressbar']->update(null, 'T'.$processData['threadId'].': exited');
-                       }
-                       else {
-                               throw new \LogicException('Unreachable');
+               finally {
+                       if ($cursorOffset) {
+                               // Move out of the progress bar area.
+                               echo str_repeat("\n", -$cursorOffset);
                        }
                        echo "\n";
+
+                       // Kill all remaining processes and check their status to not leave zombies.
+                       // There should not be any running processes at this point, except in case of
+                       // an unclean exit of one process
+                       foreach ($processes as $processData) {
+                               if (proc_get_status($processData['process'])['running']) {
+                                       echo "Killing thread ".$processData['threadId']."\n";
+                                       proc_terminate($processData['process'], 9);
+                                       $i = 0;
+                                       while (proc_get_status($processData['process'])['running']) {
+                                               usleep(100000);
+                                               if ($i++ > 3) echo "Waiting for exit\n";
+                                       }
+                               }
+                       }
                }
        }