ongoDB 是一個介于關系數據庫和非關系數據庫之間的產品,是非關系數據庫當中功能最豐富,最像關系數據庫的。他支持的數據結構非常松散,是類似json的bson格式,因此可以存儲比較復雜的數據類型。Mongo最大的特點是他支持的查詢語言非常強大,其語法有點類似于面向對象的查詢語言,幾乎可以實現類似關系數據庫單表查詢的絕大部分功能,而且還支持對數據建立索引。 |
1.下載Linux安裝包
如下圖,放到本地的某個角落,要記得位置哦~
2.連接你的服務器
ssh root@你的IP //回車輸入密碼
3.安裝包上傳
另開ssh窗口(command+n),如果是windows就打開新的cmd窗口,因為我們要操作本地文件,之前的窗口我們已經登了服務器了。傳的方法很多,我只演示其中一種。
cd "安裝包所在文件夾" // 去你剛才安裝包放的位置
scp "本地文件" root@你的IP // 回車輸入密碼開始傳送
如下圖,我是傳送到服務器的根目錄下。其實不建議,你可以放在一個固有目錄,比如/usr/local
ls -l //以列表的形式展開,我們可以看到已經下載成功了。
4.解壓安裝包
tar -zxvf mongodb-linux-x86_64-3.4.6.tgz //解壓 不同文件有不同的解壓命令,自己百度即可
5.填坑的一步
剛才說大家可以把每次安裝的東東放到固定的文件夾,所以我把解壓好的文件移到了 /usr/local/mongodb目錄了,如果你已經放到了你喜歡的位置,可以跳過此步驟。如果你不知道怎么移,可以看下。
cd /user/local //進入local
mkdir mongodb //創建mongodb文件夾
cd / //進入根目錄
mv mongodb-linux-x86_64-3.4.6 /usr/local/mongodb /*將剛才的解壓包移入 /usr/local/mongodb*/
6.配置mongodb運行環境
mkdir data //創建data文件夾,存放數據庫db文件
mkdir logs //創建logs文件夾,存放日志文件
cd logs //進入logs
touch mongo.log //創建log文件
cd .. //返回上一級
mkdir etc //創建配置文件夾
cd etc // 進入etc
vim mongo.conf //編輯同時創建mongo.conf 文件
以下是mongo.conf文件里的代碼,大家看好不要寫錯,進入文件之后要按一下鍵盤i才能開始編寫代碼。
dbpath = /usr/local/mongodb/data //路徑一定要輸入絕對的
logpath = /usr/local/mongodb/logs/mongo.log //路徑一定要輸入絕對的
logappend = true
journal = true
quiet = true
port = 27017 //端口
寫好了按esc鍵退出,然后按shift+:會在最下面出現:然后輸入wq,保存并退出的意思。
到了這一步就接近尾聲了。
7.啟動mongodb
cd mongodb/mongodb-linux-x86_64-3.4.6/bin //進入安裝包的bin目錄下
mongod -f /usr/local/mongodb/etc/mongo.conf //啟動1方法 或 ./mongod -f /usr/local/mongodb/etc/mongo.conf //啟動2方法
如果輸入這個命令,出現如下圖,沒什么變化,只有光標閃爍。
8.本地測試是否成功
我用的可視化工具 Robo 3t,新建鏈接,address輸入服務器地址,端口填你mongo.conf里配置的,基本都是27027。
點擊保存,如下圖操作,進行 Connect。
如果到了這一步基本就是成功啦。
9.小坑警示
我之前在mongo.conf 里 路徑寫的是相對的,就出現了下面的問題。
然后就是這樣子的。
10.總結
方法有很多,我寫的只是其中一種,自己也是第一次安裝。其中也有點坑坑,遇到問題不要認輸,總會解決的,寫的不對不好的地方,希望大家不吝賜教~
本文地址:https://www.linuxprobe.com/linux-mongodb.html
光鬧鐘app開發者,請關注我,后續分享更精彩!
堅持原創,共同進步!
MongoDB是一個可擴展,高性能,開源,面向文檔的,基于鍵/值類型的數據庫。非常適合保存大對象及json格式的數據。本文將介紹MongoDB的單機和集群副本安裝,記錄下來以備后用。希望對有需要的朋友有所幫助和參考。
# 歷史產品和版本選擇
https://www.mongodb.com/docs/legacy/
# 3.6版本文檔地址
https://www.mongodb.com/docs/v3.6/
- Primary掛掉之后,會選舉出一個Secondary作為Primary,與zookeeper類似。
- Arbitry(仲裁節點)上面不存數據,只是為了湊數。選舉算法要求節點數必須是奇數個,如果Primary+Secondary不是奇數個,就要用Arbitry湊數。
- 寫數據只能在Primary,讀數據默認也在Primary,可以配置成從Secondary讀,可以選最近的節點。
- 數據在Primary上寫成功之后,會將操作記錄在oplog中,Secondary將oplog拷貝過去,然后照著操作一遍,就有數據了。
- Primary和Secondary上面的數據保證最終一致性,可以為寫操作配置write concern,有幾個級別:在Primary上寫完就認為寫成功;寫到oplog后認為寫成功;寫到一個/多個/某個/某幾個Secondary之后認為寫成功,等等。
centos 系統 yum源創建
cat >/etc/yum.repos.d/mongodb-org-3.6.repo<<eof
[mongodb-org-3.6]
name=MongoDB Repository
baseurl=https://repo.mongodb.org/yum/redhat/6Server/mongodb-org/3.6/x86_64/
gpgcheck=1
enabled=1
gpgkey=https://www.mongodb.org/static/pgp/server-3.6.asc
eof
根據具體情況,選擇版本安裝
# 安裝最近版本
# sudo yum install -y mongodb-org
# 安裝指定版本
sudo yum install --disablerepo=kubernetes -y mongodb-org-3.6.8 mongodb-org-server-3.6.8 mongodb-org-shell-3.6.8 mongodb-org-mongos-3.6.8 mongodb-org-tools-3.6.8
默認配置文件/etc/mongod.conf
- storage.dbPath 數據存儲目錄
- systemLog.path 系統日志目錄
- net.bindIp 訪問ip綁定,默認127.0.0.1。需要外部機器連接時,需調整成新人的訪問源ip,或者測試所有ip放開0.0.0.0
啟動MongoDB
sudo systemctl start mongod
# 如果啟動報如下報錯
# Failed to start mongod.service: Unit mongod.service not found.
# 啟動以下命令
# sudo systemctl daemon-reload
查看MongoDB狀態
sudo systemctl status mongod
# 如果想要重啟系統MongoDB服務自動啟動,可設置以下命令
# sudo systemctl enable mongod
停止MongoDB
sudo systemctl stop mongod
重啟MongoDB
sudo systemctl restart mongod
連接MongoDB
mongo --host 127.0.0.1:27017
參考文檔
官方文檔:https://www.mongodb.com/docs/v3.6/administration/replica-set-deployment/
環境
3個節點分別安裝MongoDB實例
修改/etc/mongod.conf以下配置項:
#replica set的名字
replication:
replSetName: "rs0"
#client訪問ip綁定,開發測試0.0.0.0所有ip
net:
bindIp: localhost,<ip address>
分別在3個節點上啟動MongoDB服務
sudo systemctl start mongod
# 如果啟動報如下報錯
# Failed to start mongod.service: Unit mongod.service not found.
# 啟動以下命令
# sudo systemctl daemon-reload
Initiate the replica set(初始化副本集)
任意一個節點控制臺登錄MongoDB
[root@dev2 ~]# mongo
MongoDB shell version v3.6.8
connecting to: mongodb://127.0.0.1:27017
MongoDB server version: 3.6.8
Server has startup warnings:
2022-05-11T09:36:02.035+0800 I CONTROL [initandlisten]
2022-05-11T09:36:02.035+0800 I CONTROL [initandlisten] ** WARNING: Access control is not enabled for the database.
2022-05-11T09:36:02.035+0800 I CONTROL [initandlisten] ** Read and write access to data and configuration is unrestricted.
2022-05-11T09:36:02.035+0800 I CONTROL [initandlisten]
2022-05-11T09:36:02.035+0800 I CONTROL [initandlisten]
2022-05-11T09:36:02.035+0800 I CONTROL [initandlisten] ** WARNING: /sys/kernel/mm/transparent_hugepage/enabled is 'always'.
2022-05-11T09:36:02.035+0800 I CONTROL [initandlisten] ** We suggest setting it to 'never'
2022-05-11T09:36:02.035+0800 I CONTROL [initandlisten]
2022-05-11T09:36:02.035+0800 I CONTROL [initandlisten] ** WARNING: /sys/kernel/mm/transparent_hugepage/defrag is 'always'.
2022-05-11T09:36:02.035+0800 I CONTROL [initandlisten] ** We suggest setting it to 'never'
2022-05-11T09:36:02.035+0800 I CONTROL [initandlisten]
2022-05-11T09:36:02.035+0800 I CONTROL [initandlisten] ** WARNING: soft rlimits too low. rlimits set to 4096 processes, 64000 files. Number of processes should be at least 32000 : 0.5 times number of files.
2022-05-11T09:36:02.035+0800 I CONTROL [initandlisten]
>
初始化副本集。 注:只需在集群一臺機器上執行初始化命令。
rs.initiate( {
_id : "rs0",
members: [
{ _id: 0, host: "dev2:27017" },
{ _id: 1, host: "dev3:27017" },
{ _id: 2, host: "dev4:27017" }
]
})
執行結果如下:
> rs.initiate( {
... _id : "rs0",
... members: [
... { _id: 0, host: "dev2:27017" },
... { _id: 1, host: "dev3:27017" },
... { _id: 2, host: "dev4:27017" }
... ]
... })
{
"ok" : 1,
"operationTime" : Timestamp(1653010782, 1),
"$clusterTime" : {
"clusterTime" : Timestamp(1653010782, 1),
"signature" : {
"hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="),
"keyId" : NumberLong(0)
}
}
}
查看配置結果:控制臺執行以下指令
rs.conf()
家好,我是Edison。
最近工作中需要用到MongoDB的事務操作,因此參考了一些資料封裝了一個小的組件,提供基礎的CRUD Repository基類 和 UnitOfWork工作單元模式。今天,就來簡單介紹一下這個小組件。
MongoDB在4.2版本開始全面支持了多文檔事務,至今已過了四年了,雖然我們可能沒有在項目中用MongoDB來替代傳統關系型數據庫如MySQL/SQL Server,但是不能否認MongoDB已經在事務能力上愈發成熟了。
在MongoDB中,所謂的事務主要指的是多個文檔的事務,其使用方式和傳統關系型數據庫差不多。但我們需要注意的是:多文檔事務只能應用在副本集 或 mongos 節點上。如果你只是一個單點的mongo實例,是無法進行多文檔事務實踐的。
畫外音:如果你對MongoDB感興趣,不妨看看我的這個系列博客:《MongoDB入門到實踐學習之旅》
那么,如何快速進行事務操作呢?
var session = db.getMongo().startSession();
session.startTransaction({readConcern: { level: 'majority' },writeConcern: { w: 'majority' }});
var coll1 = session.getDatabase('students').getCollection('teams');
coll1.update({name: 'yzw-football-team'}, {$set: {members: 20}});
var coll2 = session.getDatabase('students').getCollection('records');
coll1.update({name: 'Edison'}, {$set: {gender: 'Female'}});
// 成功提交事務
session.commitTransaction();
// 失敗事務回滾
session.abortTransaction();
using (var clientSession = mongoClient.StartSession())
{
try
{
var contacts = clientSession.Client.GetDatabase("testDB").GetCollection<Contact>("contacts");
contacts.ReplaceOne(contact => contact.Id == "1234455", contact);
var books = clientSession.Client.GetDatabase("testDB").GetCollection<Book>("books");
books.DeleteOne(book => book.Id == "1234455");
clientSession.CommitTransaction();
}
catch (Exception ex)
{
// to do some logging
clientSession.AbortTransaction();
}
}
public class MongoDbConnection : IMongoDbConnection
{
public IMongoClient DatabaseClient { get; }
public string DatabaseName { get; }
public MongoDbConnection(MongoDatabaseConfigs configs, IConfiguration configuration)
{
DatabaseClient = new MongoClient(configs.GetMongoClientSettings(configuration));
DatabaseName = configs.DatabaseName;
}
}
/** Config Example
"MongoDatabaseConfigs": {
"Servers": "xxx01.edisontalk.net,xxx02.edisontalk.net,xxx03.edisontalk.net",
"Port": 27017,
"ReplicaSetName": "edt-replica",
"DatabaseName": "EDT_Practices",
"AuthDatabaseName": "admin",
"ApplicationName": "Todo",
"UserName": "service_testdev",
"Password": "xxxxxxxxxxxxxxxxxxxxxxxx",
"UseTLS": true,
"AllowInsecureTLS": true,
"SslCertificatePath": "/etc/pki/tls/certs/EDT_CA.cer",
"UseEncryption": true
}
**/
public class MongoDatabaseConfigs
{
private const string DEFAULT_AUTH_DB = "admin"; // Default AuthDB: admin
public string Servers { get; set; }
public int Port { get; set; } = 27017; // Default Port: 27017
public string ReplicaSetName { get; set; }
public string DatabaseName { get; set; }
public string DefaultCollectionName { get; set; } = string.Empty;
public string ApplicationName { get; set; }
public string UserName { get; set; }
public string Password { get; set; }
public string AuthDatabaseName { get; set; } = DEFAULT_AUTH_DB; // Default AuthDB: admin
public string CustomProperties { get; set; } = string.Empty;
public bool UseTLS { get; set; } = false;
public bool AllowInsecureTLS { get; set; } = true;
public string SslCertificatePath { get; set; } = string.Empty;
public bool StoreCertificateInKeyStore { get; set; } = false;
public MongoClientSettings GetMongoClientSettings(IConfiguration configuration = )
{
if (string.IsOrWhiteSpace(Servers))
throw new ArgumentException("Mongo Servers Configuration is Missing!");
if (string.IsOrWhiteSpace(UserName) || string.IsOrWhiteSpace(Password))
throw new ArgumentException("Mongo Account Configuration is Missing!");
// Base Configuration
MongoClientSettings settings = new MongoClientSettings
{
ApplicationName = ApplicationName,
ReplicaSetName = ReplicaSetName
};
// Credential
settings.Credential = MongoCredential.CreateCredential(AuthDatabaseName, UserName, Password);
// Servers
var mongoServers = Servers.Split(",", StringSplitOptions.RemoveEmptyEntries).ToList();
if (mongoServers.Count == 1) // Standalone
{
settings.Server = new MongoServerAddress(mongoServers.First(), Port);
settings.DirectConnection = true;
}
if (mongoServers.Count > 1) // Cluster
{
var mongoServerAddresses = new List<MongoServerAddress>();
foreach (var mongoServer in mongoServers)
{
var mongoServerAddress = new MongoServerAddress(mongoServer, Port);
mongoServerAddresses.Add(mongoServerAddress);
}
settings.Servers = mongoServerAddresses;
settings.DirectConnection = false;
}
// SSL
if (UseTLS)
{
settings.UseTls = true;
settings.AllowInsecureTls = AllowInsecureTLS;
if (string.IsOrWhiteSpace(SslCertificatePath))
throw new ArgumentException("SslCertificatePath is Missing!");
if (StoreCertificateInKeyStore)
{
var localTrustStore = new X509Store(StoreName.Root);
var certificateCollection = new X509Certificate2Collection();
certificateCollection.Import(SslCertificatePath);
try
{
localTrustStore.Open(OpenFlags.ReadWrite);
localTrustStore.AddRange(certificateCollection);
}
catch (Exception ex)
{
throw;
}
finally
{
localTrustStore.Close();
}
}
var certs = new List<X509Certificate> { new X509Certificate2(SslCertificatePath) };
settings.SslSettings = new SslSettings();
settings.SslSettings.ClientCertificates = certs;
settings.SslSettings.EnabledSslProtocols = System.Security.Authentication.SslProtocols.Tls13;
}
return settings;
}
}
public class MongoDbContext : IMongoDbContext
{
private readonly IMongoDatabase _database;
private readonly IMongoClient _mongoClient;
private readonly IList<Func<IClientSessionHandle, Task>> _commands
= new List<Func<IClientSessionHandle, Task>>();
public MongoDbContext(IMongoDbConnection dbClient)
{
_mongoClient = dbClient.DatabaseClient;
_database = _mongoClient.GetDatabase(dbClient.DatabaseName);
}
public void AddCommand(Func<IClientSessionHandle, Task> func)
{
_commands.Add(func);
}
public async Task AddCommandAsync(Func<IClientSessionHandle, Task> func)
{
_commands.Add(func);
await Task.CompletedTask;
}
/// <summary>
/// NOTES: Only works in Cluster mode
/// </summary>
public int Commit(IClientSessionHandle session)
{
try
{
session.StartTransaction();
foreach (var command in _commands)
{
command(session);
}
session.CommitTransaction();
return _commands.Count;
}
catch (Exception ex)
{
session.AbortTransaction();
return 0;
}
finally
{
_commands.Clear();
}
}
/// <summary>
/// NOTES: Only works in Cluster mode
/// </summary>
public async Task<int> CommitAsync(IClientSessionHandle session)
{
try
{
session.StartTransaction();
foreach (var command in _commands)
{
await command(session);
}
await session.CommitTransactionAsync();
return _commands.Count;
}
catch (Exception ex)
{
await session.AbortTransactionAsync();
return 0;
}
finally
{
_commands.Clear();
}
}
public IClientSessionHandle StartSession()
{
var session = _mongoClient.StartSession();
return session;
}
public async Task<IClientSessionHandle> StartSessionAsync()
{
var session = await _mongoClient.StartSessionAsync();
return session;
}
public IMongoCollection<T> GetCollection<T>(string name)
{
return _database.GetCollection<T>(name);
}
public void Dispose()
{
GC.SuppressFinalize(this);
}
}
數據倉儲:MongoRepositoryBase
在實際項目中,我們都希望有一個基礎的RepositoryBase類,將CRUD的方法都封裝了,我們實際中就只需要創建一個對應的Repository集成這個RepositoryBase就行了,無需再重復編寫CRUD的方法。那么,也就有了這個MongoRepositoryBase類:
public class MongoRepositoryBase<TEntity> : IMongoRepositoryBase<TEntity>
where TEntity : MongoEntityBase, new()
{
protected readonly IMongoDbContext _dbContext;
protected readonly IMongoCollection<TEntity> _dbSet;
private readonly string _collectionName;
private const string _keyField = "_id";
public MongoRepositoryBase(IMongoDbContext mongoDbContext)
{
_dbContext = mongoDbContext;
_collectionName = typeof(TEntity).GetAttributeValue((TableAttribute m) => m.Name)
?? typeof(TEntity).Name;
if (string.IsOrWhiteSpace(_collectionName))
throw new ArgumentException("Mongo DatabaseName can't be ! Please set the attribute Table in your entity class.");
_dbSet = mongoDbContext.GetCollection<TEntity>(_collectionName);
}
#region Create Part
public async Task AddAsync(TEntity entity, IClientSessionHandle session = )
{
if (session == )
await _dbSet.InsertOneAsync(entity);
else
await _dbContext.AddCommandAsync(async (session) => await _dbSet.InsertOneAsync(entity));
}
public async Task AddManyAsync(IEnumerable<TEntity> entityList, IClientSessionHandle session = )
{
if (session == )
await _dbSet.InsertManyAsync(entityList);
else
await _dbContext.AddCommandAsync(async (session) => await _dbSet.InsertManyAsync(entityList));
}
#endregion
# region Delete Part
public async Task DeleteAsync(string id, IClientSessionHandle session = )
{
if (session == )
await _dbSet.DeleteOneAsync(Builders<TEntity>.Filter.Eq(_keyField, new ObjectId(id)));
else
await _dbContext.AddCommandAsync(async (session) => await _dbSet.DeleteOneAsync(Builders<TEntity>.Filter.Eq(_keyField, new ObjectId(id))));
}
public async Task DeleteAsync(Expression<Func<TEntity, bool>> expression, IClientSessionHandle session = )
{
if (session == )
await _dbSet.DeleteOneAsync(expression);
else
await _dbContext.AddCommandAsync(async (session) => await _dbSet.DeleteOneAsync(expression));
}
public async Task<DeleteResult> DeleteManyAsync(FilterDefinition<TEntity> filter, IClientSessionHandle session = )
{
if (session == )
return await _dbSet.DeleteManyAsync(filter);
await _dbContext.AddCommandAsync(async (session) => await _dbSet.DeleteManyAsync(filter));
return new DeleteResult.Acknowledged(10);
}
public async Task<DeleteResult> DeleteManyAsync(Expression<Func<TEntity, bool>> expression, IClientSessionHandle session = )
{
if (session == )
return await _dbSet.DeleteManyAsync(expression);
await _dbContext.AddCommandAsync(async (session) => await _dbSet.DeleteManyAsync(expression));
return new DeleteResult.Acknowledged(10);
}
#endregion
#region Update Part
public async Task UpdateAsync(TEntity entity, IClientSessionHandle session = )
{
if (session == )
await _dbSet.ReplaceOneAsync(item => item.Id == entity.Id, entity);
else
await _dbContext.AddCommandAsync(async (session) => await _dbSet.ReplaceOneAsync(item => item.Id == entity.Id, entity));
}
public async Task UpdateAsync(Expression<Func<TEntity, bool>> expression, Expression<Action<TEntity>> entity, IClientSessionHandle session = )
{
var fieldList = new List<UpdateDefinition<TEntity>>();
if (entity.Body is MemberInitExpression param)
{
foreach (var item in param.Bindings)
{
var propertyName = item.Member.Name;
object propertyValue = ;
if (item is not MemberAssignment memberAssignment) continue;
if (memberAssignment.Expression.NodeType == ExpressionType.Constant)
{
if (memberAssignment.Expression is ConstantExpression constantExpression)
propertyValue = constantExpression.Value;
}
else
{
propertyValue = Expression.Lambda(memberAssignment.Expression, ).Compile().DynamicInvoke();
}
if (propertyName != _keyField)
{
fieldList.Add(Builders<TEntity>.Update.Set(propertyName, propertyValue));
}
}
}
if (session == )
await _dbSet.UpdateOneAsync(expression, Builders<TEntity>.Update.Combine(fieldList));
else
await _dbContext.AddCommandAsync(async (session) => await _dbSet.UpdateOneAsync(expression, Builders<TEntity>.Update.Combine(fieldList)));
}
public async Task UpdateAsync(FilterDefinition<TEntity> filter, UpdateDefinition<TEntity> update, IClientSessionHandle session = )
{
if (session == )
await _dbSet.UpdateOneAsync(filter, update);
else
await _dbContext.AddCommandAsync(async (session) => await _dbSet.UpdateOneAsync(filter, update));
}
public async Task UpdateManyAsync(Expression<Func<TEntity, bool>> expression, UpdateDefinition<TEntity> update, IClientSessionHandle session = )
{
if (session == )
await _dbSet.UpdateManyAsync(expression, update);
else
await _dbContext.AddCommandAsync(async (session) => await _dbSet.UpdateManyAsync(expression, update));
}
public async Task<UpdateResult> UpdateManayAsync(Dictionary<string, string> dic, FilterDefinition<TEntity> filter, IClientSessionHandle session = )
{
var t = new TEntity();
// Fields to be updated
var list = new List<UpdateDefinition<TEntity>>();
foreach (var item in t.GetType().GetProperties())
{
if (!dic.ContainsKey(item.Name)) continue;
var value = dic[item.Name];
list.Add(Builders<TEntity>.Update.Set(item.Name, value));
}
var updatefilter = Builders<TEntity>.Update.Combine(list);
if (session == )
return await _dbSet.UpdateManyAsync(filter, updatefilter);
await _dbContext.AddCommandAsync(async (session) => await _dbSet.UpdateManyAsync(filter, updatefilter));
return new UpdateResult.Acknowledged(10, 10, );
}
#endregion
#region Read Part
public async Task<TEntity> GetAsync(Expression<Func<TEntity, bool>> expression, bool readFromPrimary = true)
{
var readPreference = GetReadPreference(readFromPrimary);
var queryData = await _dbSet.WithReadPreference(readPreference)
.Find(expression)
.FirstOrDefaultAsync();
return queryData;
}
public async Task<TEntity> GetAsync(string id, bool readFromPrimary = true)
{
var readPreference = GetReadPreference(readFromPrimary);
var queryData = await _dbSet.WithReadPreference(readPreference).FindAsync(Builders<TEntity>.Filter.Eq(_keyField, new ObjectId(id)));
return queryData.FirstOrDefault();
}
public async Task<IEnumerable<TEntity>> GetAllAsync(bool readFromPrimary = true)
{
var readPreference = GetReadPreference(readFromPrimary);
var queryAllData = await _dbSet.WithReadPreference(readPreference).FindAsync(Builders<TEntity>.Filter.Empty);
return queryAllData.ToList();
}
public async Task<long> CountAsync(Expression<Func<TEntity, bool>> expression, bool readFromPrimary = true)
{
var readPreference = GetReadPreference(readFromPrimary);
return await _dbSet.WithReadPreference(readPreference).CountDocumentsAsync(expression);
}
public async Task<long> CountAsync(FilterDefinition<TEntity> filter, bool readFromPrimary = true)
{
var readPreference = GetReadPreference(readFromPrimary);
return await _dbSet.WithReadPreference(readPreference).CountDocumentsAsync(filter);
}
public async Task<bool> ExistsAsync(Expression<Func<TEntity, bool>> predicate, bool readFromPrimary = true)
{
var readPreference = GetReadPreference(readFromPrimary);
return await Task.FromResult(_dbSet.WithReadPreference(readPreference).AsQueryable().Any(predicate));
}
public async Task<List<TEntity>> FindListAsync(FilterDefinition<TEntity> filter, string[]? field = , SortDefinition<TEntity>? sort = , bool readFromPrimary = true)
{
var readPreference = GetReadPreference(readFromPrimary);
if (field == || field.Length == 0)
{
if (sort == )
return await _dbSet.WithReadPreference(readPreference).Find(filter).ToListAsync();
return await _dbSet.WithReadPreference(readPreference).Find(filter).Sort(sort).ToListAsync();
}
var fieldList = new List<ProjectionDefinition<TEntity>>();
for (int i = 0; i < field.Length; i++)
{
fieldList.Add(Builders<TEntity>.Projection.Include(field[i].ToString()));
}
var projection = Builders<TEntity>.Projection.Combine(fieldList);
fieldList?.Clear();
if (sort == )
return await _dbSet.WithReadPreference(readPreference).Find(filter).Project<TEntity>(projection).ToListAsync();
return await _dbSet.WithReadPreference(readPreference).Find(filter).Sort(sort).Project<TEntity>(projection).ToListAsync();
}
public async Task<List<TEntity>> FindListByPageAsync(FilterDefinition<TEntity> filter, int pageIndex, int pageSize, string[]? field = , SortDefinition<TEntity>? sort = , bool readFromPrimary = true)
{
var readPreference = GetReadPreference(readFromPrimary);
if (field == || field.Length == 0)
{
if (sort == )
return await _dbSet.WithReadPreference(readPreference).Find(filter).Skip((pageIndex - 1) * pageSize).Limit(pageSize).ToListAsync();
return await _dbSet.WithReadPreference(readPreference).Find(filter).Sort(sort).Skip((pageIndex - 1) * pageSize).Limit(pageSize).ToListAsync();
}
var fieldList = new List<ProjectionDefinition<TEntity>>();
for (int i = 0; i < field.Length; i++)
{
fieldList.Add(Builders<TEntity>.Projection.Include(field[i].ToString()));
}
var projection = Builders<TEntity>.Projection.Combine(fieldList);
fieldList?.Clear();
if (sort == )
return await _dbSet.WithReadPreference(readPreference).Find(filter).Project<TEntity>(projection).Skip((pageIndex - 1) * pageSize).Limit(pageSize).ToListAsync();
return await _dbSet.WithReadPreference(readPreference).Find(filter).Sort(sort).Project<TEntity>(projection).Skip((pageIndex - 1) * pageSize).Limit(pageSize).ToListAsync();
}
#endregion
#region Protected Methods
protected ReadPreference GetReadPreference(bool readFromPrimary)
{
if (readFromPrimary)
return ReadPreference.PrimaryPreferred;
else
return ReadPreference.SecondaryPreferred;
}
#endregion
}
工作單元:UnitOfWork
在實際項目中,在對多個Repository操作之后,我們希望有一個統一的提交操作來實現事務的原子性。因此,我們可以有一個UnitOfWork來作為代理:
public class UnitOfWork : IUnitOfWork
{
private readonly IMongoDbContext _context;
public UnitOfWork(IMongoDbContext context)
{
_context = context;
}
public bool SaveChanges(IClientSessionHandle session)
{
return _context.Commit(session) > 0;
}
public async Task<bool> SaveChangesAsync(IClientSessionHandle session)
{
return await _context.CommitAsync(session) > 0;
}
public IClientSessionHandle BeginTransaction()
{
return _context.StartSession();
}
public async Task<IClientSessionHandle> BeginTransactionAsync()
{
return await _context.StartSessionAsync();
}
public void Dispose()
{
_context.Dispose();
}
}
封裝注入:ServiceCollectionExtensions
為了便于應用中快速注入,我們可以簡單封裝一個擴展方法,快速注入相關的核心組成部分:
public static class ServiceCollectionExtensions
{
/// <summary>
/// MongoDB Config Injection
/// </summary>
public static IServiceCollection AddMongoProxy(this IServiceCollection services, IConfiguration configuration)
{
if (!configuration.GetSection(nameof(MongoDatabaseConfigs)).Exists())
return services;
services.Configure<MongoDatabaseConfigs>(configuration.GetSection(nameof(MongoDatabaseConfigs)));
services.AddSingleton(sp => sp.GetRequiredService<IOptions<MongoDatabaseConfigs>>().Value);
services.AddSingleton<IMongoDbConnection, MongoDbConnection>();
services.AddScoped<IMongoDbContext, MongoDbContext>();
services.AddScoped<IUnitOfWork, UnitOfWork>();
return services;
}
}
如何使用:三步上籃
第一步:注入MongoProxy核心部分
在appsettings中配置MongoDB的連接信息:
"MongoDatabaseConfigs": {
"Servers": "xxx01.edisontalk.net,xxx02.edisontalk.net,xxx03.edisontalk.net",
"Port": 27017,
"ReplicaSetName": "edt-replica",
"DatabaseName": "EDT_Practices",
"UserName": "xxxxxxxxxxxxx",
"Password": "xxxxxxxxxxxxx"
}
然后通過擴展方法注入MongoProxy相關部分:
builder.Services.AddMongoProxy(builder.Configuration);
第二步:添加Entity 和 Repository
示例Entity:
[ ]
public class OrderEntity : MongoEntityBase, IEntity
{
public string OrderNumber { get; set; }
public List<TransmissionEntity> Transmissions { get; set; }
}
示例Repository:
public interface ITodoItemRepository : IMongoRepositoryBase<TodoItem>
{
}
public class TodoItemRepository : MongoRepositoryBase<TodoItem>, ITodoItemRepository
{
public TodoItemRepository(IMongoDbContext mongoDbContext)
: base(mongoDbContext)
{
}
}
services.AddScoped<ITodoItemRepository, TodoItemRepository>();
services.AddScoped<IOrderRepository, OrderRepository>();
第三步:使用Repository 和 UnitOfWork
await _taskRepository.AddManyAsync(newTasks);
private readonly IUnitOfWork _unitOfWork;
public OrderService(IUnitOfWork unitOfWork, ......)
{
_unitOfWork = unitOfWork;
......
}
public async Task Example()
{
using var session = await _unitOfWork.BeginTransactionAsync())
await _taskRepository.AddManyAsync(newTasks, session);
await _orderRepository.AddAsync(newOrder, session);
await _unitOfWork.SaveChangesAsync(session);
}
小結
本文介紹了MongoDB事務的基本概念和如何通過.NET操作事務,重點介紹了EDT.MongoProxy這個小組件的設計,讓我們可以在ASP.NET 6應用中通過數據倉儲(Repository)和工作單元(UnitOfWork)的模式來快速方便地操作MongoDB的事務。
參考代碼
本文代碼并未提供所有的,如需查看,請至下面的代碼倉庫中查看,也可以點個贊給點鼓勵。
GitHub:https://github.com/Coder-EdisonZhou/EDT.MongoProxy
參考資料
追逐時光者,《.NET Core MongoDB數據倉儲和工作單元實操》:https://www.cnblogs.com/Can-daydayup/p/17157135.html
*本文主要設計參考自這篇文章,值得一讀!
TheCodeBuzz,《MongoDB Repository Implementation in .NET Core》:
https://www.thecodebuzz.com/mongodb-repository-implementation-unit-testing-net-core-example
Bryan Avery, 《ASP.NET Core - MongoDB Repository Pattern & Unit Of Work》:
https://bryanavery.co.uk/asp-net-core-mongodb-repository-pattern-unit-of-work
*請認真填寫需求信息,我們會在24小時內與您取得聯系。