Public/Read-KafkaTopic.ps1
function Read-KafkaTopic { <# .DESCRIPTION Encapsulates `Start-KafkaConsumer`, `Read-KafkaConsumer`, and `Stop-KafkaConsumer` in one command. For parameter info, consult the documentation for these commands. .OUTPUTS An array of messages consumed from the topic. .EXAMPLE Read-KafkaTopic -BrokerList 'localhost:9092' -TopicName 'test' -MessageCount 10 -FromBeginning #> [cmdletbinding(DefaultParameterSetName='UseConsumer')] param ( [Parameter(Mandatory=$true, ValueFromPipeline=$true, ParameterSetName='UseConsumer')] [ThreadJob.ThreadJob[]]$Consumer, [Parameter(Mandatory=$true, ValueFromPipeline=$true, ParameterSetName='Connect')] [string[]]$TopicName, [Parameter(Mandatory=$true, ParameterSetName='Connect')] [string[]]$BrokerList, [Parameter(ParameterSetName='Connect')] [ValidateRange(1, 9999)] [uint16]$Instances = 1, [Parameter(ParameterSetName='Connect')] [string]$Arguments, [Parameter(ParameterSetName='Connect')] [string]$ConsumerGroup, [Parameter(ParameterSetName='Connect')] [uint64]$MessageCount = 0, [Parameter(ParameterSetName='Connect')] [switch]$Persist, [Parameter(ParameterSetName='Connect')] [switch]$FromBeginning, [Parameter(ParameterSetName='UseConsumer')] [Parameter(ParameterSetName='Connect')] [uint64]$TimeoutMS = 0, [Parameter(ParameterSetName='UseConsumer')] [Parameter(ParameterSetName='Connect')] [uint32]$PollMS = 250, [Parameter(ParameterSetName='UseConsumer')] [Parameter(ParameterSetName='Connect')] [switch]$Tee, [Parameter(ParameterSetName='UseConsumer')] [switch]$AutoRemoveJob ) begin { if (-not $Consumer) { $Consumer = Start-KafkaConsumer -TopicName $TopicName -BrokerList $BrokerList -Instances $Instances -Arguments $Arguments -ConsumerGroup $ConsumerGroup -MessageCount $MessageCount -Persist:$Persist -FromBeginning:$FromBeginning $AutoRemoveJob = $true } } process { $Consumer | Read-KafkaConsumer -TimeoutMS $TimeoutMS -PollMS $PollMS -Tee:$Tee -AutoRemoveJob:$AutoRemoveJob } } |