Public/Invoke-OnePAMQuery.ps1

function Invoke-OnePAMQuery {
    <#
    .SYNOPSIS
        Executes a single SQL query against a OnePAM database resource.
    .DESCRIPTION
        Connects to a database resource, runs a query, and returns results as
        PSCustomObject array suitable for pipeline processing.
        The WebSocket connection is opened, used, and closed automatically.
    .PARAMETER Resource
        Resource name or UUID.
    .PARAMETER Query
        SQL query string to execute.
    .EXAMPLE
        Invoke-OnePAMQuery -Resource "prod-db" -Query "SELECT * FROM users LIMIT 10"
    .EXAMPLE
        Invoke-OnePAMQuery -Resource "prod-db" -Query "SELECT count(*) FROM orders" | Format-Table
    .EXAMPLE
        $results = Invoke-OnePAMQuery -Resource "analytics-db" -Query "SELECT * FROM events"
        $results | Export-Csv -Path events.csv
    #>

    [CmdletBinding()]
    param(
        [Parameter(Mandatory, Position = 0)]
        [string]$Resource,

        [Parameter(Mandatory, Position = 1)]
        [Alias('SQL')]
        [string]$Query
    )

    $res = Get-OnePAMResource -Name $Resource
    if (-not $res) { throw "Resource '$Resource' not found." }

    $resId = if ($res.id) { $res.id } elseif ($res.ID) { $res.ID } else { throw "Cannot determine resource ID." }
    Assert-OpSafePathSegment -Value $resId -Label 'resource ID'

    if ($res.type -and $res.type -ne 'database') {
        throw "Resource '$Resource' is type '$($res.type)', not database."
    }

    $session = Invoke-OpApi -Method POST -Path "/api/v1/resources/$resId/connect"

    if (-not $session.session_id) {
        throw 'Failed to create session.'
    }
    Assert-OpSafePathSegment -Value $session.session_id -Label 'session ID'

    $cfg = Get-OpConfig
    if ($session.direct) {
        $baseUri = [Uri]$cfg.api_base
        $wsScheme = if ($baseUri.Scheme -eq 'https') { 'wss' } else { 'ws' }
        $hostPort = $baseUri.Authority
    }
    else {
        if (-not $session.proxy_host) { throw 'Session response missing proxy_host.' }
        $wsScheme = 'wss'
        $hostPort = $session.proxy_host
        if ($session.proxy_port -and $session.proxy_port -ne 443) {
            $hostPort = "$($session.proxy_host):$($session.proxy_port)"
        }
    }

    $queryParts = [System.Collections.Generic.List[string]]::new()
    if (-not $session.direct -and $session.token) {
        $queryParts.Add("token=$([System.Uri]::EscapeDataString($session.token))")
    }

    $wsUrl = "${wsScheme}://${hostPort}/gateway/db/$($session.session_id)"
    if ($queryParts.Count -gt 0) { $wsUrl += "?$($queryParts -join '&')" }

    $wsHeaders = @{}
    if ($session.direct) {
        $authToken = Get-OpToken
        if ($authToken) {
            $wsHeaders['Authorization'] = "Bearer $($authToken.access_token)"
        }
    }

    $ws = New-OpWebSocket -Uri $wsUrl -Headers $wsHeaders

    try {
        $connResp = Receive-OpWsMessage -WebSocket $ws -TimeoutSeconds 30
        if ($connResp.MessageType -eq 'Close') { throw 'Connection closed by server.' }

        $connMsg = $connResp.Text | ConvertFrom-Json
        if ($connMsg.type -eq 'error') { throw "Database connection failed: $($connMsg.message)" }
        if ($connMsg.type -ne 'connected') { throw "Unexpected server response: $($connMsg.type)" }

        $queryMsg = @{
            type = 'query'
            id   = 'q1'
            sql  = $Query
        } | ConvertTo-Json -Compress
        Send-OpWsMessage -WebSocket $ws -Message $queryMsg

        $columns = @()
        $results = [System.Collections.Generic.List[PSCustomObject]]::new()

        while ($true) {
            $resp = Receive-OpWsMessage -WebSocket $ws -TimeoutSeconds 120
            if ($resp.MessageType -eq 'Close') { throw 'Connection closed during query.' }

            $msg = $resp.Text | ConvertFrom-Json

            switch ($msg.type) {
                'columns' {
                    $columns = @($msg.columns)
                }

                'row' {
                    $values = @($msg.values)
                    $obj = [ordered]@{}
                    for ($i = 0; $i -lt $columns.Count; $i++) {
                        $val = if ($i -lt $values.Count) { $values[$i] } else { $null }
                        $obj[$columns[$i]] = $val
                    }
                    $results.Add([PSCustomObject]$obj)
                }

                'complete' {
                    return $results.ToArray()
                }

                'error' {
                    throw "Query error: $($msg.message)"
                }
            }
        }
    }
    finally {
        Close-OpWebSocket -WebSocket $ws
    }
}