AzLogDcrIngestPS.psm1
|
Function Add-CollectionTimeToAllEntriesInArray Function Add-ColumnDataToAllEntriesInArray Function Build-DataArrayToAlignWithSchema Function CheckCreateUpdate-TableDcr-Structure function Compress-GzipBytes { <# .SYNOPSIS Compresses a byte array using gzip (active version). .DESCRIPTION Takes raw bytes and returns gzip-compressed bytes using System.IO.Compression.GZipStream. Used by the fast path and BatchAmount path for compressing bulk JSON payloads. .PARAMETER InputBytes The raw byte array to compress. .OUTPUTS [byte[]] Gzip-compressed byte array. #> [CmdletBinding()] param( [Parameter(Mandatory)] [byte[]]$InputBytes ) $outputStream = New-Object System.IO.MemoryStream try { $gzipStream = New-Object System.IO.Compression.GZipStream( $outputStream, [System.IO.Compression.CompressionMode]::Compress, $true ) try { $gzipStream.Write($InputBytes, 0, $InputBytes.Length) } finally { $gzipStream.Dispose() } return $outputStream.ToArray() } finally { $outputStream.Dispose() } } Function Convert-CimArrayToObjectFixStructure Function Convert-PSArrayToObjectFixStructure function ConvertFrom-SecureStringToPlainText { [CmdletBinding()] param( [Parameter(Mandatory)] [System.Security.SecureString]$SecureString ) $bstr = [System.Runtime.InteropServices.Marshal]::SecureStringToBSTR($SecureString) try { [System.Runtime.InteropServices.Marshal]::PtrToStringBSTR($bstr) } finally { [System.Runtime.InteropServices.Marshal]::ZeroFreeBSTR($bstr) } } Function CreateUpdate-AzDataCollectionRuleLogIngestCustomLog Function CreateUpdate-AzLogAnalyticsCustomLogTableDcr Function Delete-AzDataCollectionRules Function Delete-AzLogAnalyticsCustomLogTables Function Filter-ObjectExcludeProperty function Get-AzAccessTokenManagement { [CmdletBinding()] param( [string]$AzAppId, [string]$AzAppSecret, [string]$TenantId, [switch]$UseManagedIdentity, [string]$ManagedIdentityClientId ) $token = Get-AzTokenForResource ` -ResourceUrl 'https://management.azure.com/' ` -AzAppId $AzAppId ` -AzAppSecret $AzAppSecret ` -TenantId $TenantId ` -UseManagedIdentity:$UseManagedIdentity ` -ManagedIdentityClientId $ManagedIdentityClientId return @{ 'Content-Type' = 'application/json' 'Accept' = 'application/json' 'Authorization' = "Bearer $token" } } Function Get-AzDataCollectionRuleTransformKql Function Get-AzDceListAll Function Get-AzDcrDceDetails Function Get-AzDcrListAll Function Get-AzLogAnalyticsTableAzDataCollectionRuleStatus function Get-AzLogIngestBatchEndIndex { <# .SYNOPSIS Finds the optimal batch end index using cumulative sums and binary search (active version). .DESCRIPTION Uses the pre-computed CumulativePayloadSize array from the cache to find batch boundaries in O(log n) time. For compressed payloads, it probes the actual compression ratio with a small sample (100 rows), then uses that ratio to estimate the uncompressed ceiling, finds the candidate via binary search, and verifies with a single real compression call. The compression ratio is adaptive � it learns from each batch and improves accuracy for subsequent batches in the same ingestion run. .PARAMETER Cache The row cache object created by New-AzLogIngestRowJsonCache, containing pre-serialized row bytes and cumulative payload sizes. .PARAMETER StartIndex The first row index of the batch. .PARAMETER MaxPayloadBytes Maximum allowed payload size in bytes (typically 1MB = 1048576). .PARAMETER EnableCompression When set, payloads are gzip-compressed and the function uses adaptive ratio estimation to find the optimal batch size. .OUTPUTS [int] The last row index that fits within the byte limit. #> [CmdletBinding()] param( [Parameter(Mandatory)] $Cache, [Parameter(Mandatory)] [int]$StartIndex, [Parameter(Mandatory)] [int]$MaxPayloadBytes, [switch]$EnableCompression ) if ($StartIndex -ge $Cache.Count) { throw "StartIndex '$StartIndex' is outside the data range." } # -- Helper: get uncompressed payload size for rows [Start..End] ------ # Uses the pre-computed CumulativePayloadSize array � O(1) per lookup. # CumulativePayloadSize[i] = size of JSON array for rows [0..i] # For a sub-range [Start..End]: # size = CumulativePayloadSize[End] - CumulativePayloadSize[Start] + RowByteLengths[Start] + 2 # (we subtract the prefix, add back the first row, and add the 2 brackets) # Simplified: size = 2 + (sum of row bytes in range) + (End - Start) commas function Get-UncompressedPayloadSize { param([int]$FromIndex, [int]$ToIndex) if ($FromIndex -eq 0) { return $Cache.CumulativePayloadSize[$ToIndex] } # CumulativePayloadSize[i] = 2 + runningSum[0..i] + i # runningSum[From..To] = runningSum[0..To] - runningSum[0..From-1] # payload = 2 + runningSum[From..To] + (To - From) commas $rawTo = $Cache.CumulativePayloadSize[$ToIndex] - 2 - $ToIndex # = runningSum[0..To] $rawFrom = $Cache.CumulativePayloadSize[$FromIndex - 1] - 2 - ($FromIndex - 1) # = runningSum[0..From-1] $rangeSum = $rawTo - $rawFrom return (2 + $rangeSum + ($ToIndex - $FromIndex)) } # Validate single record fits $singleRowSize = Get-UncompressedPayloadSize -FromIndex $StartIndex -ToIndex $StartIndex if (-not $EnableCompression -and $singleRowSize -gt $MaxPayloadBytes) { throw "A single record ($singleRowSize bytes) is larger than the allowed payload size of $MaxPayloadBytes bytes." } if ($EnableCompression) { $singlePayload = Get-AzLogIngestPayloadBytesFromCache ` -Cache $Cache -StartIndex $StartIndex -EndIndex $StartIndex -EnableCompression if ($singlePayload.Length -gt $MaxPayloadBytes) { throw "A single record is larger than the allowed payload size of $MaxPayloadBytes bytes in the selected transfer format." } } # -- Binary search for the last index that fits the byte limit -------- # This is O(log n) using the cumulative sum � no row-by-row scanning. function Find-LastFittingIndex { param([long]$ByteLimit) $lo = $StartIndex $hi = $Cache.Count - 1 $lastGood = $StartIndex while ($lo -le $hi) { $mid = [int][Math]::Floor(($lo + $hi) / 2) $size = Get-UncompressedPayloadSize -FromIndex $StartIndex -ToIndex $mid if ($size -le $ByteLimit) { $lastGood = $mid $lo = $mid + 1 } else { $hi = $mid - 1 } } return $lastGood } if (-not $EnableCompression) { # Exact � binary search on uncompressed size return (Find-LastFittingIndex -ByteLimit $MaxPayloadBytes) } # -- Compressed path ------------------------------------------------- # Probe compression ratio with a small sample, then binary search # on uncompressed size using the estimated ratio, then verify. if (-not (Get-Variable -Name '_gzipRatioEstimate' -Scope Script -ErrorAction SilentlyContinue) -or $null -eq $script:_gzipRatioEstimate) { $sampleEnd = [Math]::Min($StartIndex + 99, $Cache.Count - 1) $samplePayload = Get-AzLogIngestPayloadBytesFromCache ` -Cache $Cache -StartIndex $StartIndex -EndIndex $sampleEnd -EnableCompression $sampleRawSize = Get-UncompressedPayloadSize -FromIndex $StartIndex -ToIndex $sampleEnd if ($sampleRawSize -gt 0) { $script:_gzipRatioEstimate = [Math]::Max(0.01, $samplePayload.Length / $sampleRawSize) } else { $script:_gzipRatioEstimate = 0.15 } Write-Verbose (" Compression probe: {0:N0} raw -> {1:N0} compressed (ratio {2:P1})" -f $sampleRawSize, $samplePayload.Length, $script:_gzipRatioEstimate) } $ratio = $script:_gzipRatioEstimate # Binary search on uncompressed size using estimated ceiling $targetUncompressedCeiling = [long][Math]::Floor($MaxPayloadBytes / $ratio * 0.95) $candidateEnd = Find-LastFittingIndex -ByteLimit $targetUncompressedCeiling $cumulativeBytes = Get-UncompressedPayloadSize -FromIndex $StartIndex -ToIndex $candidateEnd # Verify with real compression Write-Progress -Activity "Calculating batch size" ` -Status "Verifying compression for $($candidateEnd - $StartIndex + 1) rows ..." ` -PercentComplete 80 -Id 3 try { $candidatePayload = Get-AzLogIngestPayloadBytesFromCache ` -Cache $Cache -StartIndex $StartIndex -EndIndex $candidateEnd -EnableCompression $candidateCompressedSize = $candidatePayload.Length # Update ratio if ($cumulativeBytes -gt 0) { $observedRatio = $candidateCompressedSize / $cumulativeBytes $script:_gzipRatioEstimate = 0.3 * $script:_gzipRatioEstimate + 0.7 * $observedRatio } if ($candidateCompressedSize -le $MaxPayloadBytes) { # Fits. Try to extend if there's headroom. if ($candidateEnd -lt ($Cache.Count - 1) -and $candidateCompressedSize -lt ($MaxPayloadBytes * 0.85)) { $remainingBytes = $MaxPayloadBytes - $candidateCompressedSize $estRawPerRow = if ($candidateEnd -gt $StartIndex) { [Math]::Ceiling($cumulativeBytes / ($candidateEnd - $StartIndex + 1)) } else { $Cache.RowByteLengths[$StartIndex] } $estExtraRows = [Math]::Max(1, [int][Math]::Floor($remainingBytes / ($estRawPerRow * $ratio))) $extendedEnd = [Math]::Min($candidateEnd + $estExtraRows, $Cache.Count - 1) if ($extendedEnd -gt $candidateEnd) { $extPayload = Get-AzLogIngestPayloadBytesFromCache ` -Cache $Cache -StartIndex $StartIndex -EndIndex $extendedEnd -EnableCompression if ($extPayload.Length -le $MaxPayloadBytes) { $lastGood = $extendedEnd $lo = $extendedEnd + 1 $hi = [Math]::Min($extendedEnd + $estExtraRows, $Cache.Count - 1) while ($lo -le $hi) { $mid = [int][Math]::Floor(($lo + $hi) / 2) $midPayload = Get-AzLogIngestPayloadBytesFromCache ` -Cache $Cache -StartIndex $StartIndex -EndIndex $mid -EnableCompression if ($midPayload.Length -le $MaxPayloadBytes) { $lastGood = $mid $lo = $mid + 1 } else { $hi = $mid - 1 } } return $lastGood } else { $lo = $candidateEnd + 1 $hi = $extendedEnd - 1 $lastGood = $candidateEnd while ($lo -le $hi) { $mid = [int][Math]::Floor(($lo + $hi) / 2) $midPayload = Get-AzLogIngestPayloadBytesFromCache ` -Cache $Cache -StartIndex $StartIndex -EndIndex $mid -EnableCompression if ($midPayload.Length -le $MaxPayloadBytes) { $lastGood = $mid $lo = $mid + 1 } else { $hi = $mid - 1 } } return $lastGood } } } return $candidateEnd } else { # Too big � binary search downward with real compression $lo = $StartIndex $hi = $candidateEnd - 1 $lastGood = $StartIndex while ($lo -le $hi) { $mid = [int][Math]::Floor(($lo + $hi) / 2) $midPayload = Get-AzLogIngestPayloadBytesFromCache ` -Cache $Cache -StartIndex $StartIndex -EndIndex $mid -EnableCompression if ($midPayload.Length -le $MaxPayloadBytes) { $lastGood = $mid $lo = $mid + 1 } else { $hi = $mid - 1 } } return $lastGood } } # end try finally { Write-Progress -Activity "Calculating batch size" -Id 3 -Completed } } function Get-AzLogIngestPayloadBytesFromCache { <# .SYNOPSIS Assembles a JSON payload from cached row bytes, optionally gzip-compressed. .DESCRIPTION Builds the final byte payload for a range of rows [StartIndex..EndIndex] from the pre-serialized cache. For compressed payloads, streams row bytes directly into a GzipStream without intermediate string allocation. For uncompressed payloads, uses System.Buffer.BlockCopy for efficient byte assembly. .PARAMETER Cache The row cache object from New-AzLogIngestRowJsonCache. .PARAMETER StartIndex First row index (inclusive). .PARAMETER EndIndex Last row index (inclusive). .PARAMETER EnableCompression When set, the assembled payload is gzip-compressed via streaming. .OUTPUTS [byte[]] The JSON array payload, optionally gzip-compressed. #> [CmdletBinding()] param( [Parameter(Mandatory)] $Cache, [Parameter(Mandatory)] [int]$StartIndex, [Parameter(Mandatory)] [int]$EndIndex, [switch]$EnableCompression ) if ($StartIndex -lt 0 -or $EndIndex -ge $Cache.Count -or $EndIndex -lt $StartIndex) { throw "Invalid cache range StartIndex=$StartIndex EndIndex=$EndIndex" } $bracketOpen = [byte]0x5B # '[' $bracketClose = [byte]0x5D # ']' $comma = [byte]0x2C # ',' if ($EnableCompression) { $outputStream = New-Object System.IO.MemoryStream try { $gzipStream = New-Object System.IO.Compression.GzipStream( $outputStream, [System.IO.Compression.CompressionMode]::Compress, $true) try { $gzipStream.WriteByte($bracketOpen) for ($i = $StartIndex; $i -le $EndIndex; $i++) { if ($i -gt $StartIndex) { $gzipStream.WriteByte($comma) } $rowBytes = $Cache.RowUtf8Bytes[$i] $gzipStream.Write($rowBytes, 0, $rowBytes.Length) } $gzipStream.WriteByte($bracketClose) } finally { $gzipStream.Dispose() } return $outputStream.ToArray() } finally { $outputStream.Dispose() } } # Uncompressed: assemble from pre-cached byte arrays $totalSize = 2 for ($i = $StartIndex; $i -le $EndIndex; $i++) { $totalSize += $Cache.RowByteLengths[$i] } $totalSize += [Math]::Max(0, $EndIndex - $StartIndex) $result = New-Object byte[] $totalSize $pos = 0 $result[$pos++] = $bracketOpen for ($i = $StartIndex; $i -le $EndIndex; $i++) { if ($i -gt $StartIndex) { $result[$pos++] = $comma } $rowBytes = $Cache.RowUtf8Bytes[$i] [System.Buffer]::BlockCopy($rowBytes, 0, $result, $pos, $rowBytes.Length) $pos += $rowBytes.Length } $result[$pos] = $bracketClose return $result } function Get-AzTokenForResource { [CmdletBinding()] param( [Parameter(Mandatory)] [string]$ResourceUrl, [string]$AzAppId, [string]$AzAppSecret, [string]$TenantId, [Nullable[bool]]$UseManagedIdentity = $null, [string]$ManagedIdentityClientId ) Add-Type -AssemblyName System.Web if ($UseManagedIdentity -eq $true) { $resourceEncoded = [System.Web.HttpUtility]::UrlEncode($ResourceUrl) $uri = "http://169.254.169.254/metadata/identity/oauth2/token?api-version=2018-02-01&resource=$resourceEncoded" if ($ManagedIdentityClientId) { $clientIdEncoded = [System.Web.HttpUtility]::UrlEncode($ManagedIdentityClientId) $uri = "$uri&client_id=$clientIdEncoded" } $tokenResponse = Invoke-RestMethod ` -UseBasicParsing ` -Uri $uri ` -Method Get ` -Headers @{ Metadata = 'true' } ` -ErrorAction Stop return $tokenResponse.access_token } if ($AzAppId -and $AzAppSecret -and $TenantId) { $scope = [System.Web.HttpUtility]::UrlEncode("$($ResourceUrl.TrimEnd('/'))/.default") $body = "client_id=$AzAppId&scope=$scope&client_secret=$AzAppSecret&grant_type=client_credentials" $tokenEndpoint = "https://login.microsoftonline.com/$TenantId/oauth2/v2.0/token" $tokenResponse = Invoke-RestMethod ` -UseBasicParsing ` -Uri $tokenEndpoint ` -Method Post ` -Body $body ` -ContentType 'application/x-www-form-urlencoded' ` -ErrorAction Stop return $tokenResponse.access_token } $accessToken = Get-AzAccessToken -ResourceUrl $ResourceUrl -AsSecureString -ErrorAction Stop return (ConvertFrom-SecureStringToPlainText -SecureString $accessToken.Token) } function Get-JsonPayloadBytes { <# .SYNOPSIS Converts a data array to JSON bytes, optionally gzip-compressed (active version). .DESCRIPTION Last-loaded version � this is the one PowerShell uses at runtime. Serializes data to compact JSON, encodes to UTF-8, optionally compresses. .PARAMETER Data The data array to serialize. .PARAMETER EnableCompression When set, applies gzip compression. #> [CmdletBinding()] param( [Parameter(Mandatory)] [array]$Data, [switch]$EnableCompression ) $json = ConvertTo-Json -Depth 100 -InputObject @($Data) -Compress $bytes = [System.Text.Encoding]::UTF8.GetBytes($json) if ($EnableCompression) { return (Compress-GzipBytes -InputBytes $bytes) } return $bytes } Function Get-ObjectSchemaAsArray Function Get-ObjectSchemaAsHash function New-AzLogIngestRowJsonCache { <# .SYNOPSIS Pre-serializes all data rows into a cache with cumulative byte sums for fast batch sizing. .DESCRIPTION Serializes each row to compact JSON, encodes to UTF-8 bytes, and builds a cumulative payload size array. This enables O(log n) binary search for batch boundaries instead of O(n) row-by-row scanning. Only used when data exceeds 1 MB and automatic batch sizing is needed (no BatchAmount set). Shows a progress bar during serialization for large datasets. The cache contains: - RowUtf8Bytes: pre-encoded byte arrays for each row - RowByteLengths: byte length of each row - CumulativePayloadSize: cumulative JSON array size for rows [0..i] - Count: total number of rows .PARAMETER Data The source data array to cache. .OUTPUTS [PSCustomObject] Cache object with RowUtf8Bytes, RowByteLengths, CumulativePayloadSize, Count. #> [CmdletBinding()] param( [Parameter(Mandatory)] [array]$Data ) $totalRows = $Data.Count $rowUtf8Bytes = New-Object 'System.Collections.Generic.List[byte[]]' $rowByteLengths = New-Object 'int[]' $totalRows # Cumulative sum of payload bytes: CumulativePayloadSize[i] = size of JSON array for rows [0..i] # Formula: 2 (brackets) + sum(rowByteLengths[0..i]) + i (commas between rows) $cumulativePayloadSize = New-Object 'long[]' $totalRows $progressInterval = [Math]::Max(1, [int]($totalRows / 25)) $runningSum = [long]0 for ($i = 0; $i -lt $totalRows; $i++) { $rowJson = ConvertTo-Json -Depth 100 -InputObject $Data[$i] -Compress $rowBytes = [System.Text.Encoding]::UTF8.GetBytes($rowJson) $null = $rowUtf8Bytes.Add($rowBytes) $rowByteLengths[$i] = $rowBytes.Length $runningSum += $rowBytes.Length # Payload size for rows [0..i] = 2 (brackets) + runningSum + i (commas) $cumulativePayloadSize[$i] = 2 + $runningSum + $i if ($i % $progressInterval -eq 0 -or $i -eq ($totalRows - 1)) { $pct = [Math]::Round((($i + 1) / $totalRows) * 100) Write-Progress -Activity "Preparing data for upload" ` -Status "Serializing row $($i + 1) of $totalRows" ` -PercentComplete $pct -Id 1 } } Write-Progress -Activity "Preparing data for upload" -Id 1 -Completed return [pscustomobject]@{ RowUtf8Bytes = $rowUtf8Bytes RowByteLengths = $rowByteLengths CumulativePayloadSize = $cumulativePayloadSize Count = $totalRows } } function Post-AzLogAnalyticsLogIngestCustomLogDcrDce-Output { <# .SYNOPSIS Send data to LogAnalytics using Log Ingestion API and Data Collection Rule (combined). .DESCRIPTION Combined function that wraps Get-AzDcrDceDetails and Post-AzLogAnalyticsLogIngestCustomLogDcrDce. Supports gzip compression and Azure Managed Identity, configured either globally via $global:EnableCompressionDefault / $global:UseManagedIdentityDefault, or per call via -EnableCompression / -UseManagedIdentity. Priority: per-call parameter > global default > off. .PARAMETER EnableCompression Enables gzip compression ($true / $false / $null). $null = use $global:EnableCompressionDefault. If global not set, compression is off. .PARAMETER UseManagedIdentity Uses Managed Identity authentication ($true / $false / $null). $null = use $global:UseManagedIdentityDefault. If global not set, managed identity is off. .PARAMETER ManagedIdentityClientId Client ID of user-assigned managed identity. Only needed for user-assigned (not system-assigned). .PARAMETER BatchAmount Forces a specific number of records per batch. Overrides automatic 1 MB batch sizing. .EXAMPLE # Global defaults � set once, applies to all calls $global:EnableCompressionDefault = $true $global:UseManagedIdentityDefault = $false Post-AzLogAnalyticsLogIngestCustomLogDcrDce-Output -DceName $DceName -DcrName $DcrName ` -Data $DataVariable -TableName $TableName ` -AzAppId $LogIngestAppId -AzAppSecret $LogIngestAppSecret -TenantId $TenantId .EXAMPLE # Per-call override Post-AzLogAnalyticsLogIngestCustomLogDcrDce-Output -DceName $DceName -DcrName $DcrName ` -Data $DataVariable -TableName $TableName ` -AzAppId $LogIngestAppId -AzAppSecret $LogIngestAppSecret -TenantId $TenantId ` -EnableCompression $true .EXAMPLE # Managed Identity with compression Post-AzLogAnalyticsLogIngestCustomLogDcrDce-Output -DceName $DceName -DcrName $DcrName ` -Data $DataVariable -TableName $TableName ` -UseManagedIdentity $true -EnableCompression $true #> [CmdletBinding()] param( [Parameter(Mandatory)] [array]$Data, [Parameter(Mandatory)] [AllowEmptyString()] [string]$DcrName, [Parameter(Mandatory)] [AllowEmptyString()] [string]$DceName, [Parameter(Mandatory)] [string]$TableName, [string]$BatchAmount, [bool]$EnableUploadViaLogHub = $false, [string]$LogHubPath, [string]$AzAppId, [string]$AzAppSecret, [string]$TenantId, [Nullable[bool]]$EnableCompression = $null, [Nullable[bool]]$UseManagedIdentity = $null, [string]$ManagedIdentityClientId ) if ($EnableCompression -eq $null) { $EnableCompression = if ($null -ne $global:EnableCompressionDefault) { $global:EnableCompressionDefault } else { $false } } if ($UseManagedIdentity -eq $null) { $UseManagedIdentity = if ($null -ne $global:UseManagedIdentityDefault) { $global:UseManagedIdentityDefault } else { $false } } if (($EnableUploadViaLogHub -eq $false) -or ($null -eq $EnableUploadViaLogHub)) { $azDcrDceDetails = Get-AzDcrDceDetails ` -DcrName $DcrName ` -DceName $DceName ` -AzAppId $AzAppId ` -AzAppSecret $AzAppSecret ` -TenantId $TenantId ` -Verbose:$VerbosePreference return (Post-AzLogAnalyticsLogIngestCustomLogDcrDce ` -DceUri $azDcrDceDetails[2] ` -DcrImmutableId $azDcrDceDetails[6] ` -TableName $TableName ` -DcrStream $azDcrDceDetails[7] ` -Data $Data ` -BatchAmount $BatchAmount ` -AzAppId $AzAppId ` -AzAppSecret $AzAppSecret ` -TenantId $TenantId ` -EnableCompression $EnableCompression ` -UseManagedIdentity $UseManagedIdentity ` -ManagedIdentityClientId $ManagedIdentityClientId ` -Verbose:$VerbosePreference) } if (($EnableUploadViaLogHub -eq $true) -and $LogHubPath -and $Data) { $logHubData = [pscustomobject]@{ Source = $env:ComputerName UploadTime = (Get-Date -Format 'yyyy-MM-dd_HH-mm-ss') TableName = $TableName DceName = $DceName DcrName = $DcrName Data = @($Data) } if ($BatchAmount) { $logHubData | Add-Member -MemberType NoteProperty -Name BatchAmount -Value $BatchAmount } if ($EnableCompression -eq $true) { $logHubData | Add-Member -MemberType NoteProperty -Name EnableCompression -Value $true } if ($UseManagedIdentity -eq $true) { $logHubData | Add-Member -MemberType NoteProperty -Name UseManagedIdentity -Value $true } if ($ManagedIdentityClientId) { $logHubData | Add-Member -MemberType NoteProperty -Name ManagedIdentityClientId -Value $ManagedIdentityClientId } $logHubFileName = Join-Path $LogHubPath ($env:ComputerName + '__' + $TableName + '__' + (Get-Date -Format 'yyyy-MM-dd_HH-mm-ss') + '.json') Write-Host "Writing log-data to file $logHubFileName (log-hub)" $logHubData | ConvertTo-Json -Depth 25 | Out-File -FilePath $logHubFileName -Encoding utf8 -Force } } function Post-AzLogAnalyticsLogIngestCustomLogDcrDce { <# .SYNOPSIS Send data to LogAnalytics using Log Ingestion API and Data Collection Rule. .DESCRIPTION Posts data to Azure LogAnalytics via the Log Ingestion API. Automatically handles batch sizing to stay within the 1 MB payload limit. Supports gzip compression and Managed Identity authentication via: - Global defaults: $global:EnableCompressionDefault / $global:UseManagedIdentityDefault - Per-call parameters: -EnableCompression / -UseManagedIdentity Priority: per-call parameter > global default > off. .PARAMETER EnableCompression Enables gzip compression ($true / $false / $null). $null = use $global:EnableCompressionDefault. If global not set, compression is off. .PARAMETER UseManagedIdentity Uses Managed Identity authentication ($true / $false / $null). $null = use $global:UseManagedIdentityDefault. If global not set, managed identity is off. .PARAMETER ManagedIdentityClientId Client ID of user-assigned managed identity. .PARAMETER BatchAmount Forces a specific number of records per batch. Overrides automatic 1 MB batch sizing. #> [CmdletBinding()] param( [Parameter(Mandatory)] [string]$DceURI, [Parameter(Mandatory)] [string]$DcrImmutableId, [Parameter(Mandatory)] [string]$DcrStream, [Parameter(Mandatory)] [array]$Data, [Parameter(Mandatory)] [string]$TableName, [string]$BatchAmount, [string]$AzAppId, [string]$AzAppSecret, [string]$TenantId, [Nullable[bool]]$EnableCompression = $null, [Nullable[bool]]$UseManagedIdentity = $null, [string]$ManagedIdentityClientId ) if ($EnableCompression -eq $null) { # If the GLOBAL variable is defined, use it for all calls; otherwise OFF $EnableCompression = if ($null -ne $global:EnableCompressionDefault) { $global:EnableCompressionDefault } else { $false } } if ($UseManagedIdentity -eq $null) { $UseManagedIdentity = if ($null -ne $global:UseManagedIdentityDefault) { $global:UseManagedIdentityDefault } else { $false } } if (-not $Data -or @($Data).Count -eq 0) { return } $bearerToken = Get-AzTokenForResource ` -ResourceUrl 'https://monitor.azure.com/' ` -AzAppId $AzAppId ` -AzAppSecret $AzAppSecret ` -TenantId $TenantId ` -UseManagedIdentity $UseManagedIdentity ` -ManagedIdentityClientId $ManagedIdentityClientId $headers = @{ 'Authorization' = "Bearer $bearerToken" } if ($EnableCompression -eq $true) { $headers['Content-Encoding'] = 'gzip' } $maxPayloadBytes = 1MB $totalDataLines = @($Data).Count $indexLoopFrom = 0 $resultLast = $null # -- Fast path: try sending everything in one shot ------------------- # Serialize the entire array at once (much faster than per-row) and # check if it fits. For most tables this succeeds and skips all the # per-row cache/cumulative-sum machinery entirely. if (-not $BatchAmount) { Write-Progress -Activity "Preparing $totalDataLines rows for upload to [ $($TableName)_CL ]" ` -Status "Serializing data ..." -PercentComplete 20 -Id 2 $bulkJson = ConvertTo-Json -Depth 100 -InputObject @($Data) -Compress $bulkBytes = [System.Text.Encoding]::UTF8.GetBytes($bulkJson) if ($EnableCompression -eq $true) { Write-Progress -Activity "Preparing $totalDataLines rows for upload to [ $($TableName)_CL ]" ` -Status "Compressing payload ..." -PercentComplete 60 -Id 2 $bulkPayload = Compress-GzipBytes -InputBytes $bulkBytes } else { $bulkPayload = $bulkBytes } Write-Progress -Activity "Preparing $totalDataLines rows for upload to [ $($TableName)_CL ]" -Id 2 -Completed if ($bulkPayload.Length -le $maxPayloadBytes) { # Everything fits in one batch � send it directly, no cache needed $compressionText = if ($EnableCompression -eq $true) { "Compression=ON" } else { "Compression=OFF" } if ($UseManagedIdentity -eq $true) { $authText = "Auth=ManagedIdentity" } elseif ($AzAppId -and $AzAppSecret -and $TenantId) { $authText = "Auth=SPN" } else { $authText = "Auth=AzContext" } $payloadPct = [Math]::Round(($bulkPayload.Length / $maxPayloadBytes) * 100, 1) Write-Verbose (" Batch: {0} rows, payload {1:N0} bytes ({2}% of 1 MB limit) [fast path]" -f $totalDataLines, $bulkPayload.Length, $payloadPct) Write-Host "" Write-Host " Posting data to LogAnalytics table [ $($TableName)_CL ]" Write-Host " Rows : 1..$totalDataLines / $totalDataLines" Write-Host " $compressionText | $authText" Write-Host "" $uri = "$($DceURI.TrimEnd('/'))/dataCollectionRules/$($DcrImmutableId)/streams/$($DcrStream)?api-version=2021-11-01-preview" Write-Verbose ("POST {0} with {1}-byte payload" -f $uri, $bulkPayload.Length) try { $result = Invoke-WebRequest ` -UseBasicParsing ` -Uri $uri ` -Method Post ` -Headers $headers ` -ContentType 'application/json; charset=utf-8' ` -Body ([byte[]]$bulkPayload) ` -ErrorAction Stop if ($result.StatusCode -in 200,202,204) { Write-Host " SUCCESS - data uploaded to LogAnalytics" -ForegroundColor Green return $result } else { throw "Unexpected status code returned from Log Ingestion API: $($result.StatusCode)" } } catch { $responseText = $null $statusCode = $null if ($_.Exception.Response) { try { $statusCode = [int]$_.Exception.Response.StatusCode } catch {} try { $stream = $_.Exception.Response.GetResponseStream() if ($stream) { $reader = New-Object System.IO.StreamReader($stream) $responseText = $reader.ReadToEnd() $reader.Dispose() } } catch {} } throw "Log Ingestion API request failed. HTTP Status: $statusCode Response: $responseText" } } # Bulk didn't fit � fall through to per-row batching Write-Verbose " Bulk payload ($($bulkPayload.Length) bytes) exceeds 1 MB limit � switching to batched upload" $bulkJson = $null; $bulkBytes = $null; $bulkPayload = $null # free memory } # -- Per-row batching (only reached when data exceeds 1 MB or BatchAmount is set) -- $compressionText = if ($EnableCompression -eq $true) { "Compression=ON" } else { "Compression=OFF" } if ($UseManagedIdentity -eq $true) { $authText = "Auth=ManagedIdentity" } elseif ($AzAppId -and $AzAppSecret -and $TenantId) { $authText = "Auth=SPN" } else { $authText = "Auth=AzContext" } $uri = "$($DceURI.TrimEnd('/'))/dataCollectionRules/$($DcrImmutableId)/streams/$($DcrStream)?api-version=2021-11-01-preview" if ($BatchAmount) { # -- Fixed batch size: skip cache, serialize each chunk directly -- $fixedBatchSize = [int]$BatchAmount if ($fixedBatchSize -lt 1) { throw "BatchAmount must be 1 or higher." } $indexLoopFrom = 0 $resultLast = $null $batchNumber = 0 do { $batchNumber++ $indexLoopTo = [Math]::Min(($indexLoopFrom + $fixedBatchSize - 1), ($totalDataLines - 1)) $batchRowCount = $indexLoopTo - $indexLoopFrom + 1 $pctDone = [Math]::Round((($indexLoopTo + 1) / $totalDataLines) * 100) Write-Progress -Activity "Uploading to LogAnalytics table [ $($TableName)_CL ]" ` -Status "Sending batch $batchNumber (rows $($indexLoopFrom + 1)..$($indexLoopTo + 1) of $totalDataLines) ..." ` -PercentComplete $pctDone -Id 2 # Serialize this chunk directly � one ConvertTo-Json call, no cache $batchData = @($Data[$indexLoopFrom..$indexLoopTo]) $json = ConvertTo-Json -Depth 100 -InputObject @($batchData) -Compress $bytes = [System.Text.Encoding]::UTF8.GetBytes($json) if ($EnableCompression -eq $true) { $payloadBytes = Compress-GzipBytes -InputBytes $bytes } else { $payloadBytes = $bytes } $payloadPct = [Math]::Round(($payloadBytes.Length / $maxPayloadBytes) * 100, 1) Write-Verbose (" Batch: {0} rows, payload {1:N0} bytes ({2}% of 1 MB limit)" -f $batchRowCount, $payloadBytes.Length, $payloadPct) if ($payloadBytes.Length -gt $maxPayloadBytes) { throw "Payload size ($($payloadBytes.Length) bytes) exceeds the 1 MB transaction limit. Reduce BatchAmount." } if ($totalDataLines -gt 1) { Write-Host "" Write-Host " Posting data to LogAnalytics table [ $($TableName)_CL ]" Write-Host " Rows : $($indexLoopFrom + 1)..$($indexLoopTo + 1) / $totalDataLines" Write-Host " $compressionText | $authText" Write-Host "" } else { Write-Host "" Write-Host " Posting data to LogAnalytics table [ $($TableName)_CL ]" Write-Host " Rows : 1 / 1" Write-Host " $compressionText | $authText" Write-Host "" } Write-Verbose ("POST {0} with {1}-byte payload" -f $uri, $payloadBytes.Length) try { $result = Invoke-WebRequest ` -UseBasicParsing -Uri $uri -Method Post -Headers $headers ` -ContentType 'application/json; charset=utf-8' ` -Body ([byte[]]$payloadBytes) -ErrorAction Stop if ($result.StatusCode -in 200,202,204) { Write-Host " SUCCESS - data uploaded to LogAnalytics" -ForegroundColor Green $resultLast = $result } else { throw "Unexpected status code returned from Log Ingestion API: $($result.StatusCode)" } } catch { $responseText = $null; $statusCode = $null if ($_.Exception.Response) { try { $statusCode = [int]$_.Exception.Response.StatusCode } catch {} try { $stream = $_.Exception.Response.GetResponseStream() if ($stream) { $reader = New-Object System.IO.StreamReader($stream); $responseText = $reader.ReadToEnd(); $reader.Dispose() } } catch {} } throw "Log Ingestion API request failed. HTTP Status: $statusCode Response: $responseText" } $indexLoopFrom = $indexLoopTo + 1 } until ($indexLoopFrom -ge $totalDataLines) Write-Progress -Activity "Uploading to LogAnalytics table [ $($TableName)_CL ]" -Id 2 -Completed return $resultLast } # -- Auto-sizing path: build cache + cumulative sums for binary search -- $cache = New-AzLogIngestRowJsonCache -Data $Data # Reset adaptive compression ratio for this ingestion run $script:_gzipRatioEstimate = $null $batchNumber = 0 do { $batchNumber++ $pctDone = [Math]::Round(($indexLoopFrom / $totalDataLines) * 100) Write-Progress -Activity "Uploading to LogAnalytics table [ $($TableName)_CL ]" ` -Status "Calculating batch $batchNumber size (row $($indexLoopFrom + 1) of $totalDataLines) ..." ` -PercentComplete $pctDone ` -Id 2 $indexLoopTo = Get-AzLogIngestBatchEndIndex ` -Cache $cache ` -StartIndex $indexLoopFrom ` -MaxPayloadBytes $maxPayloadBytes ` -EnableCompression:($EnableCompression -eq $true) $pctDone = [Math]::Round((($indexLoopTo + 1) / $totalDataLines) * 100) Write-Progress -Activity "Uploading to LogAnalytics table [ $($TableName)_CL ]" ` -Status "Sending batch $batchNumber (rows $($indexLoopFrom + 1)..$($indexLoopTo + 1) of $totalDataLines) ..." ` -PercentComplete $pctDone ` -Id 2 $payloadBytes = Get-AzLogIngestPayloadBytesFromCache ` -Cache $cache ` -StartIndex $indexLoopFrom ` -EndIndex $indexLoopTo ` -EnableCompression:($EnableCompression -eq $true) $batchRowCount = $indexLoopTo - $indexLoopFrom + 1 $payloadPct = [Math]::Round(($payloadBytes.Length / $maxPayloadBytes) * 100, 1) Write-Verbose (" Batch: {0} rows, payload {1:N0} bytes ({2}% of 1 MB limit)" -f $batchRowCount, $payloadBytes.Length, $payloadPct) if ($payloadBytes.Length -gt $maxPayloadBytes) { throw "Payload size ($($payloadBytes.Length) bytes) exceeds the 1 MB transaction limit in the selected transfer format." } if ($totalDataLines -gt 1) { Write-Host "" Write-Host " Posting data to LogAnalytics table [ $($TableName)_CL ]" Write-Host " Rows : $($indexLoopFrom + 1)..$($indexLoopTo + 1) / $totalDataLines" Write-Host " $compressionText | $authText" Write-Host "" } else { Write-Host "" Write-Host " Posting data to LogAnalytics table [ $($TableName)_CL ]" Write-Host " Rows : 1 / 1" Write-Host " $compressionText | $authText" Write-Host "" } Write-Verbose ("POST {0} with {1}-byte payload" -f $uri, $payloadBytes.Length) try { $result = Invoke-WebRequest ` -UseBasicParsing -Uri $uri -Method Post -Headers $headers ` -ContentType 'application/json; charset=utf-8' ` -Body ([byte[]]$payloadBytes) -ErrorAction Stop if ($result.StatusCode -in 200,202,204) { Write-Host " SUCCESS - data uploaded to LogAnalytics" -ForegroundColor Green $resultLast = $result } else { throw "Unexpected status code returned from Log Ingestion API: $($result.StatusCode)" } } catch { $responseText = $null $statusCode = $null if ($_.Exception.Response) { try { $statusCode = [int]$_.Exception.Response.StatusCode } catch {} try { $stream = $_.Exception.Response.GetResponseStream() if ($stream) { $reader = New-Object System.IO.StreamReader($stream) $responseText = $reader.ReadToEnd() $reader.Dispose() } } catch {} } throw "Log Ingestion API request failed. HTTP Status: $statusCode Response: $responseText" } $indexLoopFrom = $indexLoopTo + 1 } until ($indexLoopFrom -ge $totalDataLines) Write-Progress -Activity "Uploading to LogAnalytics table [ $($TableName)_CL ]" -Id 2 -Completed return $resultLast } Function Update-AzDataCollectionRuleDceEndpoint Function Update-AzDataCollectionRuleResetTransformKqlDefault Function Update-AzDataCollectionRuleTransformKql Function ValidateFix-AzLogAnalyticsTableSchemaColumnNames Function will automatically look check in a global variable ($global:AzDcrDetails) - or do a query using Azure Resource Graph to find DCR with name |