Public/Out-KafkaTopic.ps1
function Out-KafkaTopic { <# .DESCRIPTION Encapsulates `Start-KafkaProducer` and `Stop-KafkaConsumer` in one command. For parameter info, consult the documentation for these commands. .PARAMETER Producer If an existing producer process is passed, it will be reused. .PARAMETER Format The format to serialize messages. Can be 'json' (default) or 'text'. .PARAMETER PassThru When specified, messages are passed through the pipeline. .OUTPUTS None, unless `-PassThru` is specified. .EXAMPLE 'hello world' | Out-KafkaTopic -BrokerList 'localhost:9092' -TopicName 'test' # or $p = Start-KafkaProducer -BrokerList 'localhost:9092' -TopicName 'test' 1..3 | ForEach-Object { 'hello world' } | Out-KafkaTopic -Producer $p $p | Stop-KafkaProducer #> [cmdletbinding(DefaultParameterSetName='Connect')] param ( [Parameter(Mandatory=$true, ValueFromPipeline=$true)] [object[]]$Messages, [Parameter(Mandatory=$true, ParameterSetName='UseProducer')] [System.Diagnostics.Process]$Producer, [Parameter(Mandatory=$true, ParameterSetName='Connect')] [string]$TopicName, [Parameter(ParameterSetName='Connect')] [string[]]$BrokerList = @('localhost:9092'), [Parameter(ParameterSetName='Connect')] [uint64]$TimeoutMS = 1000, [Parameter(ParameterSetName='Connect')] [uint32]$BatchSize = 100, [Parameter(ParameterSetName='Connect')] [uint16]$MaxRetries = 3, [Parameter(ParameterSetName='Connect')] [string]$Arguments, [Parameter(ParameterSetName='Connect')] [ValidateSet('json', 'text')] [string]$Format = 'json', [switch]$PassThru ) begin { [bool]$keep_alive = $true if (-not $Producer) { $Producer = Start-KafkaProducer -TopicName $TopicName -BrokerList $BrokerList -TimeoutMS $TimeoutMS -BatchSize $BatchSize -MaxRetries $MaxRetries -Arguments $Arguments -ErrorAction Stop $keep_alive = $false } [scriptblock]$convert_obj = $null if ($Format -eq 'json') { $convert_obj = { param([object]$obj) return $($obj | ConvertTo-Json -Compress) } } else { $convert_obj = { param([object]$obj) return $($obj | Out-String -NoNewline) } } } process { foreach ($msg in $Messages) { $Producer.StandardInput.WriteLine($(& $convert_obj $msg)) if ($PassThru) { Write-Output $msg } } } end { if (-not $keep_alive) { $null = $Producer | Stop-KafkaProducer } } } |