Redis通过锁定分布增量

我需要生成一个计数器,它将被发送到一些api调用。 我的应用程序在多个节点上运行,所以我想要生成一个独特的计数器。 我试过以下代码

public static long GetTransactionCountForUser(int telcoId) { long valreturn = 0; string key = "TelcoId:" + telcoId + ":Sequence"; if (Muxer != null && Muxer.IsConnected && (Muxer.GetDatabase()) != null) { IDatabase db = Muxer.GetDatabase(); var val = db.StringGet(key); int maxVal = 999; if (Convert.ToInt32(val) < maxVal) { valreturn = db.StringIncrement(key); } else { bool isdone = db.StringSet(key, valreturn); //db.SetAdd(key,new RedisValue) .StringIncrement(key, Convert.ToDouble(val)) } } return valreturn; } 

并通过Task Parallel libray运行测试。 当我有边界值时,我看到的是设置了多个时间0条目

请让我知道我需要做哪些更正

更新:我的最终逻辑如下

  public static long GetSequenceNumberForTelcoApiCallViaLuaScript(int telcoId) { long valreturn = 0; int maxIncrement = 9999;//todo via configuration if (true)//todo via configuration { IDatabase db; string key = "TelcoId:" + telcoId + ":SequenceNumber"; if (Muxer != null && Muxer.IsConnected && (db = Muxer.GetDatabase()) != null) { valreturn = (int)db.ScriptEvaluate(@" local result = redis.call('incr', KEYS[1]) if result > tonumber(ARGV[1]) then result = 1 redis.call('set', KEYS[1], result) end return result", new RedisKey[] { key }, flags: CommandFlags.HighPriority, values: new RedisValue[] { maxIncrement }); } } return valreturn; } 

实际上,您的代码在翻转边界周围并不安全,因为您正在执行“获取”,(延迟和思考),“设置” – 而不检查“获取”中的条件是否仍然适用。 如果服务器在项目1000周围繁忙,则可能会获得各种疯狂​​的输出,包括:

 1 2 ... 999 1000 // when "get" returns 998, so you do an incr 1001 // ditto 1002 // ditto 0 // when "get" returns 999 or above, so you do a set 0 // ditto 0 // ditto 1 

选项:

  1. 使用事务和约束API使您的逻辑兼容安全
  2. 通过ScriptEvaluate将您的逻辑重写为Lua脚本

现在,redis事务(每个选项1) 很难 。 就个人而言,我使用“2” – 除了更简单的代码和调试,它意味着你只有1次往返和操作,而不是“获取,观看,获取,多,incr /设置,exec /丢弃“,并从”开始重试“循环以考虑中止场景。 如果你愿意,我可以试着把它写成Lua – 它应该是4行左右。


这是Lua的实现:

 string key = ... for(int i = 0; i < 2000; i++) // just a test loop for me; you'd only do it once etc { int result = (int) db.ScriptEvaluate(@" local result = redis.call('incr', KEYS[1]) if result > 999 then result = 0 redis.call('set', KEYS[1], result) end return result", new RedisKey[] { key }); Console.WriteLine(result); } 

注意:如果您需要参数化最大值,您可以使用:

 if result > tonumber(ARGV[1]) then 

和:

 int result = (int)db.ScriptEvaluate(..., new RedisKey[] { key }, new RedisValue[] { max }); 

(所以ARGV[1]max

有必要了解eval / evalsha (这是ScriptEvaluate调用的) 不与其他服务器请求竞争 ,因此incr和可能的set之间没有任何变化。 这意味着我们不需要复杂的watch等逻辑。

通过事务/约束API,这是相同的(我认为!):

 static int IncrementAndLoopToZero(IDatabase db, RedisKey key, int max) { int result; bool success; do { RedisValue current = db.StringGet(key); var tran = db.CreateTransaction(); // assert hasn't changed - note this handles "not exists" correctly tran.AddCondition(Condition.StringEqual(key, current)); if(((int)current) > max) { result = 0; tran.StringSetAsync(key, result, flags: CommandFlags.FireAndForget); } else { result = ((int)current) + 1; tran.StringIncrementAsync(key, flags: CommandFlags.FireAndForget); } success = tran.Execute(); // if assertion fails, returns false and aborts } while (!success); // and if it aborts, we need to redo return result; } 

复杂,嗯? 这里简单的成功案例是:

 GET {key} # get the current value WATCH {key} # assertion stating that {key} should be guarded GET {key} # used by the assertion to check the value MULTI # begin a block INCR {key} # increment {key} EXEC # execute the block *if WATCH is happy* 

这是……相当多的工作,并涉及多路复用器上的管道停顿。 更复杂的情况(断言失败,监视失败,环绕)会产生略微不同的输出,但应该有效。

您可以使用WATCH命令 – 这样,如果值发生变化,您将收到通知