parser.add_argument( "--retries", type=int, default=0, help="Number of retries on failure (default: 0)" )
@staticmethod def run(cmd: List[str], timeout: int, env: Optional[Dict] = None) -> Tuple[int, str, str]: """Run command, return (exit_code, stdout, stderr).""" try: result = subprocess.run( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=timeout, env=env if env else os.environ.copy(), text=True ) return result.returncode, result.stdout, result.stderr except subprocess.TimeoutExpired: return -1, "", f"Command timed out after timeout seconds" except Exception as e: return -2, "", str(e) def worker(task_id: int, args_template: List[str], replacements: Dict[str, str], timeout: int, retries: int, env: Dict) -> TaskResult: """ Worker function executed in a separate process. Builds command by replacing placeholders and runs it. """ start_time = time.time()
def _log_result(self, result: TaskResult): """Log individual task result.""" status = "✓" if result.success else "✗" self.logger.info( f"status Task result.task_id - " f"Duration: result.duration:.2fs - " f"Retries: result.retries" ) if self.args.verbose: if result.stdout: self.logger.debug(f"Task result.task_id STDOUT:\nresult.stdout") if result.stderr: self.logger.debug(f"Task result.task_id STDERR:\nresult.stderr")
formatter = logging.Formatter( '%(asctime)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) # Console handler console = logging.StreamHandler(sys.stdout) console.setLevel(logging.DEBUG if verbose else logging.INFO) console.setFormatter(formatter) self.logger.addHandler(console) # File handler (if specified) if log_file: file_handler = logging.FileHandler(log_file) file_handler.setLevel(logging.DEBUG) file_handler.setFormatter(formatter) self.logger.addHandler(file_handler)