From c71ce9a692204164dfcb59e38334a18bd883aa50 Mon Sep 17 00:00:00 2001 From: MxEmerson <2382413024@qq.com> Date: Sat, 23 Mar 2024 02:12:18 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=94=AF=E6=8C=81=E4=BD=BF=E7=94=A8rpc?= =?UTF-8?q?=E6=96=B9=E5=BC=8F=E6=93=8D=E4=BD=9C=E6=95=B0=E6=8D=AE=E5=BA=93?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 基于gRPC包装了pymongo中的若干个方法,使客户端无感(真的吗)切换到通过rpc --- .env | 8 + rpc_server/pymongo_rpc.proto | 81 ++++++ rpc_server/pymongo_rpc_pb2.py | 54 ++++ rpc_server/pymongo_rpc_pb2_grpc.py | 264 +++++++++++++++++++ rpc_server/server.py | 103 ++++++++ src/common/config/__init__.py | 11 +- src/common/utils/media_cache/__init__.py | 16 +- src/common/utils/rpc/__init__.py | 105 ++++++++ src/common/utils/rpc/pymongo_rpc.proto | 81 ++++++ src/common/utils/rpc/pymongo_rpc_pb2.py | 54 ++++ src/common/utils/rpc/pymongo_rpc_pb2_grpc.py | 264 +++++++++++++++++++ src/plugins/repeater/model.py | 6 +- tests/rpc/test.py | 63 +++++ 13 files changed, 1103 insertions(+), 7 deletions(-) create mode 100644 rpc_server/pymongo_rpc.proto create mode 100644 rpc_server/pymongo_rpc_pb2.py create mode 100644 rpc_server/pymongo_rpc_pb2_grpc.py create mode 100644 rpc_server/server.py create mode 100644 src/common/utils/rpc/__init__.py create mode 100644 src/common/utils/rpc/pymongo_rpc.proto create mode 100644 src/common/utils/rpc/pymongo_rpc_pb2.py create mode 100644 src/common/utils/rpc/pymongo_rpc_pb2_grpc.py create mode 100644 tests/rpc/test.py diff --git a/.env b/.env index 5048224b..1088639c 100644 --- a/.env +++ b/.env @@ -8,8 +8,16 @@ COMMAND_START=["","/"] # 默认牛牛轮盘模式,0为踢人,1为禁言 #DEFAULT_ROULETTE_MODE=0 +# 是否使用RPC连接数据库 +# 请提前安装依赖:pip install grpcio grpcio-tools grpcio-reflection +#USE_RPC=false + +# gRPC token +#RPC_TOKEN=your_rpc_token + # mongodb 相关配置,如无特殊需求,保持注释即可 # 使用 docker-compose 部署时,请将MONGO_HOST设置为 mongodb 容器 的 service 名称,如:MONGO_HOST=mongodb +# 使用RPC连接数据库时,请将mongodb的主机和端口替换为rpc服务的主机和端口 #MONGO_HOST=127.0.0.1 #MONGO_PORT=27017 diff --git a/rpc_server/pymongo_rpc.proto b/rpc_server/pymongo_rpc.proto new file mode 100644 index 00000000..febffb72 --- /dev/null +++ b/rpc_server/pymongo_rpc.proto @@ -0,0 +1,81 @@ +syntax = "proto3"; + +package pymongoRPC; + +service MongoDBService { + rpc Find(FindRequest) returns (FindResponse); + rpc FindOne(FindOneRequest) returns (FindOneResponse); + rpc InsertOne(InsertOneRequest) returns (InsertOneResponse); + rpc InsertMany(InsertManyRequest) returns (InsertManyResponse); + rpc UpdateOne(UpdateOneRequest) returns (UpdateOneResponse); + rpc DeleteMany(DeleteManyRequest) returns (DeleteManyResponse); + rpc CreateIndex(CreateIndexRequest) returns (CreateIndexResponse); +} + +message FindRequest { + string collection = 1; + string filter = 2; // JSON +} + +message FindResponse { + string documents = 1; // JSON +} + +message FindOneRequest { + string collection = 1; + string filter = 2; // JSON +} + +message FindOneResponse { + string document = 1; // JSON +} + +message InsertOneRequest { + string collection = 1; + string document = 2; // JSON +} + +message InsertOneResponse { + string insertedId = 1; +} + +message InsertManyRequest { + string collection = 1; + string documents = 2; //JSON +} + +message InsertManyResponse { + string insertedIds = 1; +} + +message UpdateOneRequest { + string collection = 1; + string filter = 2; // JSON + string update = 3; // JSON + bool upsert = 4; +} + +message UpdateOneResponse { + int32 matchedCount = 1; + int32 modifiedCount = 2; +} + +message DeleteManyRequest { + string collection = 1; + string filter = 2; // JSON +} + +message DeleteManyResponse { + int32 deletedCount = 1; +} + +message CreateIndexRequest { + string collection = 1; + string keys = 2; // JSON + string name = 3; + string default_language = 4; +} + +message CreateIndexResponse { + string indexName = 1; +} diff --git a/rpc_server/pymongo_rpc_pb2.py b/rpc_server/pymongo_rpc_pb2.py new file mode 100644 index 00000000..d9ddb403 --- /dev/null +++ b/rpc_server/pymongo_rpc_pb2.py @@ -0,0 +1,54 @@ +# -*- coding: utf-8 -*- +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: pymongo_rpc.proto +# Protobuf Python Version: 4.25.1 +"""Generated protocol buffer code.""" +from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pool as _descriptor_pool +from google.protobuf import symbol_database as _symbol_database +from google.protobuf.internal import builder as _builder +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + + + +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x11pymongo_rpc.proto\x12\npymongoRPC\"1\n\x0b\x46indRequest\x12\x12\n\ncollection\x18\x01 \x01(\t\x12\x0e\n\x06\x66ilter\x18\x02 \x01(\t\"!\n\x0c\x46indResponse\x12\x11\n\tdocuments\x18\x01 \x01(\t\"4\n\x0e\x46indOneRequest\x12\x12\n\ncollection\x18\x01 \x01(\t\x12\x0e\n\x06\x66ilter\x18\x02 \x01(\t\"#\n\x0f\x46indOneResponse\x12\x10\n\x08\x64ocument\x18\x01 \x01(\t\"8\n\x10InsertOneRequest\x12\x12\n\ncollection\x18\x01 \x01(\t\x12\x10\n\x08\x64ocument\x18\x02 \x01(\t\"\'\n\x11InsertOneResponse\x12\x12\n\ninsertedId\x18\x01 \x01(\t\":\n\x11InsertManyRequest\x12\x12\n\ncollection\x18\x01 \x01(\t\x12\x11\n\tdocuments\x18\x02 \x01(\t\")\n\x12InsertManyResponse\x12\x13\n\x0binsertedIds\x18\x01 \x01(\t\"V\n\x10UpdateOneRequest\x12\x12\n\ncollection\x18\x01 \x01(\t\x12\x0e\n\x06\x66ilter\x18\x02 \x01(\t\x12\x0e\n\x06update\x18\x03 \x01(\t\x12\x0e\n\x06upsert\x18\x04 \x01(\x08\"@\n\x11UpdateOneResponse\x12\x14\n\x0cmatchedCount\x18\x01 \x01(\x05\x12\x15\n\rmodifiedCount\x18\x02 \x01(\x05\"7\n\x11\x44\x65leteManyRequest\x12\x12\n\ncollection\x18\x01 \x01(\t\x12\x0e\n\x06\x66ilter\x18\x02 \x01(\t\"*\n\x12\x44\x65leteManyResponse\x12\x14\n\x0c\x64\x65letedCount\x18\x01 \x01(\x05\"^\n\x12\x43reateIndexRequest\x12\x12\n\ncollection\x18\x01 \x01(\t\x12\x0c\n\x04keys\x18\x02 \x01(\t\x12\x0c\n\x04name\x18\x03 \x01(\t\x12\x18\n\x10\x64\x65\x66\x61ult_language\x18\x04 \x01(\t\"(\n\x13\x43reateIndexResponse\x12\x11\n\tindexName\x18\x01 \x01(\t2\x8d\x04\n\x0eMongoDBService\x12\x39\n\x04\x46ind\x12\x17.pymongoRPC.FindRequest\x1a\x18.pymongoRPC.FindResponse\x12\x42\n\x07\x46indOne\x12\x1a.pymongoRPC.FindOneRequest\x1a\x1b.pymongoRPC.FindOneResponse\x12H\n\tInsertOne\x12\x1c.pymongoRPC.InsertOneRequest\x1a\x1d.pymongoRPC.InsertOneResponse\x12K\n\nInsertMany\x12\x1d.pymongoRPC.InsertManyRequest\x1a\x1e.pymongoRPC.InsertManyResponse\x12H\n\tUpdateOne\x12\x1c.pymongoRPC.UpdateOneRequest\x1a\x1d.pymongoRPC.UpdateOneResponse\x12K\n\nDeleteMany\x12\x1d.pymongoRPC.DeleteManyRequest\x1a\x1e.pymongoRPC.DeleteManyResponse\x12N\n\x0b\x43reateIndex\x12\x1e.pymongoRPC.CreateIndexRequest\x1a\x1f.pymongoRPC.CreateIndexResponseb\x06proto3') + +_globals = globals() +_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) +_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'pymongo_rpc_pb2', _globals) +if _descriptor._USE_C_DESCRIPTORS == False: + DESCRIPTOR._options = None + _globals['_FINDREQUEST']._serialized_start=33 + _globals['_FINDREQUEST']._serialized_end=82 + _globals['_FINDRESPONSE']._serialized_start=84 + _globals['_FINDRESPONSE']._serialized_end=117 + _globals['_FINDONEREQUEST']._serialized_start=119 + _globals['_FINDONEREQUEST']._serialized_end=171 + _globals['_FINDONERESPONSE']._serialized_start=173 + _globals['_FINDONERESPONSE']._serialized_end=208 + _globals['_INSERTONEREQUEST']._serialized_start=210 + _globals['_INSERTONEREQUEST']._serialized_end=266 + _globals['_INSERTONERESPONSE']._serialized_start=268 + _globals['_INSERTONERESPONSE']._serialized_end=307 + _globals['_INSERTMANYREQUEST']._serialized_start=309 + _globals['_INSERTMANYREQUEST']._serialized_end=367 + _globals['_INSERTMANYRESPONSE']._serialized_start=369 + _globals['_INSERTMANYRESPONSE']._serialized_end=410 + _globals['_UPDATEONEREQUEST']._serialized_start=412 + _globals['_UPDATEONEREQUEST']._serialized_end=498 + _globals['_UPDATEONERESPONSE']._serialized_start=500 + _globals['_UPDATEONERESPONSE']._serialized_end=564 + _globals['_DELETEMANYREQUEST']._serialized_start=566 + _globals['_DELETEMANYREQUEST']._serialized_end=621 + _globals['_DELETEMANYRESPONSE']._serialized_start=623 + _globals['_DELETEMANYRESPONSE']._serialized_end=665 + _globals['_CREATEINDEXREQUEST']._serialized_start=667 + _globals['_CREATEINDEXREQUEST']._serialized_end=761 + _globals['_CREATEINDEXRESPONSE']._serialized_start=763 + _globals['_CREATEINDEXRESPONSE']._serialized_end=803 + _globals['_MONGODBSERVICE']._serialized_start=806 + _globals['_MONGODBSERVICE']._serialized_end=1331 +# @@protoc_insertion_point(module_scope) diff --git a/rpc_server/pymongo_rpc_pb2_grpc.py b/rpc_server/pymongo_rpc_pb2_grpc.py new file mode 100644 index 00000000..6447dcb2 --- /dev/null +++ b/rpc_server/pymongo_rpc_pb2_grpc.py @@ -0,0 +1,264 @@ +# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! +"""Client and server classes corresponding to protobuf-defined services.""" +import grpc + +import pymongo_rpc_pb2 as pymongo__rpc__pb2 + + +class MongoDBServiceStub(object): + """Missing associated documentation comment in .proto file.""" + + def __init__(self, channel): + """Constructor. + + Args: + channel: A grpc.Channel. + """ + self.Find = channel.unary_unary( + '/pymongoRPC.MongoDBService/Find', + request_serializer=pymongo__rpc__pb2.FindRequest.SerializeToString, + response_deserializer=pymongo__rpc__pb2.FindResponse.FromString, + ) + self.FindOne = channel.unary_unary( + '/pymongoRPC.MongoDBService/FindOne', + request_serializer=pymongo__rpc__pb2.FindOneRequest.SerializeToString, + response_deserializer=pymongo__rpc__pb2.FindOneResponse.FromString, + ) + self.InsertOne = channel.unary_unary( + '/pymongoRPC.MongoDBService/InsertOne', + request_serializer=pymongo__rpc__pb2.InsertOneRequest.SerializeToString, + response_deserializer=pymongo__rpc__pb2.InsertOneResponse.FromString, + ) + self.InsertMany = channel.unary_unary( + '/pymongoRPC.MongoDBService/InsertMany', + request_serializer=pymongo__rpc__pb2.InsertManyRequest.SerializeToString, + response_deserializer=pymongo__rpc__pb2.InsertManyResponse.FromString, + ) + self.UpdateOne = channel.unary_unary( + '/pymongoRPC.MongoDBService/UpdateOne', + request_serializer=pymongo__rpc__pb2.UpdateOneRequest.SerializeToString, + response_deserializer=pymongo__rpc__pb2.UpdateOneResponse.FromString, + ) + self.DeleteMany = channel.unary_unary( + '/pymongoRPC.MongoDBService/DeleteMany', + request_serializer=pymongo__rpc__pb2.DeleteManyRequest.SerializeToString, + response_deserializer=pymongo__rpc__pb2.DeleteManyResponse.FromString, + ) + self.CreateIndex = channel.unary_unary( + '/pymongoRPC.MongoDBService/CreateIndex', + request_serializer=pymongo__rpc__pb2.CreateIndexRequest.SerializeToString, + response_deserializer=pymongo__rpc__pb2.CreateIndexResponse.FromString, + ) + + +class MongoDBServiceServicer(object): + """Missing associated documentation comment in .proto file.""" + + def Find(self, request, context): + """Missing associated documentation comment in .proto file.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def FindOne(self, request, context): + """Missing associated documentation comment in .proto file.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def InsertOne(self, request, context): + """Missing associated documentation comment in .proto file.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def InsertMany(self, request, context): + """Missing associated documentation comment in .proto file.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def UpdateOne(self, request, context): + """Missing associated documentation comment in .proto file.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def DeleteMany(self, request, context): + """Missing associated documentation comment in .proto file.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def CreateIndex(self, request, context): + """Missing associated documentation comment in .proto file.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + +def add_MongoDBServiceServicer_to_server(servicer, server): + rpc_method_handlers = { + 'Find': grpc.unary_unary_rpc_method_handler( + servicer.Find, + request_deserializer=pymongo__rpc__pb2.FindRequest.FromString, + response_serializer=pymongo__rpc__pb2.FindResponse.SerializeToString, + ), + 'FindOne': grpc.unary_unary_rpc_method_handler( + servicer.FindOne, + request_deserializer=pymongo__rpc__pb2.FindOneRequest.FromString, + response_serializer=pymongo__rpc__pb2.FindOneResponse.SerializeToString, + ), + 'InsertOne': grpc.unary_unary_rpc_method_handler( + servicer.InsertOne, + request_deserializer=pymongo__rpc__pb2.InsertOneRequest.FromString, + response_serializer=pymongo__rpc__pb2.InsertOneResponse.SerializeToString, + ), + 'InsertMany': grpc.unary_unary_rpc_method_handler( + servicer.InsertMany, + request_deserializer=pymongo__rpc__pb2.InsertManyRequest.FromString, + response_serializer=pymongo__rpc__pb2.InsertManyResponse.SerializeToString, + ), + 'UpdateOne': grpc.unary_unary_rpc_method_handler( + servicer.UpdateOne, + request_deserializer=pymongo__rpc__pb2.UpdateOneRequest.FromString, + response_serializer=pymongo__rpc__pb2.UpdateOneResponse.SerializeToString, + ), + 'DeleteMany': grpc.unary_unary_rpc_method_handler( + servicer.DeleteMany, + request_deserializer=pymongo__rpc__pb2.DeleteManyRequest.FromString, + response_serializer=pymongo__rpc__pb2.DeleteManyResponse.SerializeToString, + ), + 'CreateIndex': grpc.unary_unary_rpc_method_handler( + servicer.CreateIndex, + request_deserializer=pymongo__rpc__pb2.CreateIndexRequest.FromString, + response_serializer=pymongo__rpc__pb2.CreateIndexResponse.SerializeToString, + ), + } + generic_handler = grpc.method_handlers_generic_handler( + 'pymongoRPC.MongoDBService', rpc_method_handlers) + server.add_generic_rpc_handlers((generic_handler,)) + + + # This class is part of an EXPERIMENTAL API. +class MongoDBService(object): + """Missing associated documentation comment in .proto file.""" + + @staticmethod + def Find(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary(request, target, '/pymongoRPC.MongoDBService/Find', + pymongo__rpc__pb2.FindRequest.SerializeToString, + pymongo__rpc__pb2.FindResponse.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) + + @staticmethod + def FindOne(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary(request, target, '/pymongoRPC.MongoDBService/FindOne', + pymongo__rpc__pb2.FindOneRequest.SerializeToString, + pymongo__rpc__pb2.FindOneResponse.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) + + @staticmethod + def InsertOne(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary(request, target, '/pymongoRPC.MongoDBService/InsertOne', + pymongo__rpc__pb2.InsertOneRequest.SerializeToString, + pymongo__rpc__pb2.InsertOneResponse.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) + + @staticmethod + def InsertMany(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary(request, target, '/pymongoRPC.MongoDBService/InsertMany', + pymongo__rpc__pb2.InsertManyRequest.SerializeToString, + pymongo__rpc__pb2.InsertManyResponse.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) + + @staticmethod + def UpdateOne(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary(request, target, '/pymongoRPC.MongoDBService/UpdateOne', + pymongo__rpc__pb2.UpdateOneRequest.SerializeToString, + pymongo__rpc__pb2.UpdateOneResponse.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) + + @staticmethod + def DeleteMany(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary(request, target, '/pymongoRPC.MongoDBService/DeleteMany', + pymongo__rpc__pb2.DeleteManyRequest.SerializeToString, + pymongo__rpc__pb2.DeleteManyResponse.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) + + @staticmethod + def CreateIndex(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary(request, target, '/pymongoRPC.MongoDBService/CreateIndex', + pymongo__rpc__pb2.CreateIndexRequest.SerializeToString, + pymongo__rpc__pb2.CreateIndexResponse.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) diff --git a/rpc_server/server.py b/rpc_server/server.py new file mode 100644 index 00000000..b465c31e --- /dev/null +++ b/rpc_server/server.py @@ -0,0 +1,103 @@ +from functools import wraps +from concurrent import futures +from bson import ObjectId, json_util +import grpc +import pymongo +import json +from grpc import ServerInterceptor +import pymongo_rpc_pb2_grpc +import pymongo_rpc_pb2 + + +class AuthenticationInterceptor(ServerInterceptor): + def __init__(self, valid_token): + self.valid_token = valid_token + + def intercept_service(self, continuation, handler_call_details): + metadata = dict(handler_call_details.invocation_metadata) + token = metadata.get('authorization') + + if token == self.valid_token: + return continuation(handler_call_details) + else: + raise grpc.RpcError("Invalid token") + + +class MongoDBService(pymongo_rpc_pb2_grpc.MongoDBServiceServicer): + _collections = {} + + def __init__(self): + self.client = pymongo.MongoClient("mongodb://localhost:27017/") + self.db = self.client["PallasBot"] + + def __del__(self): + self.client.close() + + def _get_collection(self, collection_name) -> pymongo.collection.Collection: + if collection_name not in MongoDBService._collections: + collection = self.db[collection_name] + MongoDBService._collections[collection_name] = collection + return MongoDBService._collections[collection_name] + + def Find(self, request, context): + collection = self._get_collection(request.collection) + filter = json.loads(request.filter) + cursor = collection.find(filter) + documents = list(cursor) + return pymongo_rpc_pb2.FindResponse(documents=json_util.dumps(documents, default=str)) + + def FindOne(self, request, context): + collection = self._get_collection(request.collection) + filter = json.loads(request.filter) + document = collection.find_one(filter) + return pymongo_rpc_pb2.FindOneResponse(document=json_util.dumps(document, default=str)) + + def InsertOne(self, request, context): + collection = self._get_collection(request.collection) + document = json_util.loads(request.document) + result = collection.insert_one(document) + return pymongo_rpc_pb2.InsertOneResponse(insertedId=str(result.inserted_id)) + + def InsertMany(self, request, context): + collection = self._get_collection(request.collection) + documents = json_util.loads(request.documents) + result = collection.insert_many(documents) + return pymongo_rpc_pb2.InsertManyResponse(insertedIds=json_util.dumps(result.inserted_ids, default=str)) + + def UpdateOne(self, request, context): + collection = self._get_collection(request.collection) + filter = json.loads(request.filter) + update = json.loads(request.update) + upsert = request.upsert + result = collection.update_one(filter, update, upsert=upsert) + return pymongo_rpc_pb2.UpdateOneResponse(matchedCount=result.matched_count, modifiedCount=result.modified_count) + + def DeleteMany(self, request, context): + collection = self._get_collection(request.collection) + filter = json.loads(request.filter) + result = collection.delete_many(filter) + return pymongo_rpc_pb2.DeleteManyResponse(deletedCount=result.deleted_count) + + def CreateIndex(self, request, context): + collection = self._get_collection(request.collection) + kwargs = {'keys': eval(request.keys)} + for attr in ['name', 'default_language']: + value = getattr(request, attr) + if value: + kwargs[attr] = value + result = collection.create_index(**kwargs) + return pymongo_rpc_pb2.CreateIndexResponse(indexName=result) + + +def serve(): + server = grpc.server(futures.ThreadPoolExecutor(max_workers=10), + interceptors=[AuthenticationInterceptor(valid_token="your_rpc_token")]) + pymongo_rpc_pb2_grpc.add_MongoDBServiceServicer_to_server( + MongoDBService(), server) + server.add_insecure_port('[::]:50051') + server.start() + server.wait_for_termination() + + +if __name__ == '__main__': + serve() diff --git a/src/common/config/__init__.py b/src/common/config/__init__.py index 752a6e3d..bbc84cee 100644 --- a/src/common/config/__init__.py +++ b/src/common/config/__init__.py @@ -10,6 +10,10 @@ class PluginConfig(BaseModel, extra=Extra.ignore): + # 是否使用云端数据库 + use_rpc: bool = False + # 远程数据库token + rpc_token: str = '' # 默认轮盘模式 default_roulette_mode: int = 0 # mongodb host @@ -69,6 +73,11 @@ class PluginConfig(BaseModel, extra=Extra.ignore): plugin_config = PluginConfig.parse_obj(get_driver().config) +if plugin_config.use_rpc: + from src.common.utils.rpc import MongoClient +else: + from pymongo import MongoClient + class Config(ABC): _config_mongo: Optional[Collection] = None @@ -78,7 +87,7 @@ class Config(ABC): @classmethod def _get_config_mongo(cls) -> Collection: if cls._config_mongo is None: - mongo_client = pymongo.MongoClient( + mongo_client = MongoClient( plugin_config.mongo_host, plugin_config.mongo_port) mongo_db = mongo_client['PallasBot'] cls._config_mongo = mongo_db[cls._table] diff --git a/src/common/utils/media_cache/__init__.py b/src/common/utils/media_cache/__init__.py index 07979263..36c554f2 100644 --- a/src/common/utils/media_cache/__init__.py +++ b/src/common/utils/media_cache/__init__.py @@ -4,8 +4,14 @@ import re from datetime import datetime, timedelta from typing import Optional +from src.common.config import plugin_config -mongo_client = pymongo.MongoClient('127.0.0.1', 27017) +if plugin_config.use_rpc: + from src.common.utils.rpc import MongoClient +else: + from pymongo import MongoClient + +mongo_client = MongoClient(plugin_config.mongo_host, plugin_config.mongo_port) mongo_db = mongo_client['PallasBot'] image_cache = mongo_db['image_cache'] @@ -23,16 +29,16 @@ async def insert_image(image_seg): '$inc': {'ref_times': 1}, '$set': {'date': idate}, } - + cache = image_cache.find_one(db_filter) - + ref_times = 0 if cache: if "ref_times" in cache: ref_times = cache["ref_times"] else: ref_times = 1 - + ref_times += 1 # 不是经常收到的图不缓存,不然会占用大量空间 @@ -46,7 +52,7 @@ async def insert_image(image_seg): base64_data = base64.b64encode(rsp.content) base64_data = base64_data.decode() - + db_update['$set']['base64_data'] = base64_data image_cache.update_one(db_filter, db_update, upsert=True) diff --git a/src/common/utils/rpc/__init__.py b/src/common/utils/rpc/__init__.py new file mode 100644 index 00000000..6d968507 --- /dev/null +++ b/src/common/utils/rpc/__init__.py @@ -0,0 +1,105 @@ +import json +import grpc +from typing import List, Dict, Tuple, Optional, Type, TypeVar, Any +from src.common.utils.rpc import pymongo_rpc_pb2, pymongo_rpc_pb2_grpc +from src.common.config import plugin_config + + +T = TypeVar('T') + + +class CollectionProxy: + def __init__(self, rpc_client: Type['MongoClient'], collection_name: str): + self.rpc_client = rpc_client + self.collection_name = collection_name + + def __getitem__(self: T, collection_name: str) -> T: + return CollectionProxy(self.rpc_client, collection_name) + + def find(self, filter: Dict = {}) -> List[Dict[str, Any]]: + return self.rpc_client.find(self.collection_name, json.dumps(filter)) + + def find_one(self, filter: Dict) -> Dict[str, Any]: + return self.rpc_client.find_one(self.collection_name, json.dumps(filter)) + + def insert_one(self, document: Dict) -> str: + return self.rpc_client.insert_one(self.collection_name, json.dumps(document)) + + def insert_many(self, documents: List[Dict]) -> List[str]: + return self.rpc_client.insert_many(self.collection_name, json.dumps(documents)) + + def update_one(self, filter: Dict, update: Dict, upsert: bool = False) -> Tuple[int, int]: + return self.rpc_client.update_one(self.collection_name, json.dumps(filter), json.dumps(update), upsert) + + def delete_many(self, filter: Dict) -> int: + return self.rpc_client.delete_many(self.collection_name, json.dumps(filter)) + + def create_index(self, keys: List[Tuple], name: Optional[str] = None, default_language: Optional[str] = None) -> str: + return self.rpc_client.create_index(self.collection_name, str(keys), name, default_language) + + +class MongoClient: + def __init__(self, mongo_host: str, mongo_port: str, **kwargs): + self.channel = grpc.insecure_channel(f'{mongo_host}:{mongo_port}') + self.stub = pymongo_rpc_pb2_grpc.MongoDBServiceStub(self.channel) + self.metadata = [('authorization', plugin_config.rpc_token)] + + def __del__(self): + self.channel.close() + + def __getitem__(self, collection_name: str) -> CollectionProxy: + return CollectionProxy(self, collection_name) + + def find(self, collection: str, filter: str) -> List[Dict[str, Any]]: + request = pymongo_rpc_pb2.FindRequest( + collection=collection, filter=filter) + response = self.stub.Find(request, metadata=self.metadata) + return json.loads(response.documents) + + def find_one(self, collection: str, filter: str) -> Dict[str, Any]: + request = pymongo_rpc_pb2.FindOneRequest( + collection=collection, filter=filter) + response = self.stub.FindOne(request, metadata=self.metadata) + return json.loads(response.document) + + def insert_one(self, collection: str, document: str) -> str: + request = pymongo_rpc_pb2.InsertOneRequest( + collection=collection, document=document) + response = self.stub.InsertOne(request, metadata=self.metadata) + return response.insertedId + + def insert_many(self, collection: str, documents: str) -> List[str]: + request = pymongo_rpc_pb2.InsertManyRequest( + collection=collection, documents=documents) + response = self.stub.InsertMany(request, metadata=self.metadata) + return json.loads(response.insertedIds) + + def update_one(self, collection: str, filter: str, update: str, upsert: bool = False) -> Tuple[int, int]: + request = pymongo_rpc_pb2.UpdateOneRequest( + collection=collection, filter=filter, update=update, upsert=upsert) + response = self.stub.UpdateOne(request, metadata=self.metadata) + return response.matchedCount, response.modifiedCount + + def delete_many(self, collection: str, filter: str) -> int: + request = pymongo_rpc_pb2.DeleteManyRequest( + collection=collection, filter=filter) + response = self.stub.DeleteMany(request, metadata=self.metadata) + return response.deletedCount + + def create_index(self, collection: str, keys: str, name: Optional[str] = None, default_language: Optional[str] = None) -> str: + request = pymongo_rpc_pb2.CreateIndexRequest( + collection=collection, keys=keys, name=name, default_language=default_language) + response = self.stub.CreateIndex(request, metadata=self.metadata) + return response.indexName + + def close(self): + self.channel.close() + + +if __name__ == '__main__': + mongo_client = MongoClient('localhost', 50051) + mongo_db = mongo_client['PallasBot'] + context_mongo = mongo_db['context'] + find_key = {"keywords": "牛牛 唱歌"} + result = context_mongo.find_one(find_key) + print(result) diff --git a/src/common/utils/rpc/pymongo_rpc.proto b/src/common/utils/rpc/pymongo_rpc.proto new file mode 100644 index 00000000..febffb72 --- /dev/null +++ b/src/common/utils/rpc/pymongo_rpc.proto @@ -0,0 +1,81 @@ +syntax = "proto3"; + +package pymongoRPC; + +service MongoDBService { + rpc Find(FindRequest) returns (FindResponse); + rpc FindOne(FindOneRequest) returns (FindOneResponse); + rpc InsertOne(InsertOneRequest) returns (InsertOneResponse); + rpc InsertMany(InsertManyRequest) returns (InsertManyResponse); + rpc UpdateOne(UpdateOneRequest) returns (UpdateOneResponse); + rpc DeleteMany(DeleteManyRequest) returns (DeleteManyResponse); + rpc CreateIndex(CreateIndexRequest) returns (CreateIndexResponse); +} + +message FindRequest { + string collection = 1; + string filter = 2; // JSON +} + +message FindResponse { + string documents = 1; // JSON +} + +message FindOneRequest { + string collection = 1; + string filter = 2; // JSON +} + +message FindOneResponse { + string document = 1; // JSON +} + +message InsertOneRequest { + string collection = 1; + string document = 2; // JSON +} + +message InsertOneResponse { + string insertedId = 1; +} + +message InsertManyRequest { + string collection = 1; + string documents = 2; //JSON +} + +message InsertManyResponse { + string insertedIds = 1; +} + +message UpdateOneRequest { + string collection = 1; + string filter = 2; // JSON + string update = 3; // JSON + bool upsert = 4; +} + +message UpdateOneResponse { + int32 matchedCount = 1; + int32 modifiedCount = 2; +} + +message DeleteManyRequest { + string collection = 1; + string filter = 2; // JSON +} + +message DeleteManyResponse { + int32 deletedCount = 1; +} + +message CreateIndexRequest { + string collection = 1; + string keys = 2; // JSON + string name = 3; + string default_language = 4; +} + +message CreateIndexResponse { + string indexName = 1; +} diff --git a/src/common/utils/rpc/pymongo_rpc_pb2.py b/src/common/utils/rpc/pymongo_rpc_pb2.py new file mode 100644 index 00000000..d9ddb403 --- /dev/null +++ b/src/common/utils/rpc/pymongo_rpc_pb2.py @@ -0,0 +1,54 @@ +# -*- coding: utf-8 -*- +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: pymongo_rpc.proto +# Protobuf Python Version: 4.25.1 +"""Generated protocol buffer code.""" +from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pool as _descriptor_pool +from google.protobuf import symbol_database as _symbol_database +from google.protobuf.internal import builder as _builder +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + + + +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x11pymongo_rpc.proto\x12\npymongoRPC\"1\n\x0b\x46indRequest\x12\x12\n\ncollection\x18\x01 \x01(\t\x12\x0e\n\x06\x66ilter\x18\x02 \x01(\t\"!\n\x0c\x46indResponse\x12\x11\n\tdocuments\x18\x01 \x01(\t\"4\n\x0e\x46indOneRequest\x12\x12\n\ncollection\x18\x01 \x01(\t\x12\x0e\n\x06\x66ilter\x18\x02 \x01(\t\"#\n\x0f\x46indOneResponse\x12\x10\n\x08\x64ocument\x18\x01 \x01(\t\"8\n\x10InsertOneRequest\x12\x12\n\ncollection\x18\x01 \x01(\t\x12\x10\n\x08\x64ocument\x18\x02 \x01(\t\"\'\n\x11InsertOneResponse\x12\x12\n\ninsertedId\x18\x01 \x01(\t\":\n\x11InsertManyRequest\x12\x12\n\ncollection\x18\x01 \x01(\t\x12\x11\n\tdocuments\x18\x02 \x01(\t\")\n\x12InsertManyResponse\x12\x13\n\x0binsertedIds\x18\x01 \x01(\t\"V\n\x10UpdateOneRequest\x12\x12\n\ncollection\x18\x01 \x01(\t\x12\x0e\n\x06\x66ilter\x18\x02 \x01(\t\x12\x0e\n\x06update\x18\x03 \x01(\t\x12\x0e\n\x06upsert\x18\x04 \x01(\x08\"@\n\x11UpdateOneResponse\x12\x14\n\x0cmatchedCount\x18\x01 \x01(\x05\x12\x15\n\rmodifiedCount\x18\x02 \x01(\x05\"7\n\x11\x44\x65leteManyRequest\x12\x12\n\ncollection\x18\x01 \x01(\t\x12\x0e\n\x06\x66ilter\x18\x02 \x01(\t\"*\n\x12\x44\x65leteManyResponse\x12\x14\n\x0c\x64\x65letedCount\x18\x01 \x01(\x05\"^\n\x12\x43reateIndexRequest\x12\x12\n\ncollection\x18\x01 \x01(\t\x12\x0c\n\x04keys\x18\x02 \x01(\t\x12\x0c\n\x04name\x18\x03 \x01(\t\x12\x18\n\x10\x64\x65\x66\x61ult_language\x18\x04 \x01(\t\"(\n\x13\x43reateIndexResponse\x12\x11\n\tindexName\x18\x01 \x01(\t2\x8d\x04\n\x0eMongoDBService\x12\x39\n\x04\x46ind\x12\x17.pymongoRPC.FindRequest\x1a\x18.pymongoRPC.FindResponse\x12\x42\n\x07\x46indOne\x12\x1a.pymongoRPC.FindOneRequest\x1a\x1b.pymongoRPC.FindOneResponse\x12H\n\tInsertOne\x12\x1c.pymongoRPC.InsertOneRequest\x1a\x1d.pymongoRPC.InsertOneResponse\x12K\n\nInsertMany\x12\x1d.pymongoRPC.InsertManyRequest\x1a\x1e.pymongoRPC.InsertManyResponse\x12H\n\tUpdateOne\x12\x1c.pymongoRPC.UpdateOneRequest\x1a\x1d.pymongoRPC.UpdateOneResponse\x12K\n\nDeleteMany\x12\x1d.pymongoRPC.DeleteManyRequest\x1a\x1e.pymongoRPC.DeleteManyResponse\x12N\n\x0b\x43reateIndex\x12\x1e.pymongoRPC.CreateIndexRequest\x1a\x1f.pymongoRPC.CreateIndexResponseb\x06proto3') + +_globals = globals() +_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) +_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'pymongo_rpc_pb2', _globals) +if _descriptor._USE_C_DESCRIPTORS == False: + DESCRIPTOR._options = None + _globals['_FINDREQUEST']._serialized_start=33 + _globals['_FINDREQUEST']._serialized_end=82 + _globals['_FINDRESPONSE']._serialized_start=84 + _globals['_FINDRESPONSE']._serialized_end=117 + _globals['_FINDONEREQUEST']._serialized_start=119 + _globals['_FINDONEREQUEST']._serialized_end=171 + _globals['_FINDONERESPONSE']._serialized_start=173 + _globals['_FINDONERESPONSE']._serialized_end=208 + _globals['_INSERTONEREQUEST']._serialized_start=210 + _globals['_INSERTONEREQUEST']._serialized_end=266 + _globals['_INSERTONERESPONSE']._serialized_start=268 + _globals['_INSERTONERESPONSE']._serialized_end=307 + _globals['_INSERTMANYREQUEST']._serialized_start=309 + _globals['_INSERTMANYREQUEST']._serialized_end=367 + _globals['_INSERTMANYRESPONSE']._serialized_start=369 + _globals['_INSERTMANYRESPONSE']._serialized_end=410 + _globals['_UPDATEONEREQUEST']._serialized_start=412 + _globals['_UPDATEONEREQUEST']._serialized_end=498 + _globals['_UPDATEONERESPONSE']._serialized_start=500 + _globals['_UPDATEONERESPONSE']._serialized_end=564 + _globals['_DELETEMANYREQUEST']._serialized_start=566 + _globals['_DELETEMANYREQUEST']._serialized_end=621 + _globals['_DELETEMANYRESPONSE']._serialized_start=623 + _globals['_DELETEMANYRESPONSE']._serialized_end=665 + _globals['_CREATEINDEXREQUEST']._serialized_start=667 + _globals['_CREATEINDEXREQUEST']._serialized_end=761 + _globals['_CREATEINDEXRESPONSE']._serialized_start=763 + _globals['_CREATEINDEXRESPONSE']._serialized_end=803 + _globals['_MONGODBSERVICE']._serialized_start=806 + _globals['_MONGODBSERVICE']._serialized_end=1331 +# @@protoc_insertion_point(module_scope) diff --git a/src/common/utils/rpc/pymongo_rpc_pb2_grpc.py b/src/common/utils/rpc/pymongo_rpc_pb2_grpc.py new file mode 100644 index 00000000..632c046b --- /dev/null +++ b/src/common/utils/rpc/pymongo_rpc_pb2_grpc.py @@ -0,0 +1,264 @@ +# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! +"""Client and server classes corresponding to protobuf-defined services.""" +import grpc + +from . import pymongo_rpc_pb2 as pymongo__rpc__pb2 + + +class MongoDBServiceStub(object): + """Missing associated documentation comment in .proto file.""" + + def __init__(self, channel): + """Constructor. + + Args: + channel: A grpc.Channel. + """ + self.Find = channel.unary_unary( + '/pymongoRPC.MongoDBService/Find', + request_serializer=pymongo__rpc__pb2.FindRequest.SerializeToString, + response_deserializer=pymongo__rpc__pb2.FindResponse.FromString, + ) + self.FindOne = channel.unary_unary( + '/pymongoRPC.MongoDBService/FindOne', + request_serializer=pymongo__rpc__pb2.FindOneRequest.SerializeToString, + response_deserializer=pymongo__rpc__pb2.FindOneResponse.FromString, + ) + self.InsertOne = channel.unary_unary( + '/pymongoRPC.MongoDBService/InsertOne', + request_serializer=pymongo__rpc__pb2.InsertOneRequest.SerializeToString, + response_deserializer=pymongo__rpc__pb2.InsertOneResponse.FromString, + ) + self.InsertMany = channel.unary_unary( + '/pymongoRPC.MongoDBService/InsertMany', + request_serializer=pymongo__rpc__pb2.InsertManyRequest.SerializeToString, + response_deserializer=pymongo__rpc__pb2.InsertManyResponse.FromString, + ) + self.UpdateOne = channel.unary_unary( + '/pymongoRPC.MongoDBService/UpdateOne', + request_serializer=pymongo__rpc__pb2.UpdateOneRequest.SerializeToString, + response_deserializer=pymongo__rpc__pb2.UpdateOneResponse.FromString, + ) + self.DeleteMany = channel.unary_unary( + '/pymongoRPC.MongoDBService/DeleteMany', + request_serializer=pymongo__rpc__pb2.DeleteManyRequest.SerializeToString, + response_deserializer=pymongo__rpc__pb2.DeleteManyResponse.FromString, + ) + self.CreateIndex = channel.unary_unary( + '/pymongoRPC.MongoDBService/CreateIndex', + request_serializer=pymongo__rpc__pb2.CreateIndexRequest.SerializeToString, + response_deserializer=pymongo__rpc__pb2.CreateIndexResponse.FromString, + ) + + +class MongoDBServiceServicer(object): + """Missing associated documentation comment in .proto file.""" + + def Find(self, request, context): + """Missing associated documentation comment in .proto file.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def FindOne(self, request, context): + """Missing associated documentation comment in .proto file.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def InsertOne(self, request, context): + """Missing associated documentation comment in .proto file.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def InsertMany(self, request, context): + """Missing associated documentation comment in .proto file.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def UpdateOne(self, request, context): + """Missing associated documentation comment in .proto file.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def DeleteMany(self, request, context): + """Missing associated documentation comment in .proto file.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def CreateIndex(self, request, context): + """Missing associated documentation comment in .proto file.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + +def add_MongoDBServiceServicer_to_server(servicer, server): + rpc_method_handlers = { + 'Find': grpc.unary_unary_rpc_method_handler( + servicer.Find, + request_deserializer=pymongo__rpc__pb2.FindRequest.FromString, + response_serializer=pymongo__rpc__pb2.FindResponse.SerializeToString, + ), + 'FindOne': grpc.unary_unary_rpc_method_handler( + servicer.FindOne, + request_deserializer=pymongo__rpc__pb2.FindOneRequest.FromString, + response_serializer=pymongo__rpc__pb2.FindOneResponse.SerializeToString, + ), + 'InsertOne': grpc.unary_unary_rpc_method_handler( + servicer.InsertOne, + request_deserializer=pymongo__rpc__pb2.InsertOneRequest.FromString, + response_serializer=pymongo__rpc__pb2.InsertOneResponse.SerializeToString, + ), + 'InsertMany': grpc.unary_unary_rpc_method_handler( + servicer.InsertMany, + request_deserializer=pymongo__rpc__pb2.InsertManyRequest.FromString, + response_serializer=pymongo__rpc__pb2.InsertManyResponse.SerializeToString, + ), + 'UpdateOne': grpc.unary_unary_rpc_method_handler( + servicer.UpdateOne, + request_deserializer=pymongo__rpc__pb2.UpdateOneRequest.FromString, + response_serializer=pymongo__rpc__pb2.UpdateOneResponse.SerializeToString, + ), + 'DeleteMany': grpc.unary_unary_rpc_method_handler( + servicer.DeleteMany, + request_deserializer=pymongo__rpc__pb2.DeleteManyRequest.FromString, + response_serializer=pymongo__rpc__pb2.DeleteManyResponse.SerializeToString, + ), + 'CreateIndex': grpc.unary_unary_rpc_method_handler( + servicer.CreateIndex, + request_deserializer=pymongo__rpc__pb2.CreateIndexRequest.FromString, + response_serializer=pymongo__rpc__pb2.CreateIndexResponse.SerializeToString, + ), + } + generic_handler = grpc.method_handlers_generic_handler( + 'pymongoRPC.MongoDBService', rpc_method_handlers) + server.add_generic_rpc_handlers((generic_handler,)) + + + # This class is part of an EXPERIMENTAL API. +class MongoDBService(object): + """Missing associated documentation comment in .proto file.""" + + @staticmethod + def Find(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary(request, target, '/pymongoRPC.MongoDBService/Find', + pymongo__rpc__pb2.FindRequest.SerializeToString, + pymongo__rpc__pb2.FindResponse.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) + + @staticmethod + def FindOne(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary(request, target, '/pymongoRPC.MongoDBService/FindOne', + pymongo__rpc__pb2.FindOneRequest.SerializeToString, + pymongo__rpc__pb2.FindOneResponse.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) + + @staticmethod + def InsertOne(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary(request, target, '/pymongoRPC.MongoDBService/InsertOne', + pymongo__rpc__pb2.InsertOneRequest.SerializeToString, + pymongo__rpc__pb2.InsertOneResponse.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) + + @staticmethod + def InsertMany(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary(request, target, '/pymongoRPC.MongoDBService/InsertMany', + pymongo__rpc__pb2.InsertManyRequest.SerializeToString, + pymongo__rpc__pb2.InsertManyResponse.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) + + @staticmethod + def UpdateOne(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary(request, target, '/pymongoRPC.MongoDBService/UpdateOne', + pymongo__rpc__pb2.UpdateOneRequest.SerializeToString, + pymongo__rpc__pb2.UpdateOneResponse.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) + + @staticmethod + def DeleteMany(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary(request, target, '/pymongoRPC.MongoDBService/DeleteMany', + pymongo__rpc__pb2.DeleteManyRequest.SerializeToString, + pymongo__rpc__pb2.DeleteManyResponse.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) + + @staticmethod + def CreateIndex(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary(request, target, '/pymongoRPC.MongoDBService/CreateIndex', + pymongo__rpc__pb2.CreateIndexRequest.SerializeToString, + pymongo__rpc__pb2.CreateIndexResponse.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) diff --git a/src/plugins/repeater/model.py b/src/plugins/repeater/model.py index 082d7914..873b1a2e 100644 --- a/src/plugins/repeater/model.py +++ b/src/plugins/repeater/model.py @@ -21,6 +21,10 @@ from nonebot.adapters.onebot.v11 import Message, MessageSegment from src.common.config import BotConfig, plugin_config +if plugin_config.use_rpc: + from src.common.utils.rpc import MongoClient +else: + from pymongo import MongoClient try: from src.common.utils.speech.text_to_speech import text_2_speech TTS_AVAIABLE = True @@ -29,7 +33,7 @@ TTS_AVAIABLE = False -mongo_client = pymongo.MongoClient( +mongo_client = MongoClient( plugin_config.mongo_host, plugin_config.mongo_port, unicode_decode_error_handler='ignore') mongo_db = mongo_client['PallasBot'] diff --git a/tests/rpc/test.py b/tests/rpc/test.py new file mode 100644 index 00000000..5a1ebe7a --- /dev/null +++ b/tests/rpc/test.py @@ -0,0 +1,63 @@ +from pymongo import ASCENDING +from bson.objectid import ObjectId + +from src.common.utils.rpc import MongoClient, CollectionProxy + +# 连接到MongoDB +client = MongoClient('localhost:50051') + +# 选择数据库 +db = client['PallasBot'] + +# 选择集合 +collection = db['config'] + +# 保存当前集合的状态,以便之后恢复 +backup = list(collection.find()) + +# 插入一条数据 +insert_result = collection.insert_one({'name': 'Test User', 'age': 25}) +print(f"Inserted one document: {insert_result}") + +# 插入多条数据 +insert_many_result = collection.insert_many([ + {'name': 'User A', 'age': 30}, + {'name': 'User B', 'age': 35} +]) +print(f"Inserted multiple documents: {insert_many_result}") + +# 查找一条数据 +one_document = collection.find_one({'name': 'Test User'}) +print(f"Found one document: {one_document}") + +# 查找多条数据 +documents = collection.find({'age': {'$gt': 25}}) +print("Found documents:") +for document in documents: + print(document) + +# 更新一条数据 +collection.update_one({'name': 'Test User'}, {'$set': {'age': 26}}) +updated_document = collection.find_one({'name': 'Test User'}) +print(f"Updated document: {updated_document}") + +# 删除多条数据 +collection.delete_many({'age': {'$lt': 35}}) +# 验证删除 +remaining_documents = list(collection.find()) +print("Remaining documents after deletion:") +for document in remaining_documents: + print(document) + +# 创建索引 +index_result = collection.create_index([('name', ASCENDING)]) +print(f"Created index: {index_result}") + +# 恢复数据库到测试前的状态 +collection.delete_many({}) # 清空当前集合 +collection.insert_many(backup) # 恢复备份数据 + +print("Database restored to its initial state.") + +# 关闭数据库连接 +client.close()