Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] improve CsvLoader & clean code #3830

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 42 additions & 83 deletions packages/components/nodes/documentloaders/Csv/Csv.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { omit } from 'lodash'
import { TextSplitter } from 'langchain/text_splitter'
import { CSVLoader } from '@langchain/community/document_loaders/fs/csv'
import { getFileFromStorage, handleEscapeCharacters } from '../../../src'
import { CSVLoader } from './CsvLoader'
import { getFileFromStorage, handleDocumentLoaderDocuments, handleDocumentLoaderMetadata, handleDocumentLoaderOutput } from '../../../src'
import { ICommonObject, IDocument, INode, INodeData, INodeOutputsValue, INodeParams } from '../../../src/Interface'

class Csv_DocumentLoaders implements INode {
Expand All @@ -19,7 +18,7 @@ class Csv_DocumentLoaders implements INode {
constructor() {
this.label = 'Csv File'
this.name = 'csvFile'
this.version = 2.0
this.version = 3.0
this.type = 'Document'
this.icon = 'csv.svg'
this.category = 'Document Loaders'
Expand Down Expand Up @@ -82,21 +81,11 @@ class Csv_DocumentLoaders implements INode {
]
}

async init(nodeData: INodeData, _: string, options: ICommonObject): Promise<any> {
const textSplitter = nodeData.inputs?.textSplitter as TextSplitter
getFiles(nodeData: INodeData) {
const csvFileBase64 = nodeData.inputs?.csvFile as string
const columnName = nodeData.inputs?.columnName as string
const metadata = nodeData.inputs?.metadata
const output = nodeData.outputs?.output as string
const _omitMetadataKeys = nodeData.inputs?.omitMetadataKeys as string

let omitMetadataKeys: string[] = []
if (_omitMetadataKeys) {
omitMetadataKeys = _omitMetadataKeys.split(',').map((key) => key.trim())
}

let docs: IDocument[] = []
let files: string[] = []
let fromStorage: boolean = true

if (csvFileBase64.startsWith('FILE-STORAGE::')) {
const fileName = csvFileBase64.replace('FILE-STORAGE::', '')
Expand All @@ -105,86 +94,56 @@ class Csv_DocumentLoaders implements INode {
} else {
files = [fileName]
}
const chatflowid = options.chatflowid

for (const file of files) {
if (!file) continue
const fileData = await getFileFromStorage(file, chatflowid)
const blob = new Blob([fileData])
const loader = new CSVLoader(blob, columnName.trim().length === 0 ? undefined : columnName.trim())

if (textSplitter) {
docs = await loader.load()
docs = await textSplitter.splitDocuments(docs)
} else {
docs.push(...(await loader.load()))
}
}
} else {
if (csvFileBase64.startsWith('[') && csvFileBase64.endsWith(']')) {
files = JSON.parse(csvFileBase64)
} else {
files = [csvFileBase64]
}

for (const file of files) {
if (!file) continue
const splitDataURI = file.split(',')
splitDataURI.pop()
const bf = Buffer.from(splitDataURI.pop() || '', 'base64')
const blob = new Blob([bf])
const loader = new CSVLoader(blob, columnName.trim().length === 0 ? undefined : columnName.trim())

if (textSplitter) {
docs = await loader.load()
docs = await textSplitter.splitDocuments(docs)
} else {
docs.push(...(await loader.load()))
}
}
fromStorage = false
}

if (metadata) {
const parsedMetadata = typeof metadata === 'object' ? metadata : JSON.parse(metadata)
docs = docs.map((doc) => ({
...doc,
metadata:
_omitMetadataKeys === '*'
? {
...parsedMetadata
}
: omit(
{
...doc.metadata,
...parsedMetadata
},
omitMetadataKeys
)
}))
return { files, fromStorage }
}

async getFileData(file: string, { chatflowid }: { chatflowid: string }, fromStorage?: boolean) {
if (fromStorage) {
return getFileFromStorage(file, chatflowid)
} else {
docs = docs.map((doc) => ({
...doc,
metadata:
_omitMetadataKeys === '*'
? {}
: omit(
{
...doc.metadata
},
omitMetadataKeys
)
}))
const splitDataURI = file.split(',')
splitDataURI.pop()
return Buffer.from(splitDataURI.pop() || '', 'base64')
}
}

if (output === 'document') {
return docs
} else {
let finaltext = ''
for (const doc of docs) {
finaltext += `${doc.pageContent}\n`
}
return handleEscapeCharacters(finaltext, false)
async init(nodeData: INodeData, _: string, options: ICommonObject): Promise<any> {
const textSplitter = nodeData.inputs?.textSplitter as TextSplitter
const columnName = nodeData.inputs?.columnName as string
const metadata = nodeData.inputs?.metadata
const output = nodeData.outputs?.output as string
const _omitMetadataKeys = nodeData.inputs?.omitMetadataKeys as string

let docs: IDocument[] = []

const chatflowid = options.chatflowid

const { files, fromStorage } = this.getFiles(nodeData)

for (const file of files) {
if (!file) continue

const fileData = await this.getFileData(file, { chatflowid }, fromStorage)
const blob = new Blob([fileData])
const loader = new CSVLoader(blob, columnName.trim().length === 0 ? undefined : columnName.trim())

// use spread instead of push, because it raises RangeError: Maximum call stack size exceeded when too many docs
docs = [...docs, ...(await handleDocumentLoaderDocuments(loader, textSplitter))]
}

docs = handleDocumentLoaderMetadata(docs, _omitMetadataKeys, metadata)

return handleDocumentLoaderOutput(docs, output)
}
}

Expand Down
74 changes: 74 additions & 0 deletions packages/components/nodes/documentloaders/Csv/CsvLoader.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import { TextLoader } from 'langchain/document_loaders/fs/text'
import Papa from 'papaparse'

type CSVLoaderOptions = {
// Return specifific column from key (string) or index (integer)
column?: string | number
// Force separator (default: auto detect)
separator?: string
}

/**
* A class that extends the TextLoader class. It represents a document
* loader that loads documents from a CSV file. It has a constructor that
* takes a `filePathOrBlob` parameter representing the path to the CSV
* file or a Blob object, and an optional `options` parameter of type
* `CSVLoaderOptions` or a string representing the column to use as the
* document's pageContent.
*/
export class CSVLoader extends TextLoader {
protected options: CSVLoaderOptions = {}

constructor(filePathOrBlob: ConstructorParameters<typeof TextLoader>[0], options?: CSVLoaderOptions | string) {
super(filePathOrBlob)

if (typeof options === 'string') {
this.options = { column: options }
} else {
this.options = options ?? this.options
}
}
/**
* A protected method that parses the raw CSV data and returns an array of
* strings representing the pageContent of each document. It uses the
* `papaparse` to parse the CSV data. If
* the `column` option is specified, it checks if the column exists in the
* CSV file and returns the values of that column as the pageContent. If
* the `column` option is not specified, it converts each row of the CSV
* data into key/value pairs and joins them with newline characters.
* @param raw The raw CSV data to be parsed.
* @returns An array of strings representing the pageContent of each document.
*/
async parse(raw: string): Promise<string[]> {
const { column, separator } = this.options

const {
data: parsed,
meta: { fields = [] }
} = Papa.parse<{ [K: string]: string }>(raw.trim(), {
delimiter: separator,
header: true
})

if (column !== undefined) {
if (!fields.length) {
throw new Error(`Unable to resolve fields from header.`)
}

let searchIdx = column

if (typeof column == 'number') {
searchIdx = fields[column]
}

if (!fields.includes(searchIdx as string)) {
throw new Error(`Column ${column} not found in CSV file.`)
}

// Note TextLoader will raise an exception if the value is null.
return parsed.map((row) => row[searchIdx])
}

return parsed.map((row) => fields.map((key) => `${key.trim() || '_0'}: ${row[key]?.trim()}`).join('\n'))
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
import { omit } from 'lodash'
import { ICommonObject, INode, INodeData, INodeOptionsValue, INodeOutputsValue, INodeParams } from '../../../src/Interface'
import { getCredentialData, getCredentialParam, handleEscapeCharacters } from '../../../src/utils'
import {
getCredentialData,
getCredentialParam,
handleDocumentLoaderDocuments,
handleDocumentLoaderMetadata,
handleDocumentLoaderOutput
} from '../../../src/utils'
import { S3Client, GetObjectCommand, S3ClientConfig, ListObjectsV2Command, ListObjectsV2Output } from '@aws-sdk/client-s3'
import { getRegions, MODEL_TYPE } from '../../../src/modelLoader'
import { Readable } from 'node:stream'
Expand All @@ -10,12 +15,13 @@ import * as os from 'node:os'

import { DirectoryLoader } from 'langchain/document_loaders/fs/directory'
import { JSONLoader } from 'langchain/document_loaders/fs/json'
import { CSVLoader } from '@langchain/community/document_loaders/fs/csv'
import { PDFLoader } from '@langchain/community/document_loaders/fs/pdf'
import { DocxLoader } from '@langchain/community/document_loaders/fs/docx'
import { TextLoader } from 'langchain/document_loaders/fs/text'
import { TextSplitter } from 'langchain/text_splitter'

import { CSVLoader } from '../Csv/CsvLoader'

class S3_DocumentLoaders implements INode {
label: string
name: string
Expand Down Expand Up @@ -151,11 +157,6 @@ class S3_DocumentLoaders implements INode {
const _omitMetadataKeys = nodeData.inputs?.omitMetadataKeys as string
const output = nodeData.outputs?.output as string

let omitMetadataKeys: string[] = []
if (_omitMetadataKeys) {
omitMetadataKeys = _omitMetadataKeys.split(',').map((key) => key.trim())
}

let credentials: S3ClientConfig['credentials'] | undefined

if (nodeData.credential) {
Expand Down Expand Up @@ -241,11 +242,11 @@ class S3_DocumentLoaders implements INode {
'.csv': (path) => new CSVLoader(path),
'.docx': (path) => new DocxLoader(path),
'.pdf': (path) =>
pdfUsage === 'perFile'
? // @ts-ignore
new PDFLoader(path, { splitPages: false, pdfjs: () => import('pdf-parse/lib/pdf.js/v1.10.100/build/pdf.js') })
: // @ts-ignore
new PDFLoader(path, { pdfjs: () => import('pdf-parse/lib/pdf.js/v1.10.100/build/pdf.js') }),
new PDFLoader(path, {
splitPages: pdfUsage !== 'perFile',
// @ts-ignore
pdfjs: () => import('pdf-parse/lib/pdf.js/v1.10.100/build/pdf.js')
}),
'.aspx': (path) => new TextLoader(path),
'.asp': (path) => new TextLoader(path),
'.cpp': (path) => new TextLoader(path), // C++
Expand Down Expand Up @@ -284,63 +285,16 @@ class S3_DocumentLoaders implements INode {
true
)

let docs = []

if (textSplitter) {
let splittedDocs = await loader.load()
splittedDocs = await textSplitter.splitDocuments(splittedDocs)
docs.push(...splittedDocs)
} else {
docs = await loader.load()
}

if (metadata) {
const parsedMetadata = typeof metadata === 'object' ? metadata : JSON.parse(metadata)
docs = docs.map((doc) => ({
...doc,
metadata:
_omitMetadataKeys === '*'
? {
...parsedMetadata
}
: omit(
{
...doc.metadata,
...parsedMetadata
},
omitMetadataKeys
)
}))
} else {
docs = docs.map((doc) => ({
...doc,
metadata:
_omitMetadataKeys === '*'
? {}
: omit(
{
...doc.metadata
},
omitMetadataKeys
)
}))
}
let docs = await handleDocumentLoaderDocuments(loader, textSplitter)

// remove the temp directory before returning docs
fsDefault.rmSync(tempDir, { recursive: true })
docs = handleDocumentLoaderMetadata(docs, _omitMetadataKeys, metadata)

if (output === 'document') {
return docs
} else {
let finaltext = ''
for (const doc of docs) {
finaltext += `${doc.pageContent}\n`
}
return handleEscapeCharacters(finaltext, false)
}
return handleDocumentLoaderOutput(docs, output)
} catch (e: any) {
fsDefault.rmSync(tempDir, { recursive: true })
throw new Error(`Failed to load data from bucket ${bucketName}: ${e.message}`)
} finally {
// remove the temp directory before returning docs
fsDefault.rmSync(tempDir, { recursive: true })
}
}
}
Expand Down
Loading
Loading