script/tools/public/Invoke-CbsRunspace.ps1
Function Invoke-CbsRunspace { Param ( [Parameter(Mandatory, ValueFromPipeline)][PSObject[]]$InputObject, [Parameter(Mandatory, Position = 0)][ScriptBlock]$ScriptBlock, [Parameter()][HashTable]$SharedVariables = @{}, [Parameter()][Switch]$AutoImports, [Parameter()][int]$MaxThreads = 30, [Parameter()][System.Management.Automation.FunctionInfo[]]$Commands = @(), [Parameter()][System.Management.Automation.PSModuleInfo[]]$Modules = @() ) Begin { Write-Verbose "Begin $($MyInvocation.InvocationName)" [System.Management.Automation.Runspaces.InitialSessionState]$sessionState = [System.Management.Automation.Runspaces.InitialSessionState]::CreateDefault() if ($AutoImports) { [string[]]$modulePaths = Get-Module | ? { $path = $_.Path $isOnModulePath = & { $env:PSModulePath -split ":" | % { if ($path -like "$_/*") { return $true }} } if ($isOnModulePath) { return $false } else { return $true } } | Select-Object -ExpandProperty Path | % { $manifest = $_ -replace "psm1$", "psd1" if (Test-Path $manifest) { $manifest } else { $_ } } foreach ($path in $ModulePaths) { Write-Verbose "Importing module $($path)" $sessionState.ImportPSModule($path) } } if ($Modules) { $Modules | Select-Object -ExpandProperty Path | % { $manifest = $_ -replace "psm1$", "psd1" if (Test-Path $manifest) { $manifest } else { $_ } } | % { Write-Verbose "Importing module $_" $sessionState.ImportPSModule($_) } } $SharedVariables["Lock"] = New-Object System.Object $SharedVariables.Keys | ForEach-Object { $sessionState.Variables.Add( (New-Object System.Management.Automation.Runspaces.SessionStateVariableEntry($_, $SharedVariables.$_, $null)) ) } Function Invoke-WithLock { Param([Parameter(Mandatory)][ScriptBlock]$ScriptBlock) [System.Threading.Monitor]::Enter($Lock) & $ScriptBlock [System.Threading.Monitor]::Exit($Lock) } $lockFunction = Get-Content Function:\Invoke-WithLock $sessionFunction = [System.Management.Automation.Runspaces.SessionStateFunctionEntry]::new("Invoke-WithLock", $lockFunction) $sessionState.Commands.Add($sessionFunction) # Import all unscoped functions into the runspace, this will let us pretend we are in the # same scope in terms of functions. This renders the "Commands" parameter somewhat redundant. (Get-ChildItem Function:\ | ? { !$_.Source }) + $Commands | % { Write-Verbose "Importing command $($_.Name)" $sessionState.Commands.Add( [System.Management.Automation.Runspaces.SessionStateFunctionEntry]::new($_.Name, $_.Definition) ) } Write-Verbose "Creating runspace pool with [$maxThreads] runspaces" $runspacePool = [runspacefactory]::CreateRunspacePool(1, $maxThreads, $sessionState, $Host) $runspacePool.Open() $data = @{ jobs = @() runningJobs = 0 } Write-Verbose "Importing preferences into runspaces" # When called from a module a function will not have the preference variables set. # We can retrieve them from $PSCmdlet.SessionState $preferences = (Get-Variable | ? Name -like *Preference | % { $variable = $PSCmdlet.SessionState.PSVariable.Get($_.Name) if ($variable.Value.PSObject.TypeNames[0] -eq "System.Boolean") { '${0} = ${1}' -f $_.Name,$variable.Value } else { '${0} = "{1}"' -f $_.Name,$variable.Value } }) -join "`n" $parameterizedScriptBlock = [ScriptBlock]::Create("param(`$_)`n$preferences`n$ScriptBlock") # $parameterizedScriptBlock = [ScriptBlock]::Create("param(`$_)`n$ScriptBlock") Write-Verbose "Created script block:`n$($parameterizedScriptBlock.ToString())" $inBuffer = [System.Management.Automation.PSDataCollection[PSObject]]::new() $inBuffer.Complete() $outBuffer = [System.Management.Automation.PSDataCollection[PSObject]]::new() Function _ProcessJobs { # Remove all finished jobs from the jobs list, store the results and clean up. $data.jobs = @($data.jobs | % { if ($_.Handle.IsCompleted) { # If the job crashes the thread object may be $null. # Simply remove it from the list and keep going if this happens. if ($_.Thread) { $_.Thread.EndInvoke($_.Handle) | Out-Null $_.Thread.Dispose() $_.Thread = $Null $_.Handle = $Null } $data.runningJobs-- } else { $_ | Write-Output } }) # This outputs the result of all the completed jobs so far. if ($outBuffer.Count -gt 0) { $outBuffer.ReadAll() | Write-Output } Start-Sleep -Milliseconds 10 } } Process { $InputObject | % { Write-Verbose "Process $_ $($MyInvocation.InvocationName)" $data.runningJobs++ while($data.runningJobs -ge $MaxThreads) { _ProcessJobs } $powerShellThread = [powershell]::Create().AddScript($parameterizedScriptBlock, $True) [void]$powerShellThread.AddParameter("_", $_) $powerShellThread.RunspacePool = $runspacePool $handle = $powerShellThread.BeginInvoke($inBuffer, $outBuffer) $data.jobs += [PSCustomObject]@{ Handle = $handle; Thread = $powerShellThread } } } End { try { Write-Verbose "End-Start $($MyInvocation.InvocationName)" # Wait for all threads to complete. while (($data.jobs | Measure-Object).Count -ne 0) { _ProcessJobs } } finally { if ($outBuffer.Count -gt 0) { $outBuffer.ReadAll() | Write-Output } # Clean up after ourselves. $runspacePool.Close() $runspacePool.Dispose() $inBuffer.Dispose() $outBuffer.Dispose() Write-Verbose "End-End $($MyInvocation.InvocationName)" } } } |