使用C#中的StackExchange / Sentinel进行Redis故障转移

我们目前正在使用Redis 2.8.4和StackExchange.Redis(并且喜欢它),但目前还没有任何针对硬件故障等的保护。 我正在努力让解决方案正常工作,我们有主/从和监控但不能完全到达那里,我在搜索后找不到任何真正的指针。

所以目前我们已经做到了这一点:

我们在每个节点上有3个redis服务器和sentinel(由Linux人员设置):devredis01:6383(master)devredis02:6383(slave)devredis03:6383(slave)devredis01:26379(sentinel)devredis02:26379(sentinel)devredis03: 26379(哨兵)

我能够将StackExchange客户端连接到redis服务器并进行写入/读取,并使用Redis Desktop Managervalidation是否正在跨所有redis实例复制数据。

我也可以使用不同的ConnectionMultiplexer连接到sentinel服务,查询配置,请求主redis节点,请求奴隶等。

我们还可以终止主redis节点并validation其中一个从属是否被提升为主节点,并且复制到另一个从节点继续工作。 我们可以观察redis连接尝试重新连接到主设备,并且如果我重新创建ConnectionMultiplexer,我可以再次写入/读取新升级的主设备并从从设备读取。

到现在为止还挺好!

我缺少的是你如何在生产系统中将它们整合在一起?

我应该从sentinel获取redis端点并使用2个ConnectionMultiplexers吗? 我究竟需要做些什么来检测节点是否已关闭? StackExchange可以自动为我执行此操作还是通过事件,以便重新连接我的redis ConnectionMultiplexer? 我应该处理ConnectionFailed事件然后重新连接,以便ConnectionMuliplexer找出新主设备是什么? 据推测,当我重新连接时,任何写入的尝试都将丢失?

我希望我不会错过一些非常明显的东西,我只是在努力将它们放在一起。

提前致谢!

我刚刚问了这个问题,并向你和我发现了一个类似的问题我相信我们的代码(客户端)现在知道当前主服务器出现故障时新主服务器的问题是什么?

如何告诉客户新Redis主服务器使用Sentinel的位置

显然,您只需订阅并收听Sentinels的活动。 有道理……我只是觉得有一种更简化的方式。

我读了一些关于Linux的Twemproxy作为代理,可能会为你做这件事吗? 但我正在使用redis for Windows并试图找到Windows选项。 如果这是批准的方式,我们可能会转移到Linux。

我上周花了一些时间与Linux人员测试场景并在这个实现的C#方面工作,并使用以下方法:

  • 从config读取sentinel地址并创建ConnectionMultiplexer以连接它们
  • 订阅+ switch-master频道
  • 每个哨兵服务器依次询问他们认为的主人redis和奴隶是什么,比较他们所有人以确保他们都同意
  • 使用从sentinel和connect读取的redis服务器地址创建一个新的ConnectionMultiplexer,将事件处理程序添加到ConnectionFailed和ConnectionRestored。
  • 当我收到+ switch-master消息时,我在redis ConnectionMultiplexer上调用Configure()
  • 作为带和括号方法,当连接类型为ConnectionType.Interactive时,我总是在收到connectionFailed或connectionRestored事件12秒后在redis ConnectionMultiplexer上调用Configure()。

我发现通常我工作并在失去redis主机约5秒后重新配置。 在这段时间我不能写,但我可以阅读(因为你可以读取一个奴隶)。 5秒对我们来说是好的,因为我们的数据非常快速地更新并且在几秒钟后变得陈旧(并且随后被覆盖)。

我不确定的一件事是我是否应该在实例关闭时从redis ConnectionMultiplexer中删除redis服务器,或者让它继续重试连接。 我决定让它重试,因为它一旦恢复就会重新成为奴隶。 我做过一些性能测试,有没有连接被重试,它似乎没什么区别。 也许有人可以澄清这是否是正确的方法。

每次不时地带回一个以前是高手的实例似乎会引起一些混乱 – 在它恢复后几秒钟我会收到写入的exception – “READONLY”表示我无法写入奴隶。 这种情况很少见,但我发现在连接状态更改后12秒调用Configure()的“全能”方法会遇到此问题。 调用Configure()似乎非常便宜,因此无论是否有必要,都要调用它两次。

既然我有奴隶,我已经卸载了一些我的数据清理代码,这些代码对从属进行了密钥扫描,这让我感到高兴。

总而言之,我非常满意,它并不完美,但对于一些很少发生的东西,它已经足够好了。

我包括我们的Redis包装器,由于各种原因,它与原始答案有所不同:

  • 我们想使用pub / sub
  • Sentinel似乎并不总是在’正确’时间向我们提供主更改消息(即我们调用Configure()并最终认为奴隶是主人)
  • connectionMultiplexer似乎并不总是每次都恢复连接,影响pub / sub

我更倾向于怀疑这取决于我们的sentinel / redis配置。 无论哪种方式,尽管进行了破坏性测试,它仍然不是完全可靠的。 除此之外,由于哨兵“过于敏感”并且在没有任何时候调用故障转移,我们不得不增加超时,因此主人更改了消息需要很长时间。 我认为在虚拟环境中运行也会加剧这个问题。

现在我们只是尝试每5秒进行一次写入测试,而且还有一个“收到的最后一条消息”检查pub / sub,而不是收听订阅。 如果我们遇到任何问题,我们会完全剥离连接并重建它们。 这似乎有点矫枉过正,但实际上它比等待来自哨兵的主变更消息要快得多而且还要快……

没有各种扩展方法和其他类等,这将无法编译,但你明白了。

namespace Smartodds.Framework.Redis { public class RedisClient : IDisposable { public RedisClient(RedisEnvironmentElement environment, Int32 databaseId) { m_ConnectTimeout = environment.ConnectTimeout; m_Timeout = environment.Timeout; m_DatabaseId = databaseId; m_ReconnectTime = environment.ReconnectTime; m_CheckSubscriptionsTime = environment.CheckSubscriptions; if (environment.TestWrite == true) { m_CheckWriteTime = environment.TestWriteTime; } environment.Password.ToCharArray().ForEach((c) => m_Password.AppendChar(c)); foreach (var server in environment.Servers) { if (server.Type == ServerType.Redis) { // will be ignored if sentinel servers are used m_RedisServers.Add(new RedisConnection { Address = server.Host, Port = server.Port }); } else { m_SentinelServers.Add(new RedisConnection { Address = server.Host, Port = server.Port }); } } } public bool IsSentinel { get { return m_SentinelServers.Count > 0; } } public IDatabase Database { get { return _Redis.GetDatabase(m_DatabaseId); } } private ConnectionMultiplexer _Redis { get { if (m_Connecting == true) { throw new RedisConnectionNotReadyException(); } ConnectionMultiplexer redis = m_Redis; if (redis == null) { throw new RedisConnectionNotReadyException(); } return redis; } } private ConnectionMultiplexer _Sentinel { get { if (m_Connecting == true) { throw new RedisConnectionNotReadyException("Sentinel connection not ready"); } ConnectionMultiplexer sentinel = m_Sentinel; if (sentinel == null) { throw new RedisConnectionNotReadyException("Sentinel connection not ready"); } return sentinel; } } public void RegisterSubscription(string channel, Action handler, Int32 maxNoReceiveSeconds) { m_Subscriptions.Add(channel, new RedisSubscription { Channel = channel, Handler = handler, MaxNoReceiveSeconds = maxNoReceiveSeconds, LastUsed = DateTime.UtcNow, }); } public void Connect() { _Connect(true); } private void _Connect(object state) { bool throwException = (bool)state; // if a reconnect is already being attempted, don't hang around waiting if (Monitor.TryEnter(m_ConnectionLocker) == false) { return; } // we took the lock, notify everything we are connecting m_Connecting = true; try { Stopwatch sw = Stopwatch.StartNew(); LoggerQueue.Debug(">>>>>> REDIS CONNECTING... >>>>>>"); // if this is a reconnect, make absolutely sure everything is cleaned up first _KillTimers(); _KillRedisClient(); if (this.IsSentinel == true && m_Sentinel == null) { LoggerQueue.Debug(">>>>>> CONNECTING TO SENTINEL >>>>>> - " + sw.Elapsed); // we'll be getting the redis servers from sentinel ConfigurationOptions sentinelConnection = _CreateRedisConfiguration(CommandMap.Sentinel, null, m_SentinelServers); m_Sentinel = ConnectionMultiplexer.Connect(sentinelConnection); LoggerQueue.Debug(">>>>>> CONNECTED TO SENTINEL >>>>>> - " + sw.Elapsed); _OutputConfigurationFromSentinel(); // get all the redis servers from sentinel and ignore any set by caller m_RedisServers.Clear(); m_RedisServers.AddRange(_GetAllRedisServersFromSentinel()); if (m_RedisServers.Count == 0) { throw new RedisException("Sentinel found no redis servers"); } } LoggerQueue.Debug(">>>>>> CONNECTING TO REDIS >>>>>> - " + sw.Elapsed); // try to connect to all redis servers ConfigurationOptions connection = _CreateRedisConfiguration(CommandMap.Default, _SecureStringToString(m_Password), m_RedisServers); m_Redis = ConnectionMultiplexer.Connect(connection); LoggerQueue.Debug(">>>>>> CONNECTED TO REDIS >>>>>> - " + sw.Elapsed); // register subscription channels m_Subscriptions.ForEach(s => { m_Redis.GetSubscriber().Subscribe(s.Key, (channel, value) => _SubscriptionHandler(channel, value)); s.Value.LastUsed = DateTime.UtcNow; }); if (this.IsSentinel == true) { // check subscriptions have been sending messages if (m_Subscriptions.Count > 0) { m_CheckSubscriptionsTimer = new Timer(_CheckSubscriptions, null, 30000, m_CheckSubscriptionsTime); } if (m_CheckWriteTime != null) { // check that we can write to redis m_CheckWriteTimer = new Timer(_CheckWrite, null, 32000, m_CheckWriteTime.Value); } // monitor for connection status change to any redis servers m_Redis.ConnectionFailed += _ConnectionFailure; m_Redis.ConnectionRestored += _ConnectionRestored; } LoggerQueue.Debug(string.Format(">>>>>> ALL REDIS CONNECTED ({0}) >>>>>>", sw.Elapsed)); } catch (Exception ex) { LoggerQueue.Error(">>>>>> REDIS CONNECT FAILURE >>>>>>", ex); if (throwException == true) { throw; } else { // internal reconnect, the reconnect has failed so might as well clean everything and try again _KillTimers(); _KillRedisClient(); // faster than usual reconnect if failure _ReconnectTimer(1000); } } finally { // finished connection attempt, notify everything and remove lock m_Connecting = false; Monitor.Exit(m_ConnectionLocker); } } private ConfigurationOptions _CreateRedisConfiguration(CommandMap commandMap, string password, List connections) { ConfigurationOptions connection = new ConfigurationOptions { CommandMap = commandMap, AbortOnConnectFail = true, AllowAdmin = true, ConnectTimeout = m_ConnectTimeout, SyncTimeout = m_Timeout, ServiceName = "master", TieBreaker = string.Empty, Password = password, }; connections.ForEach(s => { connection.EndPoints.Add(s.Address, s.Port); }); return connection; } private void _OutputConfigurationFromSentinel() { m_SentinelServers.ForEach(s => { try { IServer server = m_Sentinel.GetServer(s.Address, s.Port); if (server.IsConnected == true) { try { IPEndPoint master = server.SentinelGetMasterAddressByName("master") as IPEndPoint; var slaves = server.SentinelSlaves("master"); StringBuilder sb = new StringBuilder(); sb.Append(">>>>>> _OutputConfigurationFromSentinel Server "); sb.Append(s.Address); sb.Append(" thinks that master is "); sb.Append(master); sb.Append(" and slaves are "); foreach (var slave in slaves) { string name = slave.Where(i => i.Key == "name").Single().Value; bool up = slave.Where(i => i.Key == "flags").Single().Value.Contains("disconnected") == false; sb.Append(name); sb.Append("("); sb.Append(up == true ? "connected" : "down"); sb.Append(") "); } sb.Append(">>>>>>"); LoggerQueue.Debug(sb.ToString()); } catch (Exception ex) { LoggerQueue.Error(string.Format(">>>>>> _OutputConfigurationFromSentinel Could not get configuration from sentinel server ({0}) >>>>>>", s.Address), ex); } } else { LoggerQueue.Error(string.Format(">>>>>> _OutputConfigurationFromSentinel Sentinel server {0} was not connected", s.Address)); } } catch (Exception ex) { LoggerQueue.Error(string.Format(">>>>>> _OutputConfigurationFromSentinel Could not get IServer from sentinel ({0}) >>>>>>", s.Address), ex); } }); } private RedisConnection[] _GetAllRedisServersFromSentinel() { // ask each sentinel server for its configuration List redisServers = new List(); m_SentinelServers.ForEach(s => { try { IServer server = m_Sentinel.GetServer(s.Address, s.Port); if (server.IsConnected == true) { try { // store master in list IPEndPoint master = server.SentinelGetMasterAddressByName("master") as IPEndPoint; redisServers.Add(new RedisConnection { Address = master.Address.ToString(), Port = master.Port }); var slaves = server.SentinelSlaves("master"); foreach (var slave in slaves) { string address = slave.Where(i => i.Key == "ip").Single().Value; string port = slave.Where(i => i.Key == "port").Single().Value; redisServers.Add(new RedisConnection { Address = address, Port = Convert.ToInt32(port) }); } } catch (Exception ex) { LoggerQueue.Error(string.Format(">>>>>> _GetAllRedisServersFromSentinel Could not get redis servers from sentinel server ({0}) >>>>>>", s.Address), ex); } } else { LoggerQueue.Error(string.Format(">>>>>> _GetAllRedisServersFromSentinel Sentinel server {0} was not connected", s.Address)); } } catch (Exception ex) { LoggerQueue.Error(string.Format(">>>>>> _GetAllRedisServersFromSentinel Could not get IServer from sentinel ({0}) >>>>>>", s.Address), ex); } }); return redisServers.Distinct().ToArray(); } private IServer _GetRedisMasterFromSentinel() { // ask each sentinel server for its configuration foreach (RedisConnection sentinel in m_SentinelServers) { IServer sentinelServer = _Sentinel.GetServer(sentinel.Address, sentinel.Port); if (sentinelServer.IsConnected == true) { try { IPEndPoint master = sentinelServer.SentinelGetMasterAddressByName("master") as IPEndPoint; return _Redis.GetServer(master); } catch (Exception ex) { LoggerQueue.Error(string.Format(">>>>>> Could not get redis master from sentinel server ({0}) >>>>>>", sentinel.Address), ex); } } } throw new InvalidOperationException("No sentinel server available to get master"); } private void _ReconnectTimer(Nullable reconnectMilliseconds) { try { lock (m_ReconnectLocker) { if (m_ReconnectTimer != null) { m_ReconnectTimer.Dispose(); m_ReconnectTimer = null; } // since a reconnect will definately occur we can stop the check timers for now until reconnect succeeds (where they are recreated) _KillTimers(); LoggerQueue.Warn(">>>>>> REDIS STARTING RECONNECT TIMER >>>>>>"); m_ReconnectTimer = new Timer(_Connect, false, reconnectMilliseconds.GetValueOrDefault(m_ReconnectTime), Timeout.Infinite); } } catch (Exception ex) { LoggerQueue.Error("Error during _ReconnectTimer", ex); } } private void _CheckSubscriptions(object state) { if (Monitor.TryEnter(m_ConnectionLocker, TimeSpan.FromSeconds(1)) == false) { return; } try { DateTime now = DateTime.UtcNow; foreach (RedisSubscription subscription in m_Subscriptions.Values) { if ((now - subscription.LastUsed) > TimeSpan.FromSeconds(subscription.MaxNoReceiveSeconds)) { try { EndPoint endpoint = m_Redis.GetSubscriber().IdentifyEndpoint(subscription.Channel); EndPoint subscribedEndpoint = m_Redis.GetSubscriber().SubscribedEndpoint(subscription.Channel); LoggerQueue.Warn(string.Format(">>>>>> REDIS Channel '{0}' has not been used for longer than {1}s, IsConnected: {2}, IsConnectedChannel: {3}, EndPoint: {4}, SubscribedEndPoint: {5}, reconnecting...", subscription.Channel, subscription.MaxNoReceiveSeconds, m_Redis.GetSubscriber().IsConnected(), m_Redis.GetSubscriber().IsConnected(subscription.Channel), endpoint != null ? endpoint.ToString() : "null", subscribedEndpoint != null ? subscribedEndpoint.ToString() : "null")); } catch (Exception ex) { LoggerQueue.Error(string.Format(">>>>>> REDIS Error logging out details of Channel '{0}' reconnect", subscription.Channel), ex); } _ReconnectTimer(null); return; } } } catch (Exception ex) { LoggerQueue.Error(">>>>>> REDIS Exception ERROR during _CheckSubscriptions", ex); } finally { Monitor.Exit(m_ConnectionLocker); } } private void _CheckWrite(object state) { if (Monitor.TryEnter(m_ConnectionLocker, TimeSpan.FromSeconds(1)) == false) { return; } try { this.Database.HashSet(Environment.MachineName + "SmartoddsWriteCheck", m_CheckWriteGuid.ToString(), DateTime.UtcNow.Ticks); } catch (RedisConnectionNotReadyException) { LoggerQueue.Warn(">>>>>> REDIS RedisConnectionNotReadyException ERROR DURING _CheckWrite"); } catch (RedisServerException ex) { LoggerQueue.Warn(">>>>>> REDIS RedisServerException ERROR DURING _CheckWrite, reconnecting... - " + ex.Message); _ReconnectTimer(null); } catch (RedisConnectionException ex) { LoggerQueue.Warn(">>>>>> REDIS RedisConnectionException ERROR DURING _CheckWrite, reconnecting... - " + ex.Message); _ReconnectTimer(null); } catch (TimeoutException ex) { LoggerQueue.Warn(">>>>>> REDIS TimeoutException ERROR DURING _CheckWrite - " + ex.Message); } catch (Exception ex) { LoggerQueue.Error(">>>>>> REDIS Exception ERROR during _CheckWrite", ex); } finally { Monitor.Exit(m_ConnectionLocker); } } private void _ConnectionFailure(object sender, ConnectionFailedEventArgs e) { LoggerQueue.Warn(string.Format(">>>>>> REDIS CONNECTION FAILURE, {0}, {1}, {2} >>>>>>", e.ConnectionType, e.EndPoint.ToString(), e.FailureType)); } private void _ConnectionRestored(object sender, ConnectionFailedEventArgs e) { LoggerQueue.Warn(string.Format(">>>>>> REDIS CONNECTION RESTORED, {0}, {1}, {2} >>>>>>", e.ConnectionType, e.EndPoint.ToString(), e.FailureType)); } private void _SubscriptionHandler(string channel, RedisValue value) { // get handler lookup RedisSubscription subscription = null; if (m_Subscriptions.TryGetValue(channel, out subscription) == false || subscription == null) { return; } // update last used subscription.LastUsed = DateTime.UtcNow; // call handler subscription.Handler(channel, value); } public Int64 Publish(string channel, RedisValue message) { try { return _Redis.GetSubscriber().Publish(channel, message); } catch (RedisConnectionNotReadyException) { LoggerQueue.Error("REDIS RedisConnectionNotReadyException ERROR DURING Publish"); throw; } catch (RedisServerException ex) { LoggerQueue.Error("REDIS RedisServerException ERROR DURING Publish - " + ex.Message); throw; } catch (RedisConnectionException ex) { LoggerQueue.Error("REDIS RedisConnectionException ERROR DURING Publish - " + ex.Message); throw; } catch (TimeoutException ex) { LoggerQueue.Error("REDIS TimeoutException ERROR DURING Publish - " + ex.Message); throw; } catch (Exception ex) { LoggerQueue.Error("REDIS Exception ERROR DURING Publish", ex); throw; } } public bool LockTake(RedisKey key, RedisValue value, TimeSpan expiry) { return _Execute(() => this.Database.LockTake(key, value, expiry)); } public bool LockExtend(RedisKey key, RedisValue value, TimeSpan extension) { return _Execute(() => this.Database.LockExtend(key, value, extension)); } public bool LockRelease(RedisKey key, RedisValue value) { return _Execute(() => this.Database.LockRelease(key, value)); } private void _Execute(Action action) { try { action.Invoke(); } catch (RedisServerException ex) { LoggerQueue.Error("REDIS RedisServerException ERROR DURING _Execute - " + ex.Message); throw; } catch (RedisConnectionException ex) { LoggerQueue.Error("REDIS RedisConnectionException ERROR DURING _Execute - " + ex.Message); throw; } catch (TimeoutException ex) { LoggerQueue.Error("REDIS TimeoutException ERROR DURING _Execute - " + ex.Message); throw; } catch (Exception ex) { LoggerQueue.Error("REDIS Exception ERROR DURING _Execute", ex); throw; } } private TResult _Execute(Func function) { try { return function.Invoke(); } catch (RedisServerException ex) { LoggerQueue.Error("REDIS RedisServerException ERROR DURING _Execute - " + ex.Message); throw; } catch (RedisConnectionException ex) { LoggerQueue.Error("REDIS RedisConnectionException ERROR DURING _Execute - " + ex.Message); throw; } catch (TimeoutException ex) { LoggerQueue.Error("REDIS TimeoutException ERROR DURING _Execute - " + ex.Message); throw; } catch (Exception ex) { LoggerQueue.Error("REDIS ERROR DURING _Execute", ex); throw; } } public string[] GetAllKeys(string pattern) { if (m_Sentinel != null) { return _GetAnyRedisSlaveFromSentinel().Keys(m_DatabaseId, pattern).Select(k => (string)k).ToArray(); } else { return _Redis.GetServer(_Redis.GetEndPoints().First()).Keys(m_DatabaseId, pattern).Select(k => (string)k).ToArray(); } } private void _KillSentinelClient() { try { if (m_Sentinel != null) { LoggerQueue.Debug(">>>>>> KILLING SENTINEL CONNECTION >>>>>>"); ConnectionMultiplexer sentinel = m_Sentinel; m_Sentinel = null; sentinel.Close(false); sentinel.Dispose(); } } catch (Exception ex) { LoggerQueue.Error(">>>>>> Error during _KillSentinelClient", ex); } } private void _KillRedisClient() { try { if (m_Redis != null) { Stopwatch sw = Stopwatch.StartNew(); LoggerQueue.Debug(">>>>>> KILLING REDIS CONNECTION >>>>>>"); ConnectionMultiplexer redis = m_Redis; m_Redis = null; if (this.IsSentinel == true) { redis.ConnectionFailed -= _ConnectionFailure; redis.ConnectionRestored -= _ConnectionRestored; } redis.Close(false); redis.Dispose(); LoggerQueue.Debug(">>>>>> KILLED REDIS CONNECTION >>>>>> " + sw.Elapsed); } } catch (Exception ex) { LoggerQueue.Error(">>>>>> Error during _KillRedisClient", ex); } } private void _KillClients() { lock (m_ConnectionLocker) { _KillSentinelClient(); _KillRedisClient(); } } private void _KillTimers() { if (m_CheckSubscriptionsTimer != null) { m_CheckSubscriptionsTimer.Dispose(); m_CheckSubscriptionsTimer = null; } if (m_CheckWriteTimer != null) { m_CheckWriteTimer.Dispose(); m_CheckWriteTimer = null; } } public void Dispose() { _KillClients(); _KillTimers(); } } }