JobProvider now fully works based on a queuing logic, which allows more than one job to be queued. (EasyButton included!)

This commit is contained in:
unknown 2011-05-17 00:04:49 -07:00
parent fdd6e37b24
commit 2f786bf424
9 changed files with 160 additions and 51 deletions

View File

@ -15,11 +15,11 @@ namespace NzbDrone.Core.Test
[Test] [Test]
public void Run_Jobs_Updates_Last_Execution() public void Run_Jobs_Updates_Last_Execution()
{ {
IEnumerable<IJob> fakeTimers = new List<IJob> { new FakeJob() }; IEnumerable<IJob> fakeJobs = new List<IJob> { new FakeJob() };
var mocker = new AutoMoqer(); var mocker = new AutoMoqer();
mocker.SetConstant(MockLib.GetEmptyRepository()); mocker.SetConstant(MockLib.GetEmptyRepository());
mocker.SetConstant(fakeTimers); mocker.SetConstant(fakeJobs);
//Act //Act
var timerProvider = mocker.Resolve<JobProvider>(); var timerProvider = mocker.Resolve<JobProvider>();
@ -37,11 +37,11 @@ namespace NzbDrone.Core.Test
public void Run_Jobs_Updates_Last_Execution_Mark_as_unsuccesful() public void Run_Jobs_Updates_Last_Execution_Mark_as_unsuccesful()
{ {
IEnumerable<IJob> fakeTimers = new List<IJob> { new BrokenJob() }; IEnumerable<IJob> fakeJobs = new List<IJob> { new BrokenJob() };
var mocker = new AutoMoqer(); var mocker = new AutoMoqer();
mocker.SetConstant(MockLib.GetEmptyRepository()); mocker.SetConstant(MockLib.GetEmptyRepository());
mocker.SetConstant(fakeTimers); mocker.SetConstant(fakeJobs);
//Act //Act
var timerProvider = mocker.Resolve<JobProvider>(); var timerProvider = mocker.Resolve<JobProvider>();
@ -61,11 +61,11 @@ namespace NzbDrone.Core.Test
//after execution so the job can successfully run. //after execution so the job can successfully run.
public void can_run_job_again() public void can_run_job_again()
{ {
IEnumerable<IJob> fakeTimers = new List<IJob> { new FakeJob() }; IEnumerable<IJob> fakeJobs = new List<IJob> { new FakeJob() };
var mocker = new AutoMoqer(); var mocker = new AutoMoqer();
mocker.SetConstant(MockLib.GetEmptyRepository()); mocker.SetConstant(MockLib.GetEmptyRepository());
mocker.SetConstant(fakeTimers); mocker.SetConstant(fakeJobs);
var timerProvider = mocker.Resolve<JobProvider>(); var timerProvider = mocker.Resolve<JobProvider>();
timerProvider.Initialize(); timerProvider.Initialize();
@ -82,11 +82,11 @@ namespace NzbDrone.Core.Test
//after execution so the job can successfully run. //after execution so the job can successfully run.
public void can_run_broken_job_again() public void can_run_broken_job_again()
{ {
IEnumerable<IJob> fakeTimers = new List<IJob> { new BrokenJob() }; IEnumerable<IJob> fakeJobs = new List<IJob> { new BrokenJob() };
var mocker = new AutoMoqer(); var mocker = new AutoMoqer();
mocker.SetConstant(MockLib.GetEmptyRepository()); mocker.SetConstant(MockLib.GetEmptyRepository());
mocker.SetConstant(fakeTimers); mocker.SetConstant(fakeJobs);
var timerProvider = mocker.Resolve<JobProvider>(); var timerProvider = mocker.Resolve<JobProvider>();
timerProvider.Initialize(); timerProvider.Initialize();
@ -103,17 +103,17 @@ namespace NzbDrone.Core.Test
//after execution so the job can successfully run. //after execution so the job can successfully run.
public void can_run_async_job_again() public void can_run_async_job_again()
{ {
IEnumerable<IJob> fakeTimers = new List<IJob> { new FakeJob() }; IEnumerable<IJob> fakeJobs = new List<IJob> { new FakeJob() };
var mocker = new AutoMoqer(); var mocker = new AutoMoqer();
mocker.SetConstant(MockLib.GetEmptyRepository()); mocker.SetConstant(MockLib.GetEmptyRepository());
mocker.SetConstant(fakeTimers); mocker.SetConstant(fakeJobs);
var timerProvider = mocker.Resolve<JobProvider>(); var timerProvider = mocker.Resolve<JobProvider>();
timerProvider.Initialize(); timerProvider.Initialize();
var firstRun = timerProvider.BeginExecute(typeof(FakeJob)); var firstRun = timerProvider.QueueJob(typeof(FakeJob));
Thread.Sleep(2000); Thread.Sleep(2000);
var secondRun = timerProvider.BeginExecute(typeof(FakeJob)); var secondRun = timerProvider.QueueJob(typeof(FakeJob));
Assert.IsTrue(firstRun); Assert.IsTrue(firstRun);
Assert.IsTrue(secondRun); Assert.IsTrue(secondRun);
@ -125,17 +125,17 @@ namespace NzbDrone.Core.Test
//after execution so the job can successfully run. //after execution so the job can successfully run.
public void can_run_broken_async_job_again() public void can_run_broken_async_job_again()
{ {
IEnumerable<IJob> fakeTimers = new List<IJob> { new BrokenJob() }; IEnumerable<IJob> fakeJobs = new List<IJob> { new BrokenJob() };
var mocker = new AutoMoqer(); var mocker = new AutoMoqer();
mocker.SetConstant(MockLib.GetEmptyRepository()); mocker.SetConstant(MockLib.GetEmptyRepository());
mocker.SetConstant(fakeTimers); mocker.SetConstant(fakeJobs);
var timerProvider = mocker.Resolve<JobProvider>(); var timerProvider = mocker.Resolve<JobProvider>();
timerProvider.Initialize(); timerProvider.Initialize();
var firstRun = timerProvider.BeginExecute(typeof(FakeJob)); var firstRun = timerProvider.QueueJob(typeof(FakeJob));
Thread.Sleep(2000); Thread.Sleep(2000);
var secondRun = timerProvider.BeginExecute(typeof(FakeJob)); var secondRun = timerProvider.QueueJob(typeof(FakeJob));
Assert.IsTrue(firstRun); Assert.IsTrue(firstRun);
Assert.IsTrue(secondRun); Assert.IsTrue(secondRun);
@ -146,11 +146,11 @@ namespace NzbDrone.Core.Test
//after execution so the job can successfully run. //after execution so the job can successfully run.
public void can_run_two_jobs_at_the_same_time() public void can_run_two_jobs_at_the_same_time()
{ {
IEnumerable<IJob> fakeTimers = new List<IJob> { new SlowJob() }; IEnumerable<IJob> fakeJobs = new List<IJob> { new SlowJob() };
var mocker = new AutoMoqer(); var mocker = new AutoMoqer();
mocker.SetConstant(MockLib.GetEmptyRepository()); mocker.SetConstant(MockLib.GetEmptyRepository());
mocker.SetConstant(fakeTimers); mocker.SetConstant(fakeJobs);
var timerProvider = mocker.Resolve<JobProvider>(); var timerProvider = mocker.Resolve<JobProvider>();
timerProvider.Initialize(); timerProvider.Initialize();
@ -172,15 +172,47 @@ namespace NzbDrone.Core.Test
} }
[Test]
//This test will confirm that the concurrency checks are rest
//after execution so the job can successfully run.
public void can_queue_jobs_at_the_same_time()
{
var slowJob = new SlowJob();
IEnumerable<IJob> fakeJobs = new List<IJob> { slowJob };
var mocker = new AutoMoqer();
mocker.SetConstant(MockLib.GetEmptyRepository());
mocker.SetConstant(fakeJobs);
var timerProvider = mocker.Resolve<JobProvider>();
timerProvider.Initialize();
var thread1 = new Thread(() => timerProvider.QueueJob(typeof(SlowJob)));
var thread2 = new Thread(() => timerProvider.QueueJob(typeof(SlowJob)));
thread1.Start();
thread2.Start();
thread1.Join();
thread2.Join();
Thread.Sleep(5000);
Assert.AreEqual(1, slowJob.ExexutionCount);
}
[Test] [Test]
public void Init_Jobs() public void Init_Jobs()
{ {
var fakeTimer = new FakeJob(); var fakeTimer = new FakeJob();
IEnumerable<IJob> fakeTimers = new List<IJob> { fakeTimer }; IEnumerable<IJob> fakeJobs = new List<IJob> { fakeTimer };
var mocker = new AutoMoqer(); var mocker = new AutoMoqer();
mocker.SetConstant(MockLib.GetEmptyRepository()); mocker.SetConstant(MockLib.GetEmptyRepository());
mocker.SetConstant(fakeTimers); mocker.SetConstant(fakeJobs);
var timerProvider = mocker.Resolve<JobProvider>(); var timerProvider = mocker.Resolve<JobProvider>();
timerProvider.Initialize(); timerProvider.Initialize();
@ -207,11 +239,11 @@ namespace NzbDrone.Core.Test
for (int i = 0; i < 2; i++) for (int i = 0; i < 2; i++)
{ {
var fakeTimer = new FakeJob(); var fakeTimer = new FakeJob();
IEnumerable<IJob> fakeTimers = new List<IJob> { fakeTimer }; IEnumerable<IJob> fakeJobs = new List<IJob> { fakeTimer };
var mocker = new AutoMoqer(); var mocker = new AutoMoqer();
mocker.SetConstant(repo); mocker.SetConstant(repo);
mocker.SetConstant(fakeTimers); mocker.SetConstant(fakeJobs);
var timerProvider = mocker.Resolve<JobProvider>(); var timerProvider = mocker.Resolve<JobProvider>();
timerProvider.Initialize(); timerProvider.Initialize();
@ -238,11 +270,11 @@ namespace NzbDrone.Core.Test
for (int i = 0; i < 2; i++) for (int i = 0; i < 2; i++)
{ {
var disabledJob = new DisabledJob(); var disabledJob = new DisabledJob();
IEnumerable<IJob> fakeTimers = new List<IJob> { disabledJob }; IEnumerable<IJob> fakeJobs = new List<IJob> { disabledJob };
var mocker = new AutoMoqer(); var mocker = new AutoMoqer();
mocker.SetConstant(repo); mocker.SetConstant(repo);
mocker.SetConstant(fakeTimers); mocker.SetConstant(fakeJobs);
var timerProvider = mocker.Resolve<JobProvider>(); var timerProvider = mocker.Resolve<JobProvider>();
timerProvider.Initialize(); timerProvider.Initialize();
@ -264,11 +296,11 @@ namespace NzbDrone.Core.Test
[Test] [Test]
public void Get_Next_Execution_Time() public void Get_Next_Execution_Time()
{ {
IEnumerable<IJob> fakeTimers = new List<IJob> { new FakeJob() }; IEnumerable<IJob> fakeJobs = new List<IJob> { new FakeJob() };
var mocker = new AutoMoqer(); var mocker = new AutoMoqer();
mocker.SetConstant(MockLib.GetEmptyRepository()); mocker.SetConstant(MockLib.GetEmptyRepository());
mocker.SetConstant(fakeTimers); mocker.SetConstant(fakeJobs);
//Act //Act
var timerProvider = mocker.Resolve<JobProvider>(); var timerProvider = mocker.Resolve<JobProvider>();
@ -333,7 +365,7 @@ namespace NzbDrone.Core.Test
public void Start(ProgressNotification notification, int targetId) public void Start(ProgressNotification notification, int targetId)
{ {
throw new InvalidOperationException(); throw new ApplicationException("Broken job is broken");
} }
} }
@ -349,9 +381,12 @@ namespace NzbDrone.Core.Test
get { return 15; } get { return 15; }
} }
public int ExexutionCount { get; set; }
public void Start(ProgressNotification notification, int targetId) public void Start(ProgressNotification notification, int targetId)
{ {
Thread.Sleep(5000); Thread.Sleep(2000);
ExexutionCount++;
} }
} }
} }

View File

@ -35,7 +35,7 @@ namespace NzbDrone.Core.Test
public static IRepository GetEmptyRepository() public static IRepository GetEmptyRepository()
{ {
return GetEmptyRepository(true); return GetEmptyRepository(false);
} }
public static IRepository GetEmptyRepository(bool enableLogging) public static IRepository GetEmptyRepository(bool enableLogging)

View File

@ -40,11 +40,13 @@ namespace NzbDrone.Core.Providers.Jobs
try try
{ {
notification.CurrentMessage = String.Format("Beginning Delete of Series: {0}", seriesId); var title = _seriesProvider.GetSeries(seriesId).Title;
notification.CurrentMessage = String.Format("Deleting '{0}' from database", title);
_seriesProvider.DeleteSeries(seriesId); _seriesProvider.DeleteSeries(seriesId);
notification.CurrentMessage = String.Format("Successfully deleted Series: {0}", seriesId); notification.CurrentMessage = String.Format("Successfully deleted '{0}'", title);
} }
catch (Exception e) catch (Exception e)
{ {

View File

@ -20,6 +20,7 @@ namespace NzbDrone.Core.Providers.Jobs
private static readonly object ExecutionLock = new object(); private static readonly object ExecutionLock = new object();
private Thread _jobThread; private Thread _jobThread;
private static bool _isRunning; private static bool _isRunning;
private static readonly List<Tuple<Type, Int32>> Queue = new List<Tuple<Type, int>>();
private ProgressNotification _notification; private ProgressNotification _notification;
@ -86,7 +87,7 @@ namespace NzbDrone.Core.Providers.Jobs
foreach (var pendingTimer in pendingJobs) foreach (var pendingTimer in pendingJobs)
{ {
var timerClass = _jobs.Where(t => t.GetType().ToString() == pendingTimer.TypeName).FirstOrDefault(); var timerClass = _jobs.Where(t => t.GetType().ToString() == pendingTimer.TypeName).FirstOrDefault();
Execute(timerClass.GetType(), 0); Execute(timerClass.GetType());
} }
} }
finally finally
@ -101,29 +102,48 @@ namespace NzbDrone.Core.Providers.Jobs
/// Starts the execution of a job asynchronously /// Starts the execution of a job asynchronously
/// </summary> /// </summary>
/// <param name="jobType">Type of the job that should be executed.</param> /// <param name="jobType">Type of the job that should be executed.</param>
/// <param name="queueAllowed">If the job is allowed to be queued in case another task is aready running.</param>
/// <param name="targetId">The targetId could be any Id parameter eg. SeriesId. it will be passed to the job implementation /// <param name="targetId">The targetId could be any Id parameter eg. SeriesId. it will be passed to the job implementation
/// to allow it to filter it's target of execution.</param> /// to allow it to filter it's target of execution.</param>
/// <returns>True if ran, false if skipped</returns> /// <returns>True if ran, false if skipped</returns>
public virtual bool BeginExecute(Type jobType, int targetId = 0) /// <remarks>Job is only added to the queue if same job with the same targetId doesn't already exist in the queue.</remarks>
public virtual bool QueueJob(Type jobType, int targetId = 0)
{ {
Logger.Debug("Adding job {0} ->{1} to the queue", jobType, targetId);
lock (Queue)
{
var queueTuple = new Tuple<Type, int>(jobType, targetId);
if (Queue.Contains(queueTuple))
{
Logger.Info("Job {0} ->{1} already exists in queue. Skipping.", jobType, targetId);
return false;
}
Queue.Add(queueTuple);
Logger.Debug("Job {0} ->{1} added to the queue", jobType, targetId);
}
lock (ExecutionLock) lock (ExecutionLock)
{ {
if (_isRunning) if (_isRunning)
{ {
Logger.Info("Another job is already running. Ignoring request."); Logger.Trace("Queue is already running. Ignoreing request.");
return false; return true;
} }
_isRunning = true;
} }
if (_jobThread == null || !_jobThread.IsAlive) if (_jobThread == null || !_jobThread.IsAlive)
{ {
Logger.Trace("Initializing background thread"); Logger.Trace("Initializing queue processor thread");
ThreadStart starter = () => ThreadStart starter = () =>
{ {
try try
{ {
Execute(jobType, targetId); ProcessQueue();
} }
finally finally
{ {
@ -131,7 +151,7 @@ namespace NzbDrone.Core.Providers.Jobs
} }
}; };
_jobThread = new Thread(starter) { Name = "TimerThread", Priority = ThreadPriority.BelowNormal }; _jobThread = new Thread(starter) { Name = "JobQueueThread", Priority = ThreadPriority.BelowNormal };
_jobThread.Start(); _jobThread.Start();
} }
@ -143,6 +163,55 @@ namespace NzbDrone.Core.Providers.Jobs
return true; return true;
} }
/// <summary>
/// Starts processing of queue.
/// </summary>
private void ProcessQueue()
{
Tuple<Type, int> job = null;
try
{
lock (Queue)
{
if (Queue.Count != 0)
{
job = Queue[0];
}
}
if (job != null)
{
Execute(job.Item1, job.Item2);
}
}
catch (Exception e)
{
Logger.FatalException("An error has occured while processing queued job.", e);
}
finally
{
if (job != null)
{
Queue.Remove(job);
}
}
//Try to find next job is last run found a job.
if (job != null)
{
ProcessQueue();
}
else
{
Logger.Debug("Finished processing jobs in the queue.");
}
return;
}
/// <summary> /// <summary>
/// Executes the job /// Executes the job
/// </summary> /// </summary>
@ -221,7 +290,7 @@ namespace NzbDrone.Core.Providers.Jobs
/// Gets the next scheduled run time for the job /// Gets the next scheduled run time for the job
/// (Estimated due to schedule timer) /// (Estimated due to schedule timer)
/// </summary> /// </summary>
/// <returns>DateTime of next scheduled job</returns> /// <returns>DateTime of next scheduled job execution</returns>
public virtual DateTime NextScheduledRun(Type jobType) public virtual DateTime NextScheduledRun(Type jobType)
{ {
var job = All().Where(t => t.TypeName == jobType.ToString()).FirstOrDefault(); var job = All().Where(t => t.TypeName == jobType.ToString()).FirstOrDefault();

View File

@ -42,9 +42,9 @@ namespace NzbDrone.Core.Providers.Jobs
foreach (var series in seriesToScan) foreach (var series in seriesToScan)
{ {
notification.CurrentMessage = "Scanning for files: " + series.Title; notification.CurrentMessage = string.Format("Scanning disk for '{0}'", series.Title);
_mediaFileProvider.Scan(series); _mediaFileProvider.Scan(series);
notification.CurrentMessage = "Media File Scan completed for " + series.Title; notification.CurrentMessage = string.Format("Media File Scan completed for '{0}'", series.Title);
} }
} }
} }

View File

@ -39,21 +39,23 @@ namespace NzbDrone.Core.Providers.Jobs
private void ScanSeries(ProgressNotification notification) private void ScanSeries(ProgressNotification notification)
{ {
var syncList = _seriesProvider.GetAllSeries().Where(s => s.LastInfoSync == null).ToList(); var syncList = _seriesProvider.GetAllSeries().Where(s => s.LastInfoSync == null).ToList();
if (syncList.Count == 0) return; if (syncList.Count == 0)
{
return;
}
foreach (var currentSeries in syncList) foreach (var currentSeries in syncList)
{ {
try try
{ {
notification.CurrentMessage = String.Format("Searching For: {0}", new DirectoryInfo(currentSeries.Path).Name); notification.CurrentMessage = String.Format("Searching for '{0}'", new DirectoryInfo(currentSeries.Path).Name);
var updatedSeries = _seriesProvider.UpdateSeriesInfo(currentSeries.SeriesId); var updatedSeries = _seriesProvider.UpdateSeriesInfo(currentSeries.SeriesId);
notification.CurrentMessage = String.Format("Downloading episode info For: {0}", notification.CurrentMessage = String.Format("Downloading episode info for '{0}'",
updatedSeries.Title); updatedSeries.Title);
_episodeProvider.RefreshEpisodeInfo(updatedSeries.SeriesId); _episodeProvider.RefreshEpisodeInfo(updatedSeries.SeriesId);
notification.CurrentMessage = String.Format("Scanning disk for {0} files", notification.CurrentMessage = String.Format("Scanning disk for '{0}' files", updatedSeries.Title);
updatedSeries.Title);
_mediaFileProvider.Scan(_seriesProvider.GetSeries(updatedSeries.SeriesId)); _mediaFileProvider.Scan(_seriesProvider.GetSeries(updatedSeries.SeriesId));
} }

View File

@ -36,7 +36,7 @@ namespace NzbDrone.Web.Controllers
[HttpPost] [HttpPost]
public JsonResult ScanNewSeries() public JsonResult ScanNewSeries()
{ {
_jobProvider.BeginExecute(typeof(NewSeriesUpdate)); _jobProvider.QueueJob(typeof(NewSeriesUpdate));
return new JsonResult(); return new JsonResult();
} }

View File

@ -54,7 +54,7 @@ namespace NzbDrone.Web.Controllers
public ActionResult RssSync() public ActionResult RssSync()
{ {
_jobProvider.BeginExecute(typeof(RssSyncJob)); _jobProvider.QueueJob(typeof(RssSyncJob));
return RedirectToAction("Index"); return RedirectToAction("Index");
} }
@ -119,7 +119,7 @@ namespace NzbDrone.Web.Controllers
seriesInDb.RemoveAll(s => s.SeriesId == id); seriesInDb.RemoveAll(s => s.SeriesId == id);
//Start removing this series //Start removing this series
_jobProvider.BeginExecute(typeof(DeleteSeriesJob), id); _jobProvider.QueueJob(typeof(DeleteSeriesJob), id);
var series = GetSeriesModels(seriesInDb); var series = GetSeriesModels(seriesInDb);
return View(new GridModel(series)); return View(new GridModel(series));
@ -274,7 +274,7 @@ namespace NzbDrone.Web.Controllers
public ActionResult UpdateInfo(int seriesId) public ActionResult UpdateInfo(int seriesId)
{ {
//Syncs the episodes on disk for the specified series //Syncs the episodes on disk for the specified series
_jobProvider.BeginExecute(typeof(UpdateInfoJob), seriesId); _jobProvider.QueueJob(typeof(UpdateInfoJob), seriesId);
return RedirectToAction("Details", new { seriesId }); return RedirectToAction("Details", new { seriesId });
} }

View File

@ -68,6 +68,7 @@ Global
{193ADD3B-792B-4173-8E4C-5A3F8F0237F0}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {193ADD3B-792B-4173-8E4C-5A3F8F0237F0}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{193ADD3B-792B-4173-8E4C-5A3F8F0237F0}.Debug|Any CPU.Build.0 = Debug|Any CPU {193ADD3B-792B-4173-8E4C-5A3F8F0237F0}.Debug|Any CPU.Build.0 = Debug|Any CPU
{193ADD3B-792B-4173-8E4C-5A3F8F0237F0}.Debug|Mixed Platforms.ActiveCfg = Debug|Any CPU {193ADD3B-792B-4173-8E4C-5A3F8F0237F0}.Debug|Mixed Platforms.ActiveCfg = Debug|Any CPU
{193ADD3B-792B-4173-8E4C-5A3F8F0237F0}.Debug|Mixed Platforms.Build.0 = Debug|Any CPU
{193ADD3B-792B-4173-8E4C-5A3F8F0237F0}.Debug|x64.ActiveCfg = Debug|Any CPU {193ADD3B-792B-4173-8E4C-5A3F8F0237F0}.Debug|x64.ActiveCfg = Debug|Any CPU
{193ADD3B-792B-4173-8E4C-5A3F8F0237F0}.Debug|x86.ActiveCfg = Debug|Any CPU {193ADD3B-792B-4173-8E4C-5A3F8F0237F0}.Debug|x86.ActiveCfg = Debug|Any CPU
{193ADD3B-792B-4173-8E4C-5A3F8F0237F0}.Debug|x86.Build.0 = Debug|Any CPU {193ADD3B-792B-4173-8E4C-5A3F8F0237F0}.Debug|x86.Build.0 = Debug|Any CPU