Private/ConvertTo-ProducerCommand.ps1
function ConvertTo-ProducerCommand { [cmdletbinding()] param( [Parameter(Mandatory=$true)] [string[]]$BrokerList, [Parameter(Mandatory=$true)] [string]$TopicName, [string]$Arguments, [uint64]$TimeoutMS = 1000, [uint32]$BatchSize = 200, [uint16]$MaxRetries = 3 ) [pscustomobject]$kafka = [pscustomobject]@{'path'=$null;'args'=$null} $kafka.path = Get-KafkaHome [string]$kafkacat = [System.IO.Path]::Combine($kafka.path, 'kafkacat') [bool]$is_win = $($PSVersionTable.PSVersion.Major -lt 6 -or $IsWindows) if ($is_win) { $kafkacat += '.exe' } if (Test-Path $kafkacat) { $kafka.path = $kafkacat $kafka.args = "-b $($BrokerList -join ',') -t $TopicName -P -X message.send.max.retries=$MaxRetries" if ($TimeoutMS -gt 0) { $kafka.args += ",queue.buffering.max.ms=$TimeoutMS" } if ($BatchSize -gt 0) { $kafka.args += ",queue.buffering.max.messages=$BatchSize" } } else { if ($is_win) { $kafka.path = [System.IO.Path]::Combine($kafka.path, 'bin', 'windows', 'kafka-console-producer.bat') } else { $kafka.path = [System.IO.Path]::Combine($kafka.path, 'bin', 'kafka-console-producer.sh') } if (-not (Test-Path $kafka.path)) { Write-Error -Exception $([System.IO.FileNotFoundException]::new($kafka.path)) } $kafka.args = "--broker-list $($BrokerList -join ',') --topic $TopicName --message-send-max-retries $MaxRetries" if ($TimeoutMS -gt 0) { $kafka.args += " --timeout $TimeoutMS" } if ($BatchSize -gt 0) { $kafka.args += " --batch-size $BatchSize" } } if ($Arguments) { $kafka.args += ' ' + $Arguments } Write-Verbose $("{0} {1}" -f $kafka.path, $kafka.args) return $kafka } |