Private/Swarm.ps1

# ══════════════════════════════════════════════════════════════════════════════
# SWARM ORCHESTRATION — Internals
# ══════════════════════════════════════════════════════════════════════════════
#
# Architecture
# ────────────
# Invoke-LLMSwarm runs an ORCHESTRATOR completion that decomposes the goal
# into a JSON task list. Each task runs in a RunspacePool that clones the
# user's session. Tasks can declare DependsOn, forming a DAG — the dispatcher
# releases each task only once all its dependencies have finished successfully.
#
# Thread communication uses a [ConcurrentDictionary[string,object]] shared
# across runspaces (same process, no serialization boundary):
# $script:SwarmShared["result::<taskId>"] = [LLMResponse] set by workers
# $script:SwarmShared["status::<taskId>"] = <status> running|done|failed

$script:SwarmShared = [System.Collections.Concurrent.ConcurrentDictionary[string,object]]::new()

# ── Render helpers (swarm-specific) ──────────────────────────────────────────

function script:Write-SwarmHeader {
    param([string]$Goal, [int]$TaskCount)
    $c = $script:C; $b = $script:Box; $w = script:Get-Width
    $label = " SWARM $($b.Bullet) $TaskCount tasks "
    $lp = $b.H * 3; $rp = $b.H * [Math]::Max(2, $w - $label.Length - 8)
    Write-Host ""
    Write-Host "$($c.Cyan)$lp$($c.Bold)$label$($c.Reset)$($c.Cyan)$rp$($c.Reset)"
    Write-Host " $($c.Silver)Goal:$($c.Reset) $($c.White)$Goal$($c.Reset)"
    Write-Host ""
}

function script:Write-SwarmTaskLine {
    param([PSCustomObject]$Task, [string]$Override='')
    $c = $script:C; $b = $script:Box
    $status = if ($Override) { $Override } else { $Task.Status }
    $icon = switch ($status) {
        'pending'  { "$($c.Slate)○$($c.Reset)" }
        'waiting'  { "$($c.Yellow)◔$($c.Reset)" }
        'running'  { "$($c.Cyan)◕$($c.Reset)" }
        'done'     { "$($c.Green)$($b.Tick)$($c.Reset)" }
        'failed'   { "$($c.Red)$($b.X)$($c.Reset)" }
        'skipped'  { "$($c.Silver)—$($c.Reset)" }
        default    { "$($c.Silver)?$($c.Reset)" }
    }
    $depStr = if ($Task.DependsOn.Count -gt 0) { " $($c.Slate)← $($Task.DependsOn -join ', ')$($c.Reset)" } else { '' }
    $elapsed = if ($Task.ElapsedSec -gt 0) { " $($c.Dim)$([math]::Round($Task.ElapsedSec,1))s$($c.Reset)" } else { '' }
    Write-Host " $icon $($c.Amber)$($Task.Id.PadRight(6))$($c.Reset) $($c.White)$($Task.Name)$($c.Reset)$depStr$elapsed"
}

function script:Write-SwarmSummary {
    param([PSCustomObject]$Result)
    $c = $script:C; $b = $script:Box; $w = script:Get-Width
    Write-Host ""
    script:Write-Rule -Label "SWARM COMPLETE" -Color $c.Cyan
    $done    = @($Result.Tasks | Where-Object Status -eq 'done').Count
    $failed  = @($Result.Tasks | Where-Object Status -eq 'failed').Count
    $skipped = @($Result.Tasks | Where-Object Status -eq 'skipped').Count
    Write-Host " $($c.Silver)Tasks $($c.Reset)$($c.Green)$done done$($c.Reset) $($c.Red)$failed failed$($c.Reset) $($c.Slate)$skipped skipped$($c.Reset)"
    Write-Host " $($c.Silver)Tokens $($c.Reset)$($Result.TotalTokens) $($c.Slate)(in: $($Result.InputTokens) out: $($Result.OutputTokens))$($c.Reset)"
    Write-Host " $($c.Silver)Wall time $($c.Reset)$([math]::Round($Result.TotalSec,2))s"
    Write-Host ""
    script:Write-Rule -Label "SYNTHESIS" -Color $c.Amber
    Write-Host ""
    script:Write-ResponseBox -Content $Result.Synthesis -Provider $Result.Provider `
        -Model $Result.Model -InputTokens 0 -OutputTokens 0 `
        -StopReason 'synthesis' -ElapsedSec 0
}

# ── Orchestrator: decompose goal into task list ───────────────────────────────

function script:Invoke-OrchestratorDecompose {
    param([string]$Goal, [string]$Provider, [string]$Model, [string]$Context, [int]$MaxTasks)

    $schema = @"
You are a task orchestrator. Decompose the goal into parallel sub-tasks for specialist LLM agents.

Rules:
- Emit ONLY a raw JSON array, no markdown, no explanation.
- Each element: { "id": "t1", "name": "<short label>", "prompt": "<full task prompt>", "dependsOn": [] }
- "id" must be unique short strings: t1, t2, t3 …
- "dependsOn" is an array of ids that must complete before this task starts. Use [] for tasks that can run immediately.
- Maximum $MaxTasks tasks. Prefer parallelism — only add dependencies when the task genuinely needs prior results.
- Each "prompt" must be fully self-contained — the worker agent has no other context.
- If a task needs the result of a prior task, say so explicitly in the prompt: "Given the result from task <id>: {{result::<id>}}, ..."
  The harness will substitute {{result::<id>}} with the actual JSON result before dispatching.

Context about the environment:
$Context

Goal: $Goal
"@


    Write-Verbose "Orchestrator decomposing goal into max $MaxTasks tasks ($Provider/$Model)"
    $resp = script:Invoke-ProviderCompletion -Provider $Provider -Model $Model `
        -SystemPrompt $schema -Messages @(@{role='user';content="Decompose this goal into tasks."}) `
        -MaxTokens 2048 -WithEnv $false

    $json = $resp.Content -replace '```json',''-replace '```','' -replace '(?s)^[^[\{]*','' | ForEach-Object { $_.Trim() }
    try {
        $tasks = $json | ConvertFrom-Json
        Write-Verbose "Orchestrator produced $(@($tasks).Count) task(s)"
        return @{ Tasks=$tasks; Tokens=$resp.TotalTokens; InputTokens=$resp.InputTokens; OutputTokens=$resp.OutputTokens }
    } catch {
        Write-Error "Orchestrator produced invalid JSON task list: $_" -ErrorAction Stop
    }
}

# ── Worker scriptblock — runs inside each RunspacePool runspace ───────────────

$script:WorkerBlock = {
    param(
        [string]$TaskId,
        [string]$TaskPrompt,
        [string]$Provider,
        [string]$Model,
        [System.Collections.Concurrent.ConcurrentDictionary[string,object]]$Shared
    )

    try {
        $Shared["status::$TaskId"] = 'running'
        $resp = Invoke-LLMAgent -Prompt $TaskPrompt -Provider $Provider -Model $Model -Quiet
        $Shared["result::$TaskId"] = $resp
        $Shared["status::$TaskId"] = 'done'
    } catch {
        $Shared["result::$TaskId"] = $_.ToString()
        $Shared["status::$TaskId"] = 'failed'
    }
}

# ── RunspacePool factory — clones the user's session for parallel workers ─────

function script:New-SwarmRunspacePool {
    param([int]$MaxRunspaces = 4)

    $iss = [System.Management.Automation.Runspaces.InitialSessionState]::CreateDefault2()
    $mods = @(Get-Module)
    foreach ($mod in $mods) {
        $iss.ImportPSModule($mod.Name)
    }
    foreach ($entry in [System.Environment]::GetEnvironmentVariables().GetEnumerator()) {
        $iss.EnvironmentVariables.Add(
            [System.Management.Automation.Runspaces.SessionStateVariableEntry]::new(
                $entry.Key, $entry.Value, ''))
    }
    $varCount = script:Import-GlobalVariables -ISS $iss

    $pool = [System.Management.Automation.Runspaces.RunspaceFactory]::CreateRunspacePool(
        1, $MaxRunspaces, $iss, $Host)
    $pool.Open()
    Write-Verbose "RunspacePool opened: max=$MaxRunspaces, $($mods.Count) modules cloned, $varCount global vars"
    $pool
}

# ── DAG dispatcher ────────────────────────────────────────────────────────────

function script:Invoke-SwarmDispatcher {
    param(
        [PSCustomObject[]]$Tasks,
        [string]$Provider,
        [string]$Model,
        [System.Collections.Concurrent.ConcurrentDictionary[string,object]]$Shared,
        [int]$MaxRunspaces = 4,
        [int]$PollMs = 400,
        [int]$TimeoutSec = 300
    )

    Write-Verbose "Swarm dispatcher: $($Tasks.Count) tasks, maxRunspaces=$MaxRunspaces, timeout=${TimeoutSec}s"
    $pool = script:New-SwarmRunspacePool -MaxRunspaces $MaxRunspaces

    $state = @{}
    foreach ($t in $Tasks) {
        $state[$t.id] = [PSCustomObject]@{
            Id         = $t.id
            Name       = $t.name
            Prompt     = $t.prompt
            DependsOn  = @($t.dependsOn)
            Status     = 'pending'
            Pipeline   = $null
            AsyncResult= $null
            StartedAt  = $null
            ElapsedSec = 0.0
            Result     = $null
            Error      = $null
        }
    }

    $startTime = [datetime]::UtcNow
    $allIds    = $state.Keys

    foreach ($t in $state.Values) { script:Write-SwarmTaskLine -Task $t }
    Write-Host ""

    try {
        do {
            $anyProgress = $false

            foreach ($id in $allIds) {
                $t = $state[$id]
                if ($t.Status -notin 'pending','waiting') { continue }

                $depFailed = @($t.DependsOn | Where-Object { $state[$_].Status -eq 'failed' -or $state[$_].Status -eq 'skipped' })
                if ($depFailed.Count -gt 0) {
                    $t.Status = 'skipped'
                    $t.Error  = "Skipped: dependency $($depFailed -join ', ') failed"
                    Write-Warning "Task $id skipped: dependency $($depFailed -join ', ') failed"
                    script:Write-SwarmTaskLine -Task $t
                    $anyProgress = $true
                    continue
                }

                $depsReady = ($t.DependsOn.Count -eq 0) -or
                    ($t.DependsOn | ForEach-Object { $state[$_].Status } | Where-Object { $_ -ne 'done' } | Measure-Object).Count -eq 0

                if (-not $depsReady) {
                    if ($t.Status -ne 'waiting') { $t.Status = 'waiting'; script:Write-SwarmTaskLine -Task $t }
                    continue
                }

                $prompt = $t.Prompt
                foreach ($depId in $t.DependsOn) {
                    $depObj = $Shared["result::$depId"]
                    if ($depObj -and $depObj.Content) {
                        $prompt = $prompt -replace "{{result::$depId}}", $depObj.Content
                    }
                }

                $t.Status    = 'running'
                $t.StartedAt = [datetime]::UtcNow
                Write-Verbose "Task $id dispatching: $($t.Name)"
                $ps = [PowerShell]::Create()
                $ps.RunspacePool = $pool
                $null = $ps.AddScript($script:WorkerBlock).
                    AddArgument($id).
                    AddArgument($prompt).
                    AddArgument($Provider).
                    AddArgument($Model).
                    AddArgument($Shared)
                $t.Pipeline    = $ps
                $t.AsyncResult = $ps.BeginInvoke()

                script:Write-SwarmTaskLine -Task $t
                $anyProgress = $true
            }

            foreach ($id in $allIds) {
                $t = $state[$id]
                if ($t.Status -ne 'running' -or $null -eq $t.Pipeline) { continue }

                if ($t.AsyncResult.IsCompleted) {
                    try { $t.Pipeline.EndInvoke($t.AsyncResult) } catch {}
                    $t.ElapsedSec = ([datetime]::UtcNow - $t.StartedAt).TotalSeconds
                    $t.Status     = $Shared["status::$id"] ?? 'failed'
                    $t.Result     = $Shared["result::$id"]
                    if ($t.Status -eq 'failed' -and $t.Result -is [string]) {
                        $t.Error = $t.Result
                        Write-Warning "Task $id failed ($([math]::Round($t.ElapsedSec,1))s): $($t.Error.Substring(0, [math]::Min(100, $t.Error.Length)))"
                    } else {
                        Write-Verbose "Task $id completed ($([math]::Round($t.ElapsedSec,1))s)"
                    }

                    $t.Pipeline.Dispose()
                    $t.Pipeline    = $null
                    $t.AsyncResult = $null

                    script:Write-SwarmTaskLine -Task $t
                    $anyProgress = $true
                }
            }

            $allDone = ($state.Values | Where-Object { $_.Status -notin 'done','failed','skipped' } | Measure-Object).Count -eq 0
            if (-not $allDone) { Start-Sleep -Milliseconds $PollMs }

            if (([datetime]::UtcNow - $startTime).TotalSeconds -gt $TimeoutSec) {
                Write-Warning "Swarm timed out after ${TimeoutSec}s — cancelling remaining tasks"
                script:Write-Status "Swarm timed out after ${TimeoutSec}s" 'warn'
                foreach ($id in $allIds) {
                    $t = $state[$id]
                    if ($t.Pipeline) {
                        $t.Pipeline.Stop()
                        $t.Pipeline.Dispose()
                        $t.Pipeline    = $null
                        $t.AsyncResult = $null
                        if ($t.Status -eq 'running') {
                            $t.Status    = 'failed'
                            $t.Error     = "Timed out after ${TimeoutSec}s"
                            $t.ElapsedSec= ([datetime]::UtcNow - $t.StartedAt).TotalSeconds
                            script:Write-SwarmTaskLine -Task $t
                        }
                    }
                    if ($t.Status -eq 'pending' -or $t.Status -eq 'waiting') {
                        $t.Status = 'skipped'
                        $t.Error  = 'Skipped: swarm timed out'
                        script:Write-SwarmTaskLine -Task $t
                    }
                }
                break
            }

        } while (-not $allDone)
    }
    finally {
        $pool.Close()
        $pool.Dispose()
    }

    return $state.Values
}

# ── Synthesis: orchestrator reassembles results ───────────────────────────────

function script:Invoke-OrchestratorSynthesize {
    param([string]$Goal, [PSCustomObject[]]$TaskResults, [string]$Provider, [string]$Model)

    $done = @($TaskResults | Where-Object Status -eq 'done').Count
    $failed = @($TaskResults | Where-Object Status -eq 'failed').Count
    Write-Verbose "Synthesizing $done successful / $failed failed task(s)"
    $resultBlocks = $TaskResults | ForEach-Object {
        $content = if ($_.Result -is [PSCustomObject] -and $_.Result.Content) { $_.Result.Content }
                   else { $_.Error ?? 'no result' }
        "<task id=""$($_.Id)"" name=""$($_.Name)"" status=""$($_.Status)"">$content</task>"
    }

    $prompt = @"
Goal: $Goal

Worker results:
$($resultBlocks -join "`n")

Synthesize a single, coherent, well-structured answer to the original goal using all successful worker results.
Acknowledge any failed tasks and explain what impact that has on completeness.
"@


    $resp = script:Invoke-ProviderCompletion -Provider $Provider -Model $Model `
        -SystemPrompt 'You are a synthesis agent. Produce a clear, consolidated answer from the worker results provided.' `
        -Messages @(@{role='user';content=$prompt}) -MaxTokens 2048 -WithEnv $false

    return @{ Content=$resp.Content; Tokens=$resp.TotalTokens; InputTokens=$resp.InputTokens; OutputTokens=$resp.OutputTokens }
}