PsRunspace.psm1
function Add-PsCommand { <# .Synopsis Add a command to a [System.Management.Automation.PowerShell] instance .Description Used by Invoke-Thread Uses AddScript() or AddStatement() and AddCommand() depending on the command .EXAMPLE The following demonstrates sending a Cmdlet name to the -Command parameter [powershell]::Create() | AddCommand -Command 'Write-Output' #> param( # Powershell interface to add the Command to [Parameter(ValueFromPipeline = $true)] [powershell[]]$PowershellInterface, <# Command to add to the Powershell interface This can be a scriptblock object, or a string that specifies a: Alias Function (the name of the function) ExternalScript (the path to the .ps1 file) All, Application, Cmdlet, Configuration, Filter, or Script #> [Parameter(Position = 0)] $Command, # Output from Get-PsCommandInfo # Optional, to improve performance if it will be re-used for multiple calls of Add-PsCommand [pscustomobject]$CommandInfo ) begin { [int64]$CurrentObjectIndex = 0 if ($CommandInfo -eq $null) { $CommandInfo = Get-PsCommandInfo -Command $Command } } process { ForEach ($ThisPowershell in $PowershellInterface) { switch ($CommandInfo.CommandType) { 'Alias' { # Resolve the alias to its command and start from the beginning with that command. $CommandInfo = Get-PsCommandInfo -Command $CommandInfo.CommandInfo.Definition Add-PsCommand -Command $CommandInfo.CommandInfo.Definition -CommandInfo $CommandInfo -PowershellInterface $ThisPowerShell } 'Function' { $ThisPowershell.AddScript($CommandInfo.CommandInfo.Definition) } 'ScriptBlock' { $ThisPowershell.AddScript($Command) } 'ExternalScript' { $ThisPowershell.AddScript($CommandInfo.ScriptBlock) } default { # If the type is All, Application, Cmdlet, Configuration, Filter, or Script then run the command as-is $ThisPowershell.AddStatement().AddCommand($Command) } } } } } function Get-PsCommandInfo { <# .Synopsis Get info about a PowerShell command .Description Used by Split-Thread, Invoke-Thread, and Add-PsCommand Determine whether the Command is a [System.Management.Automation.ScriptBlock] object If not, passes it to the Name parameter of Get-Command .EXAMPLE The following demonstrates sending a Cmdlet name to the -Command parameter Get-PsCommandInfo -Command 'Write-Output' #> param( <# Command to retrieve info on This can be a scriptblock object, or a string that specifies an: Alias Function (the name of the function) ExternalScript (the path to the .ps1 file) All, Application, Cmdlet, Configuration, Filter, or Script #> $Command ) if ($Command.GetType().FullName -eq 'System.Management.Automation.ScriptBlock') { $CommandType = 'ScriptBlock' } else { $CommandInfo = Get-Command $Command -ErrorAction SilentlyContinue $CommandType = $CommandInfo.CommandType if ($CommandInfo.Source) { $ModuleInfo = Get-Module -Name $CommandInfo.Source -ErrorAction SilentlyContinue } } #CommentedForPerformanceOptimization#Write-Debug " $(Get-Date -Format s)`t$(hostname)`tGet-PsCommandInfo`t$Command is a $CommandType" [pscustomobject]@{ CommandInfo = $CommandInfo ModuleInfo = $ModuleInfo CommandType = $CommandType SourceModuleDefinition = $ModuleInfo.Definition SourceModuleName = $CommandInfo.Source } } function Open-Thread { <# .Synopsis Prepares each thread so it is ready to execute a command and capture the output streams .Description Used by Split-Thread For each InputObject an instance will be created of [System.Management.Automation.PowerShell] Then a series of commands will be run to enable the specified output streams (all by default) #> Param( # Objects to pass to the Command as an argument or parameter [Parameter( ValueFromPipeline = $true, ValueFromPipelineByPropertyName = $true )] $InputObject, # .Net Framework runspace pool to use for the threads [Parameter( Mandatory = $true )] [System.Management.Automation.Runspaces.RunspacePool]$RunspacePool, <# Name of a property (whose value is a string) that exists on each $InputObject It will be used to represent the object in text form If left null, the object's ToString() method will be used instead. #> [string]$ObjectStringProperty, # PowerShell Command or Script to run against each InputObject [Parameter(Mandatory = $true)] $Command, # Output from Get-PsCommandInfo [pscustomobject]$CommandInfo, # Named parameter of the Command to pass InputObject to # If this is not specified, InputObject will be passed to the Command as an argument [string]$InputParameter = $null, <# Parameters to add to the Command Each parameter is a name-value pair in the hashtable: @{"ParameterName" = "Value"} @{"ParameterName" = "Value" ; "ParameterTwo" = "Value2"} #> [HashTable]$AddParam = @{}, # Switches to add to the Command [string[]]$AddSwitch = @() ) begin { [int64]$CurrentObjectIndex = 0 $ThreadCount = @($InputObject).Count } process { ForEach ($Object in $InputObject) { $CurrentObjectIndex++ if ($ObjectStringProperty -ne '') { $ObjectString = $Object."$ObjectStringProperty" } else { $ObjectString = $Object.ToString() } $PowershellInterface = [powershell]::Create() $PowershellInterface.RunspacePool = $RunspacePool $null = $PowershellInterface.Commands.Clear() Add-PsCommand -Command $Command -CommandInfo $CommandInfo -PowershellInterface $PowershellInterface If (!([string]::IsNullOrEmpty($InputParameter))) { $null = $PowershellInterface.AddParameter($InputParameter, $Object) <#NormallyCommentThisForPerformanceOptimization#>$InputParameterString = "-$InputParameter '$ObjectString'" } Else { $null = $PowershellInterface.AddArgument($Object) <#NormallyCommentThisForPerformanceOptimization#>$InputParameterString = "'$ObjectString'" } $AdditionalParameters = @() $AdditionalParameters = ForEach ($Key in $AddParam.Keys) { $null = $PowershellInterface.AddParameter($Key, $AddParam.$key) <#NormallyCommentThisForPerformanceOptimization#>"-$Key '$($AddParam.$key)'" } $AdditionalParametersString = $AdditionalParameters -join ' ' $Switches = @() $Switches = ForEach ($Switch in $AddSwitch) { $null = $PowershellInterface.AddParameter($Switch) <#NormallyCommentThisForPerformanceOptimization#>"-$Switch" } $SwitchParameterString = $Switches -join ' ' $StatusString = "Invoking thread $CurrentObjectIndex`: $Command $InputParameterString $AdditionalParametersString $SwitchParameterString" <#NormallyCommentThisForPerformanceOptimization#>#Write-Debug " $(Get-Date -Format s)`t$(hostname)`tOpen-Thread`t$StatusString" $Progress = @{ Activity = $StatusString PercentComplete = $CurrentObjectIndex / $ThreadCount * 100 Status = "$($ThreadCount - $CurrentObjectIndex) remaining" } Write-Progress @Progress $Handle = $PowershellInterface.BeginInvoke() [PSCustomObject]@{ Handle = $Handle PowerShellInterface = $PowershellInterface Object = $Object ObjectString = $ObjectString Index = $CurrentObjectIndex Command = "$Command" } } } end { Write-Progress -Activity 'Completed' -Completed } } function Split-Thread { <# .Synopsis Split a command for a collection of input objects into multiple threads for asynchronous processing .Description The specified command will be run for each input object in a separate powershell instance with its own runspace These runspaces are part of the same runspace pool inside the same powershell.exe process .EXAMPLE The following demonstrates sending a Cmdlet name to the -Command parameter $InputObject | Split-Thread -Command 'Write-Output' .EXAMPLE The following demonstrates sending a scriptblock to the -Command parameter $InputObject | Split-Thread -Command [scriptblock]::create("Write-Output `$args[0]") .EXAMPLE The following demonstrates sending a script file path to the -Command parameter $InputObject | Split-Thread -Command "C:\Test-Command.ps1" .EXAMPLE The following demonstrates sending a function to the -Command parameter $InputObject | Split-Thread -Command 'Test-Function' .EXAMPLE The following demonstrates the -AddParam parameter $InputObject | Split-Thread -Command "Get-Service" -InputParameter ComputerName -AddParam @{"Name" = "BITS"} .EXAMPLE The following demonstrates the -AddSwitch parameter $InputObject | Split-Thread -Command "Get-Service" -AddSwitch @('RequiredServices','DependentServices') .EXAMPLE The following demonstrates the use of a threadsafe hashtable to store results The hastable can be accessed and updated from inside each runspace $ThreadsafeHashtable = [hashtable]::Synchronized(@{}) $InputObject | Split-Thread -Command "Fake-Function" -InputParameter ComputerName -AddParam @{"ResultHashTableParameter" = $ThreadsafeHashtable} #> param ( # PowerShell Command or Script to run against each InputObject [Parameter(Mandatory = $true)] $Command, # Objects to pass to the Command as an argument or parameter [Parameter( ValueFromPipeline = $true, ValueFromPipelineByPropertyName = $true )] $InputObject, # Named parameter of the Command to pass InputObject to # If this is not specified, InputObject will be passed to the Command as an argument $InputParameter = $null, # Maximum number of concurrent threads to allow [int]$Threads = 20, # Milliseconds to wait between cycles of the loop that checks threads for completion [int]$SleepTimer = 200, # Seconds to wait without receiving any new results before giving up and stopping all remaining threads [int]$Timeout = 120, <# Parameters to add to the Command Each parameter is a name-value pair in the hashtable: @{"ParameterName" = "Value"} @{"ParameterName" = "Value" ; "ParameterTwo" = "Value2"} #> [HashTable]$AddParam = @{}, # Switches to add to the Command [string[]]$AddSwitch = @(), # Names of modules to import in each runspace [String[]]$AddModule, <# Name of a property (whose value is a string) that exists on each $InputObject and can be used to represent the object in text form If left null, the object's ToString() method will be used instead. #> [string]$ObjectStringProperty ) begin { $InitialSessionState = [system.management.automation.runspaces.initialsessionstate]::CreateDefault() # Import the source module containing the specified Command in each thread $CommandInfo = Get-PsCommandInfo -Command $Command switch ($CommandInfo.ModuleInfo.ModuleType) { 'Binary' { Write-Debug "`$InitialSessionState.ImportPSModule('$($CommandInfo.ModuleInfo.Name)')" $InitialSessionState.ImportPSModule($CommandInfo.ModuleInfo.Name) } 'Script' { $ModulePath = $CommandInfo.ModuleInfo.Path | Split-Path -Parent Write-Debug "`$InitialSessionState.ImportPSModulesFromPath('$ModulePath')" $InitialSessionState.ImportPSModulesFromPath($ModulePath) } 'Manifest' { $ModulePath = $CommandInfo.ModuleInfo.Path | Split-Path -Parent Write-Debug "`$InitialSessionState.ImportPSModulesFromPath('$ModulePath')" $InitialSessionState.ImportPSModulesFromPath($ModulePath) } default { # Scriptblocks have no module to import so ModuleInfo will be null } } # Import any additional specified modules in each thread ForEach ($Module in $AddModule) { $ModuleObj = Get-Module $Module -ErrorAction SilentlyContinue switch ($ModuleObj.ModuleType) { 'Binary' { Write-Debug "`$InitialSessionState.ImportPSModule('$Module')" $InitialSessionState.ImportPSModule($Module) } default { # This is for Script or Manifest modules Write-Debug "`$InitialSessionState.ImportPSModulesFromPath('$($ModuleObj.Path | Split-Path -Parent)')" $InitialSessionState.ImportPSModulesFromPath(($ModuleObj.Path | Split-Path -Parent)) } } } # Set the preference variables for PowerShell output streams in each thread to match the current preferences $OutputStream = @('Debug', 'Verbose', 'Information', 'Warning', 'Error') ForEach ($ThisStream in $OutputStream) { if ($ThisStream -eq 'Error') { $VariableName = 'ErrorActionPreference' } else { $VariableName = "$($ThisStream)Preference" } $VariableValue = (Get-Variable -Name $VariableName).Value $VariableEntry = [System.Management.Automation.Runspaces.SessionStateVariableEntry]::new($VariableName, $VariableValue, '') $InitialSessionState.Variables.Add($VariableEntry) } $RunspacePool = [runspacefactory]::CreateRunspacePool(1, $Threads, $InitialSessionState, $Host) $VerbosePreference = 'SilentlyContinue' $RunspacePool.Open() $Global:TimedOut = $false $AllInputObjects = [System.Collections.Generic.List[psobject]]::new() } process { # Add all the input objects from the pipeline to a single collection; allows progress bars later ForEach ($ThisObject in $InputObject) { $null = $AllInputObjects.Add($ThisObject) } } end { $ThreadParameters = @{ Command = $Command InputParameter = $InputParameter InputObject = $AllInputObjects AddParam = $AddParam AddSwitch = $AddSwitch ObjectStringProperty = $ObjectStringProperty CommandInfo = $CommandInfo RunspacePool = $RunspacePool } $AllThreads = Open-Thread @ThreadParameters Wait-Thread -Thread $AllThreads -Threads $Threads -SleepTimer $SleepTimer -Timeout $Timeout -Dispose $VerbosePreference = 'Continue' if ($Global:TimedOut -eq $false) { #CommentedForPerformanceOptimization#Write-Debug " $(Get-Date -Format s)`t$(hostname)`tSplit-Thread`t[System.Management.Automation.Runspaces.RunspacePool]::Close()" $null = $RunspacePool.Close() #CommentedForPerformanceOptimization#Write-Debug " $(Get-Date -Format s)`t$(hostname)`tSplit-Thread`t[System.Management.Automation.Runspaces.RunspacePool]::Close() completed" #CommentedForPerformanceOptimization#Write-Debug " $(Get-Date -Format s)`t$(hostname)`tSplit-Thread`t[System.Management.Automation.Runspaces.RunspacePool]::Dispose()" $null = $RunspacePool.Dispose() #CommentedForPerformanceOptimization#Write-Debug " $(Get-Date -Format s)`t$(hostname)`tSplit-Thread`t[System.Management.Automation.Runspaces.RunspacePool]::Dispose() completed" } Write-Progress -Activity 'Completed' -Completed } } function Wait-Thread { <# .Synopsis Waits for a thread to be completed so the results can be returned, or for a timeout to be reached .Description Used by Split-Thread .INPUTS [PSCustomObject]$Thread .OUTPUTS Outputs the specified output streams from the threads #> param ( # Threads to wait for [Parameter( Mandatory = $true, ValueFromPipeline = $true )] [PSCustomObject[]]$Thread, # Maximum number of concurrent threads that are allowed (used only for progress display) [int]$Threads = 20, # Milliseconds to wait between cycles of the loop that checks threads for completion [int]$SleepTimer = 200, # Seconds to wait without receiving any new results before giving up and stopping all remaining threads [int]$Timeout = 120, # Dispose of the thread when it is finished [switch]$Dispose ) begin { $StopWatch = [System.Diagnostics.Stopwatch]::new() $StopWatch.Start() $AllThreads = [System.Collections.Generic.List[PSCustomObject]]::new() $RunspacePool = ($Thread | Select-Object -First 1).PowershellInterface.RunspacePool $CommandString = ($Thread | Select-Object -First 1).Command } process { ForEach ($ThisThread in $Thread) { # If the threads do not have handles, there is nothing to wait for, so output the thread as-is. # Otherwise wait for the handle to indicate completion (or a timeout to be reached) if ($ThisThread.Handle -eq $false) { $null = $ThisThread.PowerShellInterface.Streams.ClearStreams() $ThisThread } else { $null = $AllThreads.Add($ThisThread) } } } end { # If the threads have handles, we can check to see if they are complete. While (@($AllThreads | Where-Object -FilterScript { $null -ne $_.Handle }).Count -gt 0) { if ($RunspacePool) { $AvailableRunspaces = $RunspacePool.GetAvailableRunspaces() } $CleanedUpThreads = [System.Collections.Generic.List[PSCustomObject]]::new() $CompletedThreads = [System.Collections.Generic.List[PSCustomObject]]::new() $IncompleteThreads = [System.Collections.Generic.List[PSCustomObject]]::new() ForEach ($ThisThread in $AllThreads) { if ($null -eq $ThisThread.Handle) { $null = $CleanedUpThreads.Add($ThisThread) } if ($ThisThread.Handle.IsCompleted -eq $true) { $null = $CompletedThreads.Add($ThisThread) } if ($ThisThread.Handle.IsCompleted -eq $false) { $null = $IncompleteThreads.Add($ThisThread) } } $ActiveThreadCountString = "$($Threads - $AvailableRunspaces) of $Threads are active" #CommentedForPerformanceOptimization#Write-Debug " $(Get-Date -Format s)`t$(hostname)`tWait-Thread`t$ActiveThreadCountString" #CommentedForPerformanceOptimization#Write-Debug " $(Get-Date -Format s)`t$(hostname)`tWait-Thread`t$($CompletedThreads.Count) completed threads" #CommentedForPerformanceOptimization#Write-Debug " $(Get-Date -Format s)`t$(hostname)`tWait-Thread`t$($CleanedUpThreads.Count) cleaned up threads" #CommentedForPerformanceOptimization#Write-Debug " $(Get-Date -Format s)`t$(hostname)`tWait-Thread`t$($IncompleteThreads.Count) incomplete threads" $RemainingString = "$($IncompleteThreads.ObjectString)" If ($RemainingString.Length -gt 60) { $RemainingString = $RemainingString.Substring(0, 60) + "..." } $Progress = @{ Activity = "Waiting on threads - $ActiveThreadCountString`: $CommandString" PercentComplete = ($($CleanedUpThreads).count) / @($Thread).Count * 100 Status = "$(@($IncompleteThreads).Count) remaining - $RemainingString" } Write-Progress @Progress ForEach ($CompletedThread in $CompletedThreads) { #Write-Debug " $(Get-Date -Format s)`t$(hostname)`tWait-Thread`t$($CompletedThread.PowerShellInterface.Streams.Progress.Count) Progress messages" #Write-Debug " $(Get-Date -Format s)`t$(hostname)`tWait-Thread`t$($CompletedThread.PowerShellInterface.Streams.Information.Count) Information messages" #Write-Debug " $(Get-Date -Format s)`t$(hostname)`tWait-Thread`t$($CompletedThread.PowerShellInterface.Streams.Verbose.Count) Verbose messages" #Write-Debug " $(Get-Date -Format s)`t$(hostname)`tWait-Thread`t$($CompletedThread.PowerShellInterface.Streams.Debug.Count) Debug messages" #Write-Debug " $(Get-Date -Format s)`t$(hostname)`tWait-Thread`t$($CompletedThread.PowerShellInterface.Streams.Warning.Count) Warning messages" # Because $Host was used to create the RunspacePool, any output to $Host (which includes Write-Host and Write-Information and Write-Progress) has already been displayed #$CompletedThread.PowerShellInterface.Streams.Progress | ForEach-Object {Write-Progress $_} #$CompletedThread.PowerShellInterface.Streams.Information | ForEach-Object {Write-Information $_} #$CompletedThread.PowerShellInterface.Streams.Verbose | ForEach-Object {Write-Verbose $_} #$CompletedThread.PowerShellInterface.Streams.Debug | ForEach-Object {Write-Debug $_} #$CompletedThread.PowerShellInterface.Streams.Warning | ForEach-Object {Write-Warning $_} $null = $CompletedThread.PowerShellInterface.Streams.ClearStreams() if ($Dispose -eq $true) { $ThreadOutput = $CompletedThread.PowerShellInterface.EndInvoke($CompletedThread.Handle) #NormallyCommentThisForPerformanceOptimization#>if (($ThreadOutput | Measure-Object).Count -gt 0) { #NormallyCommentThisForPerformanceOptimization#>Write-Debug " $(Get-Date -Format s)`t$(hostname)`tWait-Thread`tOutput (count of $($ThreadOutput.Count)) received from thread $($CompletedThread.Index): $($CompletedThread.ObjectString)" #NormallyCommentThisForPerformanceOptimization#>} #NormallyCommentThisForPerformanceOptimization#>else { #NormallyCommentThisForPerformanceOptimization#>Write-Debug " $(Get-Date -Format s)`t$(hostname)`tWait-Thread`tNull result for thread $($CompletedThread.Index) ($($CompletedThread.ObjectString))" #NormallyCommentThisForPerformanceOptimization#>} $ThreadOutput $null = $CompletedThread.PowerShellInterface.Dispose() $CompletedThread.PowerShellInterface = $null $CompletedThread.Handle = $null } else { #CommentedForPerformanceOptimization#Write-Debug " $(Get-Date -Format s)`t$(hostname)`tWait-Thread`tThread $($CompletedThread.Index) ($($CompletedThread.ObjectString)) is finished opening." $null = $CompletedThread.PowerShellInterface.EndInvoke($CompletedThread.Handle) $CompletedThread.Handle = $null $CompletedThread } $StopWatch.Reset() $StopWatch.Start() } If ($StopWatch.ElapsedMilliseconds / 1000 -gt $Timeout) { Write-Warning "$(Get-Date -Format s)`t$(hostname)`tWait-Thread`tReached Timeout of $Timeout seconds. Skipping $($IncompleteThreads.Count) remaining threads: $RemainingString" $Global:TimedOut = $true $IncompleteThreads | ForEach-Object { [PSCustomObject]@{ Handle = $null PowerShellInterface = $_.PowershellInterface Object = $_.Object ObjectString = $_.ObjectString Index = $_.CurrentObjectIndex Command = $_.Command } } } #CommentedForPerformanceOptimization#Write-Debug " $(Get-Date -Format s)`t$(hostname)`tWait-Thread`tSleeping $SleepTimer milliseconds" Start-Sleep -Milliseconds $SleepTimer } $StopWatch.Stop() #CommentedForPerformanceOptimization#Write-Verbose "$(Get-Date -Format s)`t$(hostname)`tWait-Thread`tFinished waiting for threads" Write-Progress -Activity 'Completed' -Completed } } <# # Dot source any functions ForEach ($ThisScript in $ScriptFiles) { # Dot source the function . $($ThisScript.FullName) } #> Export-ModuleMember -Function @('Add-PsCommand','Get-PsCommandInfo','Open-Thread','Split-Thread','Wait-Thread') |