handlers/etl.ps1
|
<#
.SYNOPSIS ETL Handler — liest aus einer beliebigen SQL-Quelle und schreibt das Resultset via SqlBulkCopy in eine Zieltabelle. YAML-Konfiguration: type: etl logResultSet: true logPreviewRows: 0 # 0 = alle Zeilen, N = nur erste N source: windowsAuth: true server: localhost # optional database: SourceDB queryFile: 'C:\Scripts\sql\read-source.sql' # oder: query: "SELECT ..." parameters: cutoff: "2026-01-01" target: windowsAuth: true server: localhost # optional database: ZielDB table: dbo.ZielTabelle batchSize: 1000 timeout: 60 columnMapping: # optional — leer = Spaltennamen müssen übereinstimmen ZielEmail: Email ZielName: FullName #> # Password is read from an environment variable and must be converted for SimplySql credential auth. [Diagnostics.CodeAnalysis.SuppressMessageAttribute('PSAvoidUsingConvertToSecureStringWithPlainText', '')] param( [object[]] $Records, [hashtable] $Config, [string] $Endpoint, [string] $ScriptDir ) Set-StrictMode -Version Latest function Open-TargetConnection([hashtable]$Cfg, [string]$ConnName) { $server = if ($Cfg['server']) { $Cfg['server'] } else { $env:SQLSERVER_HOST } $db = $Cfg['database'] $port = if ($Cfg['port']) { $Cfg['port'] } else { if ($env:SQLSERVER_PORT) { $env:SQLSERVER_PORT } else { '1433' } } $winAuth = if ($null -ne $Cfg['windowsAuth']) { [bool]$Cfg['windowsAuth'] } else { $env:SQLSERVER_WINDOWS_AUTH -eq 'true' } $serverStr = if ($port -ne '1433') { "$server,$port" } else { $server } if ($winAuth) { Open-SQLConnection -Server $serverStr -Database $db -ConnectionName $ConnName } else { $user = if ($Cfg['user']) { $Cfg['user'] } else { $env:SQLSERVER_USER } $pass = if ($Cfg['password']) { $Cfg['password'] } else { $env:SQLSERVER_PASS } $secPass = ConvertTo-SecureString $pass -AsPlainText -Force $cred = New-Object System.Management.Automation.PSCredential($user, $secPass) Open-SQLConnection -Server $serverStr -Database $db -Credential $cred -ConnectionName $ConnName } } # Konfiguration $sourceCfg = $Config['source'] $targetCfg = $Config['target'] $logResultSet = if ($null -ne $Config['logResultSet']) { [bool]$Config['logResultSet'] } else { $true } $logPreviewRows = if ($Config['logPreviewRows']) { [int]$Config['logPreviewRows'] } else { 0 } $bulkBatchSize = if ($targetCfg['batchSize']) { [int]$targetCfg['batchSize'] } else { 1000 } $bulkTimeout = if ($targetCfg['timeout']) { [int]$targetCfg['timeout'] } else { 60 } if (-not $sourceCfg) { throw "ETL: 'source' fehlt in der YAML-Konfiguration." } if (-not $targetCfg) { throw "ETL: 'target' fehlt in der YAML-Konfiguration." } if (-not $targetCfg['table']) { throw "ETL: 'target.table' fehlt." } $srcConn = "etl_src_$([guid]::NewGuid().ToString('N').Substring(0,8))" $tgtConn = "etl_tgt_$([guid]::NewGuid().ToString('N').Substring(0,8))" try { # ------------------------------------------------------------ # 1. Source Query ausführen # ------------------------------------------------------------ Open-TargetConnection -Cfg $sourceCfg -ConnName $srcConn if ($sourceCfg['queryFile']) { $qfPath = $sourceCfg['queryFile'] if (-not (Test-Path $qfPath)) { throw "source.queryFile nicht gefunden: $qfPath" } $sourceQuery = Get-Content -Path $qfPath -Raw -Encoding UTF8 Write-Log "[$Endpoint][etl] Source query aus Datei: $qfPath" -Severity INFO } elseif ($sourceCfg['query']) { $sourceQuery = $sourceCfg['query'] } else { throw "ETL: Weder 'source.query' noch 'source.queryFile' konfiguriert." } $srcParams = @{} if ($sourceCfg['parameters']) { foreach ($pName in $sourceCfg['parameters'].Keys) { $srcParams[$pName] = $sourceCfg['parameters'][$pName] } } $rows = Invoke-SqlQuery -ConnectionName $srcConn -Query $sourceQuery -Parameters $srcParams $rowCount = @($rows).Count if ($rowCount -eq 0) { Write-Log "[$Endpoint][etl] Keine Zeilen in Source — nichts zu schreiben." -Severity INFO return @(@{ Id = 0; Success = $true; Error = $null; RowCount = 0 }) } $colNames = $rows[0].PSObject.Properties.Name Write-Log "[$Endpoint][etl] Source: $rowCount Zeilen, Spalten: $($colNames -join ', ')" -Severity INFO # ------------------------------------------------------------ # 2. Resultset loggen # ------------------------------------------------------------ if ($logResultSet) { $rowsToLog = if ($logPreviewRows -gt 0) { @($rows) | Select-Object -First $logPreviewRows } else { $rows } $logObjects = $rowsToLog | ForEach-Object { $obj = [ordered]@{} $_.PSObject.Properties | ForEach-Object { $obj[$_.Name] = $_.Value } $obj } $preview = if ($logPreviewRows -gt 0 -and $rowCount -gt $logPreviewRows) { " (Vorschau: erste $logPreviewRows von $rowCount)" } else { "" } $jsonLog = $logObjects | ConvertTo-Json -Depth 5 -Compress:($rowCount -gt 100) Write-Log "[$Endpoint][etl] Resultset$preview`: $jsonLog" -Severity INFO } # ------------------------------------------------------------ # 3. DataTable aufbauen (mit optionalem Spalten-Mapping) # ------------------------------------------------------------ $dataTable = New-Object System.Data.DataTable $mapping = $targetCfg['columnMapping'] if ($mapping -and $mapping.Count -gt 0) { foreach ($zielSpalte in $mapping.Keys) { $dataTable.Columns.Add($zielSpalte) | Out-Null } foreach ($row in $rows) { $newRow = $dataTable.NewRow() foreach ($zielSpalte in $mapping.Keys) { $quellSpalte = $mapping[$zielSpalte] $val = $row.PSObject.Properties[$quellSpalte]?.Value $newRow[$zielSpalte] = if ($null -ne $val) { $val } else { [DBNull]::Value } } $dataTable.Rows.Add($newRow) } Write-Log "[$Endpoint][etl] Spalten-Mapping: $($mapping.Keys -join ', ')" -Severity INFO } else { foreach ($col in $colNames) { $dataTable.Columns.Add($col) | Out-Null } foreach ($row in $rows) { $newRow = $dataTable.NewRow() foreach ($col in $colNames) { $val = $row.PSObject.Properties[$col]?.Value $newRow[$col] = if ($null -ne $val) { $val } else { [DBNull]::Value } } $dataTable.Rows.Add($newRow) } } # ------------------------------------------------------------ # 4. SqlBulkCopy in Ziel # ------------------------------------------------------------ Open-TargetConnection -Cfg $targetCfg -ConnName $tgtConn $tgtSqlConn = (Get-SqlConnection -ConnectionName $tgtConn).SqlConnection $bulk = New-Object System.Data.SqlClient.SqlBulkCopy($tgtSqlConn) $bulk.DestinationTableName = $targetCfg['table'] $bulk.BatchSize = $bulkBatchSize $bulk.BulkCopyTimeout = $bulkTimeout foreach ($col in $dataTable.Columns) { $bulk.ColumnMappings.Add($col.ColumnName, $col.ColumnName) | Out-Null } $bulk.WriteToServer($dataTable) $bulk.Close() Write-Log "[$Endpoint][etl] $rowCount Zeilen nach $($targetCfg['table']) geschrieben." -Severity INFO return @(@{ Id = 0; Success = $true; Error = $null; RowCount = $rowCount }) } catch { $errMsg = $_.Exception.Message Write-Log "[$Endpoint][etl] Fehler: $errMsg" -Severity ERROR return @(@{ Id = 0; Success = $false; Error = $errMsg; RowCount = 0 }) } finally { Close-SqlConnection -ConnectionName $srcConn -ErrorAction SilentlyContinue Close-SqlConnection -ConnectionName $tgtConn -ErrorAction SilentlyContinue } |