如何在C#中复制F#区分联合类型?

我创建了一个名为Actor的新类,它处理传递给它的消息。 我遇到的问题是弄清楚将相关但不同的消息传递给Actor的最优雅方式是什么。 我的第一个想法是使用inheritance,但它似乎是如此膨胀,但它是强烈的类型,这是一个明确的要求。

有什么想法吗?

private abstract class QueueMessage { } private class ClearMessage : QueueMessage { public static readonly ClearMessage Instance = new ClearMessage(); private ClearMessage() { } } private class TryDequeueMessage : QueueMessage { public static readonly TryDequeueMessage Instance = new TryDequeueMessage(); private TryDequeueMessage() { } } private class EnqueueMessage : QueueMessage { public TValue Item { get; private set; } private EnqueueMessage(TValue item) { Item = item; } } 

演员class

 /// Represents a callback method to be executed by an Actor. /// The type of reply. /// The reply made by the actor. public delegate void ActorReplyCallback(TReply reply); /// Represents an Actor which receives and processes messages in concurrent applications. /// The type of message this actor accepts. /// The type of reply made by this actor. public abstract class Actor : IDisposable { /// The default total number of threads to process messages. private const Int32 DefaultThreadCount = 1; /// Used to serialize access to the message queue. private readonly Locker Locker; /// Stores the messages until they can be processed. private readonly System.Collections.Generic.Queue MessageQueue; /// Signals the actor thread to process a new message. private readonly ManualResetEvent PostEvent; /// This tells the actor thread to stop reading from the queue. private readonly ManualResetEvent DisposeEvent; /// Processes the messages posted to the actor. private readonly List ActorThreads; /// Initializes a new instance of the Genex.Concurrency<TRequest, TResponse> class. public Actor() : this(DefaultThreadCount) { } /// Initializes a new instance of the Genex.Concurrency<TRequest, TResponse> class. ///  public Actor(Int32 thread_count) { if (thread_count < 1) throw new ArgumentOutOfRangeException("thread_count", thread_count, "Must be 1 or greater."); Locker = new Locker(); MessageQueue = new System.Collections.Generic.Queue(); EnqueueEvent = new ManualResetEvent(true); PostEvent = new ManualResetEvent(false); DisposeEvent = new ManualResetEvent(true); ActorThreads = new List(); for (Int32 i = 0; i < thread_count; i++) { var thread = new Thread(ProcessMessages); thread.IsBackground = true; thread.Start(); ActorThreads.Add(thread); } } /// Posts a message and waits for the reply. /// The message to post to the actor. /// The reply from the actor. public TReply PostWithReply(TMessage message) { using (var wrapper = new Message(message)) { lock (Locker) MessageQueue.Enqueue(wrapper); PostEvent.Set(); wrapper.Channel.CompleteEvent.WaitOne(); return wrapper.Channel.Value; } } /// Posts a message to the actor and executes the callback when the reply is received. /// The message to post to the actor. /// The callback that will be invoked once the replay is received. public void PostWithAsyncReply(TMessage value, ActorReplyCallback callback) { if (callback == null) throw new ArgumentNullException("callback"); ThreadPool.QueueUserWorkItem(state => callback(PostWithReply(value))); } /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources. public void Dispose() { if (DisposeEvent.WaitOne(10)) { DisposeEvent.Reset(); PostEvent.Set(); foreach (var thread in ActorThreads) { thread.Join(); } ((IDisposable)PostEvent).Dispose(); ((IDisposable)DisposeEvent).Dispose(); } } /// Processes a message posted to the actor. /// The message to be processed. protected abstract void ProcessMessage(Message message); /// Dequeues the messages passes them to ProcessMessage. private void ProcessMessages() { while (PostEvent.WaitOne() && DisposeEvent.WaitOne(10)) { var message = (Message)null; while (true) { lock (Locker) { message = MessageQueue.Count > 0 ? MessageQueue.Dequeue() : null; if (message == null) { PostEvent.Reset(); break; } } try { ProcessMessage(message); } catch { } } } } /// Represents a message that is passed to an actor. protected class Message : IDisposable { /// The actual value of this message. public TMessage Value { get; private set; } /// The channel used to give a reply to this message. public Channel Channel { get; private set; } /// Initializes a new instance of Genex.Concurrency.Message class. /// The actual value of the message. public Message(TMessage value) { Value = value; Channel = new Channel(); } /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources. public void Dispose() { Channel.Dispose(); } } /// Represents a channel used by an actor to reply to a message. protected class Channel : IDisposable { /// The value of the reply. public TReply Value { get; private set; } /// Signifies that the message has been replied to. public ManualResetEvent CompleteEvent { get; private set; } /// Initializes a new instance of Genex.Concurrency.Channel class. public Channel() { CompleteEvent = new ManualResetEvent(false); } /// Reply to the message received. /// The value of the reply. public void Reply(TReply value) { Value = value; CompleteEvent.Set(); } /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources. public void Dispose() { ((IDisposable)CompleteEvent).Dispose(); } } } 

在您的示例代码中,您根据PostWithReply实现PostWithReply 。 这并不理想,因为这意味着当你调用PostWithAsyncReply并且actor需要一段时间来处理它时,实际上有两个线程被绑定:执行actor的那个和等待它完成的一个。 让一个线程执行actor然后在异步情况下调用回调会更好。 (显然在同步的情况下,没有避免捆绑两个线程)。

更新:

更多关于上述内容:您构造一个带有参数的actor,告诉它要运行多少个线程。 为简单起见,假设每个actor都运行一个线程(实际上是非常好的情况,因为actor可以具有内部状态而没有锁定,因为只有一个线程直接访问它)。

演员A调用演员B,等待回应。 为了处理请求,actor B需要调用actor C.所以现在A和B的唯一线程正在等待,而C是唯一一个实际给CPU做任何工作的线程。 这么multithreading! 但如果你一直等待答案,这就是你得到的。

好的,你可以增加你在每个actor中开始的线程数。 但是你要开始他们所以他们可以无所事事地坐着。 堆栈占用大量内存,上下文切换可能很昂贵。

因此,最好使用回调机制异步发送消息,以便您可以获取完成的结果。 你的实现的问题是你从线程池中获取另一个线程,纯粹是坐下来等待。 所以你基本上应用了增加线程数的解决方法。 您将线程分配给从不运行的任务。

PostWithReply方面实现PostWithReply会更好,即相反的方式。 异步版本是低级别的。 基于我的基于委托的示例(因为它涉及较少的代码输入!):

 private bool InsertCoinImpl(int value) { // only accept dimes/10p/whatever it is in euros return (value == 10); } public void InsertCoin(int value, Action accepted) { Submit(() => accepted(InsertCoinImpl(value))); } 

所以私有实现返回一个bool。 公共异步方法接受将接收返回值的操作; 私有实现和回调操作都在同一个线程上执行。

希望同步等待的需要将成为少数情况。 但是当你需要它时,它可以由一个辅助方法提供,完全是通用的,不依赖于任何特定的actor或消息类型:

 public static T Wait(Action> activity) { T result = default(T); var finished = new EventWaitHandle(false, EventResetMode.AutoReset); activity(r => { result = r; finished.Set(); }); finished.WaitOne(); return result; } 

所以现在在其他演员中我们可以说:

 bool accepted = Helpers.Wait(r => chocMachine.InsertCoin(5, r)); 

Wait的类型参数可能是不必要的,还没有尝试编译任何这些。 但是Wait基本上为你召唤回调,所以你可以把它传递给一些异步方法,而在外面你只需要把回传给回调的东西作为你的返回值。 请注意,传递给Wait的lambda实际上仍然在调用Wait的同一个线程上执行。

我们现在回到我们的常规计划……

至于您询问的实际问题,您向演员发送消息以让它做某事。 代表们在这里很有帮助。 它们让你有效地让编译器生成一个包含一些数据的类,一个你甚至不需要显式调用的构造函数以及一个方法。 如果你不得不写一堆小class,请切换到代表。

 abstract class Actor { Queue _messages = new Queue(); protected void Submit(Action action) { // take out a lock of course _messages.Enqueue(action); } // also a "run" that reads and executes the // message delegates on background threads } 

现在,特定的派生actor遵循以下模式:

 class ChocolateMachineActor : Actor { private void InsertCoinImpl(int value) { // whatever... } public void InsertCoin(int value) { Submit(() => InsertCoinImpl(value)); } } 

因此,要向actor发送消息,您只需调用公共方法即可。 私有的Impl方法可以完成真正的工作。 无需手动编写一堆消息类。

显然我已经省略了有关回复的内容,但这可以通过更多参数来完成。 (见上面的更新)。

Steve Gilham总结了编译器如何实际处理受歧视的联合。 对于您自己的代码,您可以考虑它的简化版本。 鉴于以下F#:

 type QueueMessage = ClearMessage | TryDequeueMessage | EnqueueMessage of T 

这是在C#中模拟它的一种方法:

 public enum MessageType { ClearMessage, TryDequeueMessage, EnqueueMessage } public abstract class QueueMessage { // prevents unwanted subclassing private QueueMessage() { } public abstract MessageType MessageType { get; } ///  /// Only applies to EnqueueMessages ///  public abstract T Item { get; } public static QueueMessage MakeClearMessage() { return new ClearMessage(); } public static QueueMessage MakeTryDequeueMessage() { return new TryDequeueMessage(); } public static QueueMessage MakeEnqueueMessage(T item) { return new EnqueueMessage(item); } private sealed class ClearMessage : QueueMessage { public ClearMessage() { } public override MessageType MessageType { get { return MessageType.ClearMessage; } } ///  /// Not implemented by this subclass ///  public override T Item { get { throw new NotImplementedException(); } } } private sealed class TryDequeueMessage : QueueMessage { public TryDequeueMessage() { } public override MessageType MessageType { get { return MessageType.TryDequeueMessage; } } ///  /// Not implemented by this subclass ///  public override T Item { get { throw new NotImplementedException(); } } } private sealed class EnqueueMessage : QueueMessage { private T item; public EnqueueMessage(T item) { this.item = item; } public override MessageType MessageType { get { return MessageType.EnqueueMessage; } } ///  /// Gets the item to be enqueued ///  public override T Item { get { return item; } } } } 

现在,在给定QueueMessage代码中,您可以打开MessageType属性来代替模式匹配,并确保仅在EnqueueMessage上访问Item属性。

编辑

这是另一种基于朱丽叶代码的替代方案。 我试图简化一些事情,以便它从C#获得更有用的界面。 这比以前的版本更好,因为您无法获得MethodNotImplementedexception。

 public abstract class QueueMessage { // prevents unwanted subclassing private QueueMessage() { } public abstract TReturn Match(Func clearCase, Func tryDequeueCase, Func enqueueCase); public static QueueMessage MakeClearMessage() { return new ClearMessage(); } public static QueueMessage MakeTryDequeueMessage() { return new TryDequeueMessage(); } public static QueueMessage MakeEnqueueMessage(T item) { return new EnqueueMessage(item); } private sealed class ClearMessage : QueueMessage { public ClearMessage() { } public override TReturn Match(Func clearCase, Func tryDequeueCase, Func enqueueCase) { return clearCase(); } } private sealed class TryDequeueMessage : QueueMessage { public TryDequeueMessage() { } public override TReturn Match(Func clearCase, Func tryDequeueCase, Func enqueueCase) { return tryDequeueCase(); } } private sealed class EnqueueMessage : QueueMessage { private T item; public EnqueueMessage(T item) { this.item = item; } public override TReturn Match(Func clearCase, Func tryDequeueCase, Func enqueueCase) { return enqueueCase(item); } } } 

您可以像这样使用此代码:

 public class MessageUserTest { public void Use() { // your code to get a message here... QueueMessage msg = null; // emulate pattern matching, but without constructor names int i = msg.Match( clearCase: () => -1, tryDequeueCase: () => -2, enqueueCase: s => s.Length); } } 

联盟类型和模式匹配非常直接映射到访问者模式,我之前已经发布了几次:

因此,如果您想传递包含许多不同类型的邮件,那么您将无法实现访问者模式。

(警告,未经测试的代码,但应该让你知道它是如何完成的)

假设我们有这样的事情:

 type msg = | Add of int | Sub of int | Query of ReplyChannel let rec counts = function | [] -> (0, 0, 0) | Add(_)::xs -> let (a, b, c) = counts xs in (a + 1, b, c) | Sub(_)::xs -> let (a, b, c) = counts xs in (a, b + 1, c) | Query(_)::xs -> let (a, b, c) = counts xs in (a, b, c + 1) 

你最终得到了这个庞大的C#代码:

 interface IMsgVisitor { T Visit(Add msg); T Visit(Sub msg); T Visit(Query msg); } abstract class Msg { public abstract T Accept(IMsgVistor visitor) } class Add : Msg { public readonly int Value; public Add(int value) { this.Value = value; } public override T Accept(IMsgVisitor visitor) { return visitor.Visit(this); } } class Sub : Msg { public readonly int Value; public Add(int value) { this.Value = value; } public override T Accept(IMsgVisitor visitor) { return visitor.Visit(this); } } class Query : Msg { public readonly ReplyChannel Value; public Add(ReplyChannel value) { this.Value = value; } public override T Accept(IMsgVisitor visitor) { return visitor.Visit(this); } } 

现在,只要您想对消息执行某些操作,就需要实现访问者:

 class MsgTypeCounter : IMsgVisitor { public readonly Tuple State; public MsgTypeCounter(Tuple state) { this.State = state; } public MsgTypeCounter Visit(Add msg) { Console.WriteLine("got Add of " + msg.Value); return new MsgTypeCounter(Tuple.Create(1 + State.Item1, State.Item2, State.Item3)); } public MsgTypeCounter Visit(Sub msg) { Console.WriteLine("got Sub of " + msg.Value); return new MsgTypeCounter(Tuple.Create(State.Item1, 1 + State.Item2, State.Item3)); } public MsgTypeCounter Visit(Query msg) { Console.WriteLine("got Query of " + msg.Value); return new MsgTypeCounter(Tuple.Create(State.Item1, 1 + State.Item2, State.Item3)); } } 

最后你可以像这样使用它:

 var msgs = new Msg[] { new Add(1), new Add(3), new Sub(4), new ReplyChannel(null) }; var counts = msgs.Aggregate(new MsgTypeVisitor(Tuple.Create(0, 0, 0)), (acc, x) => x.Accept(acc)).State; 

是的,它看起来很钝,但这就是你以类型安全的方式传递多个消息的方式,这也是我们不用C#实现联合的原因;)

远射,但无论如何……

我假设区分联合是ADT的F#(抽象数据类型)。 这意味着类型可能是几件事之一。

如果有两个,你可以尝试将它放在一个带有两个类型参数的简单generics类中:

  public struct DiscriminatedUnion { public DiscriminatedUnion(T1 t1) { value = t1; } public DiscriminatedUnion(T2 t1) { value = t2; } public static implicit operator T1(DiscriminatedUnion du) {return (T1)du.value; } public static implicit operator T2(DiscriminatedUnion du) {return (T2)du.value; } object value; } 

为了使它适用于3个或更多,我们需要多次复制这个类。 根据运行时类型,任何人都有一个函数重载的解决方案?

如果你有这个

 type internal Either<'a, 'b> = | Left of 'a | Right of 'b 

在F#中,然后为类A生成的CLR的C#等价物Either<'a, 'b>具有类似的内部类型

 internal class _Left : Either { internal readonly a left1; internal _Left(a left1); } 

每个都有标签,吸气剂和工厂方法

 internal const int tag_Left = 0; internal static Either Left(a Left1); internal a Left1 { get; } 

加上一个鉴别者

 internal int Tag { get; } 

以及一系列实现IStructuralEquatable, IComparable, IStructuralComparable接口的方法

在C#中的Discriminated union中有一个编译时检查的区别联合类型

 private class ClearMessage { public static readonly ClearMessage Instance = new ClearMessage(); private ClearMessage() { } } private class TryDequeueMessage { public static readonly TryDequeueMessage Instance = new TryDequeueMessage(); private TryDequeueMessage() { } } private class EnqueueMessage { public TValue Item { get; private set; } private EnqueueMessage(TValue item) { Item = item; } } 

使用受歧视的联盟可以如下进行:

 // New file // Create an alias using Message = Union; int ProcessMessage(Message msg) { return Message.Match( clear => 1, dequeue => 2, enqueue => 3); }