Start-MapReduce.ps1
function Start-MapReduce { <# .Synopsis Runs a MapReduce on a set of data .Description Runs a MapReduce on a set of data, sequentially or in background jobs, on one or more machines. MapReduce is an approach to summarizing data. With MapReduce: * A Map function takes each value of incoming data and returns one or more objects summarizing that data * The summarized data is grouped and passed to one or more functions that reduce that data into a final result .Example New-Object PSObject -Property @{ Text = "the quick brown fox jumped over the lazy dog" } | Start-MapReduce -Map { param($Text, $data) foreach ($w in ($text -split ' ')) { New-Object PSObject -Property @{ Word = $w Count = 1 } } } -Reduce { param($Word, $WordCount) New-Object PSObject -Property @{ Word = $word Count = $WordCount | Measure-Object -Property Count -Sum | Select-Object -ExpandProperty Sum } } .Link http://en.wikipedia.org/wiki/Mapreduce #> [CmdletBinding(DefaultParameterSetName='Local')] [OutputType([PSObject])] param( # One or more map functions [ValidateScript({ foreach ($sb in $_) { Set-Item function:Map0 -Value $sb $mapFunc = $ExecutionContext.SessionState.InvokeCommand.GetCommand("Map0", "Function") Remove-Item function:Map0 if ($mapFunc.Parameters.Count -ne 2) { throw "Map functions must have two parameters" } } return $true })] [Parameter(Mandatory=$true,Position=0)] [ScriptBlock[]] $Map, # One or more reduce functions [Parameter(Mandatory=$true,Position=1)] [ValidateScript({ foreach ($sb in $_) { Set-Item function:Reduce0 -Value $sb $reduceFunc = $ExecutionContext.SessionState.InvokeCommand.GetCommand("Reduce0", "Function") Remove-Item function:Reduce0 if ($reduceFunc.Parameters.Count -ne 2) { throw "Reduce functions must have two parameters" } } return $true })] [ScriptBlock[]] $Reduce, # A set of input data [Parameter(Mandatory=$true,ValueFromPipeline=$true)] [PSObject[]] $InputObject, # The number of records in each shard. By default, this will be the square root of the number of total items. [Uint32] $ShardSize, # The number of record in each batch. By default, this will be the square root of the number of the shard size. [Uint32] $BatchSize, # The buffer between job launches. By default, 1/4th of a second. [Timespan] $Buffer = [Timespan]::FromMilliseconds(250), # If set, reduce will be run sequentially [Switch] $RunReduceSequentially, # If set, map will be run sequentially [Switch] $RunMapSequentially, # A list of computers that will be used in the map reduce [Parameter(Mandatory=$true,ParameterSetName='Grid')] [Alias('Node')] [string[]] $Grid, # If set, the local machine will be included in the grid [Parameter(ParameterSetName='Grid')] [Switch] $IncludeLocalhost, # The credential to use when remoting to the grid [Parameter(ParameterSetName='Grid')] [Management.Automation.PSCredential] $Credential, # The ratio of use of the local node to remote nodes in the grid. By default, the ratio will be 1/n, where N is the number of nodes including localhost. # If provided, a use of the local host will be entered in once for every N items. For instance, a LocalRatio of 1 would run every other job on the local machine, a LocalRatio of 2 would run every 3rd job on the local machine, and so on. [Parameter(ParameterSetName='Grid')] [Uint32] $LocalRatio ) begin { $accumulatedObjects = New-Object Collections.ArrayList $progressId = Get-Random } process { $null = $accumulatedObjects.AddRange($InputObject) Write-Progress "Accumulating Input" "$($accumulatedObjects.Count) Objects Accumulated" -id $progressId } end { $psBoundParameters["InputObject"] = $accumulatedObjects if (-not $PSBoundParameters.ShardSize) { $shardSize = [Math]::Sqrt($accumulatedObjects.Count) } if (-not $PSBoundParameters.BatchSize) { $batchSize = [Math]::Sqrt($ShardSize) } if ($IncludeLocalhost -and $Grid) { if (-not $LocalRatio) { $grid=@('localhost') + $Grid } else { $newGrid = @() for ($gc = 0; $gc -lt $grid.count;$gc+=$LocalRatio) { $newGrid+='localhost' $newGrid +=$grid[$gc..($gc + $LocalRatio -1)] } $grid = $newGrid } } $script:JobsLaunched = @{} $launchJob = { param($s, $jobInfo) if ($PSVersionTable.PSVersion -ge '3.0') { $isMap = $($Map -like $s) -as [bool] Set-Item function:Map0 -Value $s $mapFunction= $ExecutionContext.SessionState.InvokeCommand.GetCommand("Map0", "Function") $MapKey = $mapFunction.Parameters.Values | Sort-Object { $_.ParameterSets.Values[0].Position } | Select-Object -First 1 -ExpandProperty Name $mapReduceWorkflow = @" workflow mapReduceStep { param([Parameter(Position=0)][PSObject[]]`$data) foreach -parallel(`$d in `$data) { `$d | inlinescript { `$d = `$input | Select-Object -First 1 Write-Verbose 'mapping key' Write-Verbose "mapping `$(`$d | Out-String)" `$key = `$d.'$mapKey' Write-Verbose "key is `$key" Write-Verbose 'running map' . {$($s)} `$key `$d } } } "@ } . ([ScriptBlock]::Create($mapReduceWorkflow)) $j = if ($jobInfo.ComputerName -and $jobInfo.ComputerName -ne 'localhost') { if ($jobInfo.Credential) { Invoke-Command -ArgumentList $s, $jobInfo.JobData.Data -ScriptBlock $jobInfo.JobScript -ComputerName $jobInfo.ComputerName -AsJob -Credential $jobInfo.Credential -ErrorAction Stop } else { Invoke-Command -ArgumentList $s, $jobInfo.JobData.Data -ScriptBlock $jobInfo.JobScript -ComputerName $jobInfo.ComputerName -AsJob -ErrorAction Stop } } else { if ($isMap -and $PSVersionTable.PSVersion -ge '3.0') { mapReduceStep $jobInfo.JobData.Data -asjob } else { Start-Job -ArgumentList $s, $jobInfo.JobData.Data -ScriptBlock $jobInfo.JobScript -ErrorAction Stop } } $script:JobsLaunched[$j.Name] = $jobInfo $j } #region Map $allMapResults = @( $n = 0 foreach ($m in $map) { $n++ Set-Item function:Map0 -Value $m $mapFunction= $ExecutionContext.SessionState.InvokeCommand.GetCommand("Map0", "Function") $MapKey = $mapFunction.Parameters.Values | Sort-Object { $_.ParameterSets.Values[0].Position } | Select-Object -First 1 -ExpandProperty Name Write-progress "Mapping $MapKey" " " -id $progressId $mapResults = New-Object Collections.ArrayList $RunMap = { param($mapScript, $accumulatedObjects) Set-Item function:Map0 -Value $mapScript $mapFunction= $ExecutionContext.SessionState.InvokeCommand.GetCommand("Map0", "Function") $MapKey = $mapFunction.Parameters.Values | Sort-Object { $_.ParameterSets.Values[0].Position } | Select-Object -First 1 -ExpandProperty Name foreach ($obj in $accumulatedObjects) { if ($obj.$mapKey) { & $mapFunction $obj.$mapKey $obj } } } $startMapTime = [DateTime]::Now Write-Verbose "Starting Map @ $StartMapTime" if (-not $RunMapSequentially) { $finished = 0 $jobQueue = New-Object Collections.Queue if ($Grid) { $currentGridIndex = 0 } for ($i = 0; $i -lt $accumulatedObjects.Count; $i+=$ShardSize) { $perc = $i / $accumulatedObjects.Count Write-Progress "Creating Shards" "Of $ShardSize" -PercentComplete $perc $jobItem= @{ JobScript = $RunMap JobData = @{ Data = $accumulatedObjects[$i..($i + (1 * $ShardSize) -1)] Offset = $i * $ShardSize } } if ($Grid) { $jobItem.ComputerName = $grid[$currentGridIndex] if ($Credential) { $jobItem.Credential = $Credential } $currentGridIndex++ if ($currentGridIndex -eq $grid.Length) { $currentGridIndex = 0 } } $null = $jobQueue.Enqueue($jobItem) } $jobQueueSize = $jobQueue.Count $jobs = @() $Launched = 0 while ($jobQueue.Count -gt 0){ if ($jobs.Count -lt $BatchSize) { $jobInfo = $jobQueue.Dequeue() $launched++ $perc = ($Launched / $jobQueueSize ) * 100 if ($perc -le 100) { Write-Progress "Launching Map Batches" "$launched Launched - $Finished Finished" -PercentComplete $perc } else { Write-Progress "Launching Map Batches (Rescheduled Items)" "$launched Launched - $Finished Finished" -PercentComplete 100 } $job = . $launchJob $m $jobInfo $null = $jobs+=$job } else { foreach ($j in @($jobs)) { if ($j.State -eq 'Completed' -or $j.State -eq 'Failed') { $finished++ try { Receive-Job -Job $j -ErrorAction Stop } catch { if ($_.FullyQualifiedErrorId -eq 'PSSessionStateBroken') { Write-Verbose "Rescheduling $($j.Name)" $null = $jobQueue.Enqueue($script:JobsLaunched[$j.Name] ) } else { Write-Error $_ } } $jobs = @($jobs | Where-Object { $_.Name -ne $j.Name }) $j | Remove-Job } else { try { Receive-Job -Job $j -ErrorAction Stop } catch { if ($_.FullyQualifiedErrorId -eq 'PSSessionStateBroken') { Write-Verbose "Rescheduling $($j.Name)" $null = $jobQueue.Enqueue($script:JobsLaunched[$j.Name] ) } else { Write-Error $_ } } } } $jobsToLaunch = $numberOfBatches - $jobs.Count for ($jc = 0 ;$jc -lt $jobsToLaunch; $jc++) { $jobInfo = $jobQueue.Dequeue() $launched++ $perc = ($Launched / $jobQueueSize ) * 100 if ($perc -le 100) { Write-Progress "Launching Map Batches" "$launched Launched - $Finished Finished" -PercentComplete $perc } else { Write-Progress "Launching Map Batches (Rescheduled Items)" "$launched Launched - $Finished Finished" -PercentComplete 100 } $job = . $launchJob $m $jobInfo $null = $jobs+=$job } Start-Sleep -Milliseconds $Buffer.TotalMilliseconds } } while (@($jobs).Count) { foreach ($j in @($jobs)) { if ($j.State -eq 'Completed' -or $j.State -eq 'Failed') { $finished++ try { Receive-Job -Job $j -ErrorAction Stop } catch { if ($_.FullyQualifiedErrorId -eq 'PSSessionStateBroken') { Write-Verbose "Rescheduling $($j.Name)" $jobs += . $launchJob $m $script:JobsLaunched[$j.Name] } else { Write-Error $_ } } $jobs = @($jobs | Where-Object { $_.Name -ne $j.Name }) if ($j.State -eq 'Completed') { $j | Remove-Job } } } } } else { & $RunMap -mapScript $m -AccumulatedObjects $accumulatedObjects } Remove-Item function:Map0 }) Write-verbose "Time spent mapping $([DateTime]::Now -$StartMapTime)" #endregion Map $reduceStartedAt = [DateTime]::now Write-Verbose "Starting Reduce @ $reduceStartedAt" #region Reduce foreach ($r in $Reduce) { Set-Item function:Reduce0 -Value $r $reduceFunction = $ExecutionContext.SessionState.InvokeCommand.GetCommand("Reduce0", "Function") $ReduceKey = $reduceFunction.Parameters.Values | Sort-Object { $_.ParameterSets.Values[0].Position } | Select-Object -First 1 -ExpandProperty Name Write-progress "Reducing $MapKey" " " -id $progressId $sortStartTime = [DateTime]::now Write-verbose "Starting Sort & Group @ $SortStartTime" Write-Progress "Sorting MapResults" "By $ReduceKey" -Id $progressId $sortedResults = Sort-Object -InputObject $allMapResults -Property $ReduceKey -Descending Write-Progress "Grouping MapResults" "By $ReduceKey" -Id $progressId $groupedResults = @($sortedResults | Group-Object -Property $ReduceKey) Write-verbose "Time spent in Sort & Group $([Datetime]::Now -$SortStartTime)" $reduceResults = New-Object Collections.ArrayList $RunReduce = { param($reduceScript, $reducedGroup) $progId = Get-Random Set-Item function:Reduce0 -Value $reduceScript $reduceFunction = $ExecutionContext.SessionState.InvokeCommand.GetCommand("Reduce0", "Function") $ReduceKey = $reduceFunction.Parameters.Values | Sort-Object { $_.ParameterSets.Values[0].Position } | Select-Object -First 1 -ExpandProperty Name foreach ($group in $reducedGroup) { if ($group.Name) { & $reduceFunction $group.Name $group.Group } } } if (-not $RunReduceSequentially) { $finished = 0 $jobQueue = New-Object Collections.Queue if (-not $PSBoundParameters.ShardSize) { $shardSize = [Math]::Sqrt($groupedResults.Count) } if (-not $PSBoundParameters.BatchSize) { $batchSize = [Math]::Sqrt($ShardSize) } if ($grid) { $currentGridIndex = 0 } for ($i = 0; $i -lt $groupedResults.Count; $i+=$shardSize) { $perc = $i * 100/ $groupedResults.Count Write-Progress "Creating Shards" "of $ShardSize" -PercentComplete $perc $jobItem= @{ JobScript = $RunReduce JobData = @{ Data = $groupedResults[$i..($i + (1 * $ShardSize) -1)] Offset = $i * $ShardSize } } if ($Grid) { $jobItem.ComputerName = $grid[$currentGridIndex] if ($Credential) { $jobItem.Credential = $Credential } $currentGridIndex++ if ($currentGridIndex -eq $grid.Length) { $currentGridIndex = 0 } } $null = $jobQueue.Enqueue($jobItem) } $jobQueueSize = $jobQueue.Count $jobs = @() $Launched = 0 while ($jobQueue.Count -gt 0){ if ($jobs.Count -lt $BatchSize) { $jobInfo = $jobQueue.Dequeue() $launched++ $perc = ($Launched / $jobQueueSize ) * 100 if ($perc -le 100) { Write-Progress "Launching Reduce Batches" "$launched Launched - $Finished Finished" -PercentComplete $perc } else { Write-Progress "Launching Reduce Batches (Rescheduled Items)" "$launched Launched - $Finished Finished" -PercentComplete 100 } $job = . $LaunchJob $r $jobInfo $null = $jobs+=$job } else { foreach ($j in @($jobs)) { if ($j.State -eq 'Completed' -or $j.State -eq 'Failed') { $finished++ try { Receive-Job -Job $j -ErrorAction Stop } catch { if ($_.FullyQualifiedErrorId -eq 'PSSessionStateBroken') { Write-Verbose "Rescheduling $($j.Name)" $null = $jobQueue.Enqueue($script:JobsLaunched[$j.Name] ) } else { Write-Error $_ } } $jobs = @($jobs | Where-Object { $_.Name -ne $j.Name }) $j | Remove-Job } else { try { Receive-Job -Job $j -ErrorAction Stop } catch { if ($_.FullyQualifiedErrorId -eq 'PSSessionStateBroken') { Write-Verbose "Rescheduling $($j.Name)" $null = $jobQueue.Enqueue($script:JobsLaunched[$j.Name] ) } else { Write-Error $_ } } } } $jobsToLaunch = $numberOfBatches - $jobs.Count for ($jc = 0 ;$jc -lt $jobsToLaunch; $jc++) { $jobInfo = $jobQueue.Dequeue() $launched++ $perc = ($Launched / $jobQueueSize ) * 100 if ($perc -le 100) { Write-Progress "Launching Reduce Batches" "$launched Launched - $Finished Finished" -PercentComplete $perc } else { Write-Progress "Launching Reduce Batches (Rescheduled Items)" "$launched Launched - $Finished Finished" -PercentComplete 100 } $job = . $LaunchJob $r $jobInfo.JobData.Data $null = $jobs+=$job } Start-Sleep -Milliseconds $Buffer.TotalMilliseconds } } while (@($jobs).Count) { foreach ($j in @($jobs)) { if ($j.State -eq 'Completed' -or $j.State -eq 'Failed') { $finished++ try { Receive-Job -Job $j -ErrorAction Stop } catch { if ($_.FullyQualifiedErrorId -eq 'PSSessionStateBroken') { Write-Verbose "Rescheduling $($j.Name)" $jobs += . $launchJob $r $script:JobsLaunched[$j.Name] } else { Write-Error $_ } } $jobs = @($jobs | Where-Object { $_.Name -ne $j.Name }) if ($j.State -eq 'Completed') { $j | Remove-Job } } } } } else { & $RunReduce $r $groupedResults } Remove-Item function:Reduce0 } #endregion Reduce Write-verbose "Time spent in Reduce $([Datetime]::Now -$reduceStartedAt)" } } |