TaskJob.cs

// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
// This code was part of PSThreadJob.

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.ComponentModel;
using System.Linq;
using System.Management.Automation;
using System.Threading;
using System.Threading.Tasks;

namespace PowerProcess
{

    /// <summary>
    /// JobSourceAdapter
    /// </summary>
    public sealed class TaskJobSourceAdapter : JobSourceAdapter
    {
        #region Members

        private readonly ConcurrentDictionary<Guid, Job2> _repository;

        #endregion

        #region Constructor

        /// <summary>
        /// Constructor
        /// </summary>
        public TaskJobSourceAdapter()
        {
            Name = nameof(TaskJobSourceAdapter);
            _repository = new ConcurrentDictionary<Guid, Job2>();
        }

        #endregion

        #region JobSourceAdapter Implementation

        /// <summary>
        /// NewJob
        /// </summary>
        public override Job2? NewJob(JobInvocationInfo specification)
        {
            var job = specification.Parameters[0][0].Value as TaskJob;
            if (job != null)
            {
                _repository.TryAdd(job.InstanceId, job);
            }
            return job;
        }

        /// <summary>
        /// GetJobs
        /// </summary>
        public override IList<Job2> GetJobs()
        {
            return _repository.Values.ToArray();
        }

        /// <summary>
        /// GetJobsByName
        /// </summary>
        public override IList<Job2> GetJobsByName(string name, bool recurse)
        {
            var rtnList = new List<Job2>();
            foreach (var job in _repository.Values)
            {
                if (job.Name.Equals(name, StringComparison.OrdinalIgnoreCase))
                {
                    rtnList.Add(job);
                }
            }
            return rtnList;
        }

        /// <summary>
        /// GetJobsByCommand
        /// </summary>
        public override IList<Job2> GetJobsByCommand(string command, bool recurse)
        {
            var rtnList = new List<Job2>();
            foreach (var job in _repository.Values)
            {
                if (job.Command.Equals(command, StringComparison.OrdinalIgnoreCase))
                {
                    rtnList.Add(job);
                }
            }
            return rtnList;
        }

        /// <summary>
        /// GetJobByInstanceId
        /// </summary>
        public override Job2? GetJobByInstanceId(Guid instanceId, bool recurse)
        {
            if (_repository.TryGetValue(instanceId, out var job))
            {
                return job;
            }
            return null;
        }

        /// <summary>
        /// GetJobBySessionId
        /// </summary>
        public override Job2? GetJobBySessionId(int id, bool recurse)
        {
            foreach (var job in _repository.Values)
            {
                if (job.Id == id)
                {
                    return job;
                }
            }
            return null;
        }

        /// <summary>
        /// GetJobsByState
        /// </summary>
        public override IList<Job2> GetJobsByState(JobState state, bool recurse)
        {
            var rtnList = new List<Job2>();
            foreach (var job in _repository.Values)
            {
                if (job.JobStateInfo.State == state)
                {
                    rtnList.Add(job);
                }
            }
            return rtnList;
        }

        /// <summary>
        /// GetJobsByFilter
        /// </summary>
        public override IList<Job2> GetJobsByFilter(Dictionary<string, object> filter, bool recurse)
        {
            throw new PSNotSupportedException();
        }

        /// <summary>
        /// RemoveJob
        /// </summary>
        public override void RemoveJob(Job2 job)
        {
            if (_repository.TryGetValue(job.InstanceId, out var removeJob))
            {
                removeJob.StopJob();
                _repository.TryRemove(job.InstanceId, out _);
            }
        }

        #endregion
    }

    /// <summary>
    /// Job
    /// </summary>
    public sealed class TaskJob : Job2
    {
        #region Private members

        private readonly Action<TaskJob> _runTask;
        private readonly CancellationTokenSource _cancelSource;

        #endregion

        #region Properties

        /// <summary>
        /// Specifies the job definition for the JobManager
        /// </summary>
        public JobDefinition TaskJobDefinition
        {
            get;
            private set;
        }

        #endregion

        #region Constructors

#pragma warning disable CS8618
        private TaskJob() { }
#pragma warning restore CS8618

        /// <summary>
        /// Constructor.
        /// </summary>
        /// <param name="name"></param>
        /// <param name="runTask"></param>
        /// <param name="cancelSource"></param>
        public TaskJob(
            PSCmdlet psCmdlet,
            string? name,
            Action<TaskJob> runTask,
            CancellationTokenSource cancelSource)
            : base(String.Empty, name)
        {
            _runTask = runTask;
            _cancelSource = cancelSource;

            this.PSJobTypeName = "TaskJob"
                ;
            // Hook up data streams.
            this.Output = new PSDataCollection<PSObject>();
            this.Output.EnumeratorNeverBlocks = true;

            this.Error = new PSDataCollection<ErrorRecord>();
            this.Error.EnumeratorNeverBlocks = true;

            this.Progress = new PSDataCollection<ProgressRecord>();
            this.Progress.EnumeratorNeverBlocks = true;

            this.Verbose = new PSDataCollection<VerboseRecord>();
            this.Verbose.EnumeratorNeverBlocks = true;

            this.Warning = new PSDataCollection<WarningRecord>();
            this.Warning.EnumeratorNeverBlocks = true;

            this.Debug = new PSDataCollection<DebugRecord>();
            this.Debug.EnumeratorNeverBlocks = true;

            this.Information = new PSDataCollection<InformationRecord>();
            this.Information.EnumeratorNeverBlocks = true;

            // Create the JobManager job definition and job specification, and add to the JobManager.
            TaskJobDefinition = new JobDefinition(typeof(TaskJobSourceAdapter), "", Name);
            var parameterCollection = new Dictionary<string, object> {
                { nameof(psCmdlet.JobManager.NewJob), this }
            };
            var jobSpecification = new JobInvocationInfo(TaskJobDefinition, parameterCollection);
            var newJob = psCmdlet.JobManager.NewJob(jobSpecification);
            System.Diagnostics.Debug.Assert(newJob == this, "JobManager must return this job");
        }

        #endregion

        #region Public methods

        /// <summary>
        /// Dispose
        /// </summary>
        /// <param name="disposing"></param>
        protected override void Dispose(bool disposing)
        {
            if (disposing)
            {
                _cancelSource.Cancel();
                base.Output.Complete();
                base.Error.Complete();
                base.Progress.Complete();
                base.Verbose.Complete();
                base.Warning.Complete();
                base.Debug.Complete();
                base.Information.Complete();
            }

            base.Dispose(disposing);
        }

        /// <summary>
        /// StatusMessage
        /// </summary>
        public override string StatusMessage
        {
            get { return string.Empty; }
        }

        /// <summary>
        /// HasMoreData
        /// </summary>
        public override bool HasMoreData
        {
            get
            {
                return (this.Output.Count > 0 ||
                        this.Error.Count > 0 ||
                        this.Progress.Count > 0 ||
                        this.Verbose.Count > 0 ||
                        this.Debug.Count > 0 ||
                        this.Warning.Count > 0);
            }
        }

        /// <summary>
        /// Location
        /// </summary>
        public override string Location
        {
            get { return "PowerShell"; }
        }

        /// <summary>
        /// ReportError
        /// </summary>
        /// <param name="e"></param>
        public void ReportError(Exception e)
        {
            try
            {
                SetJobState(JobState.Failed);

                this.Error.Add(
                        new ErrorRecord(e, "ThreadJobError", ErrorCategory.InvalidOperation, this));
            }
            catch (ObjectDisposedException)
            {
                // Ignore. Thrown if Job is disposed (race condition.).
            }
            catch (PSInvalidOperationException)
            {
                // Ignore. Thrown if Error collection is closed (race condition.).
            }
        }

        #endregion

        #region Base class overrides

        /// <summary>
        /// StartJob
        /// </summary>
        public override void StartJob()
        {
            if (this.JobStateInfo.State != JobState.NotStarted)
            {
                throw new Exception(PSThreadJobResources.CannotStartJob);
            }
            try
            {
                Task.Factory.StartNew(() =>
                {
                    Thread.CurrentThread.Name = $"TaskJob ${this.Name}";
                    try
                    {
                        SetJobState(JobState.Running);
                        _runTask(this);
                        SetJobState(JobState.Completed);
                    }
                    catch (Exception e)
                    {
                        ReportError(e);
                    }
                }, _cancelSource.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default);
            }
            catch (Exception e)
            {
                ReportError(e);
            }
        }

        /// <summary>
        /// StartJobAsync
        /// </summary>
        public override void StartJobAsync()
        {
            this.StartJob();
            this.OnStartJobCompleted(
                new AsyncCompletedEventArgs(null, false, this));
        }

        /// <summary>
        /// OnStartJobCompleted
        /// </summary>
        /// <param name="eventArgs"></param>
        protected override void OnStartJobCompleted(AsyncCompletedEventArgs eventArgs)
        {
            base.OnStartJobCompleted(eventArgs);
        }

        /// <summary>
        /// StopJob
        /// </summary>
        public override void StopJob()
        {
            if (this.JobStateInfo.State == JobState.Failed) return;
            if (this.JobStateInfo.State != JobState.Running)
            {
                throw new Exception(PSThreadJobResources.CannotStopJob);
            }
            try
            {
                _cancelSource.Cancel();
            }
            catch (Exception e)
            {
                ReportError(e);
            }
        }

        /// <summary>
        /// StopJob
        /// </summary>
        /// <param name="force"></param>
        /// <param name="reason"></param>
        public override void StopJob(bool force, string reason)
        {
            StopJob();
        }

        /// <summary>
        /// StopJobAsync
        /// </summary>
        public override void StopJobAsync()
        {
            this.StartJob();
            this.OnStopJobCompleted(
                new AsyncCompletedEventArgs(null, false, this));
        }

        /// <summary>
        /// StopJobAsync
        /// </summary>
        /// <param name="force"></param>
        /// <param name="reason"></param>
        public override void StopJobAsync(bool force, string reason)
        {
            StopJobAsync();
        }

        /// <summary>
        /// OnStopJobCompleted
        /// </summary>
        /// <param name="eventArgs"></param>
        protected override void OnStopJobCompleted(AsyncCompletedEventArgs eventArgs)
        {
            base.OnStopJobCompleted(eventArgs);
        }

        #region Not implemented

        /// <summary>
        /// SuspendJob
        /// </summary>
        /// <param name="force"></param>
        /// <param name="reason"></param>
        public override void SuspendJob(bool force, string reason)
        {
            throw new NotImplementedException();
        }

        /// <summary>
        /// SuspendJob
        /// </summary>
        public override void SuspendJob()
        {
            throw new NotImplementedException();
        }

        /// <summary>
        /// SuspendJobAsync
        /// </summary>
        public override void SuspendJobAsync()
        {
            throw new NotImplementedException();
        }

        /// <summary>
        /// SuspendJobAsync
        /// </summary>
        /// <param name="force"></param>
        /// <param name="reason"></param>
        public override void SuspendJobAsync(bool force, string reason)
        {
            throw new NotImplementedException();
        }

        /// <summary>
        /// ResumeJob
        /// </summary>
        public override void ResumeJob()
        {
            throw new NotImplementedException();
        }

        /// <summary>
        /// ResumeJobAsync
        /// </summary>
        public override void ResumeJobAsync()
        {
            throw new NotImplementedException();
        }

        /// <summary>
        /// UnblockJob
        /// </summary>
        public override void UnblockJob()
        {
            throw new NotImplementedException();
        }

        /// <summary>
        /// UnblockJobAsync
        /// </summary>
        public override void UnblockJobAsync()
        {
            throw new NotImplementedException();
        }

        #endregion

        #endregion

    }

}