在C#中运行多个任务

我正在开发一个实时软件,我有一些非实时function。

眼镜:

  • 需要实时做方法1;
  • 需要每60分钟做一次方法2。

我应该使用multithreading吗? 任务?

现在我正在使用计时器,但不认为它是一个很好的用途。

将文件立即启动文件出现在文件夹中通常是个坏主意。 例如,如果100个文件一次到达,那么最终会有100个线程全部竞争相同的物理资源,例如磁盘磁头等。在我看来,最好的方法是实时 ,或者至少是真实的使用.Net通知可以实现新文件存在的时间,然后使用生产者 – 消费者队列处理这些文件

最好使用一个线程安全集合,例如ConcurrentQueue ,其中包含活动长时间运行的数量,例如,在同步机制(例如SemaphoreSlim)控制的任何时间处理电子邮件中的线程

使用FileSystemWatcher类可以轻松实现目录中新创建的文件的通知。 理想情况下,基于时间的方式执行处理应该使用观察者模式

我在下面创建了一个简单的应用程序来演示这些应该有用的概念。

该应用程序包含许多关键类

  1. SingletonBase类为应该只实例化一次的类实现Singleton模式 ,例如filesystemwatcher类
  2. FileSystemMonitor类,用于监视正在创建的新文件的目录,并立即通知处理队列存在新文件。 当然,您可以非常轻松地修改它以开始立即处理文件,但如上所述,这通常是个坏主意
  3. FilesWorker类,用于处理队列访问和相关任务同步。

    using System; using System.Collections.Concurrent; using System.Globalization; using System.Reactive.Linq; using System.Reflection; using System.Threading; using System.Threading.Tasks; using System.IO; using System.Security.Permissions; namespace ConsoleApplication9 { internal class Program { private static void Main(string[] args) { const string directorytowatch = @"d:\junk\watch\"; // the directory to watch for new files // this initiates a filesystemmonitor to watch for new files being created Task.Factory.StartNew(() => FileSystemMonitor.Instance.WatchDirectory(directorytowatch)); // initiate the processing of any new files FilesWorker.Instance.ReadQueue(); Console.ReadLine(); } } ///  /// Monitors the filesystem in "real-time" to check for new files ///  [PermissionSet(SecurityAction.Demand, Name = "FullTrust")] internal class FileSystemMonitor : SingletonBase { private FileSystemMonitor() { } internal void WatchDirectory(string dir) { var watcher = new FileSystemWatcher(dir) { NotifyFilter = NotifyFilters.FileName | NotifyFilters.LastWrite | NotifyFilters.LastAccess, Filter = "*.*" }; // watch all files watcher.Created += WatcherOnCreated; watcher.EnableRaisingEvents = true; } private static void WatcherOnCreated(object sender, FileSystemEventArgs fileSystemEventArgs) { Console.WriteLine(fileSystemEventArgs.FullPath + "" + fileSystemEventArgs.ChangeType); // for test purposes var fileInfo = new FileInfo(fileSystemEventArgs.FullPath); FilesWorker.Instance.AddToQueue(fileInfo); } } ///  /// handles the queue of files to be processed and the syncronisation of tasks related to the queue ///  internal class FilesWorker : SingletonBase { private FilesWorker() { } ///  /// The queue of files which still need to be processed ///  private readonly ConcurrentQueue _filesQueue = new ConcurrentQueue(); ///  /// create a semaphore to limit the number of threads which can process a file at any given time // In this case only allow 2 to be processed at any given time ///  private static readonly SemaphoreSlim Semaphore = new SemaphoreSlim(2, 2); ///  /// add new file to the queue ///  ///  internal void AddToQueue(FileInfo fileInfo) { _filesQueue.Enqueue(fileInfo); } ///  /// executes a method on a given timeframe ///  /// method to execute /// time between execution runs (seconds) internal void ExecuteMethod(Action method, double timer) { IObservable observable = Observable.Interval(TimeSpan.FromSeconds(timer)); // Token for cancelation var source = new CancellationTokenSource(); observable.Subscribe(x => { var task = new Task(method); task.Start(); }, source.Token); } ///  /// Get any new files and send for processing ///  internal void ReadQueue() { // check the queue every two seconds ExecuteMethod(ProcessQueue, 2d); } ///  /// takes files from the queue and starts processing ///  internal void ProcessQueue() { try { Semaphore.Wait(); FileInfo fileInfo; while (_filesQueue.TryDequeue(out fileInfo)) { var fileProcessor = new FileProcessor(); fileProcessor.ProcessFile(fileInfo); } } finally { Semaphore.Release(); } } } internal class FileProcessor { internal void ProcessFile(FileInfo fileInfo) { // do some long running tasks with the file } } ///  /// Implements singleton pattern on all classes which derive from it ///  /// Derived class public abstract class SingletonBase where T : class { public static T Instance { get { return SingletonFactory.Instance; } } ///  /// The singleton class factory to create the singleton instance. ///  private class SingletonFactory { static SingletonFactory() { } private SingletonFactory() { } internal static readonly T Instance = GetInstance(); private static T GetInstance() { var theType = typeof(T); T inst; try { inst = (T)theType .InvokeMember(theType.Name, BindingFlags.CreateInstance | BindingFlags.Instance | BindingFlags.NonPublic, null, null, null, CultureInfo.InvariantCulture); } catch (MissingMethodException ex) { var exception = new TypeLoadException(string.Format( CultureInfo.CurrentCulture, "The type '{0}' must have a private constructor to " + "be used in the Singleton pattern.", theType.FullName) , ex); //LogManager.LogException(LogManager.EventIdInternal, exception, "error in instantiating the singleton"); throw exception; } return inst; } } } }