Private/Invoke-VBBulkInsert.ps1

# ============================================================
# FUNCTION : Invoke-VBBulkInsert
# VERSION : 1.0.0
# CHANGED : 2026-05-07 -- Initial build
# AUTHOR : Vibhu Bhatnagar
# PURPOSE : Bulk-insert parsed DNS records into SQLite using prepared statements
# ENCODING : UTF-8 with BOM
# ============================================================

<#
.SYNOPSIS
    Inserts a buffer of parsed DNS log rows into the DNSLog SQLite table using a
    prepared statement inside a batched transaction.
 
.DESCRIPTION
    Consumes the List[object[]] buffer produced by Invoke-VBDNSLogParser and writes
    every row to the DNSLog table using the following SQLite performance strategy:
 
    1. PRAGMA tuning applied once per connection before any inserts
       (synchronous=NORMAL, cache_size=64MB, temp_store=MEMORY, mmap_size=256MB)
    2. Single INSERT statement compiled once (prepared statement)
    3. Rows committed in batches of -BatchSize (default 50,000) to control memory
       and reduce disk sync frequency vs. one transaction per file
    4. Each PS7 parallel thread creates its own SQLiteConnection (WAL mode allows this)
 
    The 21-column parameter binding order MUST match the object[] index contract
    defined in Invoke-VBDNSLogParser. Never reorder without updating both functions.
 
    After all rows are inserted, writes one audit row to the ImportLog table.
 
.PARAMETER DatabasePath
    Full path to the SQLite .db file. Must be initialised by Initialize-VBDNSLogDatabase.
 
.PARAMETER Buffer
    The List[object[]] of parsed rows from Invoke-VBDNSLogParser.
 
.PARAMETER SourceFile
    Full path of the log file being imported (written to ImportLog).
 
.PARAMETER RecordCount
    Number of records inserted (written to ImportLog).
 
.PARAMETER ErrorCount
    Number of parse errors encountered (written to ImportLog).
 
.PARAMETER FileHash
    SHA256 hash of the source file (written to ImportLog).
 
.PARAMETER DurationSeconds
    Parse + insert duration for this file (written to ImportLog).
 
.PARAMETER BatchSize
    Number of rows per transaction commit. Default: 50,000.
 
.OUTPUTS
    [int] Number of rows successfully inserted into DNSLog.
 
.NOTES
    Version : 1.0.0
    Author : Vibhu Bhatnagar
    Modified : 2026-05-07
    Category : Private
    Called by: Import-VBDNSLog (inside ForEach-Object -Parallel thread)
 
    Requires: PSSQLite module (Install-Module PSSQLite)
    WAL mode must be set on the database (done by Initialize-VBDNSLogDatabase).
    Never pass a SQLiteConnection across parallel thread boundaries.
#>


function Invoke-VBBulkInsert {
    [CmdletBinding()]
    [OutputType([int])]
    param(
        [Parameter(Mandatory = $true)]
        [string]$DatabasePath,

        [Parameter(Mandatory = $true)]
        [System.Collections.Generic.List[object[]]]$Buffer,

        [Parameter(Mandatory = $true)]
        [string]$SourceFile,

        [Parameter(Mandatory = $true)]
        [int]$RecordCount,

        [Parameter(Mandatory = $true)]
        [int]$ErrorCount,

        [Parameter(Mandatory = $true)]
        [string]$FileHash,

        [Parameter(Mandatory = $true)]
        [double]$DurationSeconds,

        [int]$BatchSize = 50000,

        # Shared progress state written by this function; read by the main thread polling loop.
        # Keys written: InsertRowsInserted (int64), InsertTotalRows (int64), InsertDone (bool).
        [System.Collections.Concurrent.ConcurrentDictionary[string,object]]$ProgressState
    )

    $conn         = $null
    $tx           = $null
    $cmd          = $null
    $rowsInserted = 0

    try {
        # Step 1 -- Open connection (each thread gets its own connection -- WAL allows concurrent writes)
        $conn = [System.Data.SQLite.SQLiteConnection]::new("Data Source=$DatabasePath;Version=3;")
        $conn.Open()

        # Step 2 -- PRAGMA tuning (must be set per connection, before any inserts)
        $pragmaCmd = $conn.CreateCommand()
        $pragmaCmd.CommandText = @"
PRAGMA synchronous = NORMAL;
PRAGMA cache_size = -65536;
PRAGMA temp_store = MEMORY;
PRAGMA mmap_size = 268435456;
"@

        $pragmaCmd.ExecuteNonQuery() | Out-Null
        $pragmaCmd.Dispose()

        # Step 3 -- Prepare INSERT statement once (compiled once, reused per row)
        $cmd = $conn.CreateCommand()
        $cmd.CommandText = @"
INSERT INTO DNSLog (
    LogDateTime, LogDate, LogTime, ThreadId, PacketId,
    Protocol, Direction, IPAddress, IPVersion, IsPrivate,
    TransactionId, PacketKind, Opcode, FlagsHex, FlagsChar,
    ResponseCode, Status, Error, QueryType, QueryName,
    SourceFile, ImportedAt
) VALUES (
    @p0, @p1, @p2, @p3, @p4,
    @p5, @p6, @p7, @p8, @p9,
    @p10, @p11, @p12, @p13, @p14,
    @p15, @p16, @p17, @p18, @p19,
    @p20, @p21
)
"@


        # Add parameters once -- values are updated per row in the loop
        $importedAt = (Get-Date).ToString('yyyy-MM-ddTHH:mm:ss')
        for ($i = 0; $i -le 20; $i++) {
            $null = $cmd.Parameters.AddWithValue("@p$i", $null)
        }
        $null = $cmd.Parameters.AddWithValue('@p21', $importedAt)

        # Step 4 -- Begin first transaction
        $tx = $conn.BeginTransaction()
        $cmd.Transaction = $tx

        # Total rows for progress percentage
        $totalRows = $Buffer.Count

        # Seed shared state so the polling loop transitions to the insert stage immediately
        if ($null -ne $ProgressState) {
            $ProgressState['InsertTotalRows']   = [long]$totalRows
            $ProgressState['InsertRowsInserted'] = [long]0
            $ProgressState['InsertDone']        = $false
        }

        # Step 5 -- Iterate buffer and bind each object[] row by index
        foreach ($row in $Buffer) {
            for ($i = 0; $i -le 20; $i++) {
                $cmd.Parameters["@p$i"].Value = $row[$i]
            }
            $cmd.ExecuteNonQuery() | Out-Null
            $rowsInserted++

            # Batch commit every BatchSize rows -- begin new transaction immediately
            # Note: do NOT Dispose() the old $tx before reassigning -- let GC handle it.
            # Disposing mid-loop invalidates the SQLiteCommand.Transaction reference on some
            # PSSQLite/System.Data.SQLite builds and causes subsequent rows to be lost.
            if ($rowsInserted % $BatchSize -eq 0) {
                $tx.Commit()
                $tx = $conn.BeginTransaction()
                $cmd.Transaction = $tx
                Write-Verbose "Invoke-VBBulkInsert: Committed batch at $rowsInserted rows"

                # Update shared state every batch -- polling loop renders the bar on the main thread
                if ($null -ne $ProgressState) {
                    $ProgressState['InsertRowsInserted'] = [long]$rowsInserted
                }
            }
        }

        # Step 6 -- Final commit for the last partial batch
        $tx.Commit()
        $tx = $null

        # Step 7 -- Write ImportLog audit row
        $serverName = ''
        try {
            if ($SourceFile -match '^\\\\([^\\]+)\\') {
                $serverName = $Matches[1]
            }
        } catch { }

        $logCmd = $conn.CreateCommand()
        $logCmd.CommandText = @"
INSERT OR IGNORE INTO ImportLog
    (FilePath, FileName, FileHash, RecordCount, ErrorCount, DurationSeconds, ImportedAt, ServerName)
VALUES
    (@fp, @fn, @fh, @rc, @ec, @ds, @ia, @sn)
"@

        $null = $logCmd.Parameters.AddWithValue('@fp', $SourceFile)
        $null = $logCmd.Parameters.AddWithValue('@fn', (Split-Path $SourceFile -Leaf))
        $null = $logCmd.Parameters.AddWithValue('@fh', $FileHash)
        $null = $logCmd.Parameters.AddWithValue('@rc', $RecordCount)
        $null = $logCmd.Parameters.AddWithValue('@ec', $ErrorCount)
        $null = $logCmd.Parameters.AddWithValue('@ds', $DurationSeconds)
        $null = $logCmd.Parameters.AddWithValue('@ia', $importedAt)
        $null = $logCmd.Parameters.AddWithValue('@sn', $serverName)
        $logCmd.ExecuteNonQuery() | Out-Null
        $logCmd.Dispose()

        # Signal the polling loop that insert is complete with final row count
        if ($null -ne $ProgressState) {
            $ProgressState['InsertRowsInserted'] = [long]$rowsInserted
            $ProgressState['InsertDone']        = $true
        }

        Write-Verbose "Invoke-VBBulkInsert: Inserted $rowsInserted rows. ImportLog updated."
    }
    catch {
        if ($null -ne $tx) {
            try { $tx.Rollback() } catch { }
        }
        throw
    }
    finally {
        if ($null -ne $cmd)  { try { $cmd.Dispose()  } catch { } }
        if ($null -ne $tx)   { try { $tx.Rollback()  } catch { } }
        if ($null -ne $conn) { try { $conn.Close(); $conn.Dispose() } catch { } }
    }

    return $rowsInserted
}