Cmdlets/src/XpandPwsh.Cmdlets/InvokeParallel/Invoke-Parallel.cs

using System;
using System.Collections.Concurrent;
using System.Collections.ObjectModel;
using System.Linq;
using System.Management.Automation;
using System.Management.Automation.Runspaces;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Reactive.Threading.Tasks;
using System.Threading;
using System.Threading.Tasks;
using XpandPwsh.CmdLets;
 
namespace XpandPwsh.Cmdlets.InvokeParallel{
    [Cmdlet(VerbsLifecycle.Invoke, "Parallel")]
    [CmdletBinding]
    public class InvokeParallel : XpandCmdlet{
        private PSVariable[] _psVariables;
        private ConcurrentBag<object> _values;
 
        [Parameter]
        public override string ActivityName{ get; set; }
 
        [Parameter(Mandatory = true, Position = 1)]
        public ScriptBlock Script{ get; set; }
 
        [Parameter]
        public string[] VariablesToImport{ get; set; } = new string[0];
 
        [Parameter(Mandatory = true, Position = 0, ValueFromPipeline = true)]
        public object Value{ get; set; }
 
        [Parameter]
        public override int ActivityId{ get; set; }
 
        [Parameter]
        public int RetryOnError{ get; set; } = 0;
 
        [Parameter]
        public int LimitConcurrency{ get; set; }
 
        [Parameter]
        public int RetryDelay{ get; set; } = 3000;
 
        [Parameter]
        public int StepInterval{ get; set; }
 
        protected override Task BeginProcessingAsync(){
            _values = new ConcurrentBag<object>();
            _psVariables = this.Invoke<PSVariable>("Get-Variable")
                .Where(variable => VariablesToImport.Select(s => s.ToLower()).Contains(variable.Name.ToLower())).ToArray();
 
            return base.BeginProcessingAsync();
        }
 
        protected override Task ProcessRecordAsync(){
            _values.Add(Value);
            return base.ProcessRecordAsync();
        }
 
        protected override Task EndProcessingAsync(){
            if (!_values.Any()) return Task.CompletedTask;
            if (_values.Count > Environment.ProcessorCount){
                StepInterval = 50;
            }
            var values = _values.ToObservable();
            if (StepInterval > 0){
                var eventLoopScheduler = new EventLoopScheduler(start => new Thread(start));
                values = values.StepInterval(TimeSpan.FromMilliseconds(StepInterval), eventLoopScheduler);
            }
             
 
            var retrySignal = Enumerable.Range(0, RetryOnError).ToObservable()
                .Delay(TimeSpan.FromMilliseconds(RetryDelay)).Publish().AutoConnect();
            values = LimitConcurrency > 0 ? InvokeWithLimit(values, retrySignal) : values.SelectMany(o => Start(o, retrySignal));
            return values
                .WriteObject(this, _values.Count)
                .HandleErrors(this, ActivityName,SynchronizationContext.Current)
                .ToTask();
        }
 
        private IObservable<Collection<PSObject>> InvokeWithLimit(IObservable<object> values,IObservable<int> retrySignal){
            return values.Select((o, i) => Observable.Defer(() => Start(o,retrySignal)))
                .Merge(LimitConcurrency);
        }
 
        private IObservable<Collection<PSObject>> Start(object o, IObservable<int> retrySignal){
 
            return Observable.Start(() => {
                using (var runspace = RunspaceFactory.CreateRunspace()){
                    runspace.Open();
                    runspace.SetVariable(new PSVariable("_", o));
                    runspace.SetVariable(_psVariables);
                    var psObjects = runspace.Invoke(Script.ToString());
                    var lastExitCode = runspace.Invoke("$LastExitCode").FirstOrDefault();
                    if (lastExitCode != null && (int) lastExitCode.BaseObject > 0){
                        var error = string.Join(Environment.NewLine, runspace.Invoke("$Error"));
                        if (!string.IsNullOrWhiteSpace(error))
                            throw new Exception(
                                $"ExitCode:{lastExitCode}{Environment.NewLine}Errors: {error}{Environment.NewLine}Script:{Script}");
                    }
 
                    runspace.Close();
                    return psObjects;
                }
            }).RetryWhen(_ => _.SelectMany(exception => retrySignal.Concat(Observable.Throw<int>(exception))));
        }
    }
}