Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: adds streaming query #48

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open

feat: adds streaming query #48

wants to merge 1 commit into from

Conversation

nathanielc
Copy link
Collaborator

No description provided.

@nathanielc
Copy link
Collaborator Author

Here is an example using the new SDK streamQuery function.

import { CeramicClient } from '@ceramic-sdk/http-client'
import type { ModelDefinition } from '@ceramic-sdk/model-protocol'
import { ModelClient } from '@ceramic-sdk/model-client'
import { ModelInstanceClient } from '@ceramic-sdk/model-instance-client'
import { getAuthenticatedDID } from '@didtools/key-did'
import { ClientOptions, createFlightSqlClient } from '@ceramic-sdk/flight-sql-client';
import { tableFromIPC } from 'apache-arrow';



async function main() {
  await Promise.all([read(), write()])
}

async function write() {
  const client = new CeramicClient({ url: "http://localhost:5101" })

  // Create an authenticated DID
  const authenticatedDID = await getAuthenticatedDID(new Uint8Array(32))

  // Create a model client
  const modelClient = new ModelClient({
    ceramic: client,
    did: authenticatedDID,
  })

  const model: ModelDefinition = {
    version: '2.0',
    name: 'Post',
    description: 'test model',
    accountRelation: { type: 'list' },
    interface: false,
    implements: [],
    schema: {
      type: 'object',
      properties: {
        title: { type: 'string', maxLength: 12 },
        body: { type: 'string', maxLength: 1024 },
        nonce: { type: 'number' },
      },
      required: ['userName'],
      additionalProperties: false,
    },
  };
  const modelStream = await modelClient.postDefinition(model);
  // Create a model instance client
  const modelInstanceClient = new ModelInstanceClient({
    ceramic: client,
    did: authenticatedDID,
  })
  while (true) {
    // creating a MID using a `list` model
    await modelInstanceClient.postSignedInit({
      model: modelStream,
      content: { title: 'This is a new post', body: 'This is the body for a new post', nonce: Math.random() },
      shouldIndex: true,
    })
    await sleep(1000)
  }
}

function sleep(ms: number): Promise<void> {
  return new Promise((resolve) => setTimeout(resolve, ms));
}

async function read() {
  const options: ClientOptions = {
    username: undefined,
    password: undefined,
    tls: false,
    host: '127.0.0.1',
    port: 5102,
    headers: [],
  };
  const client = await createFlightSqlClient(options);
  const stream = await client.streamQuery(`
    SELECT
      "index",
      cid_string(event_cid) as event_cid,
      data::varchar as data
    FROM conclusion_events_stream
    WHERE
    "index" > 43
  `);
  while (true) {
    const batch = await stream.next();
    if (batch == null) {
      break;
    }
    const table = tableFromIPC(batch);
    for (let i = 0; i < table.numRows; i++) {
      const row = table.get(i);
      console.log(row);
    }
  }
  console.log('read finished');
}

main()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant