MongoDB C# Driver Change Stream Extension using Rx.NET
$ dotnet add package MongoDB.ReactiveChangeStreamExtension for MongoDB C# Driver to handle MongoDB Change Streams as an IObservale
MongoDB Change Streams are a feature that allow the database to notify subscribers for any change, more about Change Streams
First, install the MongoDB.ReactiveChangeStream Nuget package into your app
Install-Package MongoDB.ReactiveChangeStream
using System.Reactive.Linq;
using MongoDB.Bson;
using MongoDB.Driver;
using MongoDB.ReactiveChangeStream;
var client = new MongoClient("mongodb://localhost:27017/foo?replicaSet=rs0");
var database = client.GetDatabase("foo");
var collection = database.GetCollection<BsonDocument>("bar");
var changeObservable = collection.CreateObservableChangeStream(filter =>
filter.OperationType == ChangeStreamOperationType.Delete ||
filter.OperationType == ChangeStreamOperationType.Insert ||
filter.OperationType == ChangeStreamOperationType.Update ||
filter.OperationType == ChangeStreamOperationType.Replace);
changeObservable
.Buffer(TimeSpan.FromSeconds(5))
.Do(changes =>
{
// Do something
})
.Subscribe();
Make sure you connecting to a MongoDB instance with Replica Set Enabled, as Change Streams require it. read more about how to initialize a Replica Set here...