private/Start-ParallelExecution.ps1
|
function Start-ParallelExecution { <# .SYNOPSIS Manages runspace pool for parallel file/batch processing. .DESCRIPTION Creates and manages a runspace pool to process multiple batches in parallel. Each runspace imports the module and calls Invoke-AITool recursively with -NoParallel to prevent nested parallelization. .PARAMETER Batches Array of batches, where each batch is an array of file paths. .PARAMETER ToolName The AI tool to use for processing. .PARAMETER PromptText The prompt text to use. .PARAMETER MaxThreads Maximum number of concurrent threads. .PARAMETER ContextFiles Array of static context file paths. .PARAMETER Model The model to use. .PARAMETER ReasoningEffort The reasoning effort level. .PARAMETER DisableRetry Whether to disable retry logic. .PARAMETER MaxRetryMinutes Maximum retry time in minutes. .PARAMETER SkipModified Whether to skip modified files. .PARAMETER BatchSize The batch size being used. .PARAMETER ContextFilter Optional scriptblock for dynamic context. .PARAMETER ContextFilterBase Base directories for context filter. .PARAMETER MaxErrors Maximum errors before bail-out. .PARAMETER MaxTokenErrors Maximum token errors before bail-out. .PARAMETER ModuleRoot The module root path for importing in runspaces. .PARAMETER ErrorCountRef Reference to error count variable. .PARAMETER TokenErrorCountRef Reference to token error count variable. .PARAMETER BailedOutRef Reference to bailed out flag. .OUTPUTS Results are output directly to the pipeline as they complete. .EXAMPLE $params = @{ Batches = $batches ToolName = "Claude" PromptText = "Review code" MaxThreads = 3 ModuleRoot = $ModuleRoot } Start-ParallelExecution @params #> [CmdletBinding()] param( [Parameter(Mandatory)] [array]$Batches, [Parameter(Mandatory)] [string]$ToolName, [Parameter(Mandatory)] [string]$PromptText, [Parameter()] [int]$MaxThreads = 3, [Parameter()] [string[]]$ContextFiles, [Parameter()] [string]$Model, [Parameter()] [string]$ReasoningEffort, [Parameter()] [switch]$DisableRetry, [Parameter()] [int]$MaxRetryMinutes = 240, [Parameter()] [switch]$SkipModified, [Parameter()] [int]$BatchSize = 1, [Parameter()] [scriptblock]$ContextFilter, [Parameter()] [string[]]$ContextFilterBase, [Parameter()] [int]$MaxErrors = 10, [Parameter()] [int]$MaxTokenErrors = 3, [Parameter(Mandatory)] [string]$ModuleRoot, [Parameter(Mandatory)] [ref]$ErrorCountRef, [Parameter(Mandatory)] [ref]$TokenErrorCountRef, [Parameter(Mandatory)] [ref]$BailedOutRef ) $totalFiles = ($Batches | ForEach-Object { $_.Count } | Measure-Object -Sum).Sum if ($BatchSize -gt 1) { Write-PSFMessage -Level Verbose -Message "Processing $totalFiles files in $($Batches.Count) batches in parallel (max $MaxThreads concurrent batches)" } else { Write-PSFMessage -Level Verbose -Message "Processing $totalFiles files in parallel (max $MaxThreads concurrent)" } $parallelStartTime = Get-Date $allDurations = [System.Collections.ArrayList]::new() # Get the module path for loading in runspaces $modulePsmPath = Join-Path $ModuleRoot "aitools.psm1" # Create runspace pool $pool = [RunspaceFactory]::CreateRunspacePool(1, $MaxThreads) $pool.ApartmentState = "MTA" $pool.Open() $runspaces = @() # Create scriptblock for parallel execution $scriptblock = { param( $ModulePath, $BatchFiles, $Prompt, $Tool, $Context, $Model, $ReasoningEffort, $DisableRetry, $MaxRetryMinutes, $SkipModified, $BatchSize, $ContextFilter, $ContextFilterBase ) # Set environment variables for LiteLLM $env:LITELLM_NUM_RETRIES = '0' # Import the module Import-Module $ModulePath -ErrorAction Stop # Build parameters for recursive call $params = @{ Path = $BatchFiles Prompt = $Prompt Tool = $Tool NoParallel = $true BatchSize = $BatchSize } if ($Context) { $params['Context'] = $Context } if ($Model) { $params['Model'] = $Model } if ($ReasoningEffort) { $params['ReasoningEffort'] = $ReasoningEffort } if ($DisableRetry) { $params['DisableRetry'] = $DisableRetry } if ($MaxRetryMinutes) { $params['MaxRetryMinutes'] = $MaxRetryMinutes } if ($SkipModified) { $params['SkipModified'] = $SkipModified } if ($ContextFilter) { $params['ContextFilter'] = $ContextFilter } if ($ContextFilterBase) { $params['ContextFilterBase'] = $ContextFilterBase } Invoke-AITool @params } try { # Create and start runspaces for each batch $batchIndex = 0 foreach ($batch in $Batches) { $batchIndex++ $batchFileNames = ($batch | ForEach-Object { [System.IO.Path]::GetFileName($_) }) -join ', ' Write-PSFMessage -Level Debug -Message "Queuing batch $batchIndex of $($Batches.Count) for parallel processing: $batchFileNames" $progressParams = @{ Activity = "Starting parallel processing with $ToolName" Status = "Queuing batch $batchIndex/$($Batches.Count) ($($batch.Count) file(s))" PercentComplete = ($batchIndex / $Batches.Count) * 100 } Write-Progress @progressParams $runspace = [PowerShell]::Create() $null = $runspace.AddScript($scriptblock) $null = $runspace.AddArgument($modulePsmPath) $null = $runspace.AddArgument($batch) $null = $runspace.AddArgument($PromptText) $null = $runspace.AddArgument($ToolName) $null = $runspace.AddArgument($ContextFiles) $null = $runspace.AddArgument($Model) $null = $runspace.AddArgument($ReasoningEffort) $null = $runspace.AddArgument($DisableRetry) $null = $runspace.AddArgument($MaxRetryMinutes) $null = $runspace.AddArgument($SkipModified) $null = $runspace.AddArgument($BatchSize) $null = $runspace.AddArgument($ContextFilter) $null = $runspace.AddArgument($ContextFilterBase) $runspace.RunspacePool = $pool $runspaces += [PSCustomObject]@{ Pipe = $runspace Status = $runspace.BeginInvoke() Batch = $batch Index = $batchIndex } } Write-PSFMessage -Level Verbose -Message "All runspaces started, waiting for completion and streaming results..." Write-Progress -Activity "Starting parallel processing with $ToolName" -Completed $processingActivity = if ($BatchSize -gt 1) { "Processing batches in parallel with $ToolName" } else { "Processing files in parallel with $ToolName" } Write-Progress -Activity $processingActivity -Status "Waiting for results..." -PercentComplete 0 # Poll runspaces and output results as they complete $completedBatchCount = 0 $totalBatches = $Batches.Count while ($runspaces.Count -gt 0 -and -not $BailedOutRef.Value) { foreach ($runspace in @($runspaces)) { if ($runspace.Status.IsCompleted) { try { $result = $runspace.Pipe.EndInvoke($runspace.Status) if ($result) { $completedBatchCount++ if ($BatchSize -gt 1) { $batchFileNames = ($runspace.Batch | ForEach-Object { [System.IO.Path]::GetFileName($_) }) -join ', ' Write-PSFMessage -Level Verbose -Message "Completed batch $completedBatchCount of $totalBatches ($($runspace.Batch.Count) files)" $progressParams = @{ Activity = $processingActivity Status = "Completed batch: $batchFileNames ($completedBatchCount/$totalBatches)" PercentComplete = ($completedBatchCount / $totalBatches) * 100 } } else { $fileName = [System.IO.Path]::GetFileName($runspace.Batch[0]) Write-PSFMessage -Level Verbose -Message "Completed $completedBatchCount of $totalBatches files" $progressParams = @{ Activity = $processingActivity Status = "Completed: $fileName ($completedBatchCount/$totalBatches)" PercentComplete = ($completedBatchCount / $totalBatches) * 100 } } Write-Progress @progressParams # Process results if ($result -is [array]) { foreach ($r in $result) { if ($r.Duration) { $null = $allDurations.Add($r.Duration.TotalSeconds) } # Check for errors if ($r.Success -eq $false) { $resultText = $r.Result | Out-String $trackingParams = @{ ResultText = $resultText Success = $false MaxErrors = $MaxErrors MaxTokenErrors = $MaxTokenErrors ErrorCount = $ErrorCountRef TokenErrorCount = $TokenErrorCountRef } $tracking = Update-ErrorTracking @trackingParams if ($tracking.ShouldBailOut) { $BailedOutRef.Value = $true } } $r # Output to pipeline } } else { if ($result.Duration) { $null = $allDurations.Add($result.Duration.TotalSeconds) } if ($result.Success -eq $false) { $resultText = $result.Result | Out-String $trackingParams = @{ ResultText = $resultText Success = $false MaxErrors = $MaxErrors MaxTokenErrors = $MaxTokenErrors ErrorCount = $ErrorCountRef TokenErrorCount = $TokenErrorCountRef } $tracking = Update-ErrorTracking @trackingParams if ($tracking.ShouldBailOut) { $BailedOutRef.Value = $true } } $result # Output to pipeline } } else { $completedBatchCount++ if ($BatchSize -gt 1) { $batchFileNames = ($runspace.Batch | ForEach-Object { [System.IO.Path]::GetFileName($_) }) -join ', ' Write-PSFMessage -Level Verbose -Message "Skipped batch (fresh check): $batchFileNames" } else { Write-PSFMessage -Level Verbose -Message "Skipped (fresh check): $($runspace.Batch[0])" } } } catch { $batchDesc = if ($BatchSize -gt 1) { "batch $($runspace.Index)" } else { $runspace.Batch[0] } Write-PSFMessage -Level Error -Message "Error retrieving result for $batchDesc : $_" } finally { $runspace.Pipe.Dispose() $runspaces = $runspaces | Where-Object { $_ -ne $runspace } } } } if ($runspaces.Count -gt 0) { Start-Sleep -Milliseconds 100 } } # Clean up remaining runspaces if bailed out if ($BailedOutRef.Value -and $runspaces.Count -gt 0) { Write-PSFMessage -Level Warning -Message "Cleaning up $($runspaces.Count) remaining runspace(s) after bail-out" foreach ($runspace in $runspaces) { try { $runspace.Pipe.Stop() $runspace.Pipe.Dispose() } catch { Write-PSFMessage -Level Debug -Message "Error disposing runspace: $_" } } } Write-PSFMessage -Level Verbose -Message "All parallel processing complete" Write-Progress -Activity $processingActivity -Completed # Report timing statistics $parallelEndTime = Get-Date $totalParallelTime = ($parallelEndTime - $parallelStartTime).TotalSeconds $totalSequentialTime = ($allDurations | Measure-Object -Sum).Sum $timeSaved = $totalSequentialTime - $totalParallelTime $percentSaved = if ($totalSequentialTime -gt 0) { ($timeSaved / $totalSequentialTime) * 100 } else { 0 } Write-PSFMessage -Level Verbose -Message "Parallel execution completed in $([Math]::Round($totalParallelTime, 1))s vs estimated sequential time of $([Math]::Round($totalSequentialTime, 1))s" if ($timeSaved -gt 0) { Write-PSFMessage -Level Verbose -Message "Time saved: $([Math]::Round($timeSaved, 1))s ($([Math]::Round($percentSaved, 1))% faster)" } } finally { # Clean up runspace pool if ($pool) { Write-PSFMessage -Level Verbose -Message "Cleaning up runspace pool" foreach ($runspace in $runspaces) { if ($runspace.Pipe) { try { $runspace.Pipe.Stop() $runspace.Pipe.Dispose() } catch { Write-PSFMessage -Level Debug -Message "Error disposing runspace: $_" } } } try { $pool.Close() $pool.Dispose() Write-PSFMessage -Level Verbose -Message "Runspace pool cleaned up successfully" } catch { Write-PSFMessage -Level Warning -Message "Error closing runspace pool: $_" } } } } |