handlers/duckdb.ps1
|
<#
.SYNOPSIS DuckDB Handler — schreibt Records in eine DuckDB-Datenbank via DuckDB.NET. Lädt die DLL über das ImportDependency-Modul. YAML-Konfiguration (Modus 1 — Tabellen-Mapping): type: duckdb database: "D:\Analytics\events.duckdb" table: brevo_events columnMapping: email: payload.email event_type: payload.event payload: "$" received_at: received_at YAML-Konfiguration (Modus 2 — Query): type: duckdb database: "D:\Analytics\events.duckdb" query: "INSERT INTO brevo_events SELECT * FROM read_json_auto(?)" parameters: - "$" # Positionale Parameter als Liste #> param( [object[]] $Records, [hashtable] $Config, [string] $Endpoint, [string] $ScriptDir ) Set-StrictMode -Version Latest # ------------------------------------------------------------ # DuckDB.NET via ImportDependency laden # ------------------------------------------------------------ try { Import-Module ImportDependency -ErrorAction Stop $duckDbDll = Import-Dependency -Name 'DuckDB.NET.Data' -ErrorAction Stop Add-Type -Path $duckDbDll -ErrorAction Stop } catch { throw "DuckDB.NET konnte nicht geladen werden. Bitte ImportDependency und DuckDB.NET.Data installieren. Fehler: $($_.Exception.Message)" } # ------------------------------------------------------------ # Verbindung öffnen # ------------------------------------------------------------ $dbPath = Resolve-PathTemplate -Template $Config['database'] -Endpoint $Endpoint $dbDir = Split-Path $dbPath -Parent $null = New-Item -ItemType Directory -Path $dbDir -Force $connStr = "Data Source=$dbPath" $conn = [DuckDB.NET.Data.DuckDBConnection]::new($connStr) $conn.Open() $results = [System.Collections.Generic.List[hashtable]]::new() $useQuery = $null -ne $Config['query'] try { foreach ($record in $Records) { try { $cmd = $conn.CreateCommand() if ($useQuery) { # Modus 2: Query mit positionalen Parametern (Liste) $cmd.CommandText = $Config['query'] $paramIndex = 1 foreach ($fieldExpr in $Config['parameters']) { $value = Resolve-Field -Record $record -FieldExpr $fieldExpr $param = $cmd.CreateParameter() $param.ParameterName = $paramIndex.ToString() $param.Value = $value ?? [DBNull]::Value $cmd.Parameters.Add($param) | Out-Null $paramIndex++ } } else { # Modus 1: Tabellen-Mapping $table = $Config['table'] $mapping = $Config['columnMapping'] $cols = [string[]]$mapping.Keys $placeholders = 1..$cols.Count | ForEach-Object { "$$_" } $cmd.CommandText = "INSERT INTO $table ($($cols -join ', ')) VALUES ($($placeholders -join ', '))" $idx = 1 foreach ($col in $cols) { $value = Resolve-Field -Record $record -FieldExpr $mapping[$col] $param = $cmd.CreateParameter() $param.ParameterName = $idx.ToString() $param.Value = $value ?? [DBNull]::Value $cmd.Parameters.Add($param) | Out-Null $idx++ } } $cmd.ExecuteNonQuery() | Out-Null $results.Add(@{ Id = $record['id']; Success = $true; Error = $null }) } catch { $results.Add(@{ Id = $record['id']; Success = $false; Error = $_.Exception.Message }) } } } finally { if ($null -ne $conn -and $conn.State -eq 'Open') { $conn.Close() } } Write-Log "[$Endpoint][duckdb] Ziel: $dbPath :: $($Config['table'] ?? 'via query')" -Level INFO return $results |