Scheduler built with observables v3 (follow-up) - now testable
$begingroup$
This is a 2nd follow-up to my previous one about a Scheduler built with observables.
Although the last one was working correctly, it was only possible to see this in LINQPad which I didn't like very much. I prefer to have proper tests so I've redesigned it a little bit to make testing possible.
Core
The Scheduler itself is now pretty simple. It requires an observable that produces ticks which it turns into a hot-observable. On each tick a each job's triggers are evaluated and jobs which match the trigger are executed.
Unscheduling jobs is a blocking operation when a timeout is pecified.
public class Scheduler : IDisposable
{
private readonly IConnectableObservable<DateTime> _scheduler;
private readonly IDisposable _disconnect;
public Scheduler(IObservable<DateTime> ticks)
{
// Not using .RefCount here because it should be ticking regardless of subscriptions.
_scheduler = ticks.Publish();
_disconnect = _scheduler.Connect();
}
public IDisposable Schedule(Job job, CancellationToken cancellationToken = default)
{
var unschedule =
_scheduler
// .ToList the results so that all triggers have the chance to evaluate the tick.
.Where(tick => job.Triggers.Select(t => t.Matches(tick)).ToList().Any(x => x))
.Subscribe(timestamp => job.Execute(cancellationToken));
return Disposable.Create(() =>
{
job.Continuation.Wait(job.UnscheduleTimeout);
unschedule.Dispose();
});
}
public void Dispose()
{
// Stop ticking.
_disconnect.Dispose();
}
}
The Job class is handling job's triggers and tasks and keeping them within the specified max-degree-of-parallelism.
public class Job
{
private readonly List<Task> _tasks = new List<Task>();
public Job(string name, IEnumerable<Trigger> trigger, Func<CancellationToken, Task> action)
{
Name = name;
Triggers = trigger.ToList();
Action = action;
}
public string Name { get; }
public IEnumerable<Trigger> Triggers { get; }
public Func<CancellationToken, Task> Action { get; }
public Action<Job> OnMisfire { get; set; }
public DegreeOfParallelism MaxDegreeOfParallelism { get; set; } = 1;
public TimeSpan UnscheduleTimeout { get; set; }
public Task Continuation => Task.WhenAll(_tasks).ContinueWith(_ => _tasks.Clear());
public int Count => _tasks.Count;
public void Execute(CancellationToken cancellationToken)
{
if (CanExecute())
{
var jobTask = Action(cancellationToken);
_tasks.Add(jobTask);
jobTask.ContinueWith(_ => _tasks.Remove(jobTask), cancellationToken);
}
else
{
OnMisfire?.Invoke(this);
}
}
private bool CanExecute()
{
return
MaxDegreeOfParallelism.Equals(DegreeOfParallelism.Unlimited) ||
Count < MaxDegreeOfParallelism.Value;
}
}
The DegreeOfParallelism is an anti-primitive-obsession wrapper for int.
public class DegreeOfParallelism : Primitive<int>
{
private const int UnlimitedValue = -1;
public DegreeOfParallelism(int value) : base(value) { }
public static readonly DegreeOfParallelism Unlimited = new DegreeOfParallelism(UnlimitedValue);
protected override void Validate(int value)
{
if (value == UnlimitedValue)
{
return;
}
if (value < 1)
{
throw new ArgumentException("Value must be positive.");
}
}
public static implicit operator DegreeOfParallelism(int value) => new DegreeOfParallelism(value);
}
It is supported by the base class Primitive<T> which implements basic operators, equality and comparer.
[PublicAPI]
[CannotApplyEqualityOperator]
public abstract class Primitive<T> : IEquatable<Primitive<T>>, IComparable<Primitive<T>>
{
private static readonly IComparer<Primitive<T>> Comparable = ComparerFactory<Primitive<T>>.Create(p => p.Value);
protected Primitive(T value)
{
// ReSharper disable once VirtualMemberCallInConstructor - it's ok to do this here because Validate is stateless.
Validate(Value = value);
}
protected abstract void Validate(T value);
[AutoEqualityProperty]
public T Value { get; }
#region IEquatable
public bool Equals(Primitive<T> other) => AutoEquality<Primitive<T>>.Comparer.Equals(this, other);
public override bool Equals(object obj) => obj is T other && Equals(other);
public override int GetHashCode() => AutoEquality<Primitive<T>>.Comparer.GetHashCode(this);
#endregion
#region IComparable
public int CompareTo(Primitive<T> other) => Comparable.Compare(this, other);
#endregion
public static implicit operator T(Primitive<T> primitive) => primitive.Value;
}
Triggers that I use are currently very simple too. It's just a base class that provides a single method for checking whether the trigger Matches. I have two of them.
public abstract class Trigger
{
public abstract bool Matches(DateTime tick);
}
public class CronTrigger : Trigger
{
private readonly CronExpression _cronExpression;
public CronTrigger(string cronExpression)
{
_cronExpression = CronExpression.Parse(cronExpression);
}
public string Schedule => _cronExpression.ToString();
public override bool Matches(DateTime tick)
{
return _cronExpression.Contains(tick);
}
}
public class CountTrigger : Trigger
{
public CountTrigger(int count)
{
Counter = new InfiniteCounter(count);
}
public IInfiniteCounter Counter { get; }
public override bool Matches(DateTime tick)
{
Counter.MoveNext();
return Counter.Position == InfiniteCounterPosition.Last;
}
}
Testing
So far I've created two tests for it (with XUnit). One testing the Job and a bigger one testing the Scheduler.
The first test checks whether the max-number of tasks is not exceeded and whether the Continuation tasks works correctly.
public class JobTest
{
[Fact]
public async Task Job_executes_no_more_than_specified_number_of_times()
{
var misfireCount = 0;
var job = new Job("test", Enumerable.Empty<Trigger>(), async token => await Task.Delay(TimeSpan.FromSeconds(3), token))
{
OnMisfire = j => misfireCount++,
MaxDegreeOfParallelism = 2
};
job.Execute(CancellationToken.None);
job.Execute(CancellationToken.None);
job.Execute(CancellationToken.None);
Assert.Equal(2, job.Count);
Assert.Equal(1, misfireCount);
// Wait until all jobs are completed.
await job.Continuation;
Assert.Equal(0, job.Count);
}
}
Testing Scheduler is now possible by using an observable that is ticking as I say:
public class SchedulerTest
{
[Fact]
public void Executes_job_according_to_triggers()
{
var job1ExecuteCount = 0;
var job2ExecuteCount = 0;
var misfireCount = 0;
var subject = new Subject<DateTime>();
var scheduler = new Scheduler(subject);
var unschedule1 = scheduler.Schedule(new Job("test-1", new { new CountTrigger(2) }, async token =>
{
Interlocked.Increment(ref job1ExecuteCount);
await Task.Delay(TimeSpan.FromSeconds(3), token);
})
{
MaxDegreeOfParallelism = 2,
OnMisfire = _ => Interlocked.Increment(ref misfireCount),
UnscheduleTimeout = TimeSpan.FromSeconds(4)
});
var unschedule2 = scheduler.Schedule(new Job("test-2", new { new CountTrigger(3) }, async token =>
{
Interlocked.Increment(ref job2ExecuteCount);
await Task.Delay(TimeSpan.FromSeconds(3), token);
})
{
MaxDegreeOfParallelism = 2,
OnMisfire = _ => Interlocked.Increment(ref misfireCount),
UnscheduleTimeout = TimeSpan.FromSeconds(4)
});
// Scheduler was just initialized and should not have executed anything yet.
Assert.Equal(0, job1ExecuteCount);
Assert.Equal(0, job2ExecuteCount);
// Tick once.
subject.OnNext(DateTime.Now);
// Still nothing should be executed.
Assert.Equal(0, job1ExecuteCount);
Assert.Equal(0, job2ExecuteCount);
// Now tick twice...
subject.OnNext(DateTime.Now);
subject.OnNext(DateTime.Now);
// Unschedule the job. This blocking call waits until all tasks are completed.
unschedule1.Dispose();
unschedule2.Dispose();
// Tick once again. Nothing should be executed anymore.
subject.OnNext(DateTime.Now);
// ...this should have matched the two triggers.
Assert.Equal(1, job1ExecuteCount);
Assert.Equal(1, job2ExecuteCount);
Assert.Equal(0, misfireCount);
}
}
Utilities
There are two more classes that drive the automatic scheduler.
One creates an observable that is ticking every-second:
public static class Tick
{
public static IObservable<DateTime> EverySecond(IDateTime dateTime)
{
return
Observable
.Interval(TimeSpan.FromSeconds(1))
.Select(_ => dateTime.Now());
}
}
the other provides and extension that is fixing missing seconds due to the occasional glitches in the ticking clock:
public static class ObservableExtensions
{
public static IObservable<DateTime> FixMissingSeconds(this IObservable<DateTime> tick, IDateTime dateTime)
{
var last = dateTime.Now().TruncateMilliseconds();
return tick.SelectMany(_ =>
{
var now = dateTime.Now().TruncateMilliseconds();
var gap = (now - last).Ticks / TimeSpan.TicksPerSecond;
// If we missed one second due to time inaccuracy,
// this makes sure to publish the missing second too
// so that all jobs at that second can also be triggered.
return
Enumerable
.Range(0, (int)gap)
.Select(second => last = last.AddSeconds(1));
});
}
}
public interface IDateTime
{
DateTime Now();
}
public class DateTimeUtc : IDateTime
{
public DateTime Now() => DateTime.UtcNow;
}
This time my main focus is on testability and thread-safety. Do you think I need any locking or synchronisation anywhere? I'm not entirely sure I have thought of everythig. How about testing it? Can you see anything that cannot be tested and do you think the two tests are sane?
c# async-await observer-pattern scheduled-tasks integration-testing
$endgroup$
add a comment |
$begingroup$
This is a 2nd follow-up to my previous one about a Scheduler built with observables.
Although the last one was working correctly, it was only possible to see this in LINQPad which I didn't like very much. I prefer to have proper tests so I've redesigned it a little bit to make testing possible.
Core
The Scheduler itself is now pretty simple. It requires an observable that produces ticks which it turns into a hot-observable. On each tick a each job's triggers are evaluated and jobs which match the trigger are executed.
Unscheduling jobs is a blocking operation when a timeout is pecified.
public class Scheduler : IDisposable
{
private readonly IConnectableObservable<DateTime> _scheduler;
private readonly IDisposable _disconnect;
public Scheduler(IObservable<DateTime> ticks)
{
// Not using .RefCount here because it should be ticking regardless of subscriptions.
_scheduler = ticks.Publish();
_disconnect = _scheduler.Connect();
}
public IDisposable Schedule(Job job, CancellationToken cancellationToken = default)
{
var unschedule =
_scheduler
// .ToList the results so that all triggers have the chance to evaluate the tick.
.Where(tick => job.Triggers.Select(t => t.Matches(tick)).ToList().Any(x => x))
.Subscribe(timestamp => job.Execute(cancellationToken));
return Disposable.Create(() =>
{
job.Continuation.Wait(job.UnscheduleTimeout);
unschedule.Dispose();
});
}
public void Dispose()
{
// Stop ticking.
_disconnect.Dispose();
}
}
The Job class is handling job's triggers and tasks and keeping them within the specified max-degree-of-parallelism.
public class Job
{
private readonly List<Task> _tasks = new List<Task>();
public Job(string name, IEnumerable<Trigger> trigger, Func<CancellationToken, Task> action)
{
Name = name;
Triggers = trigger.ToList();
Action = action;
}
public string Name { get; }
public IEnumerable<Trigger> Triggers { get; }
public Func<CancellationToken, Task> Action { get; }
public Action<Job> OnMisfire { get; set; }
public DegreeOfParallelism MaxDegreeOfParallelism { get; set; } = 1;
public TimeSpan UnscheduleTimeout { get; set; }
public Task Continuation => Task.WhenAll(_tasks).ContinueWith(_ => _tasks.Clear());
public int Count => _tasks.Count;
public void Execute(CancellationToken cancellationToken)
{
if (CanExecute())
{
var jobTask = Action(cancellationToken);
_tasks.Add(jobTask);
jobTask.ContinueWith(_ => _tasks.Remove(jobTask), cancellationToken);
}
else
{
OnMisfire?.Invoke(this);
}
}
private bool CanExecute()
{
return
MaxDegreeOfParallelism.Equals(DegreeOfParallelism.Unlimited) ||
Count < MaxDegreeOfParallelism.Value;
}
}
The DegreeOfParallelism is an anti-primitive-obsession wrapper for int.
public class DegreeOfParallelism : Primitive<int>
{
private const int UnlimitedValue = -1;
public DegreeOfParallelism(int value) : base(value) { }
public static readonly DegreeOfParallelism Unlimited = new DegreeOfParallelism(UnlimitedValue);
protected override void Validate(int value)
{
if (value == UnlimitedValue)
{
return;
}
if (value < 1)
{
throw new ArgumentException("Value must be positive.");
}
}
public static implicit operator DegreeOfParallelism(int value) => new DegreeOfParallelism(value);
}
It is supported by the base class Primitive<T> which implements basic operators, equality and comparer.
[PublicAPI]
[CannotApplyEqualityOperator]
public abstract class Primitive<T> : IEquatable<Primitive<T>>, IComparable<Primitive<T>>
{
private static readonly IComparer<Primitive<T>> Comparable = ComparerFactory<Primitive<T>>.Create(p => p.Value);
protected Primitive(T value)
{
// ReSharper disable once VirtualMemberCallInConstructor - it's ok to do this here because Validate is stateless.
Validate(Value = value);
}
protected abstract void Validate(T value);
[AutoEqualityProperty]
public T Value { get; }
#region IEquatable
public bool Equals(Primitive<T> other) => AutoEquality<Primitive<T>>.Comparer.Equals(this, other);
public override bool Equals(object obj) => obj is T other && Equals(other);
public override int GetHashCode() => AutoEquality<Primitive<T>>.Comparer.GetHashCode(this);
#endregion
#region IComparable
public int CompareTo(Primitive<T> other) => Comparable.Compare(this, other);
#endregion
public static implicit operator T(Primitive<T> primitive) => primitive.Value;
}
Triggers that I use are currently very simple too. It's just a base class that provides a single method for checking whether the trigger Matches. I have two of them.
public abstract class Trigger
{
public abstract bool Matches(DateTime tick);
}
public class CronTrigger : Trigger
{
private readonly CronExpression _cronExpression;
public CronTrigger(string cronExpression)
{
_cronExpression = CronExpression.Parse(cronExpression);
}
public string Schedule => _cronExpression.ToString();
public override bool Matches(DateTime tick)
{
return _cronExpression.Contains(tick);
}
}
public class CountTrigger : Trigger
{
public CountTrigger(int count)
{
Counter = new InfiniteCounter(count);
}
public IInfiniteCounter Counter { get; }
public override bool Matches(DateTime tick)
{
Counter.MoveNext();
return Counter.Position == InfiniteCounterPosition.Last;
}
}
Testing
So far I've created two tests for it (with XUnit). One testing the Job and a bigger one testing the Scheduler.
The first test checks whether the max-number of tasks is not exceeded and whether the Continuation tasks works correctly.
public class JobTest
{
[Fact]
public async Task Job_executes_no_more_than_specified_number_of_times()
{
var misfireCount = 0;
var job = new Job("test", Enumerable.Empty<Trigger>(), async token => await Task.Delay(TimeSpan.FromSeconds(3), token))
{
OnMisfire = j => misfireCount++,
MaxDegreeOfParallelism = 2
};
job.Execute(CancellationToken.None);
job.Execute(CancellationToken.None);
job.Execute(CancellationToken.None);
Assert.Equal(2, job.Count);
Assert.Equal(1, misfireCount);
// Wait until all jobs are completed.
await job.Continuation;
Assert.Equal(0, job.Count);
}
}
Testing Scheduler is now possible by using an observable that is ticking as I say:
public class SchedulerTest
{
[Fact]
public void Executes_job_according_to_triggers()
{
var job1ExecuteCount = 0;
var job2ExecuteCount = 0;
var misfireCount = 0;
var subject = new Subject<DateTime>();
var scheduler = new Scheduler(subject);
var unschedule1 = scheduler.Schedule(new Job("test-1", new { new CountTrigger(2) }, async token =>
{
Interlocked.Increment(ref job1ExecuteCount);
await Task.Delay(TimeSpan.FromSeconds(3), token);
})
{
MaxDegreeOfParallelism = 2,
OnMisfire = _ => Interlocked.Increment(ref misfireCount),
UnscheduleTimeout = TimeSpan.FromSeconds(4)
});
var unschedule2 = scheduler.Schedule(new Job("test-2", new { new CountTrigger(3) }, async token =>
{
Interlocked.Increment(ref job2ExecuteCount);
await Task.Delay(TimeSpan.FromSeconds(3), token);
})
{
MaxDegreeOfParallelism = 2,
OnMisfire = _ => Interlocked.Increment(ref misfireCount),
UnscheduleTimeout = TimeSpan.FromSeconds(4)
});
// Scheduler was just initialized and should not have executed anything yet.
Assert.Equal(0, job1ExecuteCount);
Assert.Equal(0, job2ExecuteCount);
// Tick once.
subject.OnNext(DateTime.Now);
// Still nothing should be executed.
Assert.Equal(0, job1ExecuteCount);
Assert.Equal(0, job2ExecuteCount);
// Now tick twice...
subject.OnNext(DateTime.Now);
subject.OnNext(DateTime.Now);
// Unschedule the job. This blocking call waits until all tasks are completed.
unschedule1.Dispose();
unschedule2.Dispose();
// Tick once again. Nothing should be executed anymore.
subject.OnNext(DateTime.Now);
// ...this should have matched the two triggers.
Assert.Equal(1, job1ExecuteCount);
Assert.Equal(1, job2ExecuteCount);
Assert.Equal(0, misfireCount);
}
}
Utilities
There are two more classes that drive the automatic scheduler.
One creates an observable that is ticking every-second:
public static class Tick
{
public static IObservable<DateTime> EverySecond(IDateTime dateTime)
{
return
Observable
.Interval(TimeSpan.FromSeconds(1))
.Select(_ => dateTime.Now());
}
}
the other provides and extension that is fixing missing seconds due to the occasional glitches in the ticking clock:
public static class ObservableExtensions
{
public static IObservable<DateTime> FixMissingSeconds(this IObservable<DateTime> tick, IDateTime dateTime)
{
var last = dateTime.Now().TruncateMilliseconds();
return tick.SelectMany(_ =>
{
var now = dateTime.Now().TruncateMilliseconds();
var gap = (now - last).Ticks / TimeSpan.TicksPerSecond;
// If we missed one second due to time inaccuracy,
// this makes sure to publish the missing second too
// so that all jobs at that second can also be triggered.
return
Enumerable
.Range(0, (int)gap)
.Select(second => last = last.AddSeconds(1));
});
}
}
public interface IDateTime
{
DateTime Now();
}
public class DateTimeUtc : IDateTime
{
public DateTime Now() => DateTime.UtcNow;
}
This time my main focus is on testability and thread-safety. Do you think I need any locking or synchronisation anywhere? I'm not entirely sure I have thought of everythig. How about testing it? Can you see anything that cannot be tested and do you think the two tests are sane?
c# async-await observer-pattern scheduled-tasks integration-testing
$endgroup$
$begingroup$
The code (still a little bit messy and with some other less relevant experimetal stuff) can be found in my repository here.
$endgroup$
– t3chb0t
9 mins ago
add a comment |
$begingroup$
This is a 2nd follow-up to my previous one about a Scheduler built with observables.
Although the last one was working correctly, it was only possible to see this in LINQPad which I didn't like very much. I prefer to have proper tests so I've redesigned it a little bit to make testing possible.
Core
The Scheduler itself is now pretty simple. It requires an observable that produces ticks which it turns into a hot-observable. On each tick a each job's triggers are evaluated and jobs which match the trigger are executed.
Unscheduling jobs is a blocking operation when a timeout is pecified.
public class Scheduler : IDisposable
{
private readonly IConnectableObservable<DateTime> _scheduler;
private readonly IDisposable _disconnect;
public Scheduler(IObservable<DateTime> ticks)
{
// Not using .RefCount here because it should be ticking regardless of subscriptions.
_scheduler = ticks.Publish();
_disconnect = _scheduler.Connect();
}
public IDisposable Schedule(Job job, CancellationToken cancellationToken = default)
{
var unschedule =
_scheduler
// .ToList the results so that all triggers have the chance to evaluate the tick.
.Where(tick => job.Triggers.Select(t => t.Matches(tick)).ToList().Any(x => x))
.Subscribe(timestamp => job.Execute(cancellationToken));
return Disposable.Create(() =>
{
job.Continuation.Wait(job.UnscheduleTimeout);
unschedule.Dispose();
});
}
public void Dispose()
{
// Stop ticking.
_disconnect.Dispose();
}
}
The Job class is handling job's triggers and tasks and keeping them within the specified max-degree-of-parallelism.
public class Job
{
private readonly List<Task> _tasks = new List<Task>();
public Job(string name, IEnumerable<Trigger> trigger, Func<CancellationToken, Task> action)
{
Name = name;
Triggers = trigger.ToList();
Action = action;
}
public string Name { get; }
public IEnumerable<Trigger> Triggers { get; }
public Func<CancellationToken, Task> Action { get; }
public Action<Job> OnMisfire { get; set; }
public DegreeOfParallelism MaxDegreeOfParallelism { get; set; } = 1;
public TimeSpan UnscheduleTimeout { get; set; }
public Task Continuation => Task.WhenAll(_tasks).ContinueWith(_ => _tasks.Clear());
public int Count => _tasks.Count;
public void Execute(CancellationToken cancellationToken)
{
if (CanExecute())
{
var jobTask = Action(cancellationToken);
_tasks.Add(jobTask);
jobTask.ContinueWith(_ => _tasks.Remove(jobTask), cancellationToken);
}
else
{
OnMisfire?.Invoke(this);
}
}
private bool CanExecute()
{
return
MaxDegreeOfParallelism.Equals(DegreeOfParallelism.Unlimited) ||
Count < MaxDegreeOfParallelism.Value;
}
}
The DegreeOfParallelism is an anti-primitive-obsession wrapper for int.
public class DegreeOfParallelism : Primitive<int>
{
private const int UnlimitedValue = -1;
public DegreeOfParallelism(int value) : base(value) { }
public static readonly DegreeOfParallelism Unlimited = new DegreeOfParallelism(UnlimitedValue);
protected override void Validate(int value)
{
if (value == UnlimitedValue)
{
return;
}
if (value < 1)
{
throw new ArgumentException("Value must be positive.");
}
}
public static implicit operator DegreeOfParallelism(int value) => new DegreeOfParallelism(value);
}
It is supported by the base class Primitive<T> which implements basic operators, equality and comparer.
[PublicAPI]
[CannotApplyEqualityOperator]
public abstract class Primitive<T> : IEquatable<Primitive<T>>, IComparable<Primitive<T>>
{
private static readonly IComparer<Primitive<T>> Comparable = ComparerFactory<Primitive<T>>.Create(p => p.Value);
protected Primitive(T value)
{
// ReSharper disable once VirtualMemberCallInConstructor - it's ok to do this here because Validate is stateless.
Validate(Value = value);
}
protected abstract void Validate(T value);
[AutoEqualityProperty]
public T Value { get; }
#region IEquatable
public bool Equals(Primitive<T> other) => AutoEquality<Primitive<T>>.Comparer.Equals(this, other);
public override bool Equals(object obj) => obj is T other && Equals(other);
public override int GetHashCode() => AutoEquality<Primitive<T>>.Comparer.GetHashCode(this);
#endregion
#region IComparable
public int CompareTo(Primitive<T> other) => Comparable.Compare(this, other);
#endregion
public static implicit operator T(Primitive<T> primitive) => primitive.Value;
}
Triggers that I use are currently very simple too. It's just a base class that provides a single method for checking whether the trigger Matches. I have two of them.
public abstract class Trigger
{
public abstract bool Matches(DateTime tick);
}
public class CronTrigger : Trigger
{
private readonly CronExpression _cronExpression;
public CronTrigger(string cronExpression)
{
_cronExpression = CronExpression.Parse(cronExpression);
}
public string Schedule => _cronExpression.ToString();
public override bool Matches(DateTime tick)
{
return _cronExpression.Contains(tick);
}
}
public class CountTrigger : Trigger
{
public CountTrigger(int count)
{
Counter = new InfiniteCounter(count);
}
public IInfiniteCounter Counter { get; }
public override bool Matches(DateTime tick)
{
Counter.MoveNext();
return Counter.Position == InfiniteCounterPosition.Last;
}
}
Testing
So far I've created two tests for it (with XUnit). One testing the Job and a bigger one testing the Scheduler.
The first test checks whether the max-number of tasks is not exceeded and whether the Continuation tasks works correctly.
public class JobTest
{
[Fact]
public async Task Job_executes_no_more_than_specified_number_of_times()
{
var misfireCount = 0;
var job = new Job("test", Enumerable.Empty<Trigger>(), async token => await Task.Delay(TimeSpan.FromSeconds(3), token))
{
OnMisfire = j => misfireCount++,
MaxDegreeOfParallelism = 2
};
job.Execute(CancellationToken.None);
job.Execute(CancellationToken.None);
job.Execute(CancellationToken.None);
Assert.Equal(2, job.Count);
Assert.Equal(1, misfireCount);
// Wait until all jobs are completed.
await job.Continuation;
Assert.Equal(0, job.Count);
}
}
Testing Scheduler is now possible by using an observable that is ticking as I say:
public class SchedulerTest
{
[Fact]
public void Executes_job_according_to_triggers()
{
var job1ExecuteCount = 0;
var job2ExecuteCount = 0;
var misfireCount = 0;
var subject = new Subject<DateTime>();
var scheduler = new Scheduler(subject);
var unschedule1 = scheduler.Schedule(new Job("test-1", new { new CountTrigger(2) }, async token =>
{
Interlocked.Increment(ref job1ExecuteCount);
await Task.Delay(TimeSpan.FromSeconds(3), token);
})
{
MaxDegreeOfParallelism = 2,
OnMisfire = _ => Interlocked.Increment(ref misfireCount),
UnscheduleTimeout = TimeSpan.FromSeconds(4)
});
var unschedule2 = scheduler.Schedule(new Job("test-2", new { new CountTrigger(3) }, async token =>
{
Interlocked.Increment(ref job2ExecuteCount);
await Task.Delay(TimeSpan.FromSeconds(3), token);
})
{
MaxDegreeOfParallelism = 2,
OnMisfire = _ => Interlocked.Increment(ref misfireCount),
UnscheduleTimeout = TimeSpan.FromSeconds(4)
});
// Scheduler was just initialized and should not have executed anything yet.
Assert.Equal(0, job1ExecuteCount);
Assert.Equal(0, job2ExecuteCount);
// Tick once.
subject.OnNext(DateTime.Now);
// Still nothing should be executed.
Assert.Equal(0, job1ExecuteCount);
Assert.Equal(0, job2ExecuteCount);
// Now tick twice...
subject.OnNext(DateTime.Now);
subject.OnNext(DateTime.Now);
// Unschedule the job. This blocking call waits until all tasks are completed.
unschedule1.Dispose();
unschedule2.Dispose();
// Tick once again. Nothing should be executed anymore.
subject.OnNext(DateTime.Now);
// ...this should have matched the two triggers.
Assert.Equal(1, job1ExecuteCount);
Assert.Equal(1, job2ExecuteCount);
Assert.Equal(0, misfireCount);
}
}
Utilities
There are two more classes that drive the automatic scheduler.
One creates an observable that is ticking every-second:
public static class Tick
{
public static IObservable<DateTime> EverySecond(IDateTime dateTime)
{
return
Observable
.Interval(TimeSpan.FromSeconds(1))
.Select(_ => dateTime.Now());
}
}
the other provides and extension that is fixing missing seconds due to the occasional glitches in the ticking clock:
public static class ObservableExtensions
{
public static IObservable<DateTime> FixMissingSeconds(this IObservable<DateTime> tick, IDateTime dateTime)
{
var last = dateTime.Now().TruncateMilliseconds();
return tick.SelectMany(_ =>
{
var now = dateTime.Now().TruncateMilliseconds();
var gap = (now - last).Ticks / TimeSpan.TicksPerSecond;
// If we missed one second due to time inaccuracy,
// this makes sure to publish the missing second too
// so that all jobs at that second can also be triggered.
return
Enumerable
.Range(0, (int)gap)
.Select(second => last = last.AddSeconds(1));
});
}
}
public interface IDateTime
{
DateTime Now();
}
public class DateTimeUtc : IDateTime
{
public DateTime Now() => DateTime.UtcNow;
}
This time my main focus is on testability and thread-safety. Do you think I need any locking or synchronisation anywhere? I'm not entirely sure I have thought of everythig. How about testing it? Can you see anything that cannot be tested and do you think the two tests are sane?
c# async-await observer-pattern scheduled-tasks integration-testing
$endgroup$
This is a 2nd follow-up to my previous one about a Scheduler built with observables.
Although the last one was working correctly, it was only possible to see this in LINQPad which I didn't like very much. I prefer to have proper tests so I've redesigned it a little bit to make testing possible.
Core
The Scheduler itself is now pretty simple. It requires an observable that produces ticks which it turns into a hot-observable. On each tick a each job's triggers are evaluated and jobs which match the trigger are executed.
Unscheduling jobs is a blocking operation when a timeout is pecified.
public class Scheduler : IDisposable
{
private readonly IConnectableObservable<DateTime> _scheduler;
private readonly IDisposable _disconnect;
public Scheduler(IObservable<DateTime> ticks)
{
// Not using .RefCount here because it should be ticking regardless of subscriptions.
_scheduler = ticks.Publish();
_disconnect = _scheduler.Connect();
}
public IDisposable Schedule(Job job, CancellationToken cancellationToken = default)
{
var unschedule =
_scheduler
// .ToList the results so that all triggers have the chance to evaluate the tick.
.Where(tick => job.Triggers.Select(t => t.Matches(tick)).ToList().Any(x => x))
.Subscribe(timestamp => job.Execute(cancellationToken));
return Disposable.Create(() =>
{
job.Continuation.Wait(job.UnscheduleTimeout);
unschedule.Dispose();
});
}
public void Dispose()
{
// Stop ticking.
_disconnect.Dispose();
}
}
The Job class is handling job's triggers and tasks and keeping them within the specified max-degree-of-parallelism.
public class Job
{
private readonly List<Task> _tasks = new List<Task>();
public Job(string name, IEnumerable<Trigger> trigger, Func<CancellationToken, Task> action)
{
Name = name;
Triggers = trigger.ToList();
Action = action;
}
public string Name { get; }
public IEnumerable<Trigger> Triggers { get; }
public Func<CancellationToken, Task> Action { get; }
public Action<Job> OnMisfire { get; set; }
public DegreeOfParallelism MaxDegreeOfParallelism { get; set; } = 1;
public TimeSpan UnscheduleTimeout { get; set; }
public Task Continuation => Task.WhenAll(_tasks).ContinueWith(_ => _tasks.Clear());
public int Count => _tasks.Count;
public void Execute(CancellationToken cancellationToken)
{
if (CanExecute())
{
var jobTask = Action(cancellationToken);
_tasks.Add(jobTask);
jobTask.ContinueWith(_ => _tasks.Remove(jobTask), cancellationToken);
}
else
{
OnMisfire?.Invoke(this);
}
}
private bool CanExecute()
{
return
MaxDegreeOfParallelism.Equals(DegreeOfParallelism.Unlimited) ||
Count < MaxDegreeOfParallelism.Value;
}
}
The DegreeOfParallelism is an anti-primitive-obsession wrapper for int.
public class DegreeOfParallelism : Primitive<int>
{
private const int UnlimitedValue = -1;
public DegreeOfParallelism(int value) : base(value) { }
public static readonly DegreeOfParallelism Unlimited = new DegreeOfParallelism(UnlimitedValue);
protected override void Validate(int value)
{
if (value == UnlimitedValue)
{
return;
}
if (value < 1)
{
throw new ArgumentException("Value must be positive.");
}
}
public static implicit operator DegreeOfParallelism(int value) => new DegreeOfParallelism(value);
}
It is supported by the base class Primitive<T> which implements basic operators, equality and comparer.
[PublicAPI]
[CannotApplyEqualityOperator]
public abstract class Primitive<T> : IEquatable<Primitive<T>>, IComparable<Primitive<T>>
{
private static readonly IComparer<Primitive<T>> Comparable = ComparerFactory<Primitive<T>>.Create(p => p.Value);
protected Primitive(T value)
{
// ReSharper disable once VirtualMemberCallInConstructor - it's ok to do this here because Validate is stateless.
Validate(Value = value);
}
protected abstract void Validate(T value);
[AutoEqualityProperty]
public T Value { get; }
#region IEquatable
public bool Equals(Primitive<T> other) => AutoEquality<Primitive<T>>.Comparer.Equals(this, other);
public override bool Equals(object obj) => obj is T other && Equals(other);
public override int GetHashCode() => AutoEquality<Primitive<T>>.Comparer.GetHashCode(this);
#endregion
#region IComparable
public int CompareTo(Primitive<T> other) => Comparable.Compare(this, other);
#endregion
public static implicit operator T(Primitive<T> primitive) => primitive.Value;
}
Triggers that I use are currently very simple too. It's just a base class that provides a single method for checking whether the trigger Matches. I have two of them.
public abstract class Trigger
{
public abstract bool Matches(DateTime tick);
}
public class CronTrigger : Trigger
{
private readonly CronExpression _cronExpression;
public CronTrigger(string cronExpression)
{
_cronExpression = CronExpression.Parse(cronExpression);
}
public string Schedule => _cronExpression.ToString();
public override bool Matches(DateTime tick)
{
return _cronExpression.Contains(tick);
}
}
public class CountTrigger : Trigger
{
public CountTrigger(int count)
{
Counter = new InfiniteCounter(count);
}
public IInfiniteCounter Counter { get; }
public override bool Matches(DateTime tick)
{
Counter.MoveNext();
return Counter.Position == InfiniteCounterPosition.Last;
}
}
Testing
So far I've created two tests for it (with XUnit). One testing the Job and a bigger one testing the Scheduler.
The first test checks whether the max-number of tasks is not exceeded and whether the Continuation tasks works correctly.
public class JobTest
{
[Fact]
public async Task Job_executes_no_more_than_specified_number_of_times()
{
var misfireCount = 0;
var job = new Job("test", Enumerable.Empty<Trigger>(), async token => await Task.Delay(TimeSpan.FromSeconds(3), token))
{
OnMisfire = j => misfireCount++,
MaxDegreeOfParallelism = 2
};
job.Execute(CancellationToken.None);
job.Execute(CancellationToken.None);
job.Execute(CancellationToken.None);
Assert.Equal(2, job.Count);
Assert.Equal(1, misfireCount);
// Wait until all jobs are completed.
await job.Continuation;
Assert.Equal(0, job.Count);
}
}
Testing Scheduler is now possible by using an observable that is ticking as I say:
public class SchedulerTest
{
[Fact]
public void Executes_job_according_to_triggers()
{
var job1ExecuteCount = 0;
var job2ExecuteCount = 0;
var misfireCount = 0;
var subject = new Subject<DateTime>();
var scheduler = new Scheduler(subject);
var unschedule1 = scheduler.Schedule(new Job("test-1", new { new CountTrigger(2) }, async token =>
{
Interlocked.Increment(ref job1ExecuteCount);
await Task.Delay(TimeSpan.FromSeconds(3), token);
})
{
MaxDegreeOfParallelism = 2,
OnMisfire = _ => Interlocked.Increment(ref misfireCount),
UnscheduleTimeout = TimeSpan.FromSeconds(4)
});
var unschedule2 = scheduler.Schedule(new Job("test-2", new { new CountTrigger(3) }, async token =>
{
Interlocked.Increment(ref job2ExecuteCount);
await Task.Delay(TimeSpan.FromSeconds(3), token);
})
{
MaxDegreeOfParallelism = 2,
OnMisfire = _ => Interlocked.Increment(ref misfireCount),
UnscheduleTimeout = TimeSpan.FromSeconds(4)
});
// Scheduler was just initialized and should not have executed anything yet.
Assert.Equal(0, job1ExecuteCount);
Assert.Equal(0, job2ExecuteCount);
// Tick once.
subject.OnNext(DateTime.Now);
// Still nothing should be executed.
Assert.Equal(0, job1ExecuteCount);
Assert.Equal(0, job2ExecuteCount);
// Now tick twice...
subject.OnNext(DateTime.Now);
subject.OnNext(DateTime.Now);
// Unschedule the job. This blocking call waits until all tasks are completed.
unschedule1.Dispose();
unschedule2.Dispose();
// Tick once again. Nothing should be executed anymore.
subject.OnNext(DateTime.Now);
// ...this should have matched the two triggers.
Assert.Equal(1, job1ExecuteCount);
Assert.Equal(1, job2ExecuteCount);
Assert.Equal(0, misfireCount);
}
}
Utilities
There are two more classes that drive the automatic scheduler.
One creates an observable that is ticking every-second:
public static class Tick
{
public static IObservable<DateTime> EverySecond(IDateTime dateTime)
{
return
Observable
.Interval(TimeSpan.FromSeconds(1))
.Select(_ => dateTime.Now());
}
}
the other provides and extension that is fixing missing seconds due to the occasional glitches in the ticking clock:
public static class ObservableExtensions
{
public static IObservable<DateTime> FixMissingSeconds(this IObservable<DateTime> tick, IDateTime dateTime)
{
var last = dateTime.Now().TruncateMilliseconds();
return tick.SelectMany(_ =>
{
var now = dateTime.Now().TruncateMilliseconds();
var gap = (now - last).Ticks / TimeSpan.TicksPerSecond;
// If we missed one second due to time inaccuracy,
// this makes sure to publish the missing second too
// so that all jobs at that second can also be triggered.
return
Enumerable
.Range(0, (int)gap)
.Select(second => last = last.AddSeconds(1));
});
}
}
public interface IDateTime
{
DateTime Now();
}
public class DateTimeUtc : IDateTime
{
public DateTime Now() => DateTime.UtcNow;
}
This time my main focus is on testability and thread-safety. Do you think I need any locking or synchronisation anywhere? I'm not entirely sure I have thought of everythig. How about testing it? Can you see anything that cannot be tested and do you think the two tests are sane?
c# async-await observer-pattern scheduled-tasks integration-testing
c# async-await observer-pattern scheduled-tasks integration-testing
asked 9 mins ago
t3chb0tt3chb0t
35k752124
35k752124
$begingroup$
The code (still a little bit messy and with some other less relevant experimetal stuff) can be found in my repository here.
$endgroup$
– t3chb0t
9 mins ago
add a comment |
$begingroup$
The code (still a little bit messy and with some other less relevant experimetal stuff) can be found in my repository here.
$endgroup$
– t3chb0t
9 mins ago
$begingroup$
The code (still a little bit messy and with some other less relevant experimetal stuff) can be found in my repository here.
$endgroup$
– t3chb0t
9 mins ago
$begingroup$
The code (still a little bit messy and with some other less relevant experimetal stuff) can be found in my repository here.
$endgroup$
– t3chb0t
9 mins ago
add a comment |
0
active
oldest
votes
Your Answer
StackExchange.ifUsing("editor", function () {
return StackExchange.using("mathjaxEditing", function () {
StackExchange.MarkdownEditor.creationCallbacks.add(function (editor, postfix) {
StackExchange.mathjaxEditing.prepareWmdForMathJax(editor, postfix, [["\$", "\$"]]);
});
});
}, "mathjax-editing");
StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");
StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "196"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});
function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: false,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: null,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});
}
});
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fcodereview.stackexchange.com%2fquestions%2f215088%2fscheduler-built-with-observables-v3-follow-up-now-testable%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
0
active
oldest
votes
0
active
oldest
votes
active
oldest
votes
active
oldest
votes
Thanks for contributing an answer to Code Review Stack Exchange!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
Use MathJax to format equations. MathJax reference.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fcodereview.stackexchange.com%2fquestions%2f215088%2fscheduler-built-with-observables-v3-follow-up-now-testable%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
$begingroup$
The code (still a little bit messy and with some other less relevant experimetal stuff) can be found in my repository here.
$endgroup$
– t3chb0t
9 mins ago