modules/Session.psm1
using namespace System.Management.Automation using module '.\Enums.psm1' using module '.\FeedProcessor\AuditLog\Message.psm1' using module '.\Config.psm1' using module '.\SessionConfig.psm1' using module '.\AuditLog.psm1' using module '.\Downloader\DownloaderBase.psm1' using module '.\Downloader\DownloaderFactory.psm1' using module '.\FeedProcessor\ProcessorBase.psm1' using module '.\FeedProcessor\ProcessorFactory.psm1' class Session{ #region Properties hidden [bool] $async [int] $sessionId [datetime] $createdAt [SessionConfig] $config [AuditLogFile] $auditLogFile [hashtable] $downloadJobs [int] $watcherId [bool] $convert [bool] $saveConverted #endregion #region Session static [Session[]] $activeSessions = @() static [int] $currentSessionId = 0 static [int] $nextSessionId = 1 hidden Session([OPERATOR] $operator, [ENVIRONMENT] $environment, [bool] $convert, [bool]$saveConverted){ $this.sessionId = [Session]::nextSessionId [Session]::activeSessions += $this [Session]::nextSessionId += 1 $this.Reset($operator, $environment, $convert, $saveConverted) } static [Session] getCurrent(){ foreach ($session in [Session]::activeSessions) { if ($session.sessionId -eq [Session]::currentSessionId) { return $session } } return $null } static [Session] getSession([int] $sessionId){ foreach ($session in [Session]::activeSessions) { if ($session.sessionId -eq $sessionId) { return $session } } return $null } static [Session] NewSession([OPERATOR] $operator, [ENVIRONMENT] $environment, [bool] $convert, [bool]$saveConverted){ $session = [Session]::new($operator, $environment, $convert, $saveConverted) return $session } [void] setAnalyzedScope([SOURCE[]] $sources, [long] $matchId, [string] $referentTime, [string] $timeSpan){ $this.createdAt = (Get-Date) $this.config.setAnalyzedScope($sources, $matchId, $referentTime, $timeSpan) } static [void] Recycle([int] $sessionId){ if ([Session]::activeSessions){ $remainingSessions = @() $lastId = 0 foreach ($session in [Session]::activeSessions) { if ($session.sessionId -ne $sessionId) { $remainingSessions += $session $lastId = $session.sessionId } else { $session.UnregisterFileWatcher() } } [Session]::activeSessions = $remainingSessions if ($sessionId -eq [Session]::currentSessionId) { [Session]::currentSessionId = $lastId } } } [void] Reset([OPERATOR] $operator, [ENVIRONMENT] $environment, [bool] $convert, [bool]$saveConverted){ $this.config = [SessionConfig]::new($operator, $environment) $this.convert = $convert $this.saveConverted = $saveConverted $this.downloadJobs = @{} $this.UnregisterFileWatcher() $this.auditLogFile = [AuditLogFile]::new($this.config, $this.convert, $this.saveConverted) } #endregion #region Session info static [PSCustomObject[]] AllSessionInfo(){ $retValue = @() foreach ($session in [Session]::activeSessions) { $retValue += $session.SessionInfo() } return $retValue } [PSCustomObject] SessionInfo(){ $currentIndicator = '' if ($this.auditLogFile.async) { $this.auditLogFile.ResumeJobs() } if ($this.sessionId -eq [Session]::currentSessionId) { $currentIndicator = '*' } $retValue = [PSCustomObject]@{ Id = $this.sessionId; Current = $currentIndicator; AsJob = $this.async; CreatedAt = $this.createdAt; Operator=$this.config.operator; Environment=$this.config.environment; Timezone=$this.config.timezone; MatchId=$this.config.matchId; From=$this.config.afterDateTime; To=$this.config.beforeDateTime; DownloadJobs=$this.getJobStates($this.downloadJobs); MountingJobs=$this.getJobStates($this.auditLogFile.jobs) } foreach ($sourceName in [Enum]::GetNames('SOURCE')) { $src = [Enum]::Parse('SOURCE', $sourceName) if ($this.auditLogFile.fileContent -and $this.auditLogFile.fileContent[$src]){ $retValue | Add-Member $src ("{0:N0}" -f $this.auditLogFile.fileContent[$src].Length) } else { if ($src -in $this.config.analyzedSources) { $retValue | Add-Member $src 0 } else { $retValue | Add-Member $src '-' } } } return $retValue } #endregion #region Jobs [bool] isAsync(){ return $this.async } [bool] canImport(){ if ($this.isAsync -and $this.downloadJobs.Count -gt 0 -and $this.auditLogFile.jobs.Count -gt 0) { return $false } else { return $true } } [bool] canRead([SOURCE] $source){ if ($this.isAsync -and ($this.downloadJobs[$source] -or $this.auditLogFile.jobs[$source])) { return $false } else { return $true } } hidden [string] getJobStates([hashtable] $jobs){ $jobStates = @{} if ($jobs){ foreach ($job in (Get-Job | Where-Object { $jobs.ContainsValue($_.id) })){ if ($jobStates.Contains($job.State)){ $jobStates[$job.State] += 1 } else { $jobStates.Add($job.State, 1) } } } $retValue = '-' foreach ($item in $jobStates.Keys){ if ($retValue -eq '-'){ $retValue = ('{0}: {1}' -f @($item, $jobStates[$item])) } else { $retValue = $retValue + (', {0}: {1}' -f @($item, $jobStates[$item])) } } return $retValue } static [void] ResumeAllJobs(){ foreach ($session in [Session]::activeSessions) { if ($session.IsAsync()) { #Write-Host "Inside $($session.sessionId)" $session.ResumeJobs() $session.auditLogFile.ResumeJobs() } } } [void] ResumeJobs(){ foreach ($job in (Get-Job | Where-Object { $this.downloadJobs.ContainsValue($_.id) -and $_.State -eq [JobState]::Completed -and ([SOURCE]($_.name) -in $this.config.analyzedSources) })){ #Write-Host "Deal with job: $($job.id)" $this.auditLogFile.addJob([SOURCE]($job.name)) $this.downloadJobs.Remove([SOURCE]($job.name)) Stop-Job -id $job.id Remove-Job -id $job.id } } hidden [int] RegisterFileWatcher([string] $file, [long] $matchId){ $filePath = (Resolve-Path (Split-Path $file -Parent)) $fileName = ('{0}-*.*' -f $matchId) $watcher = New-Object IO.FileSystemWatcher $filePath.Path, $fileName -Property @{ IncludeSubdirectories = $false EnableRaisingEvents = $true } $onChange = (Register-ObjectEvent $watcher -EventName "Changed" -SourceIdentifier $matchId -Action { Resume-SessionJobs }) return $onChange.id } hidden [void] UnregisterFileWatcher(){ if ($this.watcherId) { try { Unregister-Event -SubscriptionId $this.watcherId } catch { } Stop-Job -id $this.watcherId Remove-Job -id $this.watcherId $this.watcherId = 0 } } #endregion #region Register actors [void] RegisterDownloader([bool] $AsJob){ if ($this.downloadJobs.Count -eq 0){ $this.async = $AsJob [DownloaderFactory]::Init($this.config) $downloader = [DownloaderFactory]::current if ($global:DEBUG) { Write-Host "No async downloaders..." -ForegroundColor Yellow } foreach ($src in $this.config.analyzedSources) { if ($this.async){ if ($global:DEBUG) { Write-Host "Download initiated ($src)" -ForegroundColor Yellow } $this.downloadJobs += $downloader.Download($src, $this.async) if ($global:DEBUG) { Write-Host "File watcher active ($($downloader.fileName))" -ForegroundColor Yellow } } else { $downloader.Download($src, $false) } } if ($this.async) { Write-Host 'Background jobs activated.' $this.watcherId = $this.RegisterFileWatcher($downloader.fileName, $this.config.matchId) } } else { Write-Host 'There is active download at the moment, please consider using new session.' } } [void] RegisterLogFiles([bool] $AsJob, [bool] $AsDownload){ $this.auditLogFile.MountAllFiles($AsJob, $AsDownload) } [int] getMatchIdByExternal([string] $externalKey, [int] $matchId) { $uri = $this.config.UriFindExternalId($externalKey, $this.auditLogFile.getExternalByKey($externalKey), $matchId) try { $json = (Invoke-RestMethod -Uri $uri) $definitionMatchId = $json.id } catch { $definitionMatchId = $null } return $definitionMatchId } #endregion } #region Validators class ValidExternalKeys : IValidateSetValuesGenerator { [string[]] GetValidValues() { # TODO double check [ProcessorFactory]::Init([Config]::Load().processors) return ([ProcessorFactory]::mapExternalKeyPrefix.Keys) } } #endregion |