Public/Invoke-OnePAMQuery.ps1
|
function Invoke-OnePAMQuery { <# .SYNOPSIS Executes a single SQL query against a OnePAM database resource. .DESCRIPTION Connects to a database resource, runs a query, and returns results as PSCustomObject array suitable for pipeline processing. The WebSocket connection is opened, used, and closed automatically. .PARAMETER Resource Resource name or UUID. .PARAMETER Query SQL query string to execute. .EXAMPLE Invoke-OnePAMQuery -Resource "prod-db" -Query "SELECT * FROM users LIMIT 10" .EXAMPLE Invoke-OnePAMQuery -Resource "prod-db" -Query "SELECT count(*) FROM orders" | Format-Table .EXAMPLE $results = Invoke-OnePAMQuery -Resource "analytics-db" -Query "SELECT * FROM events" $results | Export-Csv -Path events.csv #> [CmdletBinding()] param( [Parameter(Mandatory, Position = 0)] [string]$Resource, [Parameter(Mandatory, Position = 1)] [Alias('SQL')] [string]$Query ) $res = Get-OnePAMResource -Name $Resource if (-not $res) { throw "Resource '$Resource' not found." } $resId = if ($res.id) { $res.id } elseif ($res.ID) { $res.ID } else { throw "Cannot determine resource ID." } Assert-OpSafePathSegment -Value $resId -Label 'resource ID' if ($res.type -and $res.type -ne 'database') { throw "Resource '$Resource' is type '$($res.type)', not database." } $session = Invoke-OpApi -Method POST -Path "/api/v1/resources/$resId/connect" if (-not $session.session_id) { throw 'Failed to create session.' } Assert-OpSafePathSegment -Value $session.session_id -Label 'session ID' $cfg = Get-OpConfig if ($session.direct) { $baseUri = [Uri]$cfg.api_base $wsScheme = if ($baseUri.Scheme -eq 'https') { 'wss' } else { 'ws' } $hostPort = $baseUri.Authority } else { if (-not $session.proxy_host) { throw 'Session response missing proxy_host.' } $wsScheme = 'wss' $hostPort = $session.proxy_host if ($session.proxy_port -and $session.proxy_port -ne 443) { $hostPort = "$($session.proxy_host):$($session.proxy_port)" } } $queryParts = [System.Collections.Generic.List[string]]::new() if (-not $session.direct -and $session.token) { $queryParts.Add("token=$([System.Uri]::EscapeDataString($session.token))") } $wsUrl = "${wsScheme}://${hostPort}/gateway/db/$($session.session_id)" if ($queryParts.Count -gt 0) { $wsUrl += "?$($queryParts -join '&')" } $wsHeaders = @{} if ($session.direct) { $authToken = Get-OpToken if ($authToken) { $wsHeaders['Authorization'] = "Bearer $($authToken.access_token)" } } $ws = New-OpWebSocket -Uri $wsUrl -Headers $wsHeaders try { $connResp = Receive-OpWsMessage -WebSocket $ws -TimeoutSeconds 30 if ($connResp.MessageType -eq 'Close') { throw 'Connection closed by server.' } $connMsg = $connResp.Text | ConvertFrom-Json if ($connMsg.type -eq 'error') { throw "Database connection failed: $($connMsg.message)" } if ($connMsg.type -ne 'connected') { throw "Unexpected server response: $($connMsg.type)" } $queryMsg = @{ type = 'query' id = 'q1' sql = $Query } | ConvertTo-Json -Compress Send-OpWsMessage -WebSocket $ws -Message $queryMsg $columns = @() $results = [System.Collections.Generic.List[PSCustomObject]]::new() while ($true) { $resp = Receive-OpWsMessage -WebSocket $ws -TimeoutSeconds 120 if ($resp.MessageType -eq 'Close') { throw 'Connection closed during query.' } $msg = $resp.Text | ConvertFrom-Json switch ($msg.type) { 'columns' { $columns = @($msg.columns) } 'row' { $values = @($msg.values) $obj = [ordered]@{} for ($i = 0; $i -lt $columns.Count; $i++) { $val = if ($i -lt $values.Count) { $values[$i] } else { $null } $obj[$columns[$i]] = $val } $results.Add([PSCustomObject]$obj) } 'complete' { return $results.ToArray() } 'error' { throw "Query error: $($msg.message)" } } } } finally { Close-OpWebSocket -WebSocket $ws } } |