sonarr-repo-only/NzbDrone.Core/Jobs/JobController.cs

188 lines
6.4 KiB
C#
Raw Normal View History

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
2013-03-05 02:34:38 +00:00
using System.Threading.Tasks;
using NLog;
using NzbDrone.Common.Messaging;
2013-04-19 04:46:18 +00:00
using NzbDrone.Core.Lifecycle;
using NzbDrone.Core.Model.Notification;
2011-12-02 01:33:17 +00:00
using NzbDrone.Core.Providers;
2013-03-05 05:37:33 +00:00
namespace NzbDrone.Core.Jobs
{
public interface IJobController
{
bool IsProcessing { get; }
IEnumerable<JobQueueItem> Queue { get; }
2013-03-05 02:34:38 +00:00
void EnqueueScheduled();
void Enqueue(Type jobType, dynamic options = null, JobQueueItem.JobSourceType source = JobQueueItem.JobSourceType.User);
bool Enqueue(string jobTypeString);
}
2013-04-19 04:46:18 +00:00
public class JobController : IJobController, IHandle<ApplicationShutdownRequested>
{
private readonly NotificationProvider _notificationProvider;
2013-01-03 01:09:13 +00:00
private readonly IEnumerable<IJob> _jobs;
private readonly IJobRepository _jobRepository;
2013-02-19 01:13:42 +00:00
private readonly Logger _logger;
private readonly BlockingCollection<JobQueueItem> _queue = new BlockingCollection<JobQueueItem>();
private ProgressNotification _notification;
2013-03-05 02:34:38 +00:00
private readonly CancellationTokenSource _cancellationTokenSource;
public JobController(NotificationProvider notificationProvider, IEnumerable<IJob> jobs, IJobRepository jobRepository, Logger logger)
{
_notificationProvider = notificationProvider;
_jobs = jobs;
_jobRepository = jobRepository;
2013-02-19 01:13:42 +00:00
_logger = logger;
2013-03-05 02:34:38 +00:00
_cancellationTokenSource = new CancellationTokenSource();
Task.Factory.StartNew(ProcessQueue, _cancellationTokenSource.Token);
}
public bool IsProcessing { get; private set; }
public IEnumerable<JobQueueItem> Queue
2011-08-03 16:29:03 +00:00
{
get
{
return _queue;
}
}
2013-03-05 02:34:38 +00:00
public void EnqueueScheduled()
{
2013-03-05 02:34:38 +00:00
if (IsProcessing)
{
2013-04-22 07:21:39 +00:00
_logger.Trace("Queue is already running. Ignoring scheduler request.");
2013-03-05 02:34:38 +00:00
return;
}
var pendingJobs = _jobRepository.GetPendingJobs()
.Select(c => _jobs.Single(t => t.GetType().ToString() == c.Type)
.GetType()).ToList();
2013-03-05 02:34:38 +00:00
pendingJobs.ForEach(jobType => Enqueue(jobType, source: JobQueueItem.JobSourceType.Scheduler));
2013-02-19 01:13:42 +00:00
_logger.Trace("{0} Scheduled tasks have been added to the queue", pendingJobs.Count);
}
2013-03-05 02:34:38 +00:00
public void Enqueue(Type jobType, dynamic options = null, JobQueueItem.JobSourceType source = JobQueueItem.JobSourceType.User)
{
IsProcessing = true;
var queueItem = new JobQueueItem
{
JobType = jobType,
2012-09-10 19:04:17 +00:00
Options = options,
Source = source
};
2013-02-19 01:13:42 +00:00
_logger.Debug("Attempting to queue {0}", queueItem);
2013-03-05 02:34:38 +00:00
lock (_queue)
{
2013-03-05 02:34:38 +00:00
if (!_queue.Contains(queueItem))
{
2013-03-05 02:34:38 +00:00
_queue.Add(queueItem);
_logger.Trace("Job {0} added to the queue. current items in queue: {1}", queueItem, _queue.Count);
}
2013-03-05 02:34:38 +00:00
else
{
2013-03-05 02:34:38 +00:00
_logger.Info("{0} already exists in the queue. Skipping. current items in queue: {1}", queueItem, _queue.Count);
}
}
}
2013-03-05 02:34:38 +00:00
public bool Enqueue(string jobTypeString)
{
var type = Type.GetType(jobTypeString);
if (type == null)
return false;
2013-03-05 02:34:38 +00:00
Enqueue(type);
return true;
}
private void ProcessQueue()
{
2013-03-05 02:34:38 +00:00
while (true)
{
2013-03-05 02:34:38 +00:00
try
{
IsProcessing = false;
var item = _queue.Take();
2013-03-05 02:34:38 +00:00
Execute(item);
}
catch (ThreadAbortException e)
{
_logger.Warn(e.Message);
}
catch (Exception e)
{
_logger.ErrorException("Error has occurred in queue processor thread", e);
}
}
}
private void Execute(JobQueueItem queueItem)
{
2013-03-05 02:34:38 +00:00
IsProcessing = true;
2013-01-14 16:36:16 +00:00
var jobImplementation = _jobs.SingleOrDefault(t => t.GetType() == queueItem.JobType);
2011-05-17 07:24:29 +00:00
if (jobImplementation == null)
{
2013-02-19 01:13:42 +00:00
_logger.Error("Unable to locate implementation for '{0}'. Make sure it is properly registered.", queueItem.JobType);
return;
}
var jobDefinition = _jobRepository.GetDefinition(queueItem.JobType);
2011-05-17 07:24:29 +00:00
using (_notification = new ProgressNotification(jobImplementation.Name))
{
2011-04-22 06:23:29 +00:00
try
{
2013-02-19 01:13:42 +00:00
_logger.Debug("Starting {0}. Last execution {1}", queueItem, jobDefinition.LastExecution);
2011-04-22 06:23:29 +00:00
var sw = Stopwatch.StartNew();
_notificationProvider.Register(_notification);
2012-09-10 19:04:17 +00:00
jobImplementation.Start(_notification, queueItem.Options);
_notification.Status = ProgressNotificationStatus.Completed;
jobDefinition.LastExecution = DateTime.Now;
jobDefinition.Success = true;
2011-04-22 06:23:29 +00:00
sw.Stop();
2013-02-19 01:13:42 +00:00
_logger.Debug("Job {0} successfully completed in {1:0}.{2} seconds.", queueItem, sw.Elapsed.TotalSeconds, sw.Elapsed.Milliseconds / 100,
sw.Elapsed.Seconds);
}
2011-04-22 06:23:29 +00:00
catch (Exception e)
{
2013-02-19 01:13:42 +00:00
_logger.ErrorException("An error has occurred while executing job [" + jobImplementation.Name + "].", e);
2011-04-22 06:23:29 +00:00
_notification.Status = ProgressNotificationStatus.Failed;
_notification.CurrentMessage = jobImplementation.Name + " Failed.";
jobDefinition.LastExecution = DateTime.Now;
jobDefinition.Success = false;
}
}
2011-08-03 16:29:03 +00:00
//Only update last execution status if was triggered by the scheduler
2012-09-10 19:04:17 +00:00
if (queueItem.Options == null)
{
_jobRepository.Update(jobDefinition);
}
}
2013-04-19 04:46:18 +00:00
public void Handle(ApplicationShutdownRequested message)
{
_cancellationTokenSource.Cancel();
}
}
}