Skip to content

Commit

Permalink
Merge pull request #287 from OpenFn/extend-parseCsv
Browse files Browse the repository at this point in the history
Extend `parseCsv` to `http` and `bigquery`
  • Loading branch information
josephjclark authored Jul 14, 2023
2 parents 7034947 + 5781e97 commit f327c9c
Show file tree
Hide file tree
Showing 11 changed files with 564 additions and 258 deletions.
6 changes: 6 additions & 0 deletions .changeset/odd-geese-joke.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
'@openfn/language-bigquery': major
'@openfn/language-http': major
---

use parseCsv from common
6 changes: 6 additions & 0 deletions .changeset/yellow-pets-march.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
'@openfn/language-common': patch
---

- update parseCsv to await callback
- Added documentation for splitKeys
127 changes: 74 additions & 53 deletions packages/bigquery/ast.json
Original file line number Diff line number Diff line change
Expand Up @@ -92,59 +92,6 @@
]
},
"valid": true
},
{
"name": "parseCSV",
"params": [
"target",
"config"
],
"docs": {
"description": "CSV-Parse for CSV conversion to JSON",
"tags": [
{
"title": "public",
"description": null,
"type": null
},
{
"title": "example",
"description": "parseCSV(\"/home/user/someData.csv\", {\n\t quoteChar: '\"',\n\t header: false,\n\t});"
},
{
"title": "function",
"description": null,
"name": null
},
{
"title": "param",
"description": "string or local file with CSV data",
"type": {
"type": "NameExpression",
"name": "String"
},
"name": "target"
},
{
"title": "param",
"description": "csv-parse config object",
"type": {
"type": "NameExpression",
"name": "Object"
},
"name": "config"
},
{
"title": "returns",
"description": null,
"type": {
"type": "NameExpression",
"name": "Operation"
}
}
]
},
"valid": true
}
],
"exports": [],
Expand Down Expand Up @@ -565,6 +512,80 @@
]
},
"valid": true
},
{
"name": "parseCsv",
"params": [
"csvData",
"parsingOptions",
"callback"
],
"docs": {
"description": "The function `parseCsv` takes a CSV file string or stream and parsing options as input, and returns a promise that\nresolves to the parsed CSV data.\nOptions for `parsingOptions` include:\n- `delimiter` {string/Buffer/[string/Buffer]} - Defines the character(s) used to delimitate the fields inside a record. Default: `','`\n- `quote` {string/Buffer/[string/Buffer]} - Defines the characters used to surround a field. Default: `'\"'`\n- `escape` {Buffer/string/null/boolean} - Set the escape character as one character/byte only. Default: `\"`\n- `columns` {boolean / array / function} - Generates record in the form of object literals. Default: `true`\n- `bom` {boolean} - Strips the {@link https://en.wikipedia.org/wiki/Byte_order_mark byte order mark (BOM)} from the input string or buffer. Default: `true`\n- `trim` {boolean} - Ignore whitespace characters immediately around the `delimiter`. Default: `true`\n- `ltrim` {boolean} - Ignore whitespace characters from the left side of a CSV field. Default: `true`\n- `rtrim` {boolean} - Ignore whitespace characters from the right side of a CSV field. Default: `true`\n- `chunkSize` {number} - The size of each chunk of CSV data. Default: `Infinity`\n- `skip_empty_lines` {boolean} - The `skip_empty_lines` skips any line which is empty. Default: `true`",
"tags": [
{
"title": "public",
"description": null,
"type": null
},
{
"title": "function",
"description": null,
"name": null
},
{
"title": "param",
"description": "A CSV string or a readable stream",
"type": {
"type": "UnionType",
"elements": [
{
"type": "NameExpression",
"name": "String"
},
{
"type": "NameExpression",
"name": "Stream"
}
]
},
"name": "csvData"
},
{
"title": "param",
"description": "Optional. Parsing options for converting CSV to JSON. \\n",
"type": {
"type": "OptionalType",
"expression": {
"type": "NameExpression",
"name": "Object"
}
},
"name": "parsingOptions"
},
{
"title": "param",
"description": "(Optional) callback function. If used it will be called state and an array of rows.",
"type": {
"type": "OptionalType",
"expression": {
"type": "NameExpression",
"name": "function"
}
},
"name": "callback"
},
{
"title": "returns",
"description": "The function returns a Promise that resolves to the result of parsing a CSV `stringOrStream`.",
"type": {
"type": "NameExpression",
"name": "Operation"
}
}
]
},
"valid": true
}
]
}
48 changes: 1 addition & 47 deletions packages/bigquery/src/Adaptor.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import {
composeNextState,
} from '@openfn/language-common';
import fs from 'fs';
import parse from 'csv-parse';
import { BigQuery } from '@google-cloud/bigquery';

/**
Expand Down Expand Up @@ -120,52 +119,6 @@ export function load(
};
}

/**
* CSV-Parse for CSV conversion to JSON
* @public
* @example
* parseCSV("/home/user/someData.csv", {
* quoteChar: '"',
* header: false,
* });
* @function
* @param {String} target - string or local file with CSV data
* @param {Object} config - csv-parse config object
* @returns {Operation}
*/
export function parseCSV(target, config) {
return state => {
return new Promise(resolve => {
var csvData = [];

try {
fs.readFileSync(target);
fs.createReadStream(target)
.pipe(parse(config))
.on('data', csvrow => {
console.log(csvrow);
csvData.push(csvrow);
})
.on('end', () => {
console.log(csvData);
resolve(composeNextState(state, csvData));
});
} catch (err) {
var csvString;
if (typeof target === 'string') {
csvString = target;
} else {
csvString = expandReferences(target)(state);
}
csvData = parse(csvString, config, (err, output) => {
console.log(output);
resolve(composeNextState(state, output));
});
}
});
};
}

export {
alterState,
dataPath,
Expand All @@ -178,4 +131,5 @@ export {
lastReferenceValue,
merge,
sourceValue,
parseCsv,
} from '@openfn/language-common';
141 changes: 141 additions & 0 deletions packages/common/ast.json
Original file line number Diff line number Diff line change
Expand Up @@ -1065,6 +1065,73 @@
},
"valid": true
},
{
"name": "splitKeys",
"params": [
"obj",
"keys"
],
"docs": {
"description": "Splits an object into two objects based on a list of keys.\nThe first object contains the keys that are not in the list,\nand the second contains the keys that are.",
"tags": [
{
"title": "public",
"description": null,
"type": null
},
{
"title": "function",
"description": null,
"name": null
},
{
"title": "param",
"description": "The object to split.",
"type": {
"type": "NameExpression",
"name": "Object"
},
"name": "obj"
},
{
"title": "param",
"description": "List of keys to split on.",
"type": {
"type": "TypeApplication",
"expression": {
"type": "NameExpression",
"name": "Array"
},
"applications": [
{
"type": "NameExpression",
"name": "string"
}
]
},
"name": "keys"
},
{
"title": "returns",
"description": "Tuple of objects, first object contains keys not in list, second contains keys that are.",
"type": {
"type": "TypeApplication",
"expression": {
"type": "NameExpression",
"name": "Array"
},
"applications": [
{
"type": "NameExpression",
"name": "Object"
}
]
}
}
]
},
"valid": true
},
{
"name": "scrubEmojis",
"params": [
Expand Down Expand Up @@ -1171,6 +1238,80 @@
},
"valid": true
},
{
"name": "parseCsv",
"params": [
"csvData",
"parsingOptions",
"callback"
],
"docs": {
"description": "The function `parseCsv` takes a CSV file string or stream and parsing options as input, and returns a promise that\nresolves to the parsed CSV data.\nOptions for `parsingOptions` include:\n- `delimiter` {string/Buffer/[string/Buffer]} - Defines the character(s) used to delimitate the fields inside a record. Default: `','`\n- `quote` {string/Buffer/[string/Buffer]} - Defines the characters used to surround a field. Default: `'\"'`\n- `escape` {Buffer/string/null/boolean} - Set the escape character as one character/byte only. Default: `\"`\n- `columns` {boolean / array / function} - Generates record in the form of object literals. Default: `true`\n- `bom` {boolean} - Strips the {@link https://en.wikipedia.org/wiki/Byte_order_mark byte order mark (BOM)} from the input string or buffer. Default: `true`\n- `trim` {boolean} - Ignore whitespace characters immediately around the `delimiter`. Default: `true`\n- `ltrim` {boolean} - Ignore whitespace characters from the left side of a CSV field. Default: `true`\n- `rtrim` {boolean} - Ignore whitespace characters from the right side of a CSV field. Default: `true`\n- `chunkSize` {number} - The size of each chunk of CSV data. Default: `Infinity`\n- `skip_empty_lines` {boolean} - The `skip_empty_lines` skips any line which is empty. Default: `true`",
"tags": [
{
"title": "public",
"description": null,
"type": null
},
{
"title": "function",
"description": null,
"name": null
},
{
"title": "param",
"description": "A CSV string or a readable stream",
"type": {
"type": "UnionType",
"elements": [
{
"type": "NameExpression",
"name": "String"
},
{
"type": "NameExpression",
"name": "Stream"
}
]
},
"name": "csvData"
},
{
"title": "param",
"description": "Optional. Parsing options for converting CSV to JSON. \\n",
"type": {
"type": "OptionalType",
"expression": {
"type": "NameExpression",
"name": "Object"
}
},
"name": "parsingOptions"
},
{
"title": "param",
"description": "(Optional) callback function. If used it will be called state and an array of rows.",
"type": {
"type": "OptionalType",
"expression": {
"type": "NameExpression",
"name": "function"
}
},
"name": "callback"
},
{
"title": "returns",
"description": "The function returns a Promise that resolves to the result of parsing a CSV `stringOrStream`.",
"type": {
"type": "NameExpression",
"name": "Operation"
}
}
]
},
"valid": true
},
{
"name": "map",
"params": [
Expand Down
Loading

0 comments on commit f327c9c

Please sign in to comment.