Public/Start-KafkaConsumer.ps1
function Start-KafkaConsumer { <# .DESCRIPTION Starts a Kafka consumer process in a dedicated thread. .PARAMETER TopicName The Kafka topic from which messages will be consumed. .PARAMETER BrokerList The Kafka broker(s) to connect to. .PARAMETER Instances The number of consumers to start. .PARAMETER Arguments Custom arguments passed to the Kafka CLI. .PARAMETER ConsumerGroup The name of a Kafka consumer group to store offsets and distribute workload. .PARAMETER Persist If specified, the consumer will continue to poll the Kafka topic even after consuming the last available message. .PARAMETER FromBeginning If specified (and no stored offset already exists), the consumer will read from the beginning of the topic instead of the end. .OUTPUTS An array of ThreadJob objects, one for each new consumer. .EXAMPLE $c = Start-KafkaConsumer -BrokerList 'localhost:9092' -TopicName 'test' #> [cmdletbinding()] param( [Parameter(Mandatory=$true, ValueFromPipeline=$true)] [string[]]$TopicName, [Parameter(Mandatory=$true)] [string[]]$BrokerList, [ValidateRange(1, 9999)] [uint16]$Instances = 1, [string]$Arguments, [string]$ConsumerGroup, [uint64]$MessageCount = 0, [switch]$Persist, [switch]$FromBeginning ) process { foreach ($topic in $TopicName) { $kafka = ConvertTo-ConsumerCommand -BrokerList $BrokerList -TopicName $topic -Arguments $Arguments -ConsumerGroup $ConsumerGroup -MessageCount $MessageCount -Persist:$Persist -FromBeginning:$FromBeginning 1..$Instances | ForEach-Object { Start-ThreadJob -ArgumentList $kafka.path, $kafka.args.Split(' '), $ErrorActionPreference, $VerbosePreference, $WarningPreference, $DebugPreference -ScriptBlock { param([string]$FilePath, [string[]]$ArgumentList, $ErrorActionPreference = [System.Management.Automation.ActionPreference]::Stop, $VerbosePreference = [System.Management.Automation.ActionPreference]::Continue, $WarningPreference = [System.Management.Automation.ActionPreference]::Continue, $DebugPreference = [System.Management.Automation.ActionPreference]::Continue) if (-not $ArgumentList) { & $FilePath } elseif ($ArgumentList.Length -eq 1) { & $FilePath $ArgumentList[0] } else { & $FilePath $ArgumentList } if ($LASTEXITCODE -ne $null) { Write-Debug $LASTEXITCODE } else { Write-Debug $([int](-not $?)) } } -Verbose -ErrorAction 'Continue' -WarningAction 'Continue' -Debug } } } } |