-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathMongoCollectionSet.cs
125 lines (101 loc) · 4.09 KB
/
MongoCollectionSet.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
using Btms.Model.Data;
using MongoDB.Bson.Serialization.IdGenerators;
using MongoDB.Driver;
using MongoDB.Driver.Linq;
using System.Collections;
using System.Linq.Expressions;
namespace Btms.Backend.Data.Mongo;
public class MongoCollectionSet<T>(MongoDbContext dbContext, string collectionName = null!)
: IMongoCollectionSet<T> where T : class, IDataEntity
{
private readonly IMongoCollection<T> _collection = string.IsNullOrEmpty(collectionName)
? dbContext.Database.GetCollection<T>(typeof(T).Name)
: dbContext.Database.GetCollection<T>(collectionName);
private readonly List<T> _entitiesToInsert = [];
private readonly List<(T Item, string Etag)> _entitiesToUpdate = [];
private IMongoQueryable<T> EntityQueryable => _collection.AsQueryable();
public IEnumerator<T> GetEnumerator()
{
return EntityQueryable.GetEnumerator();
}
IEnumerator IEnumerable.GetEnumerator()
{
return EntityQueryable.GetEnumerator();
}
public Type ElementType => EntityQueryable.ElementType;
public Expression Expression => EntityQueryable.Expression;
public IQueryProvider Provider => EntityQueryable.Provider;
public async Task<T?> Find(string id)
{
return await EntityQueryable.SingleOrDefaultAsync(x => x.Id == id);
}
public async Task<T?> Find(Expression<Func<T, bool>> query)
{
return await EntityQueryable.FirstOrDefaultAsync(query);
}
public async Task PersistAsync(CancellationToken cancellationToken)
{
if (_entitiesToInsert.Any())
{
foreach (var item in _entitiesToInsert)
{
item._Etag = BsonObjectIdGenerator.Instance.GenerateId(null, null).ToString()!;
item.Created = item.UpdatedEntity = DateTime.UtcNow;
await _collection.InsertOneAsync(dbContext.ActiveTransaction?.Session, item, cancellationToken: cancellationToken);
}
_entitiesToInsert.Clear();
}
var builder = Builders<T>.Filter;
if (_entitiesToUpdate.Any())
{
foreach (var item in _entitiesToUpdate)
{
var filter = builder.Eq(x => x.Id, item.Item.Id) & builder.Eq(x => x._Etag, item.Etag);
item.Item._Etag = BsonObjectIdGenerator.Instance.GenerateId(null, null).ToString()!;
item.Item.UpdatedEntity = DateTime.UtcNow;
var session = dbContext.ActiveTransaction?.Session;
var updateResult = session is not null
? await _collection.ReplaceOneAsync(session, filter, item.Item,
cancellationToken: cancellationToken)
: await _collection.ReplaceOneAsync(filter, item.Item,
cancellationToken: cancellationToken);
if (updateResult.ModifiedCount == 0)
{
throw new ConcurrencyException(item.Item.Id!, item.Etag);
}
}
_entitiesToUpdate.Clear();
}
}
public Task Insert(T item, CancellationToken cancellationToken = default)
{
_entitiesToInsert.Add(item);
return Task.CompletedTask;
}
public async Task Update(T item, CancellationToken cancellationToken = default)
{
await Update(item, item._Etag, cancellationToken);
}
public async Task Update(List<T> items, CancellationToken cancellationToken = default)
{
foreach (var item in items)
{
await Update(item, cancellationToken);
}
}
public Task Update(T item, string etag, CancellationToken cancellationToken = default)
{
if (_entitiesToInsert.Exists(x => x.Id == item.Id))
{
return Task.CompletedTask;
}
ArgumentNullException.ThrowIfNull(etag);
_entitiesToUpdate.RemoveAll(x => x.Item.Id == item.Id);
_entitiesToUpdate.Add(new ValueTuple<T, string>(item, etag));
return Task.CompletedTask;
}
public IAggregateFluent<T> Aggregate()
{
return _collection.Aggregate();
}
}