From 612717be82f1413189e6383ac4b3d78b6d2658ce Mon Sep 17 00:00:00 2001
From: Wan <495709+wa0x6e@users.noreply.github.com>
Date: Wed, 28 Aug 2024 14:33:25 +0900
Subject: [PATCH] feat: add script to import statement from 3rd party services
 (#422)

* feat: add source column to schema

* feat: add statement importer

* fix: update script to use new `subgraphRequest` function now supporting variables

* fix: fix default values when argument is missing

* refactor: convert providers to use Class

* fix: start all throttled and non-throttled providers at once

* fix: ignore delegate without address

* fix: trim statement

* fix: import statement into database

* fix: remove unnecessary functions

* fix: fix missing await

* fix: update schema to reflect nullable ipfs in statements

* fix: update schema to reflect new index

* refactor: add readonly to static const

* fix: increase tally interval to 750ms

* fix: update source column length to 24

* fix: fix using wrong db connection

---------

Co-authored-by: Chaitanya <yourchaitu@gmail.com>
---
 scripts/import-statements.ts                  |  34 ++++
 src/lib/importer/statement/index.ts           |  48 ++++++
 .../importer/statement/provider/Provider.ts   |  83 ++++++++++
 src/lib/importer/statement/provider/agora.ts  |  88 ++++++++++
 src/lib/importer/statement/provider/index.ts  |   9 ++
 .../importer/statement/provider/karmahq.ts    |  91 +++++++++++
 src/lib/importer/statement/provider/tally.ts  | 153 ++++++++++++++++++
 src/writer/statement.ts                       |   3 +-
 test/schema.sql                               |   4 +-
 9 files changed, 511 insertions(+), 2 deletions(-)
 create mode 100644 scripts/import-statements.ts
 create mode 100644 src/lib/importer/statement/index.ts
 create mode 100644 src/lib/importer/statement/provider/Provider.ts
 create mode 100644 src/lib/importer/statement/provider/agora.ts
 create mode 100644 src/lib/importer/statement/provider/index.ts
 create mode 100644 src/lib/importer/statement/provider/karmahq.ts
 create mode 100644 src/lib/importer/statement/provider/tally.ts

diff --git a/scripts/import-statements.ts b/scripts/import-statements.ts
new file mode 100644
index 00000000..48fb0f1e
--- /dev/null
+++ b/scripts/import-statements.ts
@@ -0,0 +1,34 @@
+import 'dotenv/config';
+import run from '../src/lib/importer/statement';
+
+// Usage: yarn ts-node scripts/import-statements.ts --providers tally,agora --spaces s:hop.eth
+async function main() {
+  let providers: string[] | undefined = undefined;
+  let spaces: string[] | undefined = undefined;
+  const startTime = new Date().getTime();
+
+  process.argv.forEach((arg, index) => {
+    if (arg === '--providers') {
+      if (!process.argv[index + 1]) throw new Error('Providers value is missing');
+      providers = process.argv[index + 1].trim().split(',');
+    }
+
+    if (arg === '--spaces') {
+      if (!process.argv[index + 1]) throw new Error('Spaces value is missing');
+      spaces = process.argv[index + 1].trim().split(',');
+    }
+  });
+
+  await run(providers, spaces);
+  console.log(`Done! ✅ in ${(Date.now() - startTime) / 1000}s`);
+}
+
+(async () => {
+  try {
+    await main();
+    process.exit(0);
+  } catch (e) {
+    console.error(e);
+    process.exit(1);
+  }
+})();
diff --git a/src/lib/importer/statement/index.ts b/src/lib/importer/statement/index.ts
new file mode 100644
index 00000000..7ff6572f
--- /dev/null
+++ b/src/lib/importer/statement/index.ts
@@ -0,0 +1,48 @@
+import intersection from 'lodash/intersection';
+import { PROVIDERS } from './provider';
+
+const DEFAULT_PROVIDERS = Object.keys(PROVIDERS);
+
+export type Delegate = {
+  id: string;
+  delegate: string;
+  statement: string;
+  source: string;
+  space: string;
+  network: string;
+  created: number;
+  updated: number;
+};
+
+export default async function main(providers = DEFAULT_PROVIDERS, spaces?: string[]) {
+  const providerParams = buildParams(providers, spaces);
+  const providerInstances = providerParams
+    .map(({ providerId, spaceIds }) => spaceIds.map(spaceId => new PROVIDERS[providerId](spaceId)))
+    .flat();
+
+  await Promise.all([
+    ...providerInstances.filter(p => !p.throttled()).map(p => p.fetch()),
+    throttle(providerInstances.filter(p => p.throttled()))
+  ]);
+}
+
+async function throttle(instances: any): Promise<any> {
+  for (const instance of instances) {
+    await instance.fetch();
+  }
+
+  return;
+}
+
+function buildParams(providers: string[], spaces?: string[]) {
+  return providers.map(providerId => {
+    const providerClass = PROVIDERS[providerId];
+    const availableSpaces = Object.keys(providerClass.MAPPING);
+
+    if (!providerClass) throw new Error(`Unknown provider: ${providerId}`);
+
+    const spaceIds: string[] = intersection(spaces || availableSpaces, availableSpaces);
+
+    return { providerId, spaceIds };
+  });
+}
diff --git a/src/lib/importer/statement/provider/Provider.ts b/src/lib/importer/statement/provider/Provider.ts
new file mode 100644
index 00000000..3ea5e257
--- /dev/null
+++ b/src/lib/importer/statement/provider/Provider.ts
@@ -0,0 +1,83 @@
+import snapshot from '@snapshot-labs/snapshot.js';
+import { Delegate } from '../';
+import hubDB from '../../../../helpers/mysql';
+import { sha256 } from '../../../../helpers/utils';
+
+export class Provider {
+  spaceId: string;
+  delegates: Delegate[];
+
+  // Time in seconds between each request, 0 to disable
+  throttle_interval = 0;
+
+  constructor(spaceId: string) {
+    this.spaceId = spaceId;
+    this.delegates = [];
+  }
+
+  async fetch(): Promise<Delegate[]> {
+    await this._fetch();
+
+    console.log(
+      `[${this.getId()}] ${this.spaceId} - ✅ Found ${
+        Object.keys(this.delegates).length
+      } delegate(s) with statement`
+    );
+
+    return this.delegates;
+  }
+
+  async _fetch() {}
+
+  formatDelegate(result: { delegate: string; statement: string }): Delegate {
+    const [network, space] = this.spaceId.split(':');
+    const now = Math.floor(new Date().getTime() / 1000);
+
+    return {
+      id: sha256([result.delegate, result.statement, space, network].join('')),
+      delegate: snapshot.utils.getFormattedAddress(
+        result.delegate,
+        snapshot.utils.isEvmAddress(result.delegate) ? 'evm' : 'starknet'
+      ),
+      statement: result.statement,
+      source: this.getId(),
+      space,
+      network,
+      created: now,
+      updated: now
+    };
+  }
+
+  beforeFetchPage(page: number) {
+    console.log(`[${this.getId()}] ${this.spaceId} - Fetching page #${page + 1}`);
+  }
+
+  async afterFetchPage(page: number, delegates: Delegate[]) {
+    if (delegates.length) {
+      this.delegates = { ...this.delegates, ...delegates };
+
+      await this.importDelegates(delegates);
+    }
+
+    if (this.throttle_interval) {
+      await snapshot.utils.sleep(this.throttle_interval);
+    }
+  }
+
+  async importDelegates(delegates: Delegate[]) {
+    console.log(`[${this.getId()}] -- Importing ${delegates.length} delegate(s)`);
+
+    await hubDB.queryAsync(
+      `INSERT IGNORE INTO statements (id, delegate, statement, source, space, network, created, updated) VALUES ?`,
+      [delegates.map(d => Object.values(d))]
+    );
+  }
+
+  throttled(): boolean {
+    return this.throttle_interval > 0;
+  }
+
+  getId(): string {
+    return '';
+  }
+}
diff --git a/src/lib/importer/statement/provider/agora.ts b/src/lib/importer/statement/provider/agora.ts
new file mode 100644
index 00000000..331350f7
--- /dev/null
+++ b/src/lib/importer/statement/provider/agora.ts
@@ -0,0 +1,88 @@
+import snapshot from '@snapshot-labs/snapshot.js';
+// eslint-disable-next-line import/no-extraneous-dependencies
+import { VariableType } from 'json-to-graphql-query';
+import { Provider } from './Provider';
+import { Delegate } from '../';
+
+const QUERY = {
+  __variables: {
+    orderBy: 'DelegatesOrder!',
+    seed: 'String!',
+    statement: 'StatementFilter',
+    first: 'Int!'
+  },
+  delegates: {
+    __args: {
+      first: new VariableType('first'),
+      seed: new VariableType('seed'),
+      orderBy: new VariableType('orderBy'),
+      where: { statement: new VariableType('statement') }
+    },
+    edges: {
+      node: {
+        id: true,
+        address: {
+          resolvedName: {
+            address: true,
+            name: true
+          }
+        },
+        statement: {
+          summary: true,
+          twitter: true,
+          discord: true
+        }
+      },
+      cursor: true
+    },
+    pageInfo: {
+      endCursor: true,
+      hasNextPage: true
+    }
+  }
+};
+
+export default class Agora extends Provider {
+  static readonly MAPPING = {
+    // NOTE: disabling pages not using graphql api
+    // 's:ens.eth': 'https://agora.ensdao.org',
+    // 's:opcollective.eth': 'https://vote.optimism.io',
+    // 's:uniswapgovernance.eth': 'https://vote.uniswapfoundation.org',
+    's:lyra.eth': 'https://vote.lyra.finance'
+  };
+
+  static readonly ID = 'agora';
+
+  async _fetch() {
+    const page = 0;
+    const variables = {
+      orderBy: 'mostVotingPower',
+      statement: 'withStatement',
+      seed: Date.now().toString(),
+      first: 30
+    };
+
+    this.beforeFetchPage(page);
+
+    const results = await snapshot.utils.subgraphRequest(
+      `${Agora.MAPPING[this.spaceId]}/graphql`,
+      QUERY,
+      {
+        variables
+      }
+    );
+
+    const _delegates: Delegate[] = results.delegates.edges.map((edge: any) => {
+      return this.formatDelegate({
+        delegate: edge.node.address.resolvedName.address,
+        statement: edge.node.statement.summary.trim()
+      });
+    });
+
+    await this.afterFetchPage(page, _delegates);
+  }
+
+  getId(): string {
+    return Agora.ID;
+  }
+}
diff --git a/src/lib/importer/statement/provider/index.ts b/src/lib/importer/statement/provider/index.ts
new file mode 100644
index 00000000..4896096b
--- /dev/null
+++ b/src/lib/importer/statement/provider/index.ts
@@ -0,0 +1,9 @@
+import agora from './agora';
+import karmahq from './karmahq';
+import tally from './tally';
+
+export const PROVIDERS = {
+  tally,
+  karmahq,
+  agora
+};
diff --git a/src/lib/importer/statement/provider/karmahq.ts b/src/lib/importer/statement/provider/karmahq.ts
new file mode 100644
index 00000000..e08730d2
--- /dev/null
+++ b/src/lib/importer/statement/provider/karmahq.ts
@@ -0,0 +1,91 @@
+import fetch, { Response } from 'node-fetch';
+import { Provider } from './Provider';
+import { Delegate } from '../';
+
+export default class Karmahq extends Provider {
+  static readonly MAPPING = {
+    's:aave.eth': 'aave',
+    's:apecoin.eth': 'apecoin',
+    's:arbitrumfoundation.eth': 'arbitrum',
+    's:gitcoindao.eth': 'gitcoin',
+    's:moonbeam-foundation.eth': 'moonbeam',
+    's:opcollective.eth': 'optimism',
+    's:rocketpool-dao.eth': 'rocketpool',
+    'sn:0x009fedaf0d7a480d21a27683b0965c0f8ded35b3f1cac39827a25a06a8a682a4': 'starknet'
+  };
+
+  static readonly ID = 'karmahq';
+
+  async fetchWithRetry<T>(fn: () => Promise<T>, retries = 3): Promise<T> {
+    while (retries > 0) {
+      try {
+        const response: Response = await fn();
+
+        if (!response.ok) {
+          throw new Error(`Response not ok: ${response.status}`);
+        }
+
+        return response;
+      } catch (error) {
+        console.log(`Error, retrying...`);
+        if (retries > 0) {
+          this.fetchWithRetry(fn, retries - 1);
+        } else {
+          throw error;
+        }
+      }
+    }
+    throw new Error('Max retries reached');
+  }
+
+  async _fetch() {
+    const PAGE_SIZE = 1000;
+    let page = 0;
+
+    while (true) {
+      this.beforeFetchPage(page);
+
+      const response: Response = await this.fetchWithRetry(() => {
+        return fetch(
+          `https://api.karmahq.xyz/api/dao/delegates?name=${
+            Karmahq.MAPPING[this.spaceId]
+          }&offset=${page}&pageSize=${PAGE_SIZE}`
+        );
+      });
+
+      const body = await response.json();
+
+      if (!body.data.delegates.length) break;
+
+      const _delegates: Delegate[] = [];
+      body.data.delegates.forEach(delegate => {
+        const statement = delegate.delegatePitch?.customFields?.find(
+          field => field.label === 'statement'
+        )?.value;
+
+        if (
+          !statement ||
+          typeof statement !== 'string' ||
+          delegate.publicAddress === '0x0000000000000000000000000000000000000000'
+        ) {
+          return;
+        }
+
+        _delegates.push(
+          this.formatDelegate({
+            delegate: delegate.publicAddress,
+            statement: statement.trim()
+          })
+        );
+      });
+
+      await this.afterFetchPage(page, _delegates);
+
+      page++;
+    }
+  }
+
+  getId(): string {
+    return Karmahq.ID;
+  }
+}
diff --git a/src/lib/importer/statement/provider/tally.ts b/src/lib/importer/statement/provider/tally.ts
new file mode 100644
index 00000000..d6ef6a4c
--- /dev/null
+++ b/src/lib/importer/statement/provider/tally.ts
@@ -0,0 +1,153 @@
+import snapshot from '@snapshot-labs/snapshot.js';
+// eslint-disable-next-line import/no-extraneous-dependencies
+import { VariableType } from 'json-to-graphql-query';
+import { Provider } from './Provider';
+import { Delegate } from '../';
+
+const API_URL = 'https://api.tally.xyz/query';
+
+const DELEGATES_QUERY = {
+  __variables: {
+    input: 'DelegatesInput!'
+  },
+  delegates: {
+    __args: {
+      input: new VariableType('input')
+    },
+    nodes: {
+      __on: {
+        __typeName: 'Delegate',
+        id: true,
+        account: {
+          address: true
+        },
+        statement: {
+          statement: true
+        }
+      }
+    },
+    pageInfo: {
+      firstCursor: true,
+      lastCursor: true
+    }
+  }
+};
+
+const ORGANIZATION_QUERY = {
+  __variables: {
+    input: 'OrganizationInput!'
+  },
+  organization: {
+    __args: {
+      input: new VariableType('input')
+    },
+    governorIds: true,
+    id: true
+  }
+};
+
+export default class Tally extends Provider {
+  static readonly MAPPING = {
+    's:arbitrumfoundation.eth': 'arbitrum',
+    's:uniswapgovernance.eth': 'uniswap',
+    's:dopedao.eth': 'dopewars',
+    's:opcollective.eth': 'optimism',
+    's:ens.eth': 'ens',
+    's:aave.eth': 'aave',
+    's:gitcoindao.eth': 'gitcoin',
+    's:hop.eth': 'hop',
+    's:gmx.eth': 'gmx',
+    's:yam.eth': 'yam-finance',
+    's:idlefinance.eth': 'idle'
+    // Spaces below do not have delegates with statement
+    // 's:fei.eth': 'fei',
+    // 's:eulerdao.eth': 'euler',
+    // 's:fuse.eth': 'rari-capital',
+    // 's:truefigov.eth': 'truefi',
+    // 's:instadapp-gov.eth': 'instadapp'
+    // 's:anglegovernance.eth': 'angle'
+  };
+
+  static readonly ID = 'tally';
+
+  throttle_interval = 750;
+
+  async _fetch() {
+    const spaceMeta = await this.spaceMeta();
+
+    let afterCursor: string | undefined;
+    let page = 0;
+
+    while (true) {
+      this.beforeFetchPage(page);
+
+      const variables: Record<string, any> = {
+        input: {
+          filters: {
+            governorId: spaceMeta.governorId,
+            organizationId: spaceMeta.organizationId
+          },
+          sort: {
+            isDescending: true,
+            sortBy: 'votes'
+          },
+          page: {
+            limit: 20
+          }
+        }
+      };
+
+      if (afterCursor) variables.input.page.afterCursor = afterCursor;
+
+      const results = await snapshot.utils.subgraphRequest(API_URL, DELEGATES_QUERY, {
+        variables,
+        headers: { 'Api-Key': process.env.TALLY_API_KEY }
+      });
+
+      if (!results.delegates.nodes.length) break;
+
+      const _delegates: Delegate[] = [];
+      results.delegates.nodes.forEach((node: any) => {
+        const statement = node.statement.statement.trim();
+
+        if (!statement) return;
+
+        _delegates.push(
+          this.formatDelegate({
+            delegate: node.account.address,
+            statement
+          })
+        );
+      });
+
+      if (!results.delegates.pageInfo.lastCursor) break;
+
+      afterCursor = results.delegates.pageInfo.lastCursor;
+      page++;
+
+      await this.afterFetchPage(page, _delegates);
+    }
+  }
+
+  private async spaceMeta(): Promise<{ governorId: string; organizationId: string }> {
+    const variables = {
+      input: {
+        slug: Tally.MAPPING[this.spaceId]
+      }
+    };
+
+    const result = await snapshot.utils.subgraphRequest(API_URL, ORGANIZATION_QUERY, {
+      variables,
+      headers: { 'Api-Key': process.env.TALLY_API_KEY }
+    });
+
+    return {
+      organizationId: result.organization.id,
+      governorId: result.organization.governorIds[0]
+    };
+  }
+
+  getId(): string {
+    return Tally.ID;
+  }
+}
diff --git a/src/writer/statement.ts b/src/writer/statement.ts
index 2d9cc277..2a88bd09 100644
--- a/src/writer/statement.ts
+++ b/src/writer/statement.ts
@@ -34,7 +34,8 @@ export async function action(body, ipfs, receipt, id): Promise<void> {
     discourse: msg.payload.discourse || '',
     status: msg.payload.status || 'INACTIVE',
     created,
-    updated: created
+    updated: created,
+    source: null
   };
 
   const query =
diff --git a/test/schema.sql b/test/schema.sql
index 21ca4647..4af3abe4 100644
--- a/test/schema.sql
+++ b/test/schema.sql
@@ -148,13 +148,14 @@ CREATE TABLE users (
 
 CREATE TABLE statements (
   id VARCHAR(66) NOT NULL,
-  ipfs VARCHAR(64) NOT NULL,
+  ipfs VARCHAR(64) DEFAULT NULL,
   delegate VARCHAR(100) NOT NULL,
   space VARCHAR(100) NOT NULL,
   about TEXT,
   statement TEXT,
   network VARCHAR(24) NOT NULL DEFAULT 's',
   discourse VARCHAR(64),
+  source VARCHAR(24) DEFAULT NULL,
   status VARCHAR(24) NOT NULL DEFAULT 'INACTIVE',
   created INT(11) NOT NULL,
   updated INT(11) NOT NULL,
@@ -164,6 +165,7 @@ CREATE TABLE statements (
   INDEX network (network),
   INDEX created (created),
   INDEX updated (updated),
+  INDEX source (source),
   INDEX status (status)
 );