leader election (#537)

* init leader leaderelection

* fix format

* remove unused import

* add config map

* fix space

* add multi lock

* lease lock

* cp case LeaderElection

* more test cases

* port all testcases from java

* fix space

* fix default timeout

* try to fix flasky  gh action by reducing renew deadline

* try debug failed gh action
This commit is contained in:
Boshi Lian
2020-12-17 10:03:10 -08:00
committed by GitHub
parent c0e96f516b
commit 9a90881e31
12 changed files with 1283 additions and 0 deletions

View File

@@ -0,0 +1,46 @@
using System.Threading;
using System.Threading.Tasks;
namespace k8s.LeaderElection
{
/// <summary>
/// ILock offers a common interface for locking on arbitrary resources used in leader election. The Interface is used to hide the details on specific implementations in order to allow them to change over time.
/// </summary>
public interface ILock
{
/// <summary>
/// Get returns the LeaderElectionRecord
/// </summary>
/// <param name="cancellationToken">token to cancel the task</param>
/// <returns>the record</returns>
Task<LeaderElectionRecord> GetAsync(CancellationToken cancellationToken = default);
/// <summary>
/// Create attempts to create a LeaderElectionRecord
/// </summary>
/// <param name="record">record to create</param>
/// <param name="cancellationToken">token to cancel the task</param>
/// <returns>true if created</returns>
Task<bool> CreateAsync(LeaderElectionRecord record, CancellationToken cancellationToken = default);
/// <summary>
/// Update will update and existing LeaderElectionRecord
/// </summary>
/// <param name="record">record to create</param>
/// <param name="cancellationToken">token to cancel the task</param>
/// <returns>true if updated</returns>
Task<bool> UpdateAsync(LeaderElectionRecord record, CancellationToken cancellationToken = default);
/// <summary>
/// the locks Identity
/// </summary>
string Identity { get; }
/// <summary>
/// Describe is used to convert details on current resource lock into a string
/// </summary>
/// <returns>resource lock description</returns>
string Describe();
}
}

View File

@@ -0,0 +1,20 @@
using System;
namespace k8s.LeaderElection
{
public class LeaderElectionConfig
{
public ILock Lock { get; set; }
public TimeSpan LeaseDuration { get; set; } = TimeSpan.FromSeconds(15);
public TimeSpan RenewDeadline { get; set; } = TimeSpan.FromSeconds(10);
public TimeSpan RetryPeriod { get; set; } = TimeSpan.FromSeconds(2);
public LeaderElectionConfig(ILock @lock)
{
Lock = @lock;
}
}
}

View File

@@ -0,0 +1,74 @@
using System;
namespace k8s.LeaderElection
{
/// <summary>
/// LeaderElectionRecord is the record that is stored in the leader election annotation.
/// This information should be used for observational purposes only and could be replaced with a random string (e.g. UUID) with only slight modification of this code.
/// </summary>
public class LeaderElectionRecord
{
/// <summary>
/// the ID that owns the lease. If empty, no one owns this lease and all callers may acquire.
/// </summary>
public string HolderIdentity { get; set; }
/// <summary>
/// LeaseDuration in seconds
/// </summary>
public int LeaseDurationSeconds { get; set; }
/// <summary>
/// acquire time
/// </summary>
// public DateTimeOffset? AcquireTime { get; set; }
public DateTime? AcquireTime { get; set; }
/// <summary>
/// renew time
/// </summary>
// public DateTimeOffset? RenewTime { get; set; }
public DateTime? RenewTime { get; set; }
/// <summary>
/// leader transitions
/// </summary>
public int LeaderTransitions { get; set; }
protected bool Equals(LeaderElectionRecord other)
{
return HolderIdentity == other?.HolderIdentity && Nullable.Equals(AcquireTime, other.AcquireTime) && Nullable.Equals(RenewTime, other.RenewTime);
}
public override bool Equals(object obj)
{
if (ReferenceEquals(null, obj))
{
return false;
}
if (ReferenceEquals(this, obj))
{
return true;
}
if (obj.GetType() != this.GetType())
{
return false;
}
return Equals((LeaderElectionRecord)obj);
}
public override int GetHashCode()
{
unchecked
{
var hashCode = (HolderIdentity != null ? HolderIdentity.GetHashCode() : 0);
hashCode = (hashCode * 397) ^ AcquireTime.GetHashCode();
hashCode = (hashCode * 397) ^ RenewTime.GetHashCode();
return hashCode;
}
}
}
}

View File

@@ -0,0 +1,248 @@
using System;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Rest;
namespace k8s.LeaderElection
{
public class LeaderElector : IDisposable
{
private const double JitterFactor = 1.2;
private readonly LeaderElectionConfig config;
/// <summary>
/// OnStartedLeading is called when a LeaderElector client starts leading
/// </summary>
public event Action OnStartedLeading;
/// <summary>
/// OnStoppedLeading is called when a LeaderElector client stops leading
/// </summary>
public event Action OnStoppedLeading;
/// <summary>
/// OnNewLeader is called when the client observes a leader that is
/// not the previously observed leader. This includes the first observed
/// leader when the client starts.
/// </summary>
public event Action<string> OnNewLeader;
private volatile LeaderElectionRecord observedRecord;
private DateTimeOffset observedTime = DateTimeOffset.MinValue;
private string reportedLeader;
public LeaderElector(LeaderElectionConfig config)
{
this.config = config;
}
public bool IsLeader()
{
return observedRecord?.HolderIdentity != null && observedRecord?.HolderIdentity == config.Lock.Identity;
}
public string GetLeader()
{
return observedRecord?.HolderIdentity;
}
public async Task RunAsync(CancellationToken cancellationToken = default)
{
await AcquireAsync(cancellationToken).ConfigureAwait(false);
try
{
OnStartedLeading?.Invoke();
// renew loop
for (; ; )
{
cancellationToken.ThrowIfCancellationRequested();
var acq = Task.Run(async () =>
{
try
{
while (!await TryAcquireOrRenew(cancellationToken).ConfigureAwait(false))
{
await Task.Delay(config.RetryPeriod, cancellationToken).ConfigureAwait(false);
MaybeReportTransition();
}
}
catch
{
// ignore
return false;
}
return true;
});
if (await Task.WhenAny(acq, Task.Delay(config.RenewDeadline, cancellationToken))
.ConfigureAwait(false) == acq)
{
var succ = await acq.ConfigureAwait(false);
if (succ)
{
await Task.Delay(config.RetryPeriod, cancellationToken).ConfigureAwait(false);
// retry
continue;
}
// renew failed
}
// timeout
break;
}
}
finally
{
OnStoppedLeading?.Invoke();
}
}
private async Task<bool> TryAcquireOrRenew(CancellationToken cancellationToken)
{
var l = config.Lock;
var leaderElectionRecord = new LeaderElectionRecord()
{
HolderIdentity = l.Identity,
LeaseDurationSeconds = config.LeaseDuration.Seconds,
AcquireTime = DateTime.UtcNow,
RenewTime = DateTime.UtcNow,
LeaderTransitions = 0,
};
// 1. obtain or create the ElectionRecord
LeaderElectionRecord oldLeaderElectionRecord = null;
try
{
oldLeaderElectionRecord = await l.GetAsync(cancellationToken).ConfigureAwait(false);
}
catch (HttpOperationException e)
{
if (e.Response.StatusCode != HttpStatusCode.NotFound)
{
return false;
}
}
if (oldLeaderElectionRecord?.AcquireTime == null ||
oldLeaderElectionRecord?.RenewTime == null ||
oldLeaderElectionRecord?.HolderIdentity == null)
{
var created = await l.CreateAsync(leaderElectionRecord, cancellationToken).ConfigureAwait(false);
if (created)
{
observedRecord = leaderElectionRecord;
observedTime = DateTimeOffset.Now;
return true;
}
return false;
}
// 2. Record obtained, check the Identity & Time
if (!Equals(observedRecord, oldLeaderElectionRecord))
{
observedRecord = oldLeaderElectionRecord;
observedTime = DateTimeOffset.Now;
}
if (!string.IsNullOrEmpty(oldLeaderElectionRecord.HolderIdentity)
&& observedTime + config.LeaseDuration > DateTimeOffset.Now
&& !IsLeader())
{
// lock is held by %v and has not yet expired", oldLeaderElectionRecord.HolderIdentity
return false;
}
// 3. We're going to try to update. The leaderElectionRecord is set to it's default
// here. Let's correct it before updating.
if (IsLeader())
{
leaderElectionRecord.AcquireTime = oldLeaderElectionRecord.AcquireTime;
leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions;
}
else
{
leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions + 1;
}
var updated = await l.UpdateAsync(leaderElectionRecord, cancellationToken).ConfigureAwait(false);
if (!updated)
{
return false;
}
observedRecord = leaderElectionRecord;
observedTime = DateTimeOffset.Now;
return true;
}
private async Task AcquireAsync(CancellationToken cancellationToken)
{
for (; ; )
{
try
{
var delay = config.RetryPeriod.Milliseconds;
var acq = TryAcquireOrRenew(cancellationToken);
if (await Task.WhenAny(acq, Task.Delay(delay, cancellationToken))
.ConfigureAwait(false) == acq)
{
if (await acq.ConfigureAwait(false))
{
return;
}
}
delay = (int)(delay * JitterFactor);
}
finally
{
MaybeReportTransition();
}
}
}
private void MaybeReportTransition()
{
if (observedRecord == null)
{
return;
}
if (observedRecord.HolderIdentity == reportedLeader)
{
return;
}
reportedLeader = observedRecord.HolderIdentity;
OnNewLeader?.Invoke(reportedLeader);
}
protected virtual void Dispose(bool disposing)
{
if (disposing)
{
}
}
/// <inheritdoc/>
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
}
}

View File

@@ -0,0 +1,36 @@
using System.Threading;
using System.Threading.Tasks;
using k8s.Models;
namespace k8s.LeaderElection.ResourceLock
{
public class ConfigMapLock : MetaObjectAnnotationLock<V1ConfigMap>
{
public ConfigMapLock(IKubernetes client, string @namespace, string name, string identity)
: base(client, @namespace, name, identity)
{
}
protected override Task<V1ConfigMap> ReadMetaObjectAsync(IKubernetes client, string name,
string namespaceParameter,
CancellationToken cancellationToken)
{
return client.ReadNamespacedConfigMapAsync(name, namespaceParameter, cancellationToken: cancellationToken);
}
protected override Task<V1ConfigMap> CreateMetaObjectAsync(IKubernetes client, V1ConfigMap obj,
string namespaceParameter,
CancellationToken cancellationToken)
{
return client.CreateNamespacedConfigMapAsync(obj, namespaceParameter, cancellationToken: cancellationToken);
}
protected override Task<V1ConfigMap> ReplaceMetaObjectAsync(IKubernetes client, V1ConfigMap obj, string name,
string namespaceParameter,
CancellationToken cancellationToken)
{
return client.ReplaceNamespacedConfigMapAsync(obj, name, namespaceParameter,
cancellationToken: cancellationToken);
}
}
}

View File

@@ -0,0 +1,31 @@
using System.Threading;
using System.Threading.Tasks;
using k8s.Models;
namespace k8s.LeaderElection.ResourceLock
{
public class EndpointsLock : MetaObjectAnnotationLock<V1Endpoints>
{
public EndpointsLock(IKubernetes client, string @namespace, string name, string identity)
: base(client, @namespace, name, identity)
{
}
protected override Task<V1Endpoints> ReadMetaObjectAsync(IKubernetes client, string name, string namespaceParameter, CancellationToken cancellationToken)
{
return client.ReadNamespacedEndpointsAsync(name, namespaceParameter, cancellationToken: cancellationToken);
}
protected override Task<V1Endpoints> CreateMetaObjectAsync(IKubernetes client, V1Endpoints obj, string namespaceParameter,
CancellationToken cancellationToken)
{
return client.CreateNamespacedEndpointsAsync(obj, namespaceParameter, cancellationToken: cancellationToken);
}
protected override Task<V1Endpoints> ReplaceMetaObjectAsync(IKubernetes client, V1Endpoints obj, string name, string namespaceParameter,
CancellationToken cancellationToken)
{
return client.ReplaceNamespacedEndpointsAsync(obj, name, namespaceParameter, cancellationToken: cancellationToken);
}
}
}

View File

@@ -0,0 +1,74 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using k8s.Models;
namespace k8s.LeaderElection.ResourceLock
{
public class LeaseLock : MetaObjectLock<V1Lease>
{
public LeaseLock(IKubernetes client, string @namespace, string name, string identity)
: base(client, @namespace, name, identity)
{
}
protected override Task<V1Lease> ReadMetaObjectAsync(IKubernetes client, string name, string namespaceParameter,
CancellationToken cancellationToken)
{
return client.ReadNamespacedLeaseAsync(name, namespaceParameter, cancellationToken: cancellationToken);
}
protected override LeaderElectionRecord GetLeaderElectionRecord(V1Lease obj)
{
if (obj == null)
{
return null;
}
return new LeaderElectionRecord()
{
AcquireTime = obj.Spec.AcquireTime,
HolderIdentity = obj.Spec.HolderIdentity,
LeaderTransitions = obj.Spec.LeaseTransitions ?? 0,
LeaseDurationSeconds = obj.Spec.LeaseDurationSeconds ?? 15, // 15 = default value
RenewTime = obj.Spec.RenewTime,
};
}
protected override V1Lease SetLeaderElectionRecord(LeaderElectionRecord record, V1Lease metaObj)
{
if (record == null)
{
throw new NullReferenceException(nameof(record));
}
if (metaObj == null)
{
throw new NullReferenceException(nameof(metaObj));
}
metaObj.Spec = new V1LeaseSpec()
{
AcquireTime = record.AcquireTime,
HolderIdentity = record.HolderIdentity,
LeaseTransitions = record.LeaderTransitions,
LeaseDurationSeconds = record.LeaseDurationSeconds,
RenewTime = record.RenewTime,
};
return metaObj;
}
protected override Task<V1Lease> CreateMetaObjectAsync(IKubernetes client, V1Lease obj, string namespaceParameter,
CancellationToken cancellationToken)
{
return client.CreateNamespacedLeaseAsync(obj, namespaceParameter, cancellationToken: cancellationToken);
}
protected override Task<V1Lease> ReplaceMetaObjectAsync(IKubernetes client, V1Lease obj, string name, string namespaceParameter,
CancellationToken cancellationToken)
{
return client.ReplaceNamespacedLeaseAsync(obj, name, namespaceParameter, cancellationToken: cancellationToken);
}
}
}

View File

@@ -0,0 +1,51 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using k8s.Models;
using Microsoft.Rest;
using Newtonsoft.Json;
namespace k8s.LeaderElection.ResourceLock
{
public abstract class MetaObjectAnnotationLock<T> : MetaObjectLock<T>
where T : class, IMetadata<V1ObjectMeta>, new()
{
private readonly JsonSerializerSettings serializationSettings;
private readonly JsonSerializerSettings derializationSettings;
protected MetaObjectAnnotationLock(IKubernetes client, string @namespace, string name, string identity)
: base(client, @namespace, name, identity)
{
serializationSettings = client.SerializationSettings;
derializationSettings = client.DeserializationSettings;
}
private const string LeaderElectionRecordAnnotationKey = "control-plane.alpha.kubernetes.io/leader";
protected override LeaderElectionRecord GetLeaderElectionRecord(T obj)
{
var recordRawStringContent = obj.GetAnnotation(LeaderElectionRecordAnnotationKey);
if (string.IsNullOrEmpty(recordRawStringContent))
{
return new LeaderElectionRecord();
}
var record =
JsonConvert.DeserializeObject<LeaderElectionRecord>(
recordRawStringContent,
derializationSettings);
return record;
}
protected override T SetLeaderElectionRecord(LeaderElectionRecord record, T metaObj)
{
metaObj.SetAnnotation(
LeaderElectionRecordAnnotationKey,
JsonConvert.SerializeObject(record, serializationSettings));
return metaObj;
}
}
}

View File

@@ -0,0 +1,104 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using k8s.Models;
using Microsoft.Rest;
using Newtonsoft.Json;
namespace k8s.LeaderElection.ResourceLock
{
public abstract class MetaObjectLock<T> : ILock
where T : class, IMetadata<V1ObjectMeta>, new()
{
private IKubernetes client;
private string ns;
private string name;
private string identity;
private T metaObjCache;
protected MetaObjectLock(IKubernetes client, string @namespace, string name, string identity)
{
this.client = client ?? throw new ArgumentNullException(nameof(client));
ns = @namespace;
this.name = name;
this.identity = identity;
}
public string Identity => identity;
public async Task<LeaderElectionRecord> GetAsync(CancellationToken cancellationToken = default)
{
var obj = await ReadMetaObjectAsync(client, name, ns, cancellationToken).ConfigureAwait(false);
var record = GetLeaderElectionRecord(obj);
Interlocked.Exchange(ref metaObjCache, obj);
return record;
}
protected abstract Task<T> ReadMetaObjectAsync(IKubernetes client, string name, string namespaceParameter, CancellationToken cancellationToken);
public async Task<bool> CreateAsync(LeaderElectionRecord record, CancellationToken cancellationToken = default)
{
var metaObj = new T
{
Metadata = new V1ObjectMeta() { Name = name, NamespaceProperty = ns },
};
metaObj = SetLeaderElectionRecord(record, metaObj);
try
{
var createdObj = await CreateMetaObjectAsync(client, metaObj, ns, cancellationToken)
.ConfigureAwait(false);
Interlocked.Exchange(ref metaObjCache, createdObj);
return true;
}
catch (HttpOperationException)
{
// ignore
}
return false;
}
protected abstract LeaderElectionRecord GetLeaderElectionRecord(T obj);
protected abstract T SetLeaderElectionRecord(LeaderElectionRecord record, T metaObj);
protected abstract Task<T> CreateMetaObjectAsync(IKubernetes client, T obj, string namespaceParameter, CancellationToken cancellationToken);
public async Task<bool> UpdateAsync(LeaderElectionRecord record, CancellationToken cancellationToken = default)
{
var metaObj = Interlocked.CompareExchange(ref metaObjCache, null, null);
if (metaObj == null)
{
throw new InvalidOperationException("endpoint not initialized, call get or create first");
}
metaObj = SetLeaderElectionRecord(record, metaObj);
try
{
var replacedObj = await ReplaceMetaObjectAsync(client, metaObj, name, ns, cancellationToken).ConfigureAwait(false);
Interlocked.Exchange(ref metaObjCache, replacedObj);
return true;
}
catch (HttpOperationException)
{
// ignore
}
return false;
}
protected abstract Task<T> ReplaceMetaObjectAsync(IKubernetes client, T obj, string name, string namespaceParameter, CancellationToken cancellationToken);
public string Describe()
{
return $"{ns}/{name}";
}
}
}

View File

@@ -0,0 +1,41 @@
using System.Threading;
using System.Threading.Tasks;
namespace k8s.LeaderElection.ResourceLock
{
public class MultiLock : ILock
{
private ILock primary;
private ILock secondary;
public MultiLock(ILock primary, ILock secondary)
{
this.primary = primary;
this.secondary = secondary;
}
public Task<LeaderElectionRecord> GetAsync(CancellationToken cancellationToken = default)
{
return primary.GetAsync(cancellationToken);
}
public async Task<bool> CreateAsync(LeaderElectionRecord record, CancellationToken cancellationToken = default)
{
return await primary.CreateAsync(record, cancellationToken).ConfigureAwait(false)
&& await secondary.CreateAsync(record, cancellationToken).ConfigureAwait(false);
}
public async Task<bool> UpdateAsync(LeaderElectionRecord record, CancellationToken cancellationToken = default)
{
return await primary.UpdateAsync(record, cancellationToken).ConfigureAwait(false)
&& await secondary.UpdateAsync(record, cancellationToken).ConfigureAwait(false);
}
public string Identity => primary.Identity;
public string Describe()
{
return primary.Describe();
}
}
}

View File

@@ -5,6 +5,8 @@ using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using k8s.LeaderElection;
using k8s.LeaderElection.ResourceLock;
using k8s.Models;
using Microsoft.AspNetCore.JsonPatch;
using Microsoft.Rest;
@@ -228,6 +230,102 @@ namespace k8s.E2E
}
[MinikubeFact]
public void LeaderIntegrationTest()
{
var client = CreateClient();
var namespaceParameter = "default";
void Cleanup()
{
var endpoints = client.ListNamespacedEndpoints(namespaceParameter);
void DeleteEndpoints(string name)
{
while (endpoints.Items.Any(p => p.Metadata.Name == name))
{
try
{
client.DeleteNamespacedEndpoints(name, namespaceParameter);
}
catch (HttpOperationException e)
{
if (e.Response.StatusCode == System.Net.HttpStatusCode.NotFound)
{
return;
}
}
}
}
DeleteEndpoints("leaderendpoint");
}
var cts = new CancellationTokenSource();
var leader1acq = new ManualResetEvent(false);
var leader1lose = new ManualResetEvent(false);
try
{
Cleanup();
var tasks = new List<Task>();
// 1
{
var l = new EndpointsLock(client, namespaceParameter, "leaderendpoint", "leader1");
var le = new LeaderElector(new LeaderElectionConfig(l)
{
LeaseDuration = TimeSpan.FromSeconds(1),
RetryPeriod = TimeSpan.FromMilliseconds(400),
});
le.OnStartedLeading += () => leader1acq.Set();
le.OnStoppedLeading += () => leader1lose.Set();
tasks.Add(le.RunAsync(cts.Token));
}
// wait 1 become leader
Assert.True(leader1acq.WaitOne(TimeSpan.FromSeconds(30)));
// 2
{
var l = new EndpointsLock(client, namespaceParameter, "leaderendpoint", "leader2");
var le = new LeaderElector(new LeaderElectionConfig(l)
{
LeaseDuration = TimeSpan.FromSeconds(1),
RetryPeriod = TimeSpan.FromMilliseconds(400),
});
var leader2init = new ManualResetEvent(false);
le.OnNewLeader += _ =>
{
leader2init.Set();
};
tasks.Add(le.RunAsync());
Assert.True(leader2init.WaitOne(TimeSpan.FromSeconds(30)));
Assert.Equal("leader1", le.GetLeader());
cts.Cancel();
Assert.True(leader1lose.WaitOne(TimeSpan.FromSeconds(30)));
Task.Delay(TimeSpan.FromSeconds(3)).Wait();
Assert.True(le.IsLeader());
}
Task.WaitAll(tasks.ToArray(), TimeSpan.FromSeconds(30));
}
finally
{
Cleanup();
}
}
private static IKubernetes CreateClient()
{
return new Kubernetes(KubernetesClientConfiguration.BuildDefaultConfig());

View File

@@ -0,0 +1,460 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.InteropServices.ComTypes;
using System.Threading;
using System.Threading.Tasks;
using k8s.LeaderElection;
using Moq;
using Xunit;
namespace k8s.Tests.LeaderElection
{
public class LeaderElectionTests
{
public LeaderElectionTests()
{
MockResourceLock.ResetGloablRecord();
}
[Fact]
public void SimpleLeaderElection()
{
var electionHistory = new List<string>();
var leadershipHistory = new List<string>();
var renewCount = 3;
var mockLock = new MockResourceLock("mock") { UpdateWillFail = () => renewCount <= 0, };
mockLock.OnCreate += _ =>
{
renewCount--;
electionHistory.Add("create record");
leadershipHistory.Add("get leadership");
};
mockLock.OnUpdate += _ =>
{
renewCount--;
electionHistory.Add("update record");
};
mockLock.OnChange += _ => { electionHistory.Add("change record"); };
var leaderElectionConfig = new LeaderElectionConfig(mockLock)
{
LeaseDuration = TimeSpan.FromMilliseconds(1000),
RetryPeriod = TimeSpan.FromMilliseconds(500),
RenewDeadline = TimeSpan.FromMilliseconds(600),
};
var countdown = new CountdownEvent(2);
Task.Run(() =>
{
var leaderElector = new LeaderElector(leaderElectionConfig);
leaderElector.OnStartedLeading += () =>
{
leadershipHistory.Add("start leading");
countdown.Signal();
};
leaderElector.OnStoppedLeading += () =>
{
leadershipHistory.Add("stop leading");
countdown.Signal();
};
leaderElector.RunAsync().Wait();
});
countdown.Wait(TimeSpan.FromSeconds(10));
Assert.True(electionHistory.SequenceEqual(new[] { "create record", "update record", "update record" }));
Assert.True(leadershipHistory.SequenceEqual(new[] { "get leadership", "start leading", "stop leading" }));
}
[Fact]
public void LeaderElection()
{
var electionHistory = new List<string>();
var leadershipHistory = new List<string>();
var renewCountA = 3;
var mockLockA = new MockResourceLock("mockA") { UpdateWillFail = () => renewCountA <= 0 };
mockLockA.OnCreate += (_) =>
{
renewCountA--;
electionHistory.Add("A creates record");
leadershipHistory.Add("A gets leadership");
};
mockLockA.OnUpdate += (_) =>
{
renewCountA--;
electionHistory.Add("A updates record");
};
mockLockA.OnChange += (_) => { leadershipHistory.Add("A gets leadership"); };
var leaderElectionConfigA = new LeaderElectionConfig(mockLockA)
{
LeaseDuration = TimeSpan.FromMilliseconds(500),
RetryPeriod = TimeSpan.FromMilliseconds(300),
RenewDeadline = TimeSpan.FromMilliseconds(400),
};
var renewCountB = 4;
var mockLockB = new MockResourceLock("mockB") { UpdateWillFail = () => renewCountB <= 0 };
mockLockB.OnCreate += (_) =>
{
renewCountB--;
electionHistory.Add("B creates record");
leadershipHistory.Add("B gets leadership");
};
mockLockB.OnUpdate += (_) =>
{
renewCountB--;
electionHistory.Add("B updates record");
};
mockLockB.OnChange += (_) => { leadershipHistory.Add("B gets leadership"); };
var leaderElectionConfigB = new LeaderElectionConfig(mockLockB)
{
LeaseDuration = TimeSpan.FromMilliseconds(500),
RetryPeriod = TimeSpan.FromMilliseconds(300),
RenewDeadline = TimeSpan.FromMilliseconds(400),
};
var lockAStopLeading = new ManualResetEvent(false);
var testLeaderElectionLatch = new CountdownEvent(4);
Task.Run(() =>
{
var leaderElector = new LeaderElector(leaderElectionConfigA);
leaderElector.OnStartedLeading += () =>
{
leadershipHistory.Add("A starts leading");
testLeaderElectionLatch.Signal();
};
leaderElector.OnStoppedLeading += () =>
{
leadershipHistory.Add("A stops leading");
testLeaderElectionLatch.Signal();
lockAStopLeading.Set();
};
leaderElector.RunAsync().Wait();
});
lockAStopLeading.WaitOne(TimeSpan.FromSeconds(3));
Task.Run(() =>
{
var leaderElector = new LeaderElector(leaderElectionConfigB);
leaderElector.OnStartedLeading += () =>
{
leadershipHistory.Add("B starts leading");
testLeaderElectionLatch.Signal();
};
leaderElector.OnStoppedLeading += () =>
{
leadershipHistory.Add("B stops leading");
testLeaderElectionLatch.Signal();
};
leaderElector.RunAsync().Wait();
});
testLeaderElectionLatch.Wait(TimeSpan.FromSeconds(10));
Assert.True(electionHistory.SequenceEqual(
new[]
{
"A creates record", "A updates record", "A updates record",
"B updates record", "B updates record", "B updates record", "B updates record",
}));
Assert.True(leadershipHistory.SequenceEqual(
new[]
{
"A gets leadership", "A starts leading", "A stops leading",
"B gets leadership", "B starts leading", "B stops leading",
}));
}
[Fact]
public void LeaderElectionWithRenewDeadline()
{
var electionHistory = new List<string>();
var leadershipHistory = new List<string>();
var renewCount = 3;
var mockLock = new MockResourceLock("mock") { UpdateWillFail = () => renewCount <= 0, };
mockLock.OnCreate += _ =>
{
renewCount--;
electionHistory.Add("create record");
leadershipHistory.Add("get leadership");
};
mockLock.OnUpdate += _ =>
{
renewCount--;
electionHistory.Add("update record");
};
mockLock.OnChange += _ => { electionHistory.Add("change record"); };
mockLock.OnTryUpdate += _ => { electionHistory.Add("try update record"); };
var leaderElectionConfig = new LeaderElectionConfig(mockLock)
{
LeaseDuration = TimeSpan.FromMilliseconds(1000),
RetryPeriod = TimeSpan.FromMilliseconds(200),
RenewDeadline = TimeSpan.FromMilliseconds(650),
};
var countdown = new CountdownEvent(2);
Task.Run(() =>
{
var leaderElector = new LeaderElector(leaderElectionConfig);
leaderElector.OnStartedLeading += () =>
{
leadershipHistory.Add("start leading");
countdown.Signal();
};
leaderElector.OnStoppedLeading += () =>
{
leadershipHistory.Add("stop leading");
countdown.Signal();
};
leaderElector.RunAsync().Wait();
});
countdown.Wait(TimeSpan.FromSeconds(10));
Assert.Equal(9, electionHistory.Count);
Assert.True(electionHistory.SequenceEqual(new[]
{
"create record", "try update record", "update record", "try update record", "update record",
"try update record", "try update record", "try update record", "try update record",
}));
Assert.True(leadershipHistory.SequenceEqual(new[] { "get leadership", "start leading", "stop leading" }));
}
[Fact]
public void LeaderElectionThrowException()
{
var l = new Mock<ILock>();
l.Setup(obj => obj.GetAsync(CancellationToken.None))
.Throws(new Exception("noxu"));
var leaderElector = new LeaderElector(new LeaderElectionConfig(l.Object)
{
LeaseDuration = TimeSpan.FromMilliseconds(1000),
RetryPeriod = TimeSpan.FromMilliseconds(200),
RenewDeadline = TimeSpan.FromMilliseconds(700),
});
try
{
leaderElector.RunAsync().Wait();
}
catch (Exception e)
{
Assert.Equal("noxu", e.InnerException?.Message);
return;
}
Assert.True(false, "exception not thrown");
}
[Fact]
public void LeaderElectionReportLeaderOnStart()
{
var l = new Mock<ILock>();
l.Setup(obj => obj.Identity)
.Returns("foo1");
l.SetupSequence(obj => obj.GetAsync(CancellationToken.None))
.ReturnsAsync(() =>
{
return new LeaderElectionRecord()
{
HolderIdentity = "foo2",
AcquireTime = DateTime.Now,
RenewTime = DateTime.Now,
LeaderTransitions = 1,
LeaseDurationSeconds = 60,
};
})
.ReturnsAsync(() =>
{
return new LeaderElectionRecord()
{
HolderIdentity = "foo3",
AcquireTime = DateTime.Now,
RenewTime = DateTime.Now,
LeaderTransitions = 1,
LeaseDurationSeconds = 60,
};
});
var leaderElector = new LeaderElector(new LeaderElectionConfig(l.Object)
{
LeaseDuration = TimeSpan.FromMilliseconds(1000),
RetryPeriod = TimeSpan.FromMilliseconds(200),
RenewDeadline = TimeSpan.FromMilliseconds(700),
});
var countdown = new CountdownEvent(2);
var notifications = new List<string>();
leaderElector.OnNewLeader += id =>
{
notifications.Add(id);
countdown.Signal();
};
Task.Run(() => leaderElector.RunAsync());
countdown.Wait(TimeSpan.FromSeconds(10));
Assert.True(notifications.SequenceEqual(new[]
{
"foo2", "foo3",
}));
}
[Fact]
public void LeaderElectionShouldReportLeaderItAcquiresOnStart()
{
var l = new Mock<ILock>();
l.Setup(obj => obj.Identity)
.Returns("foo1");
l.Setup(obj => obj.GetAsync(CancellationToken.None))
.ReturnsAsync(new LeaderElectionRecord()
{
HolderIdentity = "foo1",
AcquireTime = DateTime.Now,
RenewTime = DateTime.Now,
LeaderTransitions = 1,
LeaseDurationSeconds = 60,
});
var leaderElector = new LeaderElector(new LeaderElectionConfig(l.Object)
{
LeaseDuration = TimeSpan.FromMilliseconds(1000),
RetryPeriod = TimeSpan.FromMilliseconds(200),
RenewDeadline = TimeSpan.FromMilliseconds(700),
});
var countdown = new CountdownEvent(1);
var notifications = new List<string>();
leaderElector.OnNewLeader += id =>
{
notifications.Add(id);
countdown.Signal();
};
Task.Run(() => leaderElector.RunAsync());
countdown.Wait(TimeSpan.FromSeconds(10));
Assert.True(notifications.SequenceEqual(new[] { "foo1" }));
}
private class MockResourceLock : ILock
{
private static LeaderElectionRecord leaderRecord;
private static readonly object Lockobj = new object();
public static void ResetGloablRecord()
{
leaderRecord = null;
}
private readonly string id;
public event Action<LeaderElectionRecord> OnCreate;
public event Action<LeaderElectionRecord> OnUpdate;
public event Action<LeaderElectionRecord> OnChange;
public event Action<LeaderElectionRecord> OnTryUpdate;
public MockResourceLock(string id)
{
this.id = id;
}
public Func<bool> UpdateWillFail { get; set; }
public Task<LeaderElectionRecord> GetAsync(CancellationToken cancellationToken = default)
{
return Task.FromResult(leaderRecord);
}
public Task<bool> CreateAsync(
LeaderElectionRecord record,
CancellationToken cancellationToken = default)
{
lock (Lockobj)
{
if (leaderRecord != null)
{
return Task.FromResult(false);
}
leaderRecord = record;
OnCreate?.Invoke(record);
return Task.FromResult(true);
}
}
public Task<bool> UpdateAsync(LeaderElectionRecord record, CancellationToken cancellationToken = default)
{
lock (Lockobj)
{
OnTryUpdate?.Invoke(record);
if (UpdateWillFail?.Invoke() == true)
{
return Task.FromResult(false);
}
var oldRecord = leaderRecord;
leaderRecord = record;
OnUpdate?.Invoke(record);
if (oldRecord?.HolderIdentity != record.HolderIdentity)
{
OnChange?.Invoke(record);
}
return Task.FromResult(true);
}
}
public string Identity => id;
public string Describe() => id;
}
}
}