MadMultithreading.psm1

function New-MadThread
    {
    <#
        .SYNOPSIS
            Start given PowerShell script in a new thread
 
        .PARAMETER ScriptBlockUnique
            ScriptBlock to run in the new thread
            Required
 
        .PARAMETER RunspacePoolUnique
            RunspacePool to use for the new thread
            Required
 
        .PARAMETER ParametersUnique
            Hashtable - Parameters for the new thread
 
        .PARAMETER UseEmbeddedParameters
            Switch
            If present, parameter names are derived from ScriptBlockUnique
            and parameter values are set to matching variable values.
 
            Matching variables must exist with correct values.
 
            Thread parameter names cannot be 'ScriptBlockUnique',
            'RunspacePoolUnique', 'ParametersUnique', or 'UseEmbeddedParameters'.
 
        .NOTES
            v 1.0 3/23/18 Tim Curwick Created
            v 1.1 4/30/18 Tim Curwick Added Runspace to return object
            v 1.2 8/ 1/18 Tim Curwick Modified to improve performance from a module
    #>


    [cmdletbinding()]
    Param (
        [ScriptBlock]
        $ScriptBlockUnique,

        [System.Management.Automation.Runspaces.RunspacePool]
        $RunspacePoolUnique,
        
        [Hashtable]
        $ParametersUnique,
        
        [Switch]
        $UseEmbeddedParameters )

    If ( $UseEmbeddedParameters )
        {
        # Build parameter hashtable
        $ScriptBlockUnique.Ast.ParamBlock.Parameters |
            ForEach-Object { $_.Name.ToString().Trim( '$' ) } |
            ForEach-Object `
                -Begin   { $ParametersUnique  = @{} } `
                -Process { $ParametersUnique +=
                    @{ $_ = $PSCmdlet.SessionState.PSVariable.GetValue( $_ ) } }
        }

    # Create thread
    $PowerShell = [PowerShell]::Create()
    $PowerShell.RunspacePool = $RunspacePoolUnique
    
    # Add script
    [void]$PowerShell.AddScript( $ScriptBlockUnique )
    
    # Add parameters
    If ( $ParametersUnique.Count )
        {
        [void]$PowerShell.AddParameters( $ParametersUnique )
        }
    
    # Start thread
    $Handler = $PowerShell.BeginInvoke()
        
    # Return thread hooks
    [PSCustomObject]@{
        PowerShell   = $PowerShell
        Handler      = $Handler
        RunspacePool = $RunspacePoolUnique }
    }


function Wait-MadThread
    {
    <#
        .SYNOPSIS
            Wait for given threads to complete, optionally disposing of them
 
        .PARAMETER Thread
            Array of custom objects containing the PowerShell, Handler and Runspace for each thread to wait for
            Required
 
        .PARAMETER NoDispose
            Switch - If present, the PowerShell and Runspace objects are not disposed of after completion
 
        .PARAMETER TimeoutSeconds
            Int32 - Maximum number of seconds to
 
        .NOTES
            v 1.0 4/30/18 Tim Curwick Created
    #>

    [cmdletbinding()]
    Param (
        [array]$Thread,
        [switch]$NoDispose )

    While ( $Thread.Handler.IsCompleted -contains $False )
        {
        Start-Sleep -Milliseconds 200
        }

    If ( -not $NoDispose )
        {
        $Thread.PowerShell.Dispose()
        $Thread.RunspacePool.Dispose()
        }
    }


function Start-MadOutputThread
    {
    <#
        .SYNOPSIS
            Start a thread to export items from given queues to given CSV files
 
        .PARAMETER Reports
            Array of custom objects specifying the Queue and Path of each report
            Required for multiple reports
 
        .PARAMETER Queue
            The BlockingCollection to monitor
            Required for a single report
 
        .PARAMETER Path
            Full path and name of the output CSV file
            Required for a single report
 
        .NOTES
            v 1.0 4/30/18 Tim Curwick Created
            v 1.1 5/23/18 Tim Curwick Modified to collect all available objects in queue before exporting
                                         (Preventing chatty queue and inefficient disk writes from blocking other queues)
    #>

    [cmdletbinding()]
    Param (
        [parameter( Mandatory = $True, ParameterSetName = 'Collection' )]
        $Reports,

        [parameter( Mandatory = $True, ParameterSetName = 'Single' )]
        $Queue,

        [parameter( Mandatory = $True, ParameterSetName = 'Single' )]
        [string]$Path )

    # If the Single parameter set is used
    # Build $Reports from $Queue and $Path
    If ( $PSCmdlet.ParameterSetName -eq 'Single' )
        {
        $Reports = @( @{
            Queue = $Queue
            Path  = $Path } )
        }

    ## Define script to run in the output thread

    $OutputThreadScript = 
        {
        <#
            .PARAMETER Reports
                Array of custom objects specifying the Queue and Path of each report
                Required
        #>

        [cmdletbinding()]
        Param (
            [array]$Reports )

        # Initialize collection
        $Items = [System.Collections.ArrayList]@()
        
        # Initialize empty PSObject so we can use it as a ref variable
        $Item = [pscustomobject]@{}

        # While we're still watching for work...
        While ( $Reports.ForEach{ $_.Queue.IsCompleted } -contains $False )
            {
            # For each specified report...
            ForEach ( $Report in $Reports )
                {
                # Start with an empty bucket
                If ( $Items )
                    {
                    $Items.Clear()
                    }

                # While there are items in the queue
                # Take the item and add it to the collection
                While ( $Report.Queue.TryTake( [ref]$Item ) )
                    {
                    $Items.Add( $Item )
                    }
                
                # If output items were received
                # Export items to output file
                If ( $Items )
                    {
                    $Items | Export-CSV -Path $Report.Path -Append -Force -Encoding UTF8 -NoTypeInformation
                    }
                }
            
            # Take a breath before checking for more items to process
            Start-Sleep -Milliseconds 200
            }
        }


    # Create runspace pool
    $RunspacePool = [runspacefactory]::CreateRunspacePool( 1, 1 )
    $RunspacePool.Open()

    # Start the output thread
    # Return the thread object
    New-MadThread -ScriptBlockUnique $OutputThreadScript -RunspacePoolUnique $RunspacePool -ParametersUnique @{ Reports = $Reports }
    }


function Invoke-MadMultithread
    {
    <#
    .SYNOPSIS
        Run input against function in multiple threads
 
    .PARAMETER Function
        String - Name of the defined function to run
 
    .PARAMETER Parameters
        Hashtable - Parameters (other than pipeline input) for Function
 
    .PARAMETER Threads
        Int - Number of parallel threads to use
        Defaults to 2
 
    .PARAMETER NoSort
        Switch - If present, results are returned in the order they complete processing.
                 If not present, results are return in the same order as their
                 respective inputs.
 
    .PARAMETER InputObject
        Object[] - Array of objects or pipeline input to be processed by Scriptblock
 
 
    .EXAMPLE
    # Single thread
    $Groups | Get-ADGroupMember
    # Default 2 threads
    $Groups | Invoke-MadMultithread Get-ADGroupMember
 
    .EXAMPLE
    # Single thread
    $Groups | Get-ADGroupMember -Server DC01
    # 4 threads
    $Groups | Invoke-MadMultithread Get-ADGroupMember -Parameters @{ Server = 'DC01' } -Threads 4
 
    .EXAMPLE
    # Single thread
    $Groups | Get-ADGroupMember -Server DC01 -Recursive
    # 8 threads
    $Groups | Invoke-MadMultithread Get-ADGroupMember -Parameters @{ Server = 'DC01'; Recursive = $True } -Threads 8
 
    .EXAMPLE
    function YourFunction {
        Param ( [parameter( ValueFromPipeline = $True )]$InputParam, $Thing1, $Thing2 )
        Process
            {
            # Your pipeline code
            }
        }
    # Single thread
    $InputObjects | YourFunction -Thing1 'Alpha' -Thing2 27
    # Default 2 threads
    $InputObjects | Invoke-MadMultithread YourFunction -Parameters @{ Thing1 = 'Alpha'; Thing2 = 27 }
 
    .EXAMPLE
    function Start-RandomSleep {
        Param ( [parameter( ValueFromPipeline = $True )]$Thing1 )
        Process
            {
            Start-Sleep -Seconds ( Get-Random 5 )
            $Thing1
            }
        }
    # Default sorting - Output is in same order as input
    1..10 | Invoke-MadMultithread Start-RandomSleep -Threads 10
 
    # returns 1, 2, 3, 4, 5, 6, 7, 8, 9, 10
 
 
    # NoSort option - Results returned when ready
    1..10 | Invoke-MadMultithread Start-RandomSleep -Threads 10 -NoSort
 
    # returns 7, 4, 5, 1, 6, 8, 10, 2, 3, 9
 
 
    .NOTES
        v1.0 3/16/18 Tim Curwick Created
        v1.1 8/ 2/18 Tim Curwick Modifed to supress output of Null values
    #>

    [cmdletbinding()]
    Param (
        [string]   $Function,
        [hashtable]$Parameters,
        [int]      $Threads = 2,
        [switch]   $NoSort,

        [parameter( ValueFromPipeline = $True )]
        [array]    $InputObject )

    Begin
        {
        #region Initialize Variables

            # Collection for thread references
            $RunningThreads = @()

            # Index of next input object
            $Index = 0

            # Index of next sorted result
            $ResultIndex = 0

            # Result object (needs to exist before using as a [ref] variable)
            $Result = [pscustomobject]@{}

            # Collection for results
            $Results = @{}

            # Number of threads must be greater than 0
            $Threads = [math]::Max( $Threads, 1 )

            # Cross-thread objects
            $InputQueue    = [System.Collections.Concurrent.BlockingCollection[PSObject]]@{}
            $ResultQueue   = [System.Collections.Concurrent.BlockingCollection[PSObject]]@{}
            $ThreadStatus = [System.Collections.Concurrent.ConcurrentDictionary[Int,String]]@{}

        #endregion


        #region Define wrapped function
        
            # Get the specified Function
            $Command = Get-Command $Function -ErrorAction SilentlyContinue

            # If Function is defined in a module
            # Set the wrapped function as an alias for the Function
            If ( $Command.ModuleName )
                {
                $FunctionDefinition = 'Set-Alias -Name Invoke-WrappedFunction -value ' + $Command.Name
                }

            # If Function is a script-defined function
            # Use the definition of Function to define the wrapped function
            ElseIf ( $Command.CommandType -eq 'Function' )
                {
                $FunctionDefinition = 'function Invoke-WrappedFunction { ' + $Command.Definition + ' }'
                }
            
            # Else (Function is not defined in a module and is not a script-defined function)
            # Throw error
            Else
                {
                Write-Error -Message "Unable to parse function [$Function]."
                }

            # Convert wrapped function to scriptblock
            $FunctionDefinition = [scriptblock]::Create( $FunctionDefinition )

        #endregion

        # Define thread parameters
        $ThreadParameters = @{
            InputQueue         = $InputQueue    
            ResultQueue        = $ResultQueue   
            ThreadStatus       = $ThreadStatus
            FunctionDefinition = $FunctionDefinition
            FunctionParam      = $Parameters }

        # Define script to run in each thread
        $ThreadScript =
            {
            Param(
                [System.Collections.Concurrent.BlockingCollection[PSObject]]
                $InputQueue,

                [System.Collections.Concurrent.BlockingCollection[PSObject]]
                $ResultQueue,
        
                [System.Collections.Concurrent.ConcurrentDictionary[Int,String]]
                $ThreadStatus,

                [Scriptblock]
                $FunctionDefinition,

                [Hashtable]
                $FunctionParam )

            # Define Invoke-WrappedFunction
            . $FunctionDefinition

            # Add self to list of running threads
            $ThreadID = [appdomain]::GetCurrentThreadId()
            $ThreadStatus[$ThreadID] = 'Waiting'

            # For each Item in queue...
            # (If queue is empty, will wait for item. If queue is closed, loop ends.)
            ForEach ( $Item in $InputQueue.GetConsumingEnumerable() )
                {
                # Set status to busy
                $ThreadStatus[$ThreadID] = 'Processing'

                # Define result hashtable
                $Result = @{ Index = $Item.Index; Value = $Null; Error = $Null }

                # Call wrapped function, with or without additional parameters
                try
                    {
                    If ( $FunctionParam.Keys )
                        {
                        $Result.Value = $Item.Value | Invoke-WrappedFunction @FunctionParam
                        }
                    Else
                        {
                        $Result.Value = $Item.Value | Invoke-WrappedFunction
                        }
                    }
                
                # Error
                # Add to result
                catch
                    {
                    $Result.Error = $_
                    }
    
                # Return result
                $ResultQueue.Add( $Result )

                # Set status to ready for work
                $ThreadStatus[$ThreadID] = 'Waiting'
                }

            # If this is the last running thread
            # Close the new result queue
            [void]$ThreadStatus.TryRemove( $ThreadId, [ref]$Null )
            If ( $ThreadStatus.Count -eq 0)
                {
                $ResultQueue.CompleteAdding()
                }
            }
            
        # Create runspace pool
        $RunspacePool = [runspacefactory]::CreateRunspacePool( 1, $Threads )
        $RunspacePool.Open()
        }

    Process
        {
        try
            {
            # For each input object (looping to handle non-pipeline array input)
            ForEach ( $InputElement in $InputObject )
                {

                # If we are not yet at max thread count and
                # there are no threads waiting for work
                # Start another thread
                If ( $RunningThreads.Count -lt $Threads -and
                        $ThreadStatus.Values -notcontains 'Waiting' )
                    {
                    # Create thread
                    $PowerShell = [PowerShell]::Create()
                    $PowerShell.RunspacePool = $RunspacePool
    
                    # Add script
                    [void]$PowerShell.AddScript( $ThreadScript )
    
                    # Add parameters
                    [void]$PowerShell.AddParameters( $ThreadParameters )
    
                    # Start thread
                    $Handler = $PowerShell.BeginInvoke()
        
                    # Add thread hooks to collection
                    $RunningThreads += [PSCustomObject]@{
                        PowerShell = $PowerShell
                        Handler    = $Handler }
                    }

                # Add input object to input queue
                # (Adding an index so results can be returned in the correct order)
                $InputQueue.Add( @{ Index = $Index++; Value = $InputElement } )

                # Check result queue
                # Put any results in the results collection
                # Repeat until empty
                While ( $ResultQueue.TryTake( [ref]$Result ) )
                    {
                    # If NoSort specified
                    # Return result immediately
                    If ( $NoSort )
                        {
                        # Return result value to output stream (if not null)
                        If ( $Result.Value -ne $Null )
                            {
                            $Result.Value
                            }

                        # If an error was returned
                        # Write error to error stream
                        If ( $Result.Error )
                            {
                            Write-Error -ErrorRecord $Result.Error
                            }
                        }

                    # Else (NoSort not specified)
                    # Save result to collection
                    Else
                        {
                        $Results[$Result.Index] = $Result
                        }
                    }

                # If NoSort not specified
                # Process result collection
                If ( -not $NoSort )
                    {

                    # Check results collection
                    # If the result for the next result index is found
                    # Process it
                    # Repeat as needed
                    While ( $Results[$ResultIndex] )
                        {
                        # Return result value to output stream (if not null)
                        If ( $Results[$ResultIndex].Value -ne $Null )
                            {
                            $Results[$ResultIndex].Value
                            }

                        # If an error was returned
                        # Write error to error stream
                        If ( $Results[$ResultIndex].Error )
                            {
                            Write-Error -ErrorRecord $Results[$ResultIndex].Error
                            }

                        # Remove processed result from results collection
                        $Results.Remove( $ResultIndex )

                        # Increment next result index to process
                        $ResultIndex++
                        }
                    }
                }
            }
        finally
            {
            # Catch Ctrl-C
            If ( -not $? )
                {
                # Clean up
                $InputQueue.CompleteAdding()
                $RunningThreads | ForEach-Object { $_.PowerShell.Stop(); $_.PowerShell.Dispose() }
                $RunspacePool.Dispose()
                }
            }
        }

    End
        {
        try
            {
            # Close input queue
            $InputQueue.CompleteAdding()

            # For each result in result queue
            # (Will wait for additional results until queue is closed)
            ForEach ( $Result in $ResultQueue.GetConsumingEnumerable() )
                {
                If ( $NoSort )
                    {
                    # Return result value to output stream (if not null)
                    If ( $Result.Value -ne $Null )
                        {
                        $Result.Value
                        }

                    # If an error was returned
                    # Write error to error stream
                    If ( $Result.Error )
                        {
                        Write-Error -ErrorRecord $Result.Error
                        }
                    }
                Else
                    {
                    # Put result in results collection
                    $Results[$Result.Index] = $Result

                    # Check results collection
                    # If the result for the next result index is found
                    # Process it
                    # Repeat as needed
                    While ( $Results[$ResultIndex] )
                        {
                        # Return result value to output stream (if not null)
                        If ( $Results[$ResultIndex].Value -ne $Null )
                            {
                            $Results[$ResultIndex].Value
                            }

                        # If an error was returned
                        # Write error to error stream
                        If ( $Results[$ResultIndex].Error )
                            {
                            Write-Error -ErrorRecord $Results[$ResultIndex].Error
                            }

                        # Remove processed result from results collection
                        $Results.Remove( $ResultIndex )

                        # Increment next result index to process
                        $ResultIndex++
                        }
                    }
                }
            }
        finally
            {
            # Clean up
            $RunningThreads | ForEach-Object { $_.PowerShell.Stop(); $_.PowerShell.Dispose() }
            $RunspacePool.Dispose()
            }
        }
    }