Microsoft Orleans - Observables

Microsoft Orleans - Observables

As we continue exploring Microsoft Orleans — the virtual actor model framework — we happen upon Observers. Observers can be used to “observe” events, in the form of notifications, as an example.

Microsoft Orleans — Observers

Orleans observers are build by creating an interface that implements the Orleans’ namespaces’ IGrainObserver interface. Observer methods must have a void return type, and can be invoked via a grain when “something” happens, in the form of a method call.

The Orleans Observers documentation can be found:

Observers | Microsoft Orleans Documentation

What we’ll need

There are a few steps to setting up an observer and a grain that can manage observers:

  • New interface that implements IGrainObserver, and a class the implements the new interface.
  • A new grain interface and grain that provide a subscribe, unsubscribe, and notify like method.
  • A class to manage registered observers.
  • A method of using the above observer.

New interface implementing IGrainObserver

To hopefully go for the most straightforward observer, we’ll create an interface (and eventually class) that simply takes in a string message. This interface will look like:

1
2
3
4
public interface IObserverSample : IGrainObserver
{
void ReceiveMessage(string message);
}

In the above, we have a single method that takes in a string named message. This interface will act as our “observer” interface. As you can see this interface is quite simple — the only constraints being observer methods need to have a return type of void and the interface itself must implement the built in Orleans type of IGrainObserver.

New grain interface for observables

Next, we’ll need a grain interface that can handle the registering and unregistering of observers, along with a method that should be used to “notify” the registered observers of the intended to be observed event.

1
2
3
4
5
6
public interface IObservableManager : IGrainWithIntegerKey, IGrainInterfaceMarker
{
Task Subscribe(IObserverSample observer);
Task Unsubscribe(IObserverSample observer);
Task SendMessageToObservers(string message);
}

Again pretty straightforward — we have a Subscribe and Unsubscribe method that take in an IObserverSample (the interface from the previous step), and a SendMessageToObservers, which, strangely enough, can be used to send messages to registered observers.

A class to manage registered observers

The documentation called out using a built in class ObserverSubscriptionManager to assist with managing observers, however this class was apparently moved into a legacy assembly. The class could still be found in some of the Orleans samples, and here is that class with a few tweaks:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
public class GrainObserverManager<T> : IEnumerable<T> where T : IAddressable
{
private readonly Dictionary<T, DateTime> observers = new Dictionary<T, DateTime>();

public GrainObserverManager()
{
this.GetDateTime = () => DateTime.UtcNow;
}

public Func<DateTime> GetDateTime { get; set; }

public TimeSpan ExpirationDuration { get; set; } = TimeSpan.FromMinutes(1);

public int Count => this.observers.Count;

public void Clear()
{
this.observers.Clear();
}

public void Subscribe(T observer)
{
// Add or update the subscription.
this.observers[observer] = this.GetDateTime();
}

public void Unsubscribe(T observer)
{
this.observers.Remove(observer);
}

public async Task Notify(Func<T, Task> notification, Func<T, bool> predicate = null)
{
var now = this.GetDateTime();
var defunct = default(List<T>);
foreach (var observer in this.observers)
{
if (observer.Value + this.ExpirationDuration < now)
{
// Expired observers will be removed.
defunct = defunct ?? new List<T>();
defunct.Add(observer.Key);
continue;
}

// Skip observers which don't match the provided predicate.
if (predicate != null && !predicate(observer.Key))
{
continue;
}

try
{
await notification(observer.Key);
}
catch (Exception)
{
// Failing observers are considered defunct and will be removed..
defunct = defunct ?? new List<T>();
defunct.Add(observer.Key);
}
}

// Remove defunct observers.
if (defunct != default(List<T>))
{
foreach (var observer in defunct)
{
this.observers.Remove(observer);
}
}
}

public void Notify(Action<T> notification, Func<T, bool> predicate = null)
{
var now = this.GetDateTime();
var defunct = default(List<T>);
foreach (var observer in this.observers)
{
if (observer.Value + this.ExpirationDuration < now)
{
// Expired observers will be removed.
defunct = defunct ?? new List<T>();
defunct.Add(observer.Key);
continue;
}

// Skip observers which don't match the provided predicate.
if (predicate != null && !predicate(observer.Key))
{
continue;
}

try
{
notification(observer.Key);
}
catch (Exception)
{
// Failing observers are considered defunct and will be removed..
defunct = defunct ?? new List<T>();
defunct.Add(observer.Key);
}
}

// Remove defunct observers.
if (defunct != default(List<T>))
{
foreach (var observer in defunct)
{
this.observers.Remove(observer);
}
}
}

public void ClearExpired()
{
var now = this.GetDateTime();
var defunct = default(List<T>);
foreach (var observer in this.observers)
{
if (observer.Value + this.ExpirationDuration < now)
{
// Expired observers will be removed.
defunct = defunct ?? new List<T>();
defunct.Add(observer.Key);
}
}

// Remove defunct observers.
if (defunct != default(List<T>))
{
foreach (var observer in defunct)
{
this.observers.Remove(observer);
}
}
}

public IEnumerator<T> GetEnumerator()
{
return this.observers.Keys.GetEnumerator();
}

IEnumerator IEnumerable.GetEnumerator()
{
return this.observers.Keys.GetEnumerator();
}
}

Note, the original class I found on the Orleans github repo (under their samples):

dotnet/orleans

New Grain Implementation

we have all the groundwork and abstractions created for our observer/observable — now we need concretions for those interfaces.

The one new grain being introduced handles the sub/unsubbing, as well as notification “event” to the subscribed observers. This grain should look relatively familiar:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class ObservableManager : Grain, IObservableManager, IGrainMarker
{
private GrainObserverManager<IObserverSample> _subsManager;

public override async Task OnActivateAsync()
{
_subsManager = new GrainObserverManager<IObserverSample>();
await base.OnActivateAsync();
}

public Task SendMessageToObservers(string message)
{
_subsManager.Notify(n => n.ReceiveMessage(message));
return Task.CompletedTask;
}

public Task Subscribe(IObserverSample observer)
{
_subsManager.Subscribe(observer);
return Task.CompletedTask;
}

public Task Unsubscribe(IObserverSample observer)
{
_subsManager.Unsubscribe(observer);
return Task.CompletedTask;
}
}

In the above, the only new thing not covered before (pretty sure) is the overriding of OnActivateAsync. In this method, we’re newing up the _subsManager and proceeding with the base implementation.

The Subscribe and Unsubscribe methods register or remove the passed in IObserverSample from the GrainObserverManager, while the Notify method sends the event notification to all subscribed observers.

New IOrleansFunctions

In this demo, two new IOrleansFunctions are to be introduced. One of the functions will be used as an observer, and the other will be used to send messages to that observer.

Starting with the simpler of the two, the event sender:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class GrainObserverEventSender : IOrleansFunction
{
public string Description => "This function can be used to send a message to subscribed observers.";

public async Task PerformFunction(IClusterClient clusterClient)
{
var grain = clusterClient.GetGrain<IObservableManager>(0);

Console.WriteLine("Enter a message to send to subscribed observers.");
var message = Console.ReadLine();

await grain.SendMessageToObservers(message);

ConsoleHelpers.ReturnToMenu();
}
}

In the above, we’re using one of the three methods from the grain interface defined earlier. From here, we’re just utilizing the function to send user entered messages to our subscribed observers (if any exist).

How do we get observers to exist? That can be accomplished with the second IOrleansFunction.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class GrainObserverReceiver : IOrleansFunction, IObserverSample
{
private bool _shouldBreakLoop;

public string Description => "Acts as a receiver of observed messages. When the observer manager notifies subscribed observers like this class, they take action on the notification.";

public async Task PerformFunction(IClusterClient clusterClient)
{
Console.WriteLine("Observing for behavior, stops once behavior observed.");

var observerManager = clusterClient.GetGrain<IObservableManager>(0);
var observerRef = await clusterClient
.CreateObjectReference<IObserverSample>(this);

while (!_shouldBreakLoop)
{
await observerManager.Subscribe(observerRef);
await Task.Delay(5000);
}

await observerManager.Unsubscribe(observerRef);

ConsoleHelpers.ReturnToMenu();
}

public void ReceiveMessage(string message)
{
ConsoleHelpers.LineSeparator();
Console.WriteLine("Observed Behavior:");
Console.WriteLine(message);

_shouldBreakLoop = true;
}
}

A few new things happening in the above IOrleansFunction. First, our PerformFunction method is being used to occasionally subscribe to our observer manager grain — this is done as sort of a “heartbeat” to keep the observer alive. I don’t think it has to be done this way, but working with the sample code from the documentation, this seemed to work out ok. I guess the alternative is not having observers expire, and keeping them around indefinitely? In the above, we’re doing our normal GetGrain call, but additionally, we’re setting this as an observer reference, to be registered with the observer manager.

The other method ReceiveMessage is the method being implemented from the IObserverSample. This method is the handler for what happens when the Notify from the observer manager is called.

Demoing the observer

Now all that’s left is to run the application and make sure it works! For this demo we’ll as usual run the silohost and client, though this time we’ll actually be running two clients. One client will be used as the observer, and the other will be used to send a message to the observer. What does this look like?

Code as of this post can be found here:

Kritner-Blogs/OrleansGettingStarted

Author

Russ Hammett

Posted on

2019-01-16

Updated on

2022-10-13

Licensed under

Comments