Skip to content

Commit

Permalink
client side disconnect on any error
Browse files Browse the repository at this point in the history
  • Loading branch information
osimhi213 committed Jan 26, 2025
1 parent 36b785d commit 2edf565
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 55 deletions.
11 changes: 7 additions & 4 deletions src/createAgentManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import { Analytics, initializeAnalytics } from './services/mixpanel';
import { getAnaliticsInfo, getStreamAnalyticsProps } from './utils/analytics';

let messageSentTimestamp = 0;
const connectionRetryTimeoutInMs = 20 * 1000; // 20 seconds
const connectionRetryTimeoutInMs = 30 * 1000; // 30 seconds

interface AgentManagerItems {
chat?: Chat;
Expand Down Expand Up @@ -107,8 +107,11 @@ function initializeStreamAndChat(
return new Promise<{ chat?: Chat; streamingManager: StreamingManager<CreateStreamOptions> }>(
async (outerResolve, outerReject) => {
messageSentTimestamp = 0;
let streamingManager: StreamingManager<CreateStreamOptions> | null = null;

const timeoutId = setTimeout(async () => {
await streamingManager?.disconnect().catch(e => console.error('Error during disconnect:', e));

const timeoutId = setTimeout(() => {
reject(new Error('Could not connect'));
}, connectionRetryTimeoutInMs);

Expand All @@ -130,7 +133,7 @@ function initializeStreamAndChat(
});
}

const streamingManager = await createStreamingManager(
streamingManager = await createStreamingManager(
agent.id,
getAgentStreamArgs(agent, chat, options, greeting),
{
Expand Down Expand Up @@ -168,7 +171,7 @@ function initializeStreamAndChat(
},
},
}
).catch(reject);
);
}
);
}
Expand Down
108 changes: 57 additions & 51 deletions src/createStreamingManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,9 @@ export async function createStreamingManager<T extends CreateStreamOptions>(
let srcObject: MediaStream | null = null;

const { startConnection, sendStreamRequest, close, createStream, addIceCandidate } =
agent.videoType === VideoType.Clip
? createClipApi(auth, baseURL, agentId, callbacks.onError)
: createTalkApi(auth, baseURL, agentId, callbacks.onError);
agent.videoType === VideoType.Clip
? createClipApi(auth, baseURL, agentId, callbacks.onError)
: createTalkApi(auth, baseURL, agentId, callbacks.onError);

const { id: streamIdFromServer, offer, ice_servers, session_id } = await createStream(agent);
const peerConnection = new actualRTCPC({ iceServers: ice_servers });
Expand All @@ -139,63 +139,71 @@ export async function createStreamingManager<T extends CreateStreamOptions>(
onConnected
);

peerConnection.onicecandidate = (event: RTCPeerConnectionIceEvent) => {
log('peerConnection.onicecandidate', event);
try{
if (event.candidate && event.candidate.sdpMid && event.candidate.sdpMLineIndex !== null) {
addIceCandidate(
streamIdFromServer,
{
candidate: event.candidate.candidate,
sdpMid: event.candidate.sdpMid,
sdpMLineIndex: event.candidate.sdpMLineIndex,
},
session_id
);
} else {
addIceCandidate(streamIdFromServer, { candidate: null }, session_id);
const establishWebRtcConnection = async () => {
peerConnection.onicecandidate = (event: RTCPeerConnectionIceEvent) => {
log('peerConnection.onicecandidate', event);
try {
if (event.candidate && event.candidate.sdpMid && event.candidate.sdpMLineIndex !== null) {
addIceCandidate(
streamIdFromServer,
{
candidate: event.candidate.candidate,
sdpMid: event.candidate.sdpMid,
sdpMLineIndex: event.candidate.sdpMLineIndex,
},
session_id
);
} else {
addIceCandidate(streamIdFromServer, { candidate: null }, session_id);
}
} catch (e: any) {
callbacks.onError?.(e, { streamId: streamIdFromServer });
}
} catch (e: any) {
callbacks.onError?.(e, { streamId: streamIdFromServer });
}
};
};

pcDataChannel.onmessage = (message: MessageEvent) => {
if (pcDataChannel.readyState === 'open') {
const [event, _] = message.data.split(':');
if (event === StreamEvents.StreamReady && !isConnected) {
onConnected();
pcDataChannel.onmessage = (message: MessageEvent) => {
if (pcDataChannel.readyState === 'open') {
const [event, _] = message.data.split(':');
if (event === StreamEvents.StreamReady && !isConnected) {
onConnected();
}
}
}
};
};


peerConnection.oniceconnectionstatechange = () => {
log('peerConnection.oniceconnectionstatechange => ' + peerConnection.iceConnectionState);
peerConnection.oniceconnectionstatechange = () => {
log('peerConnection.oniceconnectionstatechange => ' + peerConnection.iceConnectionState);

const newState = mapConnectionState(peerConnection.iceConnectionState);
const newState = mapConnectionState(peerConnection.iceConnectionState);

if (newState !== ConnectionState.Connected) {
callbacks.onConnectionStateChange?.(newState);
}
};
if (newState !== ConnectionState.Connected) {
callbacks.onConnectionStateChange?.(newState);
}
};

peerConnection.ontrack = (event: RTCTrackEvent) => {
log('peerConnection.ontrack', event);
callbacks.onSrcObjectReady?.(event.streams[0]);
};
peerConnection.ontrack = (event: RTCTrackEvent) => {
log('peerConnection.ontrack', event);
callbacks.onSrcObjectReady?.(event.streams[0]);
};

await peerConnection.setRemoteDescription(offer);
log('set remote description OK');
await peerConnection.setRemoteDescription(offer);
log('set remote description OK');

const sessionClientAnswer = await peerConnection.createAnswer();
log('create answer OK');
const sessionClientAnswer = await peerConnection.createAnswer();
log('create answer OK');

await peerConnection.setLocalDescription(sessionClientAnswer);
log('set local description OK');
await peerConnection.setLocalDescription(sessionClientAnswer);
log('set local description OK');

await startConnection(streamIdFromServer, sessionClientAnswer, session_id);
log('start connection OK');
await startConnection(streamIdFromServer, sessionClientAnswer, session_id);
log('start connection OK');
}

try {
await establishWebRtcConnection();
} catch {
await close(streamIdFromServer, session_id).catch(_ => { });
}

return {
/**
Expand Down Expand Up @@ -233,9 +241,7 @@ export async function createStreamingManager<T extends CreateStreamOptions>(
}

try {
if (state === ConnectionState.Connected) {
await close(streamIdFromServer, session_id).catch(_ => {});
}
await close(streamIdFromServer, session_id);
} catch (e) {
log('Error on close stream connection', e);
}
Expand Down

0 comments on commit 2edf565

Please sign in to comment.