Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix the response text parsing and encoding issue when the moly and mofa agents communicate locally. #336

Closed
wants to merge 10 commits into from
92 changes: 81 additions & 11 deletions moly-mofa/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use moly_protocol::open_ai::{
use serde::{Deserialize, Deserializer, Serialize};
use std::sync::mpsc::{self, channel, Sender};
use tokio::task::JoinHandle;
use serde_json::Value;

#[derive(Debug, Serialize, Deserialize)]
pub struct MofaResponseReasoner {
Expand Down Expand Up @@ -105,6 +106,13 @@ pub enum MofaAgentCommand {
FetchAgentsFromServer(Sender<MofaServerResponse>),
}

#[derive(Debug, Serialize, Deserialize)]
struct MofaContent {
step_name: String,
node_results: String,
dataflow_status: bool,
}

#[derive(Clone, Debug)]
pub struct MofaClient {
command_sender: Sender<MofaAgentCommand>,
Expand Down Expand Up @@ -176,23 +184,86 @@ impl MofaClient {
content: task,
}],
};
let client = reqwest::Client::new();

// If access to OpenAI or Claude is restricted in certain regions, a VPN is typically used,
// so communication with the local MoFA service here must bypass the proxy.
let client = reqwest::Client::builder()
.no_proxy()
.build()
.expect("Failed to build client");

let req = client
.post(format!("{}/v1/chat/completions", &address))
.header("Content-Type", "application/json")
.json(&data);

current_request = Some(rt.spawn(async move {
let resp = req.send().await.expect("Failed to send request");

let resp: Result<ChatResponseData, reqwest::Error> = resp.json().await;
match resp {
Ok(resp) => {
let _ = tx.send(ChatResponse::ChatFinalResponseData(resp.clone()));
}
Err(e) => {
eprintln!("{e}");
match req.send().await {
Ok(response) => {
if response.status().is_success() {
match response.text().await {
Ok(text) => {
match serde_json::from_str::<Value>(&text) {
Ok(value) => {
if let Some(content) = value
.get("choices")
.and_then(|choices| choices.get(0))
.and_then(|choice| choice.get("message"))
.and_then(|message| message.get("content"))
.and_then(|content| content.as_str())
{
// parsing inner json
match serde_json::from_str::<MofaContent>(content) {
Ok(mofa_content) => {
let response_data = ChatResponseData {
id: value.get("id").and_then(|id| id.as_str()).unwrap_or("").to_string(),
choices: vec![ChoiceData {
finish_reason: StopReason::Stop,
index: 0,
message: MessageData {
content: mofa_content.node_results,
role: Role::Assistant,
},
logprobs: None,
}],
created: value.get("created")
.and_then(|c| c.as_i64())
.unwrap_or(0),
model: value.get("model").and_then(|m| m.as_str()).unwrap_or("").to_string(),
system_fingerprint: "".to_string(),
usage: UsageData {
completion_tokens: 0,
prompt_tokens: 0,
total_tokens: 0,
},
object: value.get("object").and_then(|o| o.as_str()).unwrap_or("").to_string(),
};

let _ = tx.send(ChatResponse::ChatFinalResponseData(response_data));
}
Err(e) => {
eprintln!("Failed to parse content JSON: {}", e);
eprintln!("Content: {}", content);
}
}
}
}
Err(e) => {
eprintln!("Failed to parse response JSON: {}", e);
eprintln!("Response: {}", text);
}
}
}
Err(e) => eprintln!("Failed to get response text: {}", e),
}
} else {
eprintln!("HTTP error: {}", response.status());
if let Ok(error_text) = response.text().await {
eprintln!("Error details: {}", error_text);
}
}
}
Err(e) => eprintln!("Request failed: {}", e),
}
}));
}
Expand Down Expand Up @@ -254,7 +325,6 @@ impl MofaClient {

fn new_fake(address: String) -> Self {
let (command_sender, command_receiver) = channel();

let address_clone = address.clone();
std::thread::spawn(move || {
while let Ok(command) = command_receiver.recv() {
Expand Down
2 changes: 1 addition & 1 deletion moly-protocol/src/open_ai.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ pub struct UsageData {
pub struct ChatResponseData {
pub id: String,
pub choices: Vec<ChoiceData>,
pub created: u32,
pub created: i64,
pub model: ModelID,
#[serde(default)]
pub system_fingerprint: String,
Expand Down
5 changes: 3 additions & 2 deletions src/data/chats/chat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -428,11 +428,12 @@ impl Chat {
std::thread::spawn(move || '_loop: loop {
match rx.recv() {
Ok(moly_mofa::ChatResponse::ChatFinalResponseData(data)) => {
let node_results = serde_json::from_str::<MofaAgentResponse>(&data.choices[0].message.content).unwrap();
// The original JSON data of node_results has been parsed and can be used directly.
let node_results = data.choices[0].clone().message.content;
Cx::post_action(ChatEntityAction {
chat_id,
kind: ChatEntityActionKind::MofaAgentResult(
node_results.node_results
node_results
),
});

Expand Down
1 change: 0 additions & 1 deletion src/data/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use anyhow::Result;
use chrono::{DateTime, Utc};
use makepad_widgets::{Action, ActionDefaultRef, DefaultNone};
use moly_backend::Backend;

use moly_mofa::MofaServerResponse;
use moly_protocol::data::{Author, DownloadedFile, File, FileID, Model, ModelID, PendingDownload};
use std::rc::Rc;
Expand Down