diff --git a/PROJECT b/PROJECT index 218deb688..1ced70153 100644 --- a/PROJECT +++ b/PROJECT @@ -110,6 +110,15 @@ resources: kind: Application path: github.com/kubeagi/arcadia/api/base/v1alpha1 version: v1alpha1 +- api: + crdVersion: v1 + namespaced: true + controller: true + domain: kubeagi.k8s.com.cn + group: arcadia + kind: DocumentLoader + path: github.com/kubeagi/arcadia/api/app-node/documentloader/v1alpha1 + version: v1alpha1 - api: crdVersion: v1 controller: true diff --git a/api/app-node/chain/v1alpha1/llmchain_types.go b/api/app-node/chain/v1alpha1/llmchain_types.go index 442d66533..9df9f6865 100644 --- a/api/app-node/chain/v1alpha1/llmchain_types.go +++ b/api/app-node/chain/v1alpha1/llmchain_types.go @@ -33,6 +33,12 @@ type LLMChainSpec struct { type CommonChainConfig struct { Tools []agent.Tool `json:"tools,omitempty"` + + // MaxNumberOfConcurrent represents the max number of concurrent calls done simultaneously to + // the llm chain.Only 1 by default + // +kubebuilder:default=1 + MaxNumberOfConccurent int `json:"maxNumberOfConccurent,omitempty"` + // for memory Memory Memory `json:"memory,omitempty"` diff --git a/api/base/v1alpha1/application.go b/api/base/v1alpha1/application.go index f1a946633..00a23b955 100644 --- a/api/base/v1alpha1/application.go +++ b/api/base/v1alpha1/application.go @@ -16,6 +16,8 @@ limitations under the License. package v1alpha1 +import "fmt" + const ( InputNode = "Input" OutputNode = "Output" @@ -26,3 +28,8 @@ const ( // AppPublicLabelKey will add to app which is public AppPublicLabelKey = Group + "/app-is-public" ) + +// ConversationFilePath is the path in system storage for file within a conversation +func ConversationFilePath(appName string, conversationID string, fileName string) string { + return fmt.Sprintf("application/%s/conversation/%s/%s", appName, conversationID, fileName) +} diff --git a/apiserver/docs/docs.go b/apiserver/docs/docs.go index 820199333..ed5512547 100644 --- a/apiserver/docs/docs.go +++ b/apiserver/docs/docs.go @@ -882,9 +882,9 @@ const docTemplate = `{ } } }, - "/chat/conversations/docs": { + "/chat/conversations/file": { "post": { - "description": "receive and process uploaded documents(pdf, docx) for one conversation", + "description": "receive conversational files for one conversation", "consumes": [ "multipart/form-data" ], @@ -894,7 +894,7 @@ const docTemplate = `{ "tags": [ "application" ], - "summary": "receive and process uploaded documents(pdf, docx) for one conversation", + "summary": "receive conversational files for one conversation", "parameters": [ { "type": "string", @@ -912,49 +912,23 @@ const docTemplate = `{ }, { "type": "string", - "description": "The query to this document", - "name": "query", - "in": "formData", - "required": true - }, - { - "type": "string", - "description": "The conversation id for this document", + "description": "The conversation id for this file", "name": "conversation_id", "in": "formData" }, - { - "type": "string", - "description": "The response mode to this conversation", - "name": "response_mode", - "in": "formData", - "required": true - }, { "type": "file", - "description": "This is the docs for the conversation", - "name": "docs", + "description": "This is the file for the conversation", + "name": "file", "in": "formData", "required": true - }, - { - "type": "integer", - "description": "The chunk size when load and split the document", - "name": "chunk_size", - "in": "formData" - }, - { - "type": "integer", - "description": "The chunk overlap when load and split the document", - "name": "chunk_overlap", - "in": "formData" } ], "responses": { "200": { "description": "OK", "schema": { - "$ref": "#/definitions/chat.ConversationDocsRespBody" + "$ref": "#/definitions/chat.ChatRespBody" } }, "400": { @@ -1399,66 +1373,11 @@ const docTemplate = `{ "chat.ChatRespBody": { "type": "object", "properties": { - "conversation_id": { - "type": "string", - "example": "5a41f3ca-763b-41ec-91c3-4bbbb00736d0" - }, - "created_at": { - "description": "CreatedAt is the time when the message is created", - "type": "string", - "example": "2023-12-21T10:21:06.389359092+08:00" - }, - "latency": { - "description": "Latency(ms) is how much time the server cost to process a certain request.", - "type": "integer", - "example": 1000 - }, - "message": { - "description": "Message is what AI say", - "type": "string", - "example": "旷工最小计算单位为0.5天。" - }, - "message_id": { + "action": { + "description": "Action indicates what is this chat for", "type": "string", - "example": "4f3546dd-5404-4bf8-a3bc-4fa3f9a7ba24" - }, - "references": { - "description": "References is the list of references", - "type": "array", - "items": { - "$ref": "#/definitions/retriever.Reference" - } - } - } - }, - "chat.ConversatioSingleDocRespBody": { - "type": "object", - "properties": { - "file_name": { - "type": "string" - }, - "number_of_documents": { - "type": "integer" - }, - "summary": { - "description": "Summary info", - "type": "string" - }, - "timecost_for_embedding": { - "description": "Embedding info", - "type": "number" - }, - "timecost_for_summarization": { - "type": "number" + "example": "CHAT" }, - "total_time_cost": { - "type": "number" - } - } - }, - "chat.ConversationDocsRespBody": { - "type": "object", - "properties": { "conversation_id": { "type": "string", "example": "5a41f3ca-763b-41ec-91c3-4bbbb00736d0" @@ -1468,14 +1387,6 @@ const docTemplate = `{ "type": "string", "example": "2023-12-21T10:21:06.389359092+08:00" }, - "doc": { - "description": "Docs are the responbody for each document", - "allOf": [ - { - "$ref": "#/definitions/chat.ConversatioSingleDocRespBody" - } - ] - }, "latency": { "description": "Latency(ms) is how much time the server cost to process a certain request.", "type": "integer", @@ -1991,6 +1902,11 @@ const docTemplate = `{ "storage.Message": { "type": "object", "properties": { + "action": { + "description": "Action indicates what is this message for\nChat(by default),UPLOAD,etc...", + "type": "string", + "example": "UPLOAD" + }, "answer": { "type": "string", "example": "旷工最小计算单位为0.5天。" diff --git a/apiserver/docs/swagger.json b/apiserver/docs/swagger.json index 739a0054c..6a081928d 100644 --- a/apiserver/docs/swagger.json +++ b/apiserver/docs/swagger.json @@ -876,9 +876,9 @@ } } }, - "/chat/conversations/docs": { + "/chat/conversations/file": { "post": { - "description": "receive and process uploaded documents(pdf, docx) for one conversation", + "description": "receive conversational files for one conversation", "consumes": [ "multipart/form-data" ], @@ -888,7 +888,7 @@ "tags": [ "application" ], - "summary": "receive and process uploaded documents(pdf, docx) for one conversation", + "summary": "receive conversational files for one conversation", "parameters": [ { "type": "string", @@ -906,49 +906,23 @@ }, { "type": "string", - "description": "The query to this document", - "name": "query", - "in": "formData", - "required": true - }, - { - "type": "string", - "description": "The conversation id for this document", + "description": "The conversation id for this file", "name": "conversation_id", "in": "formData" }, - { - "type": "string", - "description": "The response mode to this conversation", - "name": "response_mode", - "in": "formData", - "required": true - }, { "type": "file", - "description": "This is the docs for the conversation", - "name": "docs", + "description": "This is the file for the conversation", + "name": "file", "in": "formData", "required": true - }, - { - "type": "integer", - "description": "The chunk size when load and split the document", - "name": "chunk_size", - "in": "formData" - }, - { - "type": "integer", - "description": "The chunk overlap when load and split the document", - "name": "chunk_overlap", - "in": "formData" } ], "responses": { "200": { "description": "OK", "schema": { - "$ref": "#/definitions/chat.ConversationDocsRespBody" + "$ref": "#/definitions/chat.ChatRespBody" } }, "400": { @@ -1393,66 +1367,11 @@ "chat.ChatRespBody": { "type": "object", "properties": { - "conversation_id": { - "type": "string", - "example": "5a41f3ca-763b-41ec-91c3-4bbbb00736d0" - }, - "created_at": { - "description": "CreatedAt is the time when the message is created", - "type": "string", - "example": "2023-12-21T10:21:06.389359092+08:00" - }, - "latency": { - "description": "Latency(ms) is how much time the server cost to process a certain request.", - "type": "integer", - "example": 1000 - }, - "message": { - "description": "Message is what AI say", - "type": "string", - "example": "旷工最小计算单位为0.5天。" - }, - "message_id": { + "action": { + "description": "Action indicates what is this chat for", "type": "string", - "example": "4f3546dd-5404-4bf8-a3bc-4fa3f9a7ba24" - }, - "references": { - "description": "References is the list of references", - "type": "array", - "items": { - "$ref": "#/definitions/retriever.Reference" - } - } - } - }, - "chat.ConversatioSingleDocRespBody": { - "type": "object", - "properties": { - "file_name": { - "type": "string" - }, - "number_of_documents": { - "type": "integer" - }, - "summary": { - "description": "Summary info", - "type": "string" - }, - "timecost_for_embedding": { - "description": "Embedding info", - "type": "number" - }, - "timecost_for_summarization": { - "type": "number" + "example": "CHAT" }, - "total_time_cost": { - "type": "number" - } - } - }, - "chat.ConversationDocsRespBody": { - "type": "object", - "properties": { "conversation_id": { "type": "string", "example": "5a41f3ca-763b-41ec-91c3-4bbbb00736d0" @@ -1462,14 +1381,6 @@ "type": "string", "example": "2023-12-21T10:21:06.389359092+08:00" }, - "doc": { - "description": "Docs are the responbody for each document", - "allOf": [ - { - "$ref": "#/definitions/chat.ConversatioSingleDocRespBody" - } - ] - }, "latency": { "description": "Latency(ms) is how much time the server cost to process a certain request.", "type": "integer", @@ -1985,6 +1896,11 @@ "storage.Message": { "type": "object", "properties": { + "action": { + "description": "Action indicates what is this message for\nChat(by default),UPLOAD,etc...", + "type": "string", + "example": "UPLOAD" + }, "answer": { "type": "string", "example": "旷工最小计算单位为0.5天。" diff --git a/apiserver/docs/swagger.yaml b/apiserver/docs/swagger.yaml index 64d844c13..d9e86ab00 100644 --- a/apiserver/docs/swagger.yaml +++ b/apiserver/docs/swagger.yaml @@ -56,50 +56,10 @@ definitions: type: object chat.ChatRespBody: properties: - conversation_id: - example: 5a41f3ca-763b-41ec-91c3-4bbbb00736d0 + action: + description: Action indicates what is this chat for + example: CHAT type: string - created_at: - description: CreatedAt is the time when the message is created - example: "2023-12-21T10:21:06.389359092+08:00" - type: string - latency: - description: Latency(ms) is how much time the server cost to process a certain - request. - example: 1000 - type: integer - message: - description: Message is what AI say - example: 旷工最小计算单位为0.5天。 - type: string - message_id: - example: 4f3546dd-5404-4bf8-a3bc-4fa3f9a7ba24 - type: string - references: - description: References is the list of references - items: - $ref: '#/definitions/retriever.Reference' - type: array - type: object - chat.ConversatioSingleDocRespBody: - properties: - file_name: - type: string - number_of_documents: - type: integer - summary: - description: Summary info - type: string - timecost_for_embedding: - description: Embedding info - type: number - timecost_for_summarization: - type: number - total_time_cost: - type: number - type: object - chat.ConversationDocsRespBody: - properties: conversation_id: example: 5a41f3ca-763b-41ec-91c3-4bbbb00736d0 type: string @@ -107,10 +67,6 @@ definitions: description: CreatedAt is the time when the message is created example: "2023-12-21T10:21:06.389359092+08:00" type: string - doc: - allOf: - - $ref: '#/definitions/chat.ConversatioSingleDocRespBody' - description: Docs are the responbody for each document latency: description: Latency(ms) is how much time the server cost to process a certain request. @@ -468,6 +424,12 @@ definitions: type: object storage.Message: properties: + action: + description: |- + Action indicates what is this message for + Chat(by default),UPLOAD,etc... + example: UPLOAD + type: string answer: example: 旷工最小计算单位为0.5天。 type: string @@ -1077,11 +1039,11 @@ paths: summary: delete one conversation tags: - application - /chat/conversations/docs: + /chat/conversations/file: post: consumes: - multipart/form-data - description: receive and process uploaded documents(pdf, docx) for one conversation + description: receive conversational files for one conversation parameters: - description: The app namespace for this conversation in: formData @@ -1093,40 +1055,22 @@ paths: name: app_name required: true type: string - - description: The query to this document - in: formData - name: query - required: true - type: string - - description: The conversation id for this document + - description: The conversation id for this file in: formData name: conversation_id type: string - - description: The response mode to this conversation - in: formData - name: response_mode - required: true - type: string - - description: This is the docs for the conversation + - description: This is the file for the conversation in: formData - name: docs + name: file required: true type: file - - description: The chunk size when load and split the document - in: formData - name: chunk_size - type: integer - - description: The chunk overlap when load and split the document - in: formData - name: chunk_overlap - type: integer produces: - application/json responses: "200": description: OK schema: - $ref: '#/definitions/chat.ConversationDocsRespBody' + $ref: '#/definitions/chat.ChatRespBody' "400": description: Bad Request schema: @@ -1135,7 +1079,7 @@ paths: description: Internal Server Error schema: $ref: '#/definitions/chat.ErrorResp' - summary: receive and process uploaded documents(pdf, docx) for one conversation + summary: receive conversational files for one conversation tags: - application /chat/messages: diff --git a/apiserver/pkg/application/application.go b/apiserver/pkg/application/application.go index f37a00ba6..51fe4603d 100644 --- a/apiserver/pkg/application/application.go +++ b/apiserver/pkg/application/application.go @@ -31,6 +31,7 @@ import ( agent "github.com/kubeagi/arcadia/api/app-node/agent/v1alpha1" apichain "github.com/kubeagi/arcadia/api/app-node/chain/v1alpha1" + apidocumentloader "github.com/kubeagi/arcadia/api/app-node/documentloader/v1alpha1" apiprompt "github.com/kubeagi/arcadia/api/app-node/prompt/v1alpha1" apiretriever "github.com/kubeagi/arcadia/api/app-node/retriever/v1alpha1" "github.com/kubeagi/arcadia/api/base/v1alpha1" @@ -367,6 +368,28 @@ func UpdateApplicationConfig(ctx context.Context, c client.Client, input generat return nil, err } + // 3. create or update documentloader + documentLoader := &apidocumentloader.DocumentLoader{ + ObjectMeta: metav1.ObjectMeta{ + Name: input.Name, + Namespace: input.Namespace, + }, + Spec: apidocumentloader.DocumentLoaderSpec{ + CommonSpec: v1alpha1.CommonSpec{ + DisplayName: "documentloader", + Description: "documentloader", + }, + ChunkSize: 1024, + ChunkOverlap: 50, + LoaderConfig: apidocumentloader.LoaderConfig{}, + }, + } + if _, err := controllerutil.CreateOrUpdate(ctx, c, documentLoader, func() error { + return nil + }); err != nil { + return nil, err + } + // 3. create or update chain var ( chainConfig *apichain.CommonChainConfig @@ -533,6 +556,19 @@ func redefineNodes(knowledgebase *string, name, llmName string) (nodes []v1alpha Name: name, }, }, + NextNodeName: []string{"documentloader-node"}, + }, + { + NodeConfig: v1alpha1.NodeConfig{ + Name: "documentloader-node", + DisplayName: "documentloader", + Description: "文档加载,可选", + Ref: &v1alpha1.TypedObjectReference{ + APIGroup: pointer.String("arcadia.kubeagi.k8s.com.cn"), + Kind: "DocumentLoader", + Name: name, + }, + }, NextNodeName: []string{"chain-node"}, }, { diff --git a/apiserver/pkg/chat/chat_docs.go b/apiserver/pkg/chat/chat_docs.go index bbb7909ac..4e24f6543 100644 --- a/apiserver/pkg/chat/chat_docs.go +++ b/apiserver/pkg/chat/chat_docs.go @@ -19,51 +19,27 @@ package chat import ( "bytes" "context" - "errors" + "crypto/sha256" + "encoding/hex" "fmt" "io" "mime/multipart" "path/filepath" - "sync" "time" - "github.com/tmc/langchaingo/documentloaders" - langchainllms "github.com/tmc/langchaingo/llms" - "github.com/tmc/langchaingo/schema" - "github.com/tmc/langchaingo/textsplitter" + "github.com/jackc/pgx/v5/pgconn" + "github.com/minio/minio-go/v7" "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/klog/v2" + arcadiav1alpha1 "github.com/kubeagi/arcadia/api/base/v1alpha1" "github.com/kubeagi/arcadia/apiserver/pkg/auth" "github.com/kubeagi/arcadia/apiserver/pkg/chat/storage" "github.com/kubeagi/arcadia/apiserver/pkg/common" - runtimebase "github.com/kubeagi/arcadia/pkg/appruntime/base" - runtimechain "github.com/kubeagi/arcadia/pkg/appruntime/chain" - runtimellm "github.com/kubeagi/arcadia/pkg/appruntime/llm" - "github.com/kubeagi/arcadia/pkg/langchainwrap" - "github.com/kubeagi/arcadia/pkg/utils" - "github.com/kubeagi/arcadia/pkg/vectorstore" -) - -var ( - ErrNoLLMProvidedInApplication = errors.New("llm not provided in application") -) - -const ( - DefaultPromptTemplateForMap = ` - {{.context}} - - With above content, please summarize it with only half content size of it. - ` - DefaultPromptTemplatForReduce = `"{{.context}}"` - DefaultSummaryMaxNumberOfConcurrent = 2 - - DefaultDocumentChunkSize = 1024 - DefaultDocumentChunkOverlap = 100 ) // ReceiveConversationDocs receive and process docs for a conversation -func (cs *ChatServer) ReceiveConversationDoc(ctx context.Context, messageID string, req ConversationDocsReqBody, doc *multipart.FileHeader, respStream chan string) (*ConversationDocsRespBody, error) { +func (cs *ChatServer) ReceiveConversationFile(ctx context.Context, messageID string, req ConversationFilesReqBody, file *multipart.FileHeader) (*ChatRespBody, error) { if messageID == "" { messageID = string(uuid.NewUUID()) } @@ -75,7 +51,6 @@ func (cs *ChatServer) ReceiveConversationDoc(ctx context.Context, messageID stri search := []storage.SearchOption{ storage.WithAppName(req.APPName), storage.WithAppNamespace(req.AppNamespace), - storage.WithDebug(req.Debug), } if currentUser != "" { search = append(search, storage.WithUser(currentUser)) @@ -96,240 +71,80 @@ func (cs *ChatServer) ReceiveConversationDoc(ctx context.Context, messageID stri } } - // process document with map-reduce - message := storage.Message{ - ID: messageID, - Query: req.Query, - Answer: "", - Documents: make([]storage.Document, 0), - } - - // summarize conversation doc - resp, err := cs.SummarizeConversationDoc(ctx, req, doc, respStream) + // upload files to system datasource + ds, err := common.SystemDatasourceOSS(ctx, cs.cli) if err != nil { - return nil, err - } - - message.Answer = resp.Summary - message.Latency = int64(resp.TimecostForSummarization) - message.Documents = append(message.Documents, storage.Document{ - ID: string(uuid.NewUUID()), - MessageID: messageID, - Name: doc.Filename, - Summary: resp.Summary, - }) - - // update conversat ion - conversation.Messages = append(conversation.Messages, message) - conversation.UpdatedAt = req.StartTime - // update the conversation with new message - if err := cs.Storage().UpdateConversation(conversation); err != nil { - return nil, err + klog.Errorf("no storage service found with err %s", err) + return nil, fmt.Errorf("no storage service found with err %s", err.Error()) } - return &ConversationDocsRespBody{ - ChatRespBody: ChatRespBody{ - ConversationID: req.ConversationID, - MessageID: messageID, - CreatedAt: time.Now(), - Message: resp.Summary, - }, - Doc: resp, - }, nil -} - -// SummarizeConversationDoc receive a single document,then generate embeddings and summary for this document -func (cs *ChatServer) SummarizeConversationDoc(ctx context.Context, req ConversationDocsReqBody, doc *multipart.FileHeader, respStream chan string) (*ConversatioSingleDocRespBody, error) { - klog.V(5).Infof("Load and split the document %s size:%s for conversation %s", doc.Filename, utils.BytesToSizedStr(doc.Size), req.ConversationID) - resp := &ConversatioSingleDocRespBody{ - FileName: doc.Filename, - } - var summarizationStart = time.Now() - defer func() { - resp.TotalTimecost = time.Since(summarizationStart).Seconds() - }() - - src, err := doc.Open() + src, err := file.Open() if err != nil { return nil, err } defer src.Close() - data, err := io.ReadAll(src) if err != nil { return nil, err } - dataReader := bytes.NewReader(data) - - var documents []schema.Document - var loader documentloaders.Loader - switch ext := filepath.Ext(doc.Filename); ext { - case ".pdf": - loader = documentloaders.NewPDF(dataReader, doc.Size) - case ".txt": - loader = documentloaders.NewText(dataReader) - case ".html", ".htm": - loader = documentloaders.NewHTML(dataReader) - default: - return nil, fmt.Errorf("file with extension %s not supported yet", ext) - } - - // set document chunk parameters - if req.ChunkSize == 0 { - req.ChunkSize = DefaultDocumentChunkSize - } - if req.ChunkOverlap == 0 { - req.ChunkOverlap = DefaultDocumentChunkOverlap - } - - split := textsplitter.NewRecursiveCharacter( - textsplitter.WithChunkSize(req.ChunkSize), - textsplitter.WithChunkOverlap(req.ChunkOverlap), - ) - documents, err = loader.LoadAndSplit(ctx, split) + // use sha256 as the object name so we can avoid overwrite files with same name but different content + hash := sha256.Sum256(data) + objectName := hex.EncodeToString(hash[:]) + objectPath := arcadiav1alpha1.ConversationFilePath(req.APPName, req.ConversationID, fmt.Sprintf("%s%s", objectName, filepath.Ext(file.Filename))) + _, err = ds.Client.PutObject( + ctx, req.AppNamespace, + objectPath, + bytes.NewReader(data), + int64(len(data)), + minio.PutObjectOptions{ + UserTags: map[string]string{ + "FILE_NAME": file.Filename, + }, + }) if err != nil { - return nil, err + klog.Errorf("failed to store file %s with error %s", file.Filename, err.Error()) + return nil, fmt.Errorf("failed to store file %s with error %s", file.Filename, err.Error()) } - var wg sync.WaitGroup - semaphore := make(chan struct{}, 2) - - var errStr string - // For embedding generation - wg.Add(1) - var errEmbedding error - go func() { - start := time.Now() - defer wg.Done() - semaphore <- struct{}{} - defer func() { - resp.TimecostForEmbedding = time.Since(start).Seconds() - <-semaphore - }() - klog.V(5).Infof("Generate embeddings from file %s to vectorstore for conversation %s", doc.Filename, req.ConversationID) - errEmbedding = cs.GenerateSingleDocEmbeddings(ctx, req, doc, documents) - if errEmbedding != nil { - klog.V(1).ErrorS(errEmbedding, "ErrEmbedding", "document", doc.Filename, "conversation", req.ConversationID) - errStr += fmt.Sprintf(" ErrEmbedding: %s", errEmbedding.Error()) - // break once error occurs - ctx.Done() - return - } - klog.V(5).Infof("Generate embeddings for doc %s is successful in conversation %s!", doc.Filename, req.ConversationID) - }() - - // For summary generation - wg.Add(1) - var errSummary error - var summary string - go func() { - start := time.Now() - defer wg.Done() - semaphore <- struct{}{} - defer func() { - resp.TimecostForSummarization = time.Since(start).Seconds() - <-semaphore - }() - klog.V(5).Infof("Generate summarization from file %s for conversation %s", doc.Filename, req.ConversationID) - summary, errSummary = cs.GenerateSingleDocSummary(ctx, req, documents, respStream) - if errSummary != nil { - // break once error occurs - errStr += fmt.Sprintf(" ErrSummary: %s", errSummary.Error()) - ctx.Done() - klog.V(1).ErrorS(errSummary, "ErrSummary", "document", doc.Filename, "conversation", req.ConversationID) - return - } - klog.V(5).Infof("Generate summarization for doc %s in conversation %s is done! Summary: %s", doc.Filename, req.ConversationID, summary) - }() - - // wait until all finished - wg.Wait() - - if errEmbedding != nil || errSummary != nil { - return nil, errors.New(errStr) - } - - // TODO: Save document to system datasource: ns/applications/:appname/conversations/:id/{filename} - - resp.NumberOfDocuments = len(documents) - resp.Summary = summary - - return resp, nil -} - -// GenerateSingleDocEmbeddings -func (cs *ChatServer) GenerateSingleDocEmbeddings(ctx context.Context, req ConversationDocsReqBody, doc *multipart.FileHeader, documents []schema.Document) error { - // get the built-in system embedder and vectorstore - embedder, vectorStore, err := common.SystemEmbeddingSuite(ctx, cs.cli) - if err != nil { - return err - } - langchainEmbedder, err := langchainwrap.GetLangchainEmbedder(ctx, embedder, cs.cli, "") - if err != nil { - return err - } - err = vectorstore.AddDocuments(ctx, klog.FromContext(ctx), vectorStore, langchainEmbedder, req.ConversationID, cs.cli, documents) - if err != nil { - return err - } - return nil -} - -// GenerateSingleDocSummary generate the summary of single document -func (cs *ChatServer) GenerateSingleDocSummary(ctx context.Context, req ConversationDocsReqBody, documents []schema.Document, respStream chan string) (string, error) { - app, c, err := cs.getApp(ctx, req.APPName, req.AppNamespace) - if err != nil { - return "", fmt.Errorf("failed to get app due to %s", err.Error()) + // process document with map-reduce + message := storage.Message{ + ID: messageID, + Action: "UPLOAD", + Query: "UPLOAD", + Answer: "DONE", + Latency: int64(time.Since(req.StartTime).Milliseconds()), + Documents: []storage.Document{ + { + ID: string(uuid.NewUUID()), + MessageID: messageID, + ConversationID: req.ConversationID, + Name: file.Filename, + Object: objectPath, + }, + }, } - var llm langchainllms.Model - var mpChainNode runtimebase.BaseNode - // find LLM along with chain call options - for _, n := range app.Spec.Nodes { - baseNode := runtimebase.NewBaseNode(app.Namespace, n.Name, *n.Ref) - switch kind := baseNode.Kind(); kind { - case "llm": - l := runtimellm.NewLLM(baseNode) - if err := l.Init(ctx, c, nil); err != nil { - return "", fmt.Errorf("failed init llm due to %s", err.Error()) - } - llm = l.LLM - case "llmchain": - mpChainNode = baseNode - case "retrievalqachain": - mpChainNode = baseNode + // update conversat ion + conversation.Messages = append(conversation.Messages, message) + conversation.UpdatedAt = time.Now() + // update the conversation with new message + if err := cs.Storage().UpdateConversation(conversation); err != nil { + pgErr, ok := err.(*pgconn.PgError) + if !ok { + return nil, err + } + // 42P10 means confilict happens on object(primary key in pg) + if pgErr.Code != "42P10" { + return nil, err } } - // If no LLM provided, we can't generate the summary - if llm == nil { - return "", ErrNoLLMProvidedInApplication - } - out := map[string]any{ - "question": req.Query, - "_answer_stream": respStream, - "llm": llm, - "documents": documents, - } - if req.ResponseMode == "streaming" { - out["_need_stream"] = true - } - // initialize MapReduceChain - mpChain := runtimechain.NewMapReduceChain(mpChainNode) - if err = mpChain.Init(ctx, cs.cli, out); err != nil { - return "", err - } - out, err = mpChain.Run(ctx, cs.cli, out) - if err != nil { - return "", fmt.Errorf("failed to generate summary due to %s", err.Error()) - } - a, ok := out["_answer"] - if !ok { - return "", errors.New("empty answer") - } - answer, ok := a.(string) - if !ok && len(answer) > 0 { - return "", errors.New("invalid answer.not a string") - } - return answer, nil + return &ChatRespBody{ + ConversationID: req.ConversationID, + CreatedAt: time.Now(), + MessageID: messageID, + Action: "UPLOAD", + Message: "Done", + Latency: message.Latency, + }, nil } diff --git a/apiserver/pkg/chat/chat_server.go b/apiserver/pkg/chat/chat_server.go index 8aaac3d6e..8471d4703 100644 --- a/apiserver/pkg/chat/chat_server.go +++ b/apiserver/pkg/chat/chat_server.go @@ -139,6 +139,7 @@ func (cs *ChatServer) AppRun(ctx context.Context, req ChatReqBody, respStream ch } conversation.Messages = append(conversation.Messages, storage.Message{ ID: messageID, + Action: "CHAT", Query: req.Query, Answer: "", }) @@ -162,6 +163,7 @@ func (cs *ChatServer) AppRun(ctx context.Context, req ChatReqBody, respStream ch return &ChatRespBody{ ConversationID: conversation.ID, MessageID: messageID, + Action: "CHAT", Message: out.Answer, CreatedAt: time.Now(), References: out.References, diff --git a/apiserver/pkg/chat/rest_type.go b/apiserver/pkg/chat/rest_type.go index 0e9a96589..1342e82e3 100644 --- a/apiserver/pkg/chat/rest_type.go +++ b/apiserver/pkg/chat/rest_type.go @@ -48,13 +48,11 @@ type ConversationReqBody struct { ConversationID string `json:"conversation_id" form:"conversation_id" example:"5a41f3ca-763b-41ec-91c3-4bbbb00736d0"` } -type ConversationDocsReqBody struct { - ChatReqBody `json:",inline" form:",inline"` - - // ChunkSize for text splitter - ChunkSize int `json:"chunk_size" form:"chunk_size" example:"2048"` - // ChunkOverlap for text splitter - ChunkOverlap int `json:"chunk_overlap" form:"chunk_overlap" example:"200"` +type ConversationFilesReqBody struct { + ConversationReqBody `json:",inline"` + Debug bool `json:"-"` + NewChat bool `json:"-"` + StartTime time.Time `json:"-"` } type MessageReqBody struct { @@ -81,6 +79,8 @@ type ChatReqBody struct { type ChatRespBody struct { ConversationID string `json:"conversation_id" example:"5a41f3ca-763b-41ec-91c3-4bbbb00736d0"` MessageID string `json:"message_id" example:"4f3546dd-5404-4bf8-a3bc-4fa3f9a7ba24"` + // Action indicates what is this chat for + Action string `json:"action,omitempty" example:"CHAT"` // Message is what AI say Message string `json:"message" example:"旷工最小计算单位为0.5天。"` // CreatedAt is the time when the message is created diff --git a/apiserver/pkg/chat/storage/storage.go b/apiserver/pkg/chat/storage/storage.go index ebf913275..d9b140944 100644 --- a/apiserver/pkg/chat/storage/storage.go +++ b/apiserver/pkg/chat/storage/storage.go @@ -44,7 +44,11 @@ type Conversation struct { // Message represent a message in storage type Message struct { - ID string `gorm:"column:id;primaryKey;type:uuid;comment:message id" json:"id" example:"4f3546dd-5404-4bf8-a3bc-4fa3f9a7ba24"` + ID string `gorm:"column:id;primaryKey;type:uuid;comment:message id" json:"id" example:"4f3546dd-5404-4bf8-a3bc-4fa3f9a7ba24"` + // Action indicates what is this message for + // Chat(by default),UPLOAD,etc... + Action string `gorm:"column:action;type:string;comment:user action" json:"action" example:"UPLOAD"` + Query string `gorm:"column:query;type:string;comment:user input" json:"query" example:"旷工最小计算单位为多少天?"` Answer string `gorm:"column:answer;type:string;comment:ai response" json:"answer" example:"旷工最小计算单位为0.5天。"` References References `gorm:"column:references;type:json;comment:references" json:"references,omitempty"` @@ -56,10 +60,12 @@ type Message struct { } type Document struct { - ID string `gorm:"column:id;primaryKey;type:uuid;comment:document id" json:"id" example:"4f3546dd-5404-4bf8-a3bc-4fa3f9a7ba24"` - Name string `gorm:"column:name;type:string;comment:document name" json:"name" example:"kaoqin.pdf"` - MessageID string `gorm:"column:message_id;type:uuid;comment:message id" json:"-"` - Summary string `gorm:"column:summary;type:string;comment:document summary" json:"summary" example:"kaoqin.pdf"` + ID string `gorm:"column:id;primaryKey;type:uuid;comment:document id" json:"id" example:"4f3546dd-5404-4bf8-a3bc-4fa3f9a7ba24"` + Name string `gorm:"column:name;type:string;comment:document name" json:"name" example:"kaoqin.pdf"` + Object string `gorm:"column:object;primaryKey;type:string;comment:object name in oss with sha256(content)" json:"object" example:"kaoqin.pdf"` + ConversationID string `gorm:"column:conversation_id;type:uuid;comment:conversation id" json:"-"` + MessageID string `gorm:"column:message_id;type:uuid;comment:message id" json:"-"` + Summary string `gorm:"column:summary;type:string;comment:document summary" json:"summary" example:"kaoqin.pdf"` } type References []retriever.Reference diff --git a/apiserver/pkg/chat/storage/storage_postgresql.go b/apiserver/pkg/chat/storage/storage_postgresql.go index f08fd30ed..33f831f3c 100644 --- a/apiserver/pkg/chat/storage/storage_postgresql.go +++ b/apiserver/pkg/chat/storage/storage_postgresql.go @@ -157,7 +157,7 @@ func (p *PostgreSQLStorage) FindExistingConversation(conversationID string, opts conversationQuery.Debug = false conversationQuery.DeletedAt.Valid = false res := &Conversation{} - tx := p.db.Preload("Messages").First(res, conversationQuery) + tx := p.db.Preload("Messages.Documents").First(res, conversationQuery) if tx.Error != nil { return nil, tx.Error } @@ -208,7 +208,11 @@ func (p *PostgreSQLStorage) FindExistingMessage(conversationID string, messageID conversationQuery.DeletedAt.Valid = false conversation := &Conversation{} message := &Message{} - tx := p.db.First(conversation, conversationQuery) + tx := p.db.Preload("Documents").First(message, Message{ID: messageID}) + if tx.Error != nil { + return nil, tx.Error + } + tx = p.db.First(conversation, conversationQuery) if tx.Error != nil { return nil, tx.Error } @@ -223,11 +227,13 @@ func (p *PostgreSQLStorage) FindExistingMessage(conversationID string, messageID } func (p *PostgreSQLStorage) FindExistingDocument(conversationID, messageID string, documentID string, opts ...SearchOption) (*Document, error) { - message, err := p.FindExistingMessage(conversationID, messageID, opts...) - if err != nil { - return nil, err - } + messageQuery := Message{ID: messageID} + message := &Message{} document := &Document{} + tx := p.db.First(message, messageQuery) + if tx.Error != nil { + return nil, tx.Error + } association := p.db.Model(message).Association("Documents") if association.Error != nil { return nil, association.Error diff --git a/apiserver/service/chat.go b/apiserver/service/chat.go index c778e9c87..99aa3e902 100644 --- a/apiserver/service/chat.go +++ b/apiserver/service/chat.go @@ -182,35 +182,29 @@ func (cs *ChatService) ChatHandler() gin.HandlerFunc { } } -// @Summary receive and process uploaded documents(pdf, docx) for one conversation +// @Summary receive conversational files for one conversation // @Schemes -// @Description receive and process uploaded documents(pdf, docx) for one conversation +// @Description receive conversational files for one conversation // @Tags application // @Accept multipart/form-data // @Produce json // // @Param app_namespace formData string true "The app namespace for this conversation" // @Param app_name formData string true "The app name for this conversation" -// @Param query formData string true "The query to this document" -// @Param conversation_id formData string false "The conversation id for this document" -// @Param response_mode formData string true "The response mode to this conversation" -// @Param docs formData file true "This is the docs for the conversation" -// @Param chunk_size formData int false "The chunk size when load and split the document" -// @Param chunk_overlap formData int false "The chunk overlap when load and split the document" +// @Param conversation_id formData string false "The conversation id for this file" +// @Param file formData file true "This is the file for the conversation" // -// @Success 200 {object} chat.ConversationDocsRespBody +// @Success 200 {object} chat.ChatRespBody // @Failure 400 {object} chat.ErrorResp // @Failure 500 {object} chat.ErrorResp -// @Router /chat/conversations/docs [post] -func (cs *ChatService) ChatDocs() gin.HandlerFunc { +// @Router /chat/conversations/file [post] +func (cs *ChatService) ChatFile() gin.HandlerFunc { return func(c *gin.Context) { - req := chat.ConversationDocsReqBody{ - ChatReqBody: chat.ChatReqBody{ - StartTime: time.Now(), - }, + req := chat.ConversationFilesReqBody{ + StartTime: time.Now(), } if err := c.ShouldBind(&req); err != nil { - klog.FromContext(c.Request.Context()).Error(err, "conversationDocsHandler: error binding json") + klog.FromContext(c.Request.Context()).Error(err, "conversationFileHandler: error binding json") c.JSON(http.StatusBadRequest, chat.ErrorResp{Err: err.Error()}) return } @@ -221,90 +215,23 @@ func (cs *ChatService) ChatDocs() gin.HandlerFunc { req.ConversationID = string(uuid.NewUUID()) } - // TODO: allow multiple files - doc, err := c.FormFile("docs") + file, err := c.FormFile("file") if err != nil { - klog.FromContext(c.Request.Context()).Error(err, "error receive and process uploaded documents(pdf, docx)") + klog.FromContext(c.Request.Context()).Error(err, "error receive conversational file") c.JSON(http.StatusBadRequest, chat.ErrorResp{Err: err.Error()}) return } messageID := string(uuid.NewUUID()) - var response *chat.ConversationDocsRespBody - if req.ResponseMode.IsStreaming() { - buf := strings.Builder{} - // handle chat streaming mode - respStream := make(chan string, 1) - go func() { - defer func() { - if e := recover(); e != nil { - err, ok := e.(error) - if ok { - klog.FromContext(c.Request.Context()).Error(err, "A panic occurred when run chat.ReceiveConversationDoc") - } else { - klog.FromContext(c.Request.Context()).Error(fmt.Errorf("get err:%#v", e), "A panic occurred when run chat.ReceiveConversationDoc") - } - } - }() - - response, err = cs.server.ReceiveConversationDoc(c, messageID, req, doc, respStream) - if err != nil { - c.SSEvent("error", chat.ChatRespBody{ - MessageID: messageID, - ConversationID: req.ConversationID, - Message: err.Error(), - CreatedAt: time.Now(), - Latency: time.Since(req.StartTime).Milliseconds(), - }) - // c.JSON(http.StatusInternalServerError, chat.ErrorResp{Err: err.Error()}) - klog.FromContext(c.Request.Context()).Error(err, "error resp") - close(respStream) - return - } - if response != nil { - if str := buf.String(); response.Message == str || strings.TrimSpace(str) == strings.TrimSpace(response.Message) { - close(respStream) - } - } - }() - - c.Writer.Header().Set("Content-Type", "text/event-stream") - c.Writer.Header().Set("Cache-Control", "no-cache") - c.Writer.Header().Set("Connection", "keep-alive") - c.Writer.Header().Set("Transfer-Encoding", "chunked") - klog.FromContext(c.Request.Context()).Info("start to receive messages...") - clientDisconnected := c.Stream(func(w io.Writer) bool { - if msg, ok := <-respStream; ok { - c.SSEvent("", chat.ConversationDocsRespBody{ - ChatRespBody: chat.ChatRespBody{ - MessageID: messageID, - ConversationID: req.ConversationID, - Message: msg, - CreatedAt: time.Now(), - Latency: time.Since(req.StartTime).Milliseconds(), - }, - }) - buf.WriteString(msg) - return true - } - return false - }) - if clientDisconnected { - klog.FromContext(c.Request.Context()).Info("chatHandler: the client is disconnected") - } - klog.FromContext(c.Request.Context()).Info("end to receive messages") - } else { - // Upload the file to specific dst. - resp, err := cs.server.ReceiveConversationDoc(c, messageID, req, doc, nil) - if err != nil { - klog.FromContext(c.Request.Context()).Error(err, "error receive and process uploaded documents(pdf, docx)") - c.JSON(http.StatusInternalServerError, chat.ErrorResp{Err: err.Error()}) - return - } - - c.JSON(http.StatusOK, resp) + // Upload the file to specific dst. + resp, err := cs.server.ReceiveConversationFile(c, messageID, req, file) + if err != nil { + klog.FromContext(c.Request.Context()).Error(err, "error receive conversational file") + c.JSON(http.StatusInternalServerError, chat.ErrorResp{Err: err.Error()}) + return } - klog.FromContext(c.Request.Context()).V(3).Info("receive and process uploaded documents(pdf, docx) done", "req", req) + c.JSON(http.StatusOK, resp) + klog.FromContext(c.Request.Context()).V(3).Info("receive conversational file done", "req", req) } } @@ -492,7 +419,7 @@ func registerChat(g *gin.RouterGroup, conf config.ServerConfig) { g.POST("", auth.AuthInterceptor(conf.EnableOIDC, oidc.Verifier, v1alpha1.GroupVersion, "get", "applications"), requestid.RequestIDInterceptor(), chatService.ChatHandler()) // chat with bot - g.POST("/conversations/docs", auth.AuthInterceptor(conf.EnableOIDC, oidc.Verifier, v1alpha1.GroupVersion, "get", "applications"), requestid.RequestIDInterceptor(), chatService.ChatDocs()) // upload docs for conversation + g.POST("/conversations/file", auth.AuthInterceptor(conf.EnableOIDC, oidc.Verifier, v1alpha1.GroupVersion, "get", "applications"), requestid.RequestIDInterceptor(), chatService.ChatFile()) // upload fles for conversation g.POST("/conversations", auth.AuthInterceptor(conf.EnableOIDC, oidc.Verifier, v1alpha1.GroupVersion, "get", "applications"), requestid.RequestIDInterceptor(), chatService.ListConversationHandler()) // list conversations g.DELETE("/conversations/:conversationID", auth.AuthInterceptor(conf.EnableOIDC, oidc.Verifier, v1alpha1.GroupVersion, "get", "applications"), requestid.RequestIDInterceptor(), chatService.DeleteConversationHandler()) // delete conversation diff --git a/config/crd/bases/chain.arcadia.kubeagi.k8s.com.cn_apichains.yaml b/config/crd/bases/chain.arcadia.kubeagi.k8s.com.cn_apichains.yaml index d3ddb5c8f..c7cfa46ab 100644 --- a/config/crd/bases/chain.arcadia.kubeagi.k8s.com.cn_apichains.yaml +++ b/config/crd/bases/chain.arcadia.kubeagi.k8s.com.cn_apichains.yaml @@ -54,6 +54,11 @@ spec: in a llm call. minimum: 10 type: integer + maxNumberOfConccurent: + default: 1 + description: MaxNumberOfConcurrent represents the max number of concurrent + calls done simultaneously to the llm chain.Only 1 by default + type: integer maxTokens: default: 2048 description: MaxTokens is the maximum number of tokens to generate diff --git a/config/crd/bases/chain.arcadia.kubeagi.k8s.com.cn_llmchains.yaml b/config/crd/bases/chain.arcadia.kubeagi.k8s.com.cn_llmchains.yaml index 9fe511543..fd59a16d3 100644 --- a/config/crd/bases/chain.arcadia.kubeagi.k8s.com.cn_llmchains.yaml +++ b/config/crd/bases/chain.arcadia.kubeagi.k8s.com.cn_llmchains.yaml @@ -50,6 +50,11 @@ spec: in a llm call. minimum: 10 type: integer + maxNumberOfConccurent: + default: 1 + description: MaxNumberOfConcurrent represents the max number of concurrent + calls done simultaneously to the llm chain.Only 1 by default + type: integer maxTokens: default: 2048 description: MaxTokens is the maximum number of tokens to generate diff --git a/config/crd/bases/chain.arcadia.kubeagi.k8s.com.cn_retrievalqachains.yaml b/config/crd/bases/chain.arcadia.kubeagi.k8s.com.cn_retrievalqachains.yaml index 148fe177a..32ea90b19 100644 --- a/config/crd/bases/chain.arcadia.kubeagi.k8s.com.cn_retrievalqachains.yaml +++ b/config/crd/bases/chain.arcadia.kubeagi.k8s.com.cn_retrievalqachains.yaml @@ -53,6 +53,11 @@ spec: in a llm call. minimum: 10 type: integer + maxNumberOfConccurent: + default: 1 + description: MaxNumberOfConcurrent represents the max number of concurrent + calls done simultaneously to the llm chain.Only 1 by default + type: integer maxTokens: default: 2048 description: MaxTokens is the maximum number of tokens to generate diff --git a/config/crd/kustomization.yaml b/config/crd/kustomization.yaml index a1a8b264d..f956dc1af 100644 --- a/config/crd/kustomization.yaml +++ b/config/crd/kustomization.yaml @@ -13,6 +13,7 @@ resources: - bases/arcadia.kubeagi.k8s.com.cn_knowledgebases.yaml - bases/arcadia.kubeagi.k8s.com.cn_vectorstores.yaml - bases/arcadia.kubeagi.k8s.com.cn_applications.yaml +- bases/arcadia.kubeagi.k8s.com.cn_documentloaders.yaml - bases/chain.arcadia.kubeagi.k8s.com.cn_llmchains.yaml - bases/chain.arcadia.kubeagi.k8s.com.cn_retrievalqachains.yaml - bases/chain.arcadia.kubeagi.k8s.com.cn_apichains.yaml diff --git a/config/samples/app_llmchain_abstract.yaml b/config/samples/app_llmchain_abstract.yaml new file mode 100644 index 000000000..572841305 --- /dev/null +++ b/config/samples/app_llmchain_abstract.yaml @@ -0,0 +1,98 @@ +apiVersion: arcadia.kubeagi.k8s.com.cn/v1alpha1 +kind: Application +metadata: + name: base-chat-document-assistant + namespace: kubeagi-system +spec: + displayName: "AI文档对话助手" + description: "最简单的AI文档对话助手" + prologue: "Hello, I am a document assistant 🤖" + nodes: + - name: Input + displayName: "用户输入" + description: "用户输入节点,必须" + ref: + kind: Input + name: Input + nextNodeName: ["prompt-node"] + - name: prompt-node + displayName: "prompt" + description: "设定prompt,template中可以使用{{xx}}来替换变量" + ref: + apiGroup: prompt.arcadia.kubeagi.k8s.com.cn + kind: Prompt + name: base-chat-document-assistant + nextNodeName: ["chain-node"] + - name: documentloader-node + displayName: "documentloader" + description: "设定prompt,template中可以使用{{xx}}来替换变量" + ref: + apiGroup: arcadia.kubeagi.k8s.com.cn + kind: DocumentLoader + name: base-chat-document-assistant + nextNodeName: ["chain-node"] + - name: llm-node + displayName: "大模型服务" + description: "设定大模型的访问信息" + ref: + apiGroup: arcadia.kubeagi.k8s.com.cn + kind: LLM + name: qwen-0-5b-external + nextNodeName: ["chain-node"] + - name: chain-node + displayName: "llm chain" + description: "chain是langchain的核心概念,llmChain用于连接prompt和llm" + ref: + apiGroup: chain.arcadia.kubeagi.k8s.com.cn + kind: LLMChain + name: base-chat-document-assistant + nextNodeName: ["Output"] + - name: Output + displayName: "最终输出" + description: "最终输出节点,必须" + ref: + kind: Output + name: Output +--- +apiVersion: prompt.arcadia.kubeagi.k8s.com.cn/v1alpha1 +kind: Prompt +metadata: + name: base-chat-document-assistant + namespace: kubeagi-system + annotations: + arcadia.kubeagi.k8s.com.cn/input-rules: '[{"kind":"Input","length":1}]' + arcadia.kubeagi.k8s.com.cn/output-rules: '[{"length":1}]' +spec: + displayName: "设定AI文档助手的prompt" + description: "设定AI文档助手的prompt" + userMessage: | + Help me + '{{.question}}' +--- +apiVersion: arcadia.kubeagi.k8s.com.cn/v1alpha1 +kind: DocumentLoader +metadata: + name: base-chat-document-assistant + namespace: kubeagi-system +spec: + displayName: "llm chain" + description: "llm chain" + chunkSize: 500 + chunkOverlap: 100 +--- +apiVersion: chain.arcadia.kubeagi.k8s.com.cn/v1alpha1 +kind: LLMChain +metadata: + name: base-chat-document-assistant + namespace: kubeagi-system + annotations: + arcadia.kubeagi.k8s.com.cn/input-rules: '[{"kind":"LLM","group":"arcadia.kubeagi.k8s.com.cn","length":1},{"kind":"prompt","group":"prompt.arcadia.kubeagi.k8s.com.cn","length":1}]' + arcadia.kubeagi.k8s.com.cn/output-rules: '[{"kind":"Output","length":1}]' +spec: + displayName: "llm chain" + description: "llm chain" + maxTokens: 20480 + maxLength: 20480 + model: "6ac7baa2-71e7-4ffc-bd49-9356e743ecbb" + memory: + maxTokenLimit: 20480 diff --git a/deploy/charts/arcadia/Chart.yaml b/deploy/charts/arcadia/Chart.yaml index da8554211..60d43e6c9 100644 --- a/deploy/charts/arcadia/Chart.yaml +++ b/deploy/charts/arcadia/Chart.yaml @@ -2,7 +2,7 @@ apiVersion: v2 name: arcadia description: A Helm chart(Also a KubeBB Component) for KubeAGI Arcadia type: application -version: 0.3.6 +version: 0.3.7 appVersion: "0.2.0" keywords: diff --git a/deploy/charts/arcadia/crds/chain.arcadia.kubeagi.k8s.com.cn_apichains.yaml b/deploy/charts/arcadia/crds/chain.arcadia.kubeagi.k8s.com.cn_apichains.yaml index d3ddb5c8f..c7cfa46ab 100644 --- a/deploy/charts/arcadia/crds/chain.arcadia.kubeagi.k8s.com.cn_apichains.yaml +++ b/deploy/charts/arcadia/crds/chain.arcadia.kubeagi.k8s.com.cn_apichains.yaml @@ -54,6 +54,11 @@ spec: in a llm call. minimum: 10 type: integer + maxNumberOfConccurent: + default: 1 + description: MaxNumberOfConcurrent represents the max number of concurrent + calls done simultaneously to the llm chain.Only 1 by default + type: integer maxTokens: default: 2048 description: MaxTokens is the maximum number of tokens to generate diff --git a/deploy/charts/arcadia/crds/chain.arcadia.kubeagi.k8s.com.cn_llmchains.yaml b/deploy/charts/arcadia/crds/chain.arcadia.kubeagi.k8s.com.cn_llmchains.yaml index 9fe511543..fd59a16d3 100644 --- a/deploy/charts/arcadia/crds/chain.arcadia.kubeagi.k8s.com.cn_llmchains.yaml +++ b/deploy/charts/arcadia/crds/chain.arcadia.kubeagi.k8s.com.cn_llmchains.yaml @@ -50,6 +50,11 @@ spec: in a llm call. minimum: 10 type: integer + maxNumberOfConccurent: + default: 1 + description: MaxNumberOfConcurrent represents the max number of concurrent + calls done simultaneously to the llm chain.Only 1 by default + type: integer maxTokens: default: 2048 description: MaxTokens is the maximum number of tokens to generate diff --git a/deploy/charts/arcadia/crds/chain.arcadia.kubeagi.k8s.com.cn_retrievalqachains.yaml b/deploy/charts/arcadia/crds/chain.arcadia.kubeagi.k8s.com.cn_retrievalqachains.yaml index 148fe177a..32ea90b19 100644 --- a/deploy/charts/arcadia/crds/chain.arcadia.kubeagi.k8s.com.cn_retrievalqachains.yaml +++ b/deploy/charts/arcadia/crds/chain.arcadia.kubeagi.k8s.com.cn_retrievalqachains.yaml @@ -53,6 +53,11 @@ spec: in a llm call. minimum: 10 type: integer + maxNumberOfConccurent: + default: 1 + description: MaxNumberOfConcurrent represents the max number of concurrent + calls done simultaneously to the llm chain.Only 1 by default + type: integer maxTokens: default: 2048 description: MaxTokens is the maximum number of tokens to generate diff --git a/pkg/appruntime/chain/llmchain.go b/pkg/appruntime/chain/llmchain.go index ad82d99dd..06e2fa588 100644 --- a/pkg/appruntime/chain/llmchain.go +++ b/pkg/appruntime/chain/llmchain.go @@ -46,7 +46,7 @@ func NewLLMChain(baseNode base.BaseNode) *LLMChain { } } -func (l *LLMChain) Init(ctx context.Context, cli client.Client, _ map[string]any) error { +func (l *LLMChain) Init(ctx context.Context, cli client.Client, args map[string]any) error { instance := &v1alpha1.LLMChain{} if err := cli.Get(ctx, types.NamespacedName{Namespace: l.RefNamespace(), Name: l.Ref.Name}, instance); err != nil { return fmt.Errorf("can't find the chain in cluster: %w", err) @@ -72,6 +72,29 @@ func (l *LLMChain) Run(ctx context.Context, _ client.Client, args map[string]any if !ok { return args, errors.New("prompt not prompts.FormatPrompter") } + + instance := l.Instance + options := GetChainOptions(instance.Spec.CommonChainConfig) + + // Check if have files as input + v3, ok := args["documents"] + if ok { + docs, ok := v3.([]langchaingoschema.Document) + if ok && len(docs) != 0 { + args["max_number_of_conccurent"] = instance.Spec.MaxNumberOfConccurent + mpChain := NewMapReduceChain(l.BaseNode, options...) + err = mpChain.Init(ctx, nil, args) + if err != nil { + return args, err + } + _, err = mpChain.Run(ctx, nil, args) + if err != nil { + return args, err + } + // TODO: save this summary to document with a callback handler??? + } + } + // _history is optional // if set ,only ChatMessageHistory allowed var history langchaingoschema.ChatMessageHistory @@ -81,8 +104,7 @@ func (l *LLMChain) Run(ctx context.Context, _ client.Client, args map[string]any return args, errors.New("history not memory.ChatMessageHistory") } } - instance := l.Instance - options := GetChainOptions(instance.Spec.CommonChainConfig) + // Add the answer to the context if it's not empty if args["_answer"] != nil { klog.Infoln("get answer from upstream:", args["_answer"]) diff --git a/pkg/appruntime/chain/mpchain.go b/pkg/appruntime/chain/mpchain.go index 05a1644f8..ec9262d3f 100644 --- a/pkg/appruntime/chain/mpchain.go +++ b/pkg/appruntime/chain/mpchain.go @@ -34,28 +34,17 @@ import ( const ( // For map-reduce DefaultPromptTemplateForMap = ` - {{.context}} + Content: {{.context}} - With above content, please summarize it with only half content size of it. + With above content, please summarize it with only 1/5 size of the content.Please remind that your answer must use same language(中文或English) of the content. ` - DefaultPromptTemplatForReduce = `"{{.context}}"` - - // For post process the map-reduced summary - DefaultPromptTemplateForPostMapReduce = ` - Here is the map-reduced summary of a document: - - Summary: {{.summary}} - - Now please answer the following question based on the above document summary. Make sure the answer is using same language with the question: + DefaultPromptTemplatForReduce = ` + Below is the sub-summaries that each is based on a piece of a complete document: - Question: {{.question}} + {{.context}} - Answer: + Please generate a single summary based on above sub-summaries. ` - - DefaultSummaryMaxNumberOfConcurrent = 2 - DefaultDocumentChunkSize = 1024 - DefaultDocumentChunkOverlap = 100 ) type MapReduceChain struct { @@ -77,59 +66,29 @@ type MapReduceChain struct { chainCallOptions []chains.ChainCallOption } -func NewMapReduceChain(baseNode base.BaseNode) *MapReduceChain { +func NewMapReduceChain(baseNode base.BaseNode, chainCallOptions ...chains.ChainCallOption) *MapReduceChain { return &MapReduceChain{ BaseNode: baseNode, MapReduceDocuments: chains.MapReduceDocuments{}, + chainCallOptions: chainCallOptions, } } -func (l *MapReduceChain) Init(ctx context.Context, cli client.Client, args map[string]any) error { - if args == nil { - return errors.New("no arguments provided for MapReduceChain") - } +func (l *MapReduceChain) Init(ctx context.Context, _ client.Client, _ map[string]any) error { + return nil +} + +func (l *MapReduceChain) Run(ctx context.Context, _ client.Client, args map[string]any) (outArgs map[string]any, err error) { // initialize the LLM v1, ok := args["llm"] if !ok { - return errors.New("no llm") + return args, errors.New("no llm") } llm, ok := v1.(llms.Model) if !ok { - return errors.New("llm not llms.Model") + return args, errors.New("llm not llms.Model") } - // only group `chain` is allowed - if l.BaseNode.Group() != "chain" { - return fmt.Errorf("invalid base node with group %s.must be in group chain", l.BaseNode.Group()) - } - // initialize call options - var chainCallOptions []chains.ChainCallOption - switch kind := l.BaseNode.Kind(); kind { - case "llmchain": - llmchain := NewLLMChain(l.BaseNode) - if err := llmchain.Init(ctx, cli, nil); err != nil { - return err - } - l.isReady, l.message = llmchain.Ready() - if !l.isReady { - return fmt.Errorf("llmchain is not ready with %s", l.message) - } - chainCallOptions = GetChainOptions(llmchain.Instance.Spec.CommonChainConfig) - case "retrievalqachain": - retrivalQAChain := NewRetrievalQAChain(l.BaseNode) - if err := retrivalQAChain.Init(ctx, cli, nil); err != nil { - return err - } - l.isReady, l.message = retrivalQAChain.Ready() - if !l.isReady { - return fmt.Errorf("retrivalQAChain is not ready with %s", l.message) - } - chainCallOptions = GetChainOptions(retrivalQAChain.Instance.Spec.CommonChainConfig) - default: - return fmt.Errorf("invalid base node kind %s for MapReduceChain.not supported yet", kind) - } - l.chainCallOptions = append(l.chainCallOptions, chainCallOptions...) - // initialize MapReduceDocuments l.MapReduceDocuments = chains.NewMapReduceDocuments( chains.NewLLMChain(llm, prompts.NewPromptTemplate(DefaultPromptTemplateForMap, []string{"context"})), @@ -140,45 +99,34 @@ func (l *MapReduceChain) Init(ctx context.Context, cli client.Client, args map[s ), ), ) + v2, ok := args["max_number_of_conccurent"] + if ok { + maxNumberOfConcurrent, ok := v2.(int) + if ok { + l.MapReduceDocuments.MaxNumberOfConcurrent = maxNumberOfConcurrent + } + } - l.LLMChain = *chains.NewLLMChain(llm, prompts.NewPromptTemplate(DefaultPromptTemplateForPostMapReduce, []string{"summary", "question"})) - - return nil -} - -func (l *MapReduceChain) Run(ctx context.Context, cli client.Client, args map[string]any) (outArgs map[string]any, err error) { - v1, ok := args["documents"] + v3, ok := args["documents"] if !ok { - return args, errors.New("no documents") + // skip if no documents + klog.V(5).Infof("skip MapReduceChain due to no documents found") + return args, nil } - documents, ok := v1.([]schema.Document) + documents, ok := v3.([]schema.Document) if !ok { - return args, errors.New("llm not llms.LanguageModel") + // skip if no documents + klog.V(5).Infof("skip MapReduceChain due to no documents found") + return args, nil } + // run MapReduceDocuments out, err := chains.Run(ctx, l.MapReduceDocuments, documents, l.chainCallOptions...) if err != nil { return args, fmt.Errorf("failed to run MapReduceChain due to %s", err.Error()) } - // set the summary with the output of MapReduceDocuments - args["summary"] = out - - // run LLMChain - needStream := false - needStream, ok = args["_need_stream"].(bool) - if ok && needStream { - l.chainCallOptions = append(l.chainCallOptions, chains.WithStreamingFunc(stream(args))) - } - // call llmchain - out, err = chains.Predict(ctx, l.LLMChain, args, l.chainCallOptions...) - // handler out & error - out, err = handleNoErrNoOut(ctx, needStream, out, err, l.LLMChain, args, l.chainCallOptions) - klog.FromContext(ctx).V(5).Info("use MapReduceChain, blocking out:" + out) - if err == nil { - args["_answer"] = out - return args, nil - } - return args, fmt.Errorf("mapreaducechain run error: %w", err) + args["_answer"] = fmt.Sprintf("Here is the document summary: %s \n", out) + return args, nil } func (l *MapReduceChain) Ready() (bool, string) { diff --git a/pkg/appruntime/chain/retrievalqachain.go b/pkg/appruntime/chain/retrievalqachain.go index 0653de904..083beceb1 100644 --- a/pkg/appruntime/chain/retrievalqachain.go +++ b/pkg/appruntime/chain/retrievalqachain.go @@ -93,6 +93,25 @@ func (l *RetrievalQAChain) Run(ctx context.Context, _ client.Client, args map[st instance := l.Instance options := GetChainOptions(instance.Spec.CommonChainConfig) + // Check if have files as input + v5, ok := args["documents"] + if ok { + docs, ok := v5.([]langchainschema.Document) + if ok && len(docs) != 0 { + args["max_number_of_conccurent"] = instance.Spec.MaxNumberOfConccurent + mpChain := NewMapReduceChain(l.BaseNode, options...) + err = mpChain.Init(ctx, nil, args) + if err != nil { + return args, err + } + _, err = mpChain.Run(ctx, nil, args) + if err != nil { + return args, err + } + // TODO:save out as a reference of following answer + } + } + args = runTools(ctx, args, instance.Spec.Tools) llmChain := chains.NewLLMChain(llm, prompt) if history != nil { diff --git a/pkg/appruntime/documentloader/documentloader.go b/pkg/appruntime/documentloader/documentloader.go index 70e6f6e51..0e72bca88 100644 --- a/pkg/appruntime/documentloader/documentloader.go +++ b/pkg/appruntime/documentloader/documentloader.go @@ -19,7 +19,6 @@ package documentloader import ( "bytes" "context" - "errors" "fmt" "io" "path/filepath" @@ -64,16 +63,17 @@ func (dl *DocumentLoader) Run(ctx context.Context, cli client.Client, args map[s // Check if have docs as input v1, ok := args["files"] if !ok { - return args, errors.New("no input docs") + // skip if no files provided + return args, nil } files, ok := v1.([]string) if !ok || len(files) == 0 { - return args, errors.New("empty file list") + // skip if no files provided + return args, nil } if err := cli.Get(ctx, types.NamespacedName{Namespace: dl.RefNamespace(), Name: dl.Ref.Name}, dl.Instance); err != nil { return args, fmt.Errorf("can't find the documentloader in cluster: %w", err) } - system, err := config.GetSystemDatasource(ctx, cli) if err != nil { return nil, err @@ -88,11 +88,12 @@ func (dl *DocumentLoader) Run(ctx context.Context, cli client.Client, args map[s } var allDocs []schema.Document - var textArray []string + var allDocsContent []string + // TODO: skip if document already been processed,just return a abstract summary for _, file := range files { ossInfo := &arcadiav1alpha1.OSS{Bucket: dl.RefNamespace()} - ossInfo.Object = filepath.Join("upload", file) + ossInfo.Object = file klog.Infoln("handling file", ossInfo.Object) fileHandler, err := ossDatasource.ReadFile(ctx, ossInfo) if err != nil { @@ -140,15 +141,16 @@ func (dl *DocumentLoader) Run(ctx context.Context, cli client.Client, args map[s klog.Errorln("failed to load and split content", err) return nil, err } + allDocs = append(allDocs, docs...) for _, doc := range docs { - textArray = append(textArray, doc.PageContent) + allDocsContent = append(allDocsContent, doc.PageContent) } } // Set both docs and context for latter usage - args["docs"] = allDocs - args["context"] = strings.Join(textArray, "\n") + args["documents"] = allDocs + args["documents_content"] = strings.Join(allDocsContent, "\n") return args, nil }