Cmdlets/src/XpandPwsh.Cmdlets/InvokeParallel/Invoke-Parallel.cs

using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Management.Automation;
using System.Management.Automation.Runspaces;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Reactive.Threading.Tasks;
using System.Threading;
using System.Threading.Tasks;
using XpandPwsh.CmdLets;
 
namespace XpandPwsh.Cmdlets.InvokeParallel{
    [Cmdlet(VerbsLifecycle.Invoke, "Parallel")]
    [CmdletBinding]
    public class InvokeParallel : XpandCmdlet{
        private ConcurrentBag<object> _values;
        private PSVariable[] _psVariables;
 
        [Parameter]
        public override string ActivityName{ get; set; }
 
        [Parameter(Mandatory = true, Position = 1)]
        public ScriptBlock Script{ get; set; }
 
        [Parameter]
        public string[] VariablesToImport{ get; set; } = new string[0];
 
        [Parameter(Mandatory = true, Position = 0, ValueFromPipeline = true)]
        public object Value{ get; set; }
        [Parameter]
        public override int ActivityId{ get; set; }
 
        [Parameter]
        public int RetryOnError{ get; set; } = 0;
 
        [Parameter]
        public int RetryDelay{ get; set; } = 3000;
        [Parameter]
        public int StepInterval{ get; set; }
        protected override Task BeginProcessingAsync(){
            _values = new ConcurrentBag<object>();
            _psVariables = this.Invoke<PSVariable>("Get-Variable")
                .Where(variable => VariablesToImport.Contains(variable.Name)).ToArray();
            return base.BeginProcessingAsync();
        }
 
        protected override Task ProcessRecordAsync(){
            _values.Add(Value);
            return base.ProcessRecordAsync();
        }
 
        protected override Task EndProcessingAsync(){
            if (!_values.Any()){
                return Task.CompletedTask;
            }
            var signal = Enumerable.Range(0,RetryOnError).ToObservable()
                .Delay(TimeSpan.FromMilliseconds(RetryDelay)).Publish().AutoConnect();
            var eventLoopScheduler = new EventLoopScheduler(start => new Thread(start));
            var synchronizationContext = SynchronizationContext.Current;
            var values = _values.ToObservable();
            if (StepInterval > 0){
                values=values.StepInterval(TimeSpan.FromMilliseconds(StepInterval),eventLoopScheduler);
            }
            return values
                .SelectMany((o, i) => Observable.Start(() => {
                        using (var runspace = RunspaceFactory.CreateRunspace()){
                            runspace.Open();
                            runspace.SetVariable(new PSVariable("_", o));
                            runspace.SetVariable(_psVariables);
                            var psObjects = runspace.Invoke(Script.ToString());
                            var lastExitCode = runspace.Invoke("$LastExitCode").FirstOrDefault();
                            if (lastExitCode != null&& ((int) lastExitCode.BaseObject) >0){
                                var error = string.Join(Environment.NewLine,runspace.Invoke("$Error"));
                                if (!string.IsNullOrWhiteSpace(error)){
                                    throw new Exception($"ExitCode:{lastExitCode}{Environment.NewLine}Errors: {error}{Environment.NewLine}Script:{Script}");
                                }
                            }
                            runspace.Close();
                            return psObjects;
                        }
                    })
                    .RetryWhen(_ => _.SelectMany(exception => signal.Concat(Observable.Throw<int>(exception))))
                    .HandleErrors(this, ActivityName,synchronizationContext))
                .WriteObject(this,_values.Count)
                .ToTask();
        }
 
         
    }
}