如何在Web API中维护请求的状态或队列

我有情况,我必须在Web API方法中接收请求,排队这些请求,然后将批量发送到数据库(Solr实例)。

我不确定如何维护来自多个来源的一批请求。 现在我将每个请求数据以json格式写入磁盘上的文件,稍后我将有一个Windows服务,浏览文件夹读取所有文件,更新数据库并删除这些文件。

这是我在Web API中所做的

public void Post(LogEntry value) { value.EventID = Guid.NewGuid(); value.ServerTime = DateTime.UtcNow; string json = JsonConvert.SerializeObject(value); using(StreamWriter sw = new StreamWriter(value.EventID.ToString())) { sw.Write(json); } } 

(这里的EventID是GUID)

这个过程看起来不正确,必须有一种维护请求队列的方法,但我不确定如何在多个请求期间维护队列。

我这样做的原因是,在solr实例中批量插入比通过SolrNet插入单个记录更快。 我希望在Web API上每秒至少获得100个请求。 我想创建一批1000个请求并每10秒更新一次solr实例。 请不要认为我需要代码,只需知道我应采用什么策略来维护请求/状态队列。

如果您使用的是.NET 4.0或更高版本,则可以使用并发队列:

并发队列(MSDN)

这是一种使用队列的线程安全方式,然后可以在所需的时间访问该队列。

编辑:

例:

这将是队列的包装:

 public static class RequestQueue { private static ConcurrentQueue _queue; public static ConcurrentQueue Queue { get { if (_queue == null) { _queue = new ConcurrentQueue(); } return _queue; } } } 

然后你可以像这样设置你的web api(这个例子为了简洁起见存储整数):

 public class ValuesController : ApiController { public string Get() { var sb = new StringBuilder(); foreach (var item in RequestQueue.Queue) { sb.Append(item.ToString()); } return sb.ToString(); } public void Post(int id) { RequestQueue.Queue.Enqueue(id); } } 

如果您使用此示例,您将看到队列包含多个请求中的值。 但是,由于它存在于内存中,如果应用程序池被回收(例如),那些排队的项目将会消失。

现在,您可以检查队列何时保存10个项目,然后将这些项目保存到数据库,同时创建另一个队列来存储传入的值。

像这样:

 public static class RequestQueue { private static ConcurrentQueue _queue; public static ConcurrentQueue Queue { get { if (_queue == null) { _queue = new ConcurrentQueue(); } if (_queue.Count >= 10) { SaveToDB(_queue); _queue = new ConcurrentQueue(); } return _queue; } } public static void SaveToDB(ConcurrentQueue queue) { foreach (var item in queue) { SaveItemToDB(item); } } } 

你需要稍微清理一下,但这种设置应该有效。 此外,您可能需要一些锁定机制来绕过将队列转储到数据库并创建新实例。 我会编写一个控制台应用程序,其中有多个线程可以访问此队列来测试它。

这是使用MSMQ的一个非常好的场景。 对于每个请求,只需将项目发布到MSMQ队列。 在同一个webapp或任何其他应用程序中,只需从队列中读取多个项目并将其发布到solr。 无论您的应用程序崩溃或被回收,MSMQ都会安全地保存您的数据,以便您以后检索它。

MSMQfunction强大,可靠且可扩展。 它非常适合您的问题。

您可以将请求排入内存中的队列,也可以使用Quartz .Net定期将它们发送到数据库。 您可以在Global.asax.cs中完成此操作,如下所示:

 public class RequestQueue { private readonly Queue _requestHistory; private RequestQueue() { _requestHistory = new Queue(); } private static RequestQueue _singleton; public static RequestQueue Instance() { if (_singleton == null) _singleton = new RequestQueue(); return _singleton; } public void Enqueue(HttpRequest request) { _requestHistory.Enqueue(request); } public void Flush() { while (_requestHistory.Count > 0) { var request = _requestHistory.Dequeue(); try { //Write request To Db } catch (Exception) { _requestHistory.Enqueue(request); } } } } public class WebApiApplication : System.Web.HttpApplication { public WebApiApplication() { base.BeginRequest += delegate { RequestQueue.Instance().Enqueue(HttpContext.Current.Request); }; } private void InitializeQuartz() { ISchedulerFactory sf = new StdSchedulerFactory(); IScheduler sched = sf.GetScheduler(); DateTimeOffset runTime = DateBuilder.EvenMinuteDate(DateTime.UtcNow); DateTimeOffset startTime = DateBuilder.NextGivenSecondDate(null, 5); IJobDetail job = JobBuilder.Create() .WithIdentity("job1", "group1") .Build(); ITrigger trigger = TriggerBuilder.Create() .WithIdentity("trigger1", "group1") .StartAt(runTime) .WithCronSchedule("5 0/1 * * * ?") .Build(); sched.ScheduleJob(job, trigger); sched.Start(); } public class QueueConsumer : IJob { public void Execute(IJobExecutionContext context) { RequestQueue.Instance().Flush(); } } protected void Application_Start() { InitializeQuartz(); 

另一种解决方案是将记录保存在与WebApi不在同一进程中的内存队列中。 例如:MemcacheQueue https://github.com/coderrr/memcache_queue

其中一些队列实现具有持久性function,因此在任何情况下都不会丢失数据。

您应该尝试实现NServiceBus能够在将来安排消息和发送消息,从服务总线文档中,您可以调度任务或动作/ lambda,在给定的时间间隔内重复执行。

这意味着您可以拥有一个记忆缓存,并且每10分钟将数组的内容写入您的solr / lucene impl,例如,这更容易:

 Schedule.Every(TimeSpan.FromMinutes(10)).Action(() => { < task to be executed > }) 

如果您需要更灵活地设置schedueler,您可以将它集成到quartz.net

案件应如下:

  • 作为Windows服务的WCF和NServiceBus应该共享相同的上下文或实现可以在系统的这两个不同部分之间共享的cacheManager。
  • 每次你做一个请求你调用你的wcf传递参数并在你的wcf中添加一个内存行到数组(它可能是一个字符串数组,你写入磁盘的json值相同)
  • ServiceBus将处理将操作的队列管理到arrays中,并避免arrays操作的任何冲突,例如:

    • 将项添加到数组中
    • 清空数组
    • 写入您的数据库
 public class ThresholdBuffer { private ConcurrentBag _buffer; private int _threshold; public ThresholdBuffer(int threshold) { _threshold = threshold; _buffer = new ConcurrentBag(); } public void Add(T item) { _buffer.Add(item); if(_buffer.Count >= _threshold) { Recycle(); } } public void Recycle() { var value = Interlocked.Exchange>(ref _buffer, new ConcurrentBag()); //flush value } } 
  1. 创建刷新逻辑
  2. 在Application_Start(Global.asax)事件中创建ThresholdBuffer并将其存储在Application,Static字段等中
  3. 调用Add方法
  4. 在Application_End手动调用Recycle

您可以在Recycle中添加锁定逻辑,以防止创建多个ConcurrentBag并刷新几乎空的包。 但我认为这比锁定更不邪恶。

更新。 无需附加的ConcurrentBag创建即可锁定

 public class ThresholdBuffer { private ConcurrentBag _buffer; private int _copacity; private int _threshold; public ThresholdBuffer(int threshold) { _threshold = threshold; _copacity = 0; _buffer = new ConcurrentBag(); } public void Add(T item) { _buffer.Add(item); if (Interlocked.Increment(ref _copacity) == _threshold) { Recycle(); } } public void Recycle() { var value4flasshing = Interlocked.Exchange>(ref _buffer, new ConcurrentBag()); Thread.VolatileWrite(ref _copacity, 0); } } 

ps您可以使用任何ConcurrentCollection而不是ConcurrentBag