sonarr-repo-only/NzbDrone.Core/Providers/Jobs/JobProvider.cs

321 lines
11 KiB
C#
Raw Normal View History

//https://github.com/kayone/NzbDrone/blob/master/NzbDrone.Core/Providers/Jobs/JobProvider.cs
using System;
using System.Collections.Generic;
2011-08-03 16:29:03 +00:00
using System.ComponentModel;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using Ninject;
using NLog;
using NzbDrone.Core.Model.Notification;
using NzbDrone.Core.Repository;
2011-06-17 19:50:49 +00:00
using PetaPoco;
namespace NzbDrone.Core.Providers.Jobs
{
2011-08-03 16:29:03 +00:00
/// <summary>
/// Provides a background task runner, tasks could be queue either by the scheduler using QueueScheduled()
/// or by explicitly calling QueueJob(type,int)
/// </summary>
public class JobProvider
{
private static readonly Logger Logger = LogManager.GetCurrentClassLogger();
2011-06-17 19:50:49 +00:00
private readonly IDatabase _database;
private readonly NotificationProvider _notificationProvider;
private readonly IList<IJob> _jobs;
private static readonly object ExecutionLock = new object();
private Thread _jobThread;
private static bool _isRunning;
2011-07-11 00:03:01 +00:00
2011-08-03 16:29:03 +00:00
private static readonly List<Tuple<Type, Int32>> _queue = new List<Tuple<Type, int>>();
private ProgressNotification _notification;
[Inject]
2011-06-17 19:50:49 +00:00
public JobProvider(IDatabase database, NotificationProvider notificationProvider, IList<IJob> jobs)
{
2011-06-17 19:50:49 +00:00
_database = database;
_notificationProvider = notificationProvider;
_jobs = jobs;
}
2011-08-03 16:29:03 +00:00
/// <summary>
/// Initializes a new instance of the <see cref="JobProvider"/> class. by AutoMoq
/// </summary>
/// <remarks>Should only be used by AutoMoq</remarks>
[EditorBrowsable(EditorBrowsableState.Never)]
public JobProvider() { }
2011-08-03 16:29:03 +00:00
/// <summary>
/// Gets the active queue.
/// </summary>
public static List<Tuple<Type, Int32>> Queue
{
get
{
return _queue;
}
}
/// <summary>
/// Returns a list of all registered jobs
/// </summary>
2011-07-08 03:27:11 +00:00
public virtual List<JobDefinition> All()
{
2011-07-08 03:27:11 +00:00
return _database.Fetch<JobDefinition>().ToList();
}
/// <summary>
2011-08-03 16:29:03 +00:00
/// Adds/Updates definitions for a job
/// </summary>
2011-08-03 16:29:03 +00:00
/// <param name="definitions">Settings to be added/updated</param>
public virtual void SaveDefinition(JobDefinition definitions)
{
2011-07-08 03:27:11 +00:00
if (definitions.Id == 0)
{
2011-07-08 03:27:11 +00:00
Logger.Trace("Adding job definitions for {0}", definitions.Name);
_database.Insert(definitions);
}
else
{
2011-07-08 03:27:11 +00:00
Logger.Trace("Updating job definitions for {0}", definitions.Name);
_database.Update(definitions);
}
}
/// <summary>
2011-08-03 16:29:03 +00:00
/// Iterates through all registered jobs and queues any that are due for an execution.
/// </summary>
2011-08-03 16:29:03 +00:00
/// <remarks>Will ignore request if queue is already running.</remarks>
public virtual void QueueScheduled()
{
lock (ExecutionLock)
{
if (_isRunning)
{
2011-07-06 06:17:21 +00:00
Logger.Trace("Queue is already running. Ignoring scheduler's request.");
return;
}
}
var counter = 0;
var pendingJobs = All().Where(
t => t.Enable &&
(DateTime.Now - t.LastExecution) > TimeSpan.FromMinutes(t.Interval)
).Select(c => _jobs.Where(t => t.GetType().ToString() == c.TypeName).Single());
foreach (var job in pendingJobs)
{
QueueJob(job.GetType());
counter++;
}
Logger.Trace("{0} Scheduled tasks have been added to the queue", counter);
}
/// <summary>
2011-08-03 16:29:03 +00:00
/// Queues the execution of a job asynchronously
/// </summary>
2011-08-03 16:29:03 +00:00
/// <param name="jobType">Type of the job that should be queued.</param>
/// <param name="targetId">The targetId could be any Id parameter eg. SeriesId. it will be passed to the job implementation
/// to allow it to filter it's target of execution.</param>
/// <remarks>Job is only added to the queue if same job with the same targetId doesn't already exist in the queue.</remarks>
public virtual void QueueJob(Type jobType, int targetId = 0)
{
2011-07-06 06:17:21 +00:00
Logger.Debug("Adding [{0}:{1}] to the queue", jobType.Name, targetId);
lock (ExecutionLock)
{
lock (Queue)
{
var queueTuple = new Tuple<Type, int>(jobType, targetId);
if (!Queue.Contains(queueTuple))
{
Queue.Add(queueTuple);
Logger.Trace("Job [{0}:{1}] added to the queue", jobType.Name, targetId);
}
else
{
2011-08-03 16:29:03 +00:00
Logger.Info("[{0}:{1}] already exists in the queue. Skipping.", jobType.Name, targetId);
}
}
if (_isRunning)
{
2011-07-06 06:17:21 +00:00
Logger.Trace("Queue is already running. No need to start it up.");
return;
}
_isRunning = true;
}
if (_jobThread == null || !_jobThread.IsAlive)
{
Logger.Trace("Initializing queue processor thread");
ThreadStart starter = () =>
{
try
{
ProcessQueue();
}
catch (Exception e)
{
2011-07-06 06:17:21 +00:00
Logger.ErrorException("Error has occurred in queue processor thread", e);
}
finally
{
_isRunning = false;
}
};
2011-07-11 00:03:01 +00:00
_jobThread = new Thread(starter) { Name = "JobQueueThread" };
_jobThread.Start();
}
else
{
2011-07-06 06:17:21 +00:00
Logger.Error("Execution lock has fucked up. Thread still active. Ignoring request.");
}
}
/// <summary>
2011-08-03 16:29:03 +00:00
/// Starts processing of queue synchronously.
/// </summary>
private void ProcessQueue()
{
2011-07-11 00:03:01 +00:00
do
{
2011-07-11 00:03:01 +00:00
Tuple<Type, int> job = null;
2011-07-27 22:59:48 +00:00
using (NestedDiagnosticsContext.Push(Guid.NewGuid().ToString()))
{
2011-08-03 16:29:03 +00:00
try
{
2011-07-27 22:59:48 +00:00
lock (Queue)
2011-07-11 00:03:01 +00:00
{
2011-07-27 22:59:48 +00:00
if (Queue.Count != 0)
{
job = Queue.First();
Queue.Remove(job);
2011-07-27 22:59:48 +00:00
}
}
if (job != null)
{
Execute(job.Item1, job.Item2);
2011-07-11 00:03:01 +00:00
}
2011-07-27 22:59:48 +00:00
}
catch (Exception e)
2011-07-11 00:03:01 +00:00
{
2011-07-27 22:59:48 +00:00
Logger.FatalException("An error has occurred while processing queued job.", e);
}
}
2011-07-11 00:03:01 +00:00
} while (Queue.Count != 0);
Logger.Trace("Finished processing jobs in the queue.");
return;
}
/// <summary>
2011-08-03 16:29:03 +00:00
/// Executes the job synchronously
/// </summary>
/// <param name="jobType">Type of the job that should be executed</param>
/// <param name="targetId">The targetId could be any Id parameter eg. SeriesId. it will be passed to the timer implementation
/// to allow it to filter it's target of execution</param>
private void Execute(Type jobType, int targetId = 0)
{
2011-07-27 22:59:48 +00:00
var jobImplementation = _jobs.Where(t => t.GetType() == jobType).Single();
2011-05-17 07:24:29 +00:00
if (jobImplementation == null)
{
2011-07-06 06:17:21 +00:00
Logger.Error("Unable to locate implementation for '{0}'. Make sure it is properly registered.", jobType);
return;
}
2011-07-27 22:59:48 +00:00
var settings = All().Where(j => j.TypeName == jobType.ToString()).Single();
2011-05-17 07:24:29 +00:00
using (_notification = new ProgressNotification(jobImplementation.Name))
{
2011-04-22 06:23:29 +00:00
try
{
2011-07-06 06:17:21 +00:00
Logger.Debug("Starting '{0}' job. Last execution {1}", settings.Name, settings.LastExecution);
2011-04-22 06:23:29 +00:00
var sw = Stopwatch.StartNew();
_notificationProvider.Register(_notification);
2011-05-17 07:24:29 +00:00
jobImplementation.Start(_notification, targetId);
_notification.Status = ProgressNotificationStatus.Completed;
settings.LastExecution = DateTime.Now;
settings.Success = true;
2011-04-22 06:23:29 +00:00
sw.Stop();
2011-07-06 06:17:21 +00:00
Logger.Debug("Job '{0}' successfully completed in {1}.{2} seconds.", jobImplementation.Name, sw.Elapsed.Seconds, sw.Elapsed.Milliseconds / 100,
2011-04-22 06:23:29 +00:00
sw.Elapsed.Seconds);
}
catch (Exception e)
{
2011-07-06 06:17:21 +00:00
Logger.ErrorException("An error has occurred while executing job " + jobImplementation.Name, e);
2011-04-22 06:23:29 +00:00
_notification.Status = ProgressNotificationStatus.Failed;
_notification.CurrentMessage = jobImplementation.Name + " Failed.";
settings.LastExecution = DateTime.Now;
settings.Success = false;
}
}
2011-08-03 16:29:03 +00:00
//Only update last execution status if was triggered by the scheduler
if (targetId == 0)
{
2011-08-03 16:29:03 +00:00
SaveDefinition(settings);
}
}
/// <summary>
/// Initializes jobs in the database using the IJob instances that are
2011-08-03 16:29:03 +00:00
/// registered using ninject
/// </summary>
public virtual void Initialize()
{
2011-04-25 03:51:18 +00:00
Logger.Debug("Initializing jobs. Count {0}", _jobs.Count());
var currentTimer = All();
foreach (var timer in _jobs)
{
var timerProviderLocal = timer;
if (!currentTimer.Exists(c => c.TypeName == timerProviderLocal.GetType().ToString()))
{
2011-07-08 03:27:11 +00:00
var settings = new JobDefinition
{
Enable = timerProviderLocal.DefaultInterval > 0,
TypeName = timer.GetType().ToString(),
Name = timerProviderLocal.Name,
Interval = timerProviderLocal.DefaultInterval,
2011-06-23 06:56:17 +00:00
LastExecution = new DateTime(2000, 1, 1)
};
2011-08-03 16:29:03 +00:00
SaveDefinition(settings);
}
}
}
/// <summary>
2011-08-03 16:29:03 +00:00
/// Gets the next scheduled run time for a specific job
/// (Estimated due to schedule timer)
/// </summary>
/// <returns>DateTime of next scheduled job execution</returns>
public virtual DateTime NextScheduledRun(Type jobType)
{
2011-07-27 22:59:48 +00:00
var job = All().Where(t => t.TypeName == jobType.ToString()).Single();
return job.LastExecution.AddMinutes(job.Interval);
}
}
}