Skip to content

Commit

Permalink
unsubcribe from realtime on refine contacts (#1713)
Browse files Browse the repository at this point in the history
* refactor contactsStore & stop realtime when refining contacts

* fix supabase local dev env

* refactor and fix bug
  • Loading branch information
maleksal authored Sep 11, 2024
1 parent c642b31 commit 73f0282
Show file tree
Hide file tree
Showing 3 changed files with 155 additions and 95 deletions.
67 changes: 31 additions & 36 deletions frontend/src/components/Mining/Table/MiningTable.vue
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,9 @@
:enrichment-realtime-callback="emptyFunction"
:enrichment-request-response-callback="emptyFunction"
:contacts-to-enrich="
$contactsStore.selected ?? (contacts as unknown as string[])
$contactsStore.selectedEmails ?? (contacts as unknown as string[])
"
:enrich-all-contacts="$contactsStore.selected === undefined"
:enrich-all-contacts="$contactsStore.selectedEmails === undefined"
/>
</div>
<div class="ml-2">
Expand Down Expand Up @@ -655,7 +655,6 @@

<script setup lang="ts">
import type { User } from '@supabase/supabase-js';
import type DataTable from 'primevue/datatable';
import type {
DataTableFilterEvent,
DataTableSelectAllChangeEvent,
Expand Down Expand Up @@ -701,16 +700,15 @@ const emptyFunction = () => {};
const $toast = useToast();
const $user = useSupabaseUser() as Ref<User>;
const $supabaseClient = useSupabaseClient();
const $contactsStore = useContactsStore();
const $leadminerStore = useLeadminerStore();
const $contactInformationSidebar = useMiningContactInformationSidebar();
const isLoading = ref(true);
const loadingLabel = ref('');
const contacts = computed(() => $contactsStore.contacts);
const contactsLength = computed(() => $contactsStore.contactsLength);
const contacts = computed(() => $contactsStore.contactsList);
const contactsLength = computed(() => $contactsStore.contactCount);
const activeMiningTask = computed(
() => $leadminerStore.miningTask !== undefined,
Expand All @@ -737,38 +735,32 @@ function onFilter(event: DataTableFilterEvent) {
filteredContacts.value = event.filteredValue;
}
async function refineContacts() {
loadingLabel.value = t('refining_contacts');
const user = $user.value;
// @ts-expect-error: Issue with @nuxt/supabase typing
const refine = await $supabaseClient.rpc('refine_persons', {
userid: user?.id,
});
if (refine.error) {
throw refine.error;
}
}
async function syncTable() {
$contactsStore.clearSyncInterval();
loadingLabel.value = t('syncing');
const user = $user.value;
$contactsStore.setContacts(await getContacts(user.id));
$contactsStore.setSyncInterval();
}
watch(activeMiningTask, async (isActive) => {
if (isActive) {
$leadminerStore.cleaningFinished = false;
filtersStore.clearFilter();
} else {
isLoading.value = true;
await refineContacts();
await syncTable();
/**
* Disable realtime; protects table from rendering multiple times
*/
await $contactsStore.unsubscribeFromRealtimeUpdates();
loadingLabel.value = t('refining_contacts');
await $contactsStore.refineContacts();
loadingLabel.value = t('syncing');
await $contactsStore.reloadContacts();
filtersStore.toggleFilters();
isLoading.value = false;
$leadminerStore.cleaningFinished = true;
/**
* Subscribe again after the table is rendered
*/
$contactsStore.subscribeToRealtimeUpdates();
}
});
Expand Down Expand Up @@ -833,13 +825,13 @@ const contactsToExport = computed<string[] | undefined>(() =>
watch(
contactsToExport,
() => {
$contactsStore.selected = contactsToExport.value;
$contactsStore.selectedEmails = contactsToExport.value;
},
{ deep: true, immediate: true },
);
watch(implicitlySelectedContactsLength, () => {
$contactsStore.selectedLength = implicitlySelectedContactsLength.value;
$contactsStore.selectedContactsCount = implicitlySelectedContactsLength.value;
});
/* *** Export CSV *** */
Expand Down Expand Up @@ -1026,14 +1018,17 @@ onNuxtReady(async () => {
...($screenStore.width > 950 ? ['status'] : []),
];
const $stepper = useMiningStepper();
const fetchedContacts = await getContacts($user.value.id);
if ($stepper.index === 1 && fetchedContacts.length > 0) {
await $contactsStore.loadContacts();
if (
$stepper.index === 1 &&
$contactsStore.contactsList &&
$contactsStore.contactsList.length > 0
) {
$stepper.hide();
}
$contactsStore.setContacts(fetchedContacts);
$contactsStore.subscribeRealtime($user.value);
$contactsStore.subscribeToRealtimeUpdates();
isLoading.value = false;
});
Expand Down
181 changes: 123 additions & 58 deletions frontend/src/stores/contacts.ts
Original file line number Diff line number Diff line change
@@ -1,29 +1,60 @@
import type {
RealtimeChannel,
RealtimePostgresChangesPayload,
User,
} from '@supabase/supabase-js';
import { defineStore } from 'pinia';
import { ref } from 'vue';

import type { Contact } from '@/types/contact';
import { convertDates, getOrganization } from '~/utils/contacts';

export const useContactsStore = defineStore('contacts-store', () => {
let syncInterval: ReturnType<typeof setInterval>;
let subscription: RealtimeChannel;
const $user = useSupabaseUser();
const $supabase = useSupabaseClient();
const $leadminerStore = useLeadminerStore();

const contacts = ref<Contact[] | undefined>(undefined);
const contactsLength = computed(() => contacts.value?.length);
const selected = ref<string[] | undefined>(undefined);
const selectedLength = ref<number>(0);
const contactsList = ref<Contact[] | undefined>(undefined);
const cachedContactsList = ref<Contact[]>([]);

const $leadminerStore = useLeadminerStore();
const activeTask = computed(() => $leadminerStore.activeTask);
const cachedContacts = ref<Contact[]>([]);
const selectedEmails = ref<string[] | undefined>(undefined);
const selectedContactsCount = ref<number>(0);

const contactCount = computed(() => contactsList.value?.length);
const isMiningTaskActive = computed(() => $leadminerStore.activeTask);

let realtimeChannel: RealtimeChannel | null = null;
let syncIntervalId: ReturnType<typeof setInterval> | null = null;

/**
* Load contacts from database to store.
*/
async function loadContacts() {
const { data, error } = await $supabase.rpc(
'get_contacts_table',
// @ts-expect-error: Issue with @nuxt/supabase typing
{ userid: $user.value?.id },
);

function setContacts(newContacts: Contact[]) {
contacts.value = newContacts;
if (error) throw error;
contactsList.value = convertDates(data);
}

/**
* Loads contacts from db and restarts SyncInterval.
*/
async function reloadContacts() {
clearSyncInterval();
await loadContacts();
startSyncInterval();
}

/**
* Refines contacts in database.
*/
async function refineContacts() {
const { error } = await $supabase.rpc(
'refine_persons',
// @ts-expect-error: Issue with @nuxt/supabase typing
{ userid: $user.value?.id },
);
if (error) throw error;
}

function upsertTop(newContact: Contact, oldContacts: Contact[]) {
Expand All @@ -38,91 +69,125 @@ export const useContactsStore = defineStore('contacts-store', () => {
oldContacts.unshift({ ...oldContact, ...newContact });
}

async function handleNewContact(newContact: Contact) {
if (!contacts.value) return;
newContact = convertDates([newContact])[0];
if (newContact.works_for) {
const org = await getOrganization({ id: newContact.works_for }, ['name']);
newContact.works_for = org ? org.name : newContact.works_for;
/**
* Handles a new contact from realtime.
* @param contact - The new contact object.
*/
async function processNewContact(contact: Contact) {
if (!contactsList.value) return;

const [newContact] = convertDates([contact]);
const { works_for: organizationId } = newContact;

if (organizationId) {
const organization = await getOrganization({ id: organizationId }, [
'name',
]);
newContact.works_for = organization ? organization.name : organizationId;
}
if (activeTask.value) {
// Cache to render periodically
if (cachedContacts.value.length === 0)
cachedContacts.value = JSON.parse(JSON.stringify(contacts.value));
upsertTop(newContact, cachedContacts.value);

// If a mining task is active, cache the contacts for periodic rendering
if (isMiningTaskActive.value) {
if (cachedContactsList.value.length === 0) {
cachedContactsList.value = JSON.parse(
JSON.stringify(contactsList.value),
);
}
upsertTop(newContact, cachedContactsList.value);
} else {
// Render instantly
upsertTop(newContact, contacts.value);
// Otherwise, update the contacts instantly
upsertTop(newContact, contactsList.value);
}
}

/**
* Applies cached contacts to the main contacts list.
*/
function applyCachedContacts() {
if (cachedContacts.value.length > 0) {
contacts.value = cachedContacts.value;
cachedContacts.value = [];
if (cachedContactsList.value.length > 0) {
contactsList.value = cachedContactsList.value;
cachedContactsList.value = [];
}
}

function subscribeRealtime(user: User) {
subscription = useSupabaseClient()
/**
* Subscribes to real-time updates for contacts.
*/
function subscribeToRealtimeUpdates() {
realtimeChannel = useSupabaseClient()
.channel('contacts-table')
.on(
'postgres_changes',
{
event: '*',
schema: 'public',
table: 'persons',
filter: `user_id=eq.${user?.id}`,
filter: `user_id=eq.${$user.value?.id}`,
},
async (payload: RealtimePostgresChangesPayload<Contact>) => {
const newContact = payload.new as Contact;
await handleNewContact(newContact);
await processNewContact(newContact);
},
);

setSyncInterval();
subscription.subscribe();
startSyncInterval();
realtimeChannel.subscribe();
}

function unsubscribeRealtime() {
if (subscription) {
subscription.unsubscribe();
/**
* Unsubscribes from real-time updates and clears the sync interval.
*/
async function unsubscribeFromRealtimeUpdates() {
if (realtimeChannel) {
await realtimeChannel.unsubscribe();
await $supabase.removeChannel(realtimeChannel);
}
if (syncInterval) {
if (syncIntervalId) {
applyCachedContacts();
clearSyncInterval();
}
}

function setSyncInterval() {
cachedContacts.value = [];
syncInterval = setInterval(() => {
/**
* Starts the sync interval to periodically apply cached contacts.
*/
function startSyncInterval() {
cachedContactsList.value = [];
syncIntervalId = setInterval(() => {
applyCachedContacts();
}, 2000);
}

/**
* Clears the sync interval.
*/
function clearSyncInterval() {
clearInterval(syncInterval);
if (syncIntervalId) clearInterval(syncIntervalId);
}

/**
* Resets the store.
*/
function $reset() {
unsubscribeRealtime();
contacts.value = undefined;
selected.value = undefined;
selectedLength.value = 0;
cachedContacts.value = [];
unsubscribeFromRealtimeUpdates();
contactsList.value = undefined;
selectedEmails.value = undefined;
selectedContactsCount.value = 0;
}
cachedContactsList.value = [];

return {
contacts,
selected,
selectedLength,
contactsLength,
contactsList,
selectedEmails,
selectedContactsCount,
contactCount,
$reset,
setContacts,
subscribeRealtime,
unsubscribeRealtime,
setSyncInterval,
loadContacts,
reloadContacts,
refineContacts,
subscribeToRealtimeUpdates,
unsubscribeFromRealtimeUpdates,
startSyncInterval,
clearSyncInterval,
};
});
2 changes: 1 addition & 1 deletion supabase/functions/.env.dev
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
LEADMINER_PROJECT_URL = http://host.docker.internal:54321/
LEADMINER_ANON_KEY = eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZS1kZW1vIiwicm9sZSI6ImFub24iLCJleHAiOjE5ODM4MTI5OTZ9.CRXP1A7WOeoJeXxjNni43kdQwgnWNReilDMblYTn_I0
LEADMINER_SECRET_TOKEN = eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZS1kZW1vIiwicm9sZSI6InNlcnZpY2Vfcm9sZSIsImV4cCI6MTk4MzgxMjk5Nn0.EGIM96RAZx35lJzdJsyH-qQwv8Hdp7fsn3W0YpN81IU
LEADMINER_HASH_SECRET = 'change-me'
LEADMINER_HASH_SECRET = 'change_me'

0 comments on commit 73f0282

Please sign in to comment.