Cmdlets/src/XpandPosh.Cmdlets/RXExtensions.cs
using System;
using System.Management.Automation; using System.Reactive.Concurrency; using System.Reactive.Disposables; using System.Reactive.Linq; using System.Reactive.Threading.Tasks; using System.Threading; using NuGet.Common; using XpandPosh.CmdLets; namespace XpandPosh.Cmdlets{ internal static class RXExtensions{ public static IObservable<Tuple<TSource1, TSource2>> CreatePermutations<TSource1,TSource2>(this IObservable<TSource1> source,IObservable<TSource2> other,IScheduler scheduler=null) { if (scheduler == null) scheduler = Scheduler.CurrentThread; return Observable.Create<Tuple<TSource1, TSource2>>(observer =>{ var replay = other.Replay(scheduler); var sequence = source.SelectMany(i => replay.Select(j => Tuple.Create(i,j))); return new CompositeDisposable(replay.Connect(), sequence.Subscribe(observer)); }); } public static IObservable<T> IgnoreException<T,TException>(this IObservable<T> source, PSCmdlet cmdlet,object targetObject) where TException:Exception{ return source.ObserveOn(SynchronizationContext.Current) .Catch<T, TException>(exception => { cmdlet.WriteError(new ErrorRecord(exception, $"{exception.GetHashCode()}",ErrorCategory.InvalidOperation, targetObject)); return Observable.Empty<T>(); }); } public static IObservable<TSource> HandleErrors<TSource>(this IObservable<TSource> source,XpandCmdlet cmdlet, object targetObject=null,SynchronizationContext context=null){ context = context ?? SynchronizationContext.Current; return source.HandleErrors<TSource, Exception>(cmdlet, targetObject,context); } public static IObservable<TSource> HandleErrors<TSource,TException>(this IObservable<TSource> source, XpandCmdlet cmdlet,object targetObject,SynchronizationContext context=null) where TException:Exception{ context = context ?? SynchronizationContext.Current; targetObject = targetObject ?? cmdlet.ActivityName; return source.ObserveOn(context) .Catch<TSource,TException>(exception => { var errorAction = cmdlet.ErrorAction(); if (errorAction==ActionPreference.SilentlyContinue) return Observable.Return(default(TSource)); var errorRecord = new ErrorRecord(exception, exception.GetHashCode().ToString(),ErrorCategory.InvalidOperation, targetObject); cmdlet.WriteError(errorRecord); if (errorAction==ActionPreference.Stop){ return Observable.Throw<TSource>(exception); } if (errorAction==ActionPreference.Ignore||errorAction==ActionPreference.Continue) return Observable.Return(default(TSource)); throw new NotImplementedException($"{errorAction}"); }); } public static IObservable<T> WriteProgress<T>(this IObservable<T> source, IProgressCmdlet cmdlet,int itemsCount){ return source.Select((objects, i) => { var percentComplete = i * 100 / itemsCount; cmdlet.WriteProgress(new ProgressRecord(cmdlet.ActivityId, cmdlet.ActivityName,string.Format(cmdlet.ActivityStatus, percentComplete)){PercentComplete = percentComplete}); return objects; }).Finally(() => cmdlet.WriteProgressCompletion(new ProgressRecord(cmdlet.ActivityId, cmdlet.ActivityName, cmdlet.ActivityStatus){PercentComplete = 100},cmdlet.CompletionMessage)); } public static IObservable<T> WriteVerboseObject<T>(this IObservable<T> source, Cmdlet cmdlet,Func<T,string> text=null,SynchronizationContext synchronizationContext=null){ synchronizationContext = synchronizationContext ?? SynchronizationContext.Current; text = text ?? (arg => $"{arg}"); return source.ObserveOn(synchronizationContext).Select((arg1, i) => { cmdlet.WriteVerbose($"{arg1.GetType().Name}: {text(arg1)}"); return arg1; }); } public static IObservable<T> WriteObject<T>(this IObservable<T> source,Cmdlet cmdlet,int? progressItemsTotalCount=null,bool enumerateCollection=true){ var writeObject = source.ObserveOn(SynchronizationContext.Current).Do(obj => cmdlet.WriteObject(obj,enumerateCollection)); return progressItemsTotalCount.HasValue ? writeObject.WriteProgress((IProgressCmdlet) cmdlet, progressItemsTotalCount.Value) : writeObject; } public static IObservable<T> ToObservable<T>(this IEnumeratorAsync<T> enumeratorAsync){ var nextItem = Observable.Defer(() => enumeratorAsync.MoveNextAsync().ToObservable()); return nextItem.Repeat().TakeUntil(b => !b).Select(b => enumeratorAsync.Current); } } } |