dispatch.ps1
|
<#
.SYNOPSIS Webhook Dispatcher — liest WebhookEvents aus SQL Server und verteilt sie anhand von endpoints/*.yaml an konfigurierte Ziele. .DESCRIPTION Wird als Scheduled Task alle N Minuten ausgeführt. Zustand wird in dbo.WebhookDispatchLog im SQL Server gehalten. Retry: fehlgeschlagene Einträge werden beim nächsten Run automatisch erneut verarbeitet. .NOTES Benötigt: WriteLog, powershell-yaml Module #> [CmdletBinding()] param() Set-StrictMode -Version Latest $ErrorActionPreference = 'Stop' # ------------------------------------------------------------ # Pfade # ------------------------------------------------------------ $ScriptDir = $PSScriptRoot $EnvFile = Join-Path $ScriptDir '.env' $EndpointDir = Join-Path $ScriptDir 'endpoints' $HandlerDir = Join-Path $ScriptDir 'handlers' $LogDir = Join-Path $ScriptDir 'logs' # ------------------------------------------------------------ # .env laden # ------------------------------------------------------------ function Read-EnvFile([string]$Path) { $vars = @{} if (-not (Test-Path $Path)) { return $vars } Get-Content $Path | ForEach-Object { $line = $_.Trim() if ($line -and $line -notmatch '^#' -and $line -match '^([^=]+)=(.*)$') { $vars[$Matches[1].Trim()] = $Matches[2].Trim().Trim('"').Trim("'") } } return $vars } $envVars = Read-EnvFile $EnvFile foreach ($key in $envVars.Keys) { Set-Item -Path "env:$key" -Value $envVars[$key] } # ------------------------------------------------------------ # WriteLog initialisieren # ------------------------------------------------------------ $null = New-Item -ItemType Directory -Path $LogDir -Force Import-Module WriteLog -ErrorAction Stop $LogFile = Join-Path $LogDir "dispatch_$(Get-Date -Format 'yyyy-MM-dd').log" Set-Logfile -Path $LogFile # ------------------------------------------------------------ # powershell-yaml laden # ------------------------------------------------------------ if (-not (Get-Module -ListAvailable -Name 'powershell-yaml')) { Write-Log "Installiere powershell-yaml..." -Level INFO Install-Module powershell-yaml -Scope CurrentUser -Force -AllowClobber } Import-Module powershell-yaml -ErrorAction Stop # ------------------------------------------------------------ # SQL Server Verbindung (für WebhookEvents + DispatchLog) # ------------------------------------------------------------ function Get-SqlConnection { $server = $env:SQLSERVER_HOST $db = $env:SQLSERVER_DB $winAuth = $env:SQLSERVER_WINDOWS_AUTH -eq 'true' $port = if ($env:SQLSERVER_PORT) { $env:SQLSERVER_PORT } else { '1433' } if ($winAuth) { $connStr = "Server=$server,$port;Database=$db;Integrated Security=SSPI;TrustServerCertificate=True;" } else { $connStr = "Server=$server,$port;Database=$db;User Id=$($env:SQLSERVER_USER);Password=$($env:SQLSERVER_PASS);TrustServerCertificate=True;" } $conn = New-Object System.Data.SqlClient.SqlConnection($connStr) $conn.Open() return $conn } # ------------------------------------------------------------ # Feldwert aus Record auflösen # "id" → direkte Spalte # "payload.email" → payload-Spalte als JSON parsen, dann .email # "$" → kompletter payload JSON-String # ------------------------------------------------------------ function Resolve-Field([hashtable]$Record, [string]$FieldExpr) { if ($FieldExpr -eq '$') { return $Record['payload'] } $parts = $FieldExpr -split '\.', 2 if ($parts.Count -eq 1) { return $Record[$FieldExpr] } $colName = $parts[0] $jsonPath = $parts[1] $raw = $Record[$colName] if ($null -eq $raw) { return $null } try { $obj = $raw | ConvertFrom-Json -ErrorAction Stop $jsonPath -split '\.' | ForEach-Object { if ($null -ne $obj) { $obj = $obj.$_ } } return $obj } catch { return $null } } # ------------------------------------------------------------ # Pfad-Platzhalter auflösen # {datetime}, {date}, {year}, {month}, {day}, {endpoint}, {target} # ------------------------------------------------------------ function Resolve-PathTemplate([string]$Template, [string]$Endpoint, [string]$Target = '') { $now = Get-Date return $Template ` -replace '\{datetime\}', ($now.ToString('yyyy-MM-dd_HHmm')) ` -replace '\{date\}', ($now.ToString('yyyy-MM-dd')) ` -replace '\{year\}', ($now.ToString('yyyy')) ` -replace '\{month\}', ($now.ToString('MM')) ` -replace '\{day\}', ($now.ToString('dd')) ` -replace '\{endpoint\}', $Endpoint ` -replace '\{target\}', ($Target -replace '[\\/:*?"<>|]', '_') } # ------------------------------------------------------------ # Pending Records laden — Records ohne "done" Eintrag im DispatchLog # ------------------------------------------------------------ function Get-PendingRecords { param( [System.Data.SqlClient.SqlConnection] $Conn, [string] $Endpoint, [string] $TargetName, [int] $BatchSize = 100 ) $sql = @" SELECT e.id, e.endpoint, e.payload, e.headers, e.source_ip, e.received_at, e.inserted_at FROM dbo.WebhookEvents e WHERE e.endpoint = @endpoint AND NOT EXISTS ( SELECT 1 FROM dbo.WebhookDispatchLog l WHERE l.webhook_id = e.id AND l.target_name = @target_name AND l.status = 'done' ) ORDER BY e.received_at ASC OFFSET 0 ROWS FETCH NEXT @batch ROWS ONLY "@ $cmd = $Conn.CreateCommand() $cmd.CommandText = $sql $cmd.Parameters.AddWithValue('@endpoint', $Endpoint) | Out-Null $cmd.Parameters.AddWithValue('@target_name', $TargetName) | Out-Null $cmd.Parameters.AddWithValue('@batch', $BatchSize) | Out-Null $adapter = New-Object System.Data.SqlClient.SqlDataAdapter($cmd) $table = New-Object System.Data.DataTable $adapter.Fill($table) | Out-Null $records = [System.Collections.Generic.List[hashtable]]::new() foreach ($row in $table.Rows) { $record = @{} foreach ($col in $table.Columns) { $record[$col.ColumnName] = if ($row[$col] -is [System.DBNull]) { $null } else { $row[$col] } } $records.Add($record) } return $records } # ------------------------------------------------------------ # DispatchLog Eintrag schreiben (MERGE — insert oder update) # ------------------------------------------------------------ function Write-DispatchLog { param( [System.Data.SqlClient.SqlConnection] $Conn, [long] $WebhookId, [string] $Endpoint, [string] $TargetName, [string] $TargetType, [string] $Status, [string] $ErrorMsg = $null ) $sql = @" MERGE dbo.WebhookDispatchLog AS t USING (SELECT @webhook_id AS webhook_id, @target_name AS target_name) AS s ON t.webhook_id = s.webhook_id AND t.target_name = s.target_name WHEN MATCHED THEN UPDATE SET status = @status, attempts = t.attempts + 1, error = @error, dispatched_at = SYSUTCDATETIME() WHEN NOT MATCHED THEN INSERT (webhook_id, endpoint, target_name, target_type, status, attempts, error, dispatched_at) VALUES (@webhook_id, @endpoint, @target_name, @target_type, @status, 1, @error, SYSUTCDATETIME()); "@ $cmd = $Conn.CreateCommand() $cmd.CommandText = $sql $cmd.Parameters.AddWithValue('@webhook_id', $WebhookId) | Out-Null $cmd.Parameters.AddWithValue('@endpoint', $Endpoint) | Out-Null $cmd.Parameters.AddWithValue('@target_name', $TargetName) | Out-Null $cmd.Parameters.AddWithValue('@target_type', $TargetType) | Out-Null $cmd.Parameters.AddWithValue('@status', $Status) | Out-Null $cmd.Parameters.AddWithValue('@error', [object]($ErrorMsg ?? [DBNull]::Value)) | Out-Null $cmd.ExecuteNonQuery() | Out-Null } # ------------------------------------------------------------ # Hauptschleife # ------------------------------------------------------------ Write-Log "=== Webhook Dispatcher gestartet ===" -Level INFO $conn = $null try { $conn = Get-SqlConnection Write-Log "Verbunden: $($env:SQLSERVER_HOST)/$($env:SQLSERVER_DB)" -Level INFO $yamlFiles = Get-ChildItem -Path $EndpointDir -Filter '*.yaml' -ErrorAction SilentlyContinue if (-not $yamlFiles) { Write-Log "Keine YAML-Dateien in: $EndpointDir" -Level WARN return } foreach ($yamlFile in $yamlFiles) { $cfg = Get-Content $yamlFile.FullName -Raw | ConvertFrom-Yaml $endpoint = $cfg.endpoint if ($cfg.enabled -eq $false) { Write-Log "[$endpoint] Übersprungen (enabled: false)" -Level INFO continue } $batchSize = if ($cfg.batchSize) { [int]$cfg.batchSize } else { 100 } Write-Log "[$endpoint] Starte Verarbeitung..." -Level INFO foreach ($target in $cfg.targets) { $targetName = $target.name $targetType = $target.type $handlerPath = Join-Path $HandlerDir "$targetType.ps1" if (-not (Test-Path $handlerPath)) { Write-Log "[$endpoint][$targetName] Handler nicht gefunden: $handlerPath" -Level ERROR continue } $records = Get-PendingRecords -Conn $conn -Endpoint $endpoint ` -TargetName $targetName -BatchSize $batchSize if ($records.Count -eq 0) { Write-Log "[$endpoint][$targetName] Keine neuen Records" -Level INFO continue } Write-Log "[$endpoint][$targetName] $($records.Count) Records" -Level INFO $doneCount = 0 $failCount = 0 try { # Handler aufrufen — gibt Liste von @{Id; Success; Error} zurück $results = & $handlerPath ` -Records $records ` -Config $target ` -Endpoint $endpoint ` -ScriptDir $ScriptDir foreach ($result in $results) { if ($result.Success) { Write-DispatchLog -Conn $conn -WebhookId $result.Id ` -Endpoint $endpoint -TargetName $targetName ` -TargetType $targetType -Status 'done' $doneCount++ } else { Write-DispatchLog -Conn $conn -WebhookId $result.Id ` -Endpoint $endpoint -TargetName $targetName ` -TargetType $targetType -Status 'failed' -ErrorMsg $result.Error $failCount++ Write-Log "[$endpoint][$targetName] ID=$($result.Id) failed: $($result.Error)" -Level WARN } } } catch { $errMsg = $_.Exception.Message Write-Log "[$endpoint][$targetName] Handler-Fehler: $errMsg" -Level ERROR foreach ($record in $records) { Write-DispatchLog -Conn $conn -WebhookId $record['id'] ` -Endpoint $endpoint -TargetName $targetName ` -TargetType $targetType -Status 'failed' -ErrorMsg $errMsg $failCount++ } } Write-Log "[$endpoint][$targetName] done=$doneCount failed=$failCount" -Level INFO } } } catch { Write-Log "Kritischer Fehler: $($_.Exception.Message)" -Level ERROR Write-Log $_.ScriptStackTrace -Level ERROR exit 1 } finally { if ($null -ne $conn -and $conn.State -eq 'Open') { $conn.Close() } Write-Log "=== Dispatcher beendet ===" -Level INFO } |