Classes/Async.ps1
using namespace System.Collections.Generic using namespace System.Collections.ObjectModel using namespace System.Linq.Expressions using namespace System.Management.Automation using namespace System.Management.Automation.Runspaces using namespace System.Threading using namespace System.Threading.Tasks # Static class that facilitates the use of traditional .NET async techniques in PowerShell. class AsyncOps { static [PSTaskFactory] $Factory = [PSTaskFactory]::new(); static [Task] ContinueWithCodeMethod([psobject] $instance, [scriptblock] $continuationAction) { $delegate = [AsyncOps]::CreateAsyncDelegate( $continuationAction, [Action[Task[Collection[psobject]]]]) return [AsyncOps]::PrepareTask($instance.psadapted.ContinueWith($delegate)) } # - Hides the result property on a Task object. This is done because the getter for Result waits # for the task to finish, even if just output to the console. # - Adds ContinueWith code method that wraps scriptblocks with CreateAsyncDelegate static [Task] PrepareTask([Task] $target) { $propertyList = $target.psobject.Properties.Name -notmatch 'Result' -as [string[]] $propertySet = [PSPropertySet]::new('DefaultDisplayPropertySet', $propertyList) -as [PSMemberInfo[]] $standardMembers = [PSMemberSet]::new('PSStandardMembers', $propertySet) $target.psobject.Members.Add($standardMembers) $target.psobject.Methods.Add( [PSCodeMethod]::new( 'ContinueWith', [AsyncOps].GetMethod('ContinueWithCodeMethod'))) return $target } static [MulticastDelegate] CreateAsyncDelegate([scriptblock] $function, [type] $delegateType) { return [AsyncOps]::CreateAsyncDelegate($function, $delegateType, [AsyncOps]::Factory) } # Create a delegate from a scriptblock that can be used in threads without runspaces, like those # used in Tasks or AsyncCallbacks. static [MulticastDelegate] CreateAsyncDelegate([scriptblock] $function, [type] $delegateType, [PSTaskFactory] $factory) { $invokeMethod = $delegateType.GetMethod('Invoke') $returnType = $invokeMethod.ReturnType # Create a parameter expression for each parameter the delegate takes. $parameters = $invokeMethod. GetParameters(). ForEach{ [Expression]::Parameter($PSItem.ParameterType, $PSItem.Name) } $scriptParameters = [string]::Empty if ($parameters) { $scriptParameters = '$' + ($invokeMethod.GetParameters().Name -join ', $') } # Allow access to parameters in the following ways: # - By the name given to them by the delegate's invoke method # - $args # - $PSItem/$_ (first parameter only) $preparedScript = 'param({0}) process {{ return {{ {1} }}.InvokeReturnAsIs($PSBoundParameters.Values) }}' -f $scriptParameters, $function # Prepare variable and constant expressions. $scriptText = [Expression]::Constant($preparedScript, [string]) $ps = [Expression]::Variable([powershell], 'ps') $collectionResultType = [Collection[psobject]] if ($returnType -ne [void] -and $returnType -ne [Collection[psobject]]) { $collectionResultType = [Collection`1].MakeGenericType($returnType) } $result = [Expression]::Variable($collectionResultType, 'result') $psInput = [Expression]::Variable([Object[]], 'psInput') $guid = [Expression]::Constant($factory.InstanceId, [guid]) $pool = [Expression]::Property( [Expression]::Property( [Expression]::Property($null, [PSTaskFactory], 'Instances'), 'Item', $guid), 'RunspacePool') # Group the expressions for the body by creating them in a scriptblock. [Expression[]]$expressions = & { [Expression]::Assign($ps, [Expression]::Call([powershell], 'Create', @(), @())) [Expression]::Assign([Expression]::Property($ps, 'RunspacePool'), $pool) [Expression]::Call($ps, 'AddScript', @(), $scriptText) foreach ($parameter in $parameters) { [Expression]::Call($ps, 'AddArgument', @(), $parameter) } $invokeArgs = @() if ($parameters) { [Expression]::Assign( $psInput, [Expression]::NewArrayInit([object], $parameters[0] -as [Expression[]])) $invokeArgs = @($psInput) } $invokeTypeArgs = @() if ($returnType -ne [void] -and $returnType -ne [Collection[psobject]]) { $invokeTypeArgs = @($returnType) } [Expression]::Assign($result, [Expression]::Call($ps, 'Invoke', $invokeTypeArgs, $invokeArgs)) [Expression]::Call($ps, 'Dispose', @(), @()) if ($returnType -ne [void]) { [Expression]::Call( [LanguagePrimitives], 'ConvertTo', @($returnType), $result -as [Expression[]]) } else { $result } } $block = [Expression]::Block([ParameterExpression[]]($ps, $result, $psInput), $expressions) $lambda = [Expression]::Lambda( $delegateType, $block, $parameters -as [ParameterExpression[]]) return $lambda.Compile() } } # A TaskFactory implementation that creates tasks that run scriptblocks in a runspace pool. class PSTaskFactory : TaskFactory[Collection[psobject]] { hidden static [Dictionary[guid, PSTaskFactory]] $Instances = [Dictionary[guid, PSTaskFactory]]::new() hidden [RunspacePool] $RunspacePool; hidden [guid] $InstanceId; hidden [bool] $IsDisposed = $false; PSTaskFactory() : base() { $this.Initialize() } PSTaskFactory([CancellationToken] $cancellationToken) : base($cancellationToken) { $this.Initialize() } PSTaskFactory([TaskScheduler] $scheduler) : base($scheduler) { $this.Initialize() } PSTaskFactory( [TaskCreationOptions] $creationOptions, [TaskContinuationOptions] $continuationOptions) : base($creationOptions, $continuationOptions) { $this.Initialize() } PSTaskFactory( [CancellationToken] $cancellationToken, [TaskCreationOptions] $creationOptions, [TaskContinuationOptions] $continuationOptions, [TaskScheduler] $scheduler) : base($cancellationToken, $creationOptions, $continuationAction, $scheduler) { $this.Initialize() } hidden [void] Initialize() { $this.RunspacePool = [runspacefactory]::CreateRunspacePool(1, 4) $this.RunspacePool.Open() $this.InstanceId = [guid]::NewGuid() [PSTaskFactory]::Instances.Add($this.InstanceId, $this) } # Can't implement IDisposable while inheriting a generic class because of a parse error, need # to create an issue. [void] Dispose() { $this.AssertNotDisposed() [PSTaskFactory]::Instances.Remove($this.InstanceId) $this.RunspacePool.Dispose() $this.IsDisposed = $true } hidden [void] AssertNotDisposed() { if ($this.IsDisposed) { throw [InvalidOperationException]::new( 'Cannot perform operation because object "PSTaskFactory" has already been disposed.') } } # Shortcut to AsyncOps.CreateAsyncDelegate hidden [MulticastDelegate] Wrap([scriptblock] $function, [type] $delegateType) { $this.AssertNotDisposed() return [AsyncOps]::CreateAsyncDelegate($function, $delegateType, $this) } # The remaining functions implement methods from TaskFactory. All of these methods call the base # method after wrapping the scriptblock to create a delegate that will work in tasks. [Task[Collection[psobject]]] ContinueWhenAll([Task[]] $tasks, [scriptblock] $continuationAction) { $this.AssertNotDisposed() $delegateType = [Func`2].MakeGenericType([Task[]], [Collection[psobject]]) return [AsyncOps]::PrepareTask( ([TaskFactory[Collection[psobject]]]$this).ContinueWhenAll( $tasks, $this.Wrap($continuationAction, $delegateType), $this.CancellationToken, $this.ContinuationOptions, [TaskScheduler]::Current)) } [Task[Collection[psobject]]] ContinueWhenAny([Task[]] $tasks, [scriptblock] $continuationAction) { $this.AssertNotDisposed() $delegateType = [Func`2].MakeGenericType([Task], [Collection[psobject]]) return [AsyncOps]::PrepareTask( ([TaskFactory[Collection[psobject]]]$this).ContinueWhenAny( $tasks, $this.Wrap($continuationAction, $delegateType), $this.CancellationToken, $this.ContinuationOptions, [TaskScheduler]::Current)) } [Task[Collection[psobject]]] StartNew([scriptblock] $function) { $this.AssertNotDisposed() return [AsyncOps]::PrepareTask( ([TaskFactory[Collection[psobject]]]$this).StartNew( $this.Wrap($function, [Func[Collection[psobject]]]), $this.CancellationToken, $this.CreationOptions, [TaskScheduler]::Current)) } [Task[Collection[psobject]]] StartNew([scriptblock] $function, [object] $state) { $this.AssertNotDisposed() return [AsyncOps]::PrepareTask( ([TaskFactory[Collection[psobject]]]$this).StartNew( $this.Wrap($function, [Func[object, Collection[psobject]]]), $state, $this.CancellationToken, $this.CreationOptions, [TaskScheduler]::Current)) } } function async { [CmdletBinding()] param( [scriptblock] $ScriptBlock, [Parameter(ValueFromPipeline)] [object] $ArgumentList ) process { if (-not $scriptblock) { return } [AsyncOps]::Factory.StartNew($ScriptBlock, $ArgumentList) } } function await { [CmdletBinding()] param( [Parameter(ValueFromPipeline)] [Task] $Task ) begin { $taskList = [List[Task]]::new() } process { if ($Task) { $taskList.Add($Task) } } end { if (-not $taskList.Count) { return } $finished = $false while (-not $finished) { $finished = $taskList.TrueForAll({ param([Task]$task) $task.IsCompleted -or $task.IsCanceled -or $task.IsFaulted }) } $taskList.ToArray().ForEach{ if ($PSItem.IsFaulted -and $PSItem.Exception) { if (-not ($exception = $PSItem.Exception.InnerException)) { $exception = $PSItem.Exception } $PSCmdlet.WriteError( [ErrorRecord]::new( $exception, $exception.GetType().Name, [ErrorCategory]::InvalidResult, $PSItem)) } $PSItem.Result } } } function ContinueWith { [CmdletBinding()] param( [scriptblock] $ContinuationAction, [switch] $WhenAny, [Parameter(ValueFromPipeline)] [Task] $Task ) begin { $taskList = [List[Task]]::new() } process { if ($Task) { $taskList.Add($Task) } } end { if (-not $taskList.Count) { return } if ($WhenAny.IsPresent) { return [AsyncOps]::Factory.ContinueWhenAny($taskList, $ContinuationAction) } return [AsyncOps]::Factory.ContinueWhenAll($taskList, $ContinuationAction) } } |