From cf7ff0c352249d4128c869b4171517701add15c5 Mon Sep 17 00:00:00 2001 From: Akash Date: Sat, 23 Nov 2024 22:46:38 +0530 Subject: [PATCH] feat : Added Parser and Lexer --- Readme.MD | 40 ++++++++-------- config.yaml | 14 ++++-- config/config.go | 99 ++++++++++++++++++++++++++++++++++----- integrations/csv.go | 94 ++++++++++++++++++++++++++++++++----- interfaces/interfaces.go | 7 ++- language/lexer.go | 79 +++++++++++++++++++++++++++++++ language/parser.go | 92 ++++++++++++++++++++++++++++++++++++ test.csv | Bin 0 -> 199 bytes 8 files changed, 375 insertions(+), 50 deletions(-) create mode 100644 language/lexer.go create mode 100644 language/parser.go create mode 100644 test.csv diff --git a/Readme.MD b/Readme.MD index 05da785..522281a 100644 --- a/Readme.MD +++ b/Readme.MD @@ -155,26 +155,28 @@ Depending on the data source, fields can be named using: The rules can be embedded into the YAML configuration for pipelines: ```yaml -pipeline: - - name: "CSV to Firebase Pipeline" - input: - type: "CSV" - source: "/data/input.csv" - validation: - rules: | - FIELD("age") TYPE(INT) RANGE(18, 65) - FIELD("email") MATCHES(EMAIL_REGEX) - transformation: - rules: | - RENAME("old_field", "new_field") - IF FIELD("age") > 50 THEN ADD_FIELD("senior_discount", TRUE) - output: - type: "Firebase" - destination: "firebase://project_name/collection" - error_handling: | - ON_ERROR(LOG_AND_CONTINUE) +inputconfig: + url: wss://echo.websocket.org +inputmethod: WebSocket + +outputconfig: + url: wss://echo.websocket.org +outputmethod: WebSocket + +validation: + rules: | + FIELD("message") REQUIRED + FIELD("timestamp") TYPE(INT) RANGE(1609459200, 1704067200) + +transformation: + rules: | + ADD_FIELD("processed_at", CURRENT_TIME()) + IF FIELD("message") MATCHES("^hello") THEN RENAME("message", "greeting") + MAP("status", {"0": "inactive", "1": "active"}) + +error_handling: + on_error: LOG_AND_CONTINUE ``` - --- # Adding a New Integration diff --git a/config.yaml b/config.yaml index d9a854d..180b2df 100644 --- a/config.yaml +++ b/config.yaml @@ -1,6 +1,12 @@ +errorhandling: + strategy: LOG_AND_CONTINUE inputconfig: - url: wss://echo.websocket.org -inputmethod: WebSocket + csvsourcefilename: sample.csv +inputmethod: CSV outputconfig: - url: wss://echo.websocket.org -outputmethod: WebSocket + csvdestinationfilename: test.csv +outputmethod: CSV +transformations: | + ADD_FIELD("processed_at", CURRENT_TIME()) +validations: | + FIELD("age") TYPE(INT) RANGE(18, 65) diff --git a/config/config.go b/config/config.go index f94d5ee..40a3ad8 100644 --- a/config/config.go +++ b/config/config.go @@ -10,6 +10,29 @@ import ( "github.com/spf13/viper" ) +// Config represents the entire configuration structure +type Config struct { + InputMethod string `yaml:"inputMethod"` + OutputMethod string `yaml:"outputMethod"` + InputConfig map[string]interface{} `yaml:"inputconfig"` + OutputConfig map[string]interface{} `yaml:"outputconfig"` + Validations []string `yaml:"validations"` + Transformations []string `yaml:"transformations"` + ErrorHandling ErrorHandling `yaml:"errorhandling"` +} + +// ErrorHandling represents the error handling configuration +type ErrorHandling struct { + Strategy string `yaml:"strategy"` + QuarantineOutput QuarantineOutput `yaml:"quarantineoutput"` +} + +// QuarantineOutput represents the quarantine output configuration +type QuarantineOutput struct { + Type string `yaml:"type"` + Location string `yaml:"location"` +} + // AskForMode prompts the user to select between starting the HTTP server or using the CLI func AskForMode() (string, error) { modePrompt := promptui.Select{ @@ -33,10 +56,13 @@ func LoadConfig(configFile string) (map[string]interface{}, error) { } config := map[string]interface{}{ - "inputMethod": viper.GetString("inputMethod"), - "outputMethod": viper.GetString("outputMethod"), - "inputconfig": viper.GetStringMap("inputconfig"), - "outputconfig": viper.GetStringMap("outputconfig"), + "inputMethod": viper.GetString("inputMethod"), + "outputMethod": viper.GetString("outputMethod"), + "inputconfig": viper.GetStringMap("inputconfig"), + "outputconfig": viper.GetStringMap("outputconfig"), + "errorhandling": viper.GetStringMap("errorhandling"), + "validations": viper.GetStringMap("validations"), + "transformations": viper.GetStringMap("transformations"), } return config, nil @@ -81,19 +107,34 @@ func SetupConfigInteractively() (map[string]interface{}, error) { return nil, fmt.Errorf("failed to get fields for output method: %w", err) } + // Read validations and transformations + validations, err := readRules("validations") + if err != nil { + return nil, fmt.Errorf("failed to read validation rules: %w", err) + } + transformations, err := readRules("transformations") + if err != nil { + return nil, fmt.Errorf("failed to read transformation rules: %w", err) + } + + // Read error handling + errorhandling, err := readErrorHandlingConfig() + if err != nil { + return nil, fmt.Errorf("failed to read error handling configuration: %w", err) + } + // Combine all configurations config := map[string]interface{}{ - "inputMethod": inputMethod, - "outputMethod": outputMethod, - "inputconfig": inputconfig, - "outputconfig": outputconfig, + "inputMethod": inputMethod, + "outputMethod": outputMethod, + "inputconfig": inputconfig, + "outputconfig": outputconfig, + "validations": validations, + "transformations": transformations, + "errorhandling": errorhandling, } - //TODO : FIX THIS BUG OF MISSING INPUT CONFIG IN CONFIGURATION saveConfig(config) - //wait for 2 - // time.Sleep(5 * time.Second) - return config, nil } @@ -144,6 +185,40 @@ func readIntegrationFields(method string, isSource bool) (map[string]interface{} return config, nil } +// readRules reads validation or transformation rules interactively +func readRules(ruleType string) (string, error) { + prompt := promptui.Prompt{ + Label: fmt.Sprintf("Enter %s rules (multiline, finish with empty line):", ruleType), + } + rules := "" + for { + line, err := prompt.Run() + if err != nil { + return "", fmt.Errorf("failed to read %s: %w", ruleType, err) + } + if line == "" { + break + } + rules += line + "\n" + } + return rules, nil +} + +// readErrorHandlingConfig prompts for error handling strategy and quarantine details +func readErrorHandlingConfig() (map[string]interface{}, error) { + prompt := promptui.Prompt{ + Label: "Enter Error Handling Strategy (e.g., LOG_AND_CONTINUE, STOP_ON_ERROR):", + } + strategy, err := prompt.Run() + if err != nil { + return nil, fmt.Errorf("failed to read error handling strategy: %w", err) + } + + return map[string]interface{}{ + "strategy": strategy, + }, nil +} + // saveConfig writes the configuration to a config.yaml file func saveConfig(config map[string]interface{}) { for key, value := range config { diff --git a/integrations/csv.go b/integrations/csv.go index a558c02..137d3cb 100644 --- a/integrations/csv.go +++ b/integrations/csv.go @@ -3,10 +3,12 @@ package integrations import ( "encoding/csv" "errors" + "fmt" "os" "strings" "github.com/SkySingh04/fractal/interfaces" + "github.com/SkySingh04/fractal/language" "github.com/SkySingh04/fractal/logger" "github.com/SkySingh04/fractal/registry" ) @@ -34,6 +36,7 @@ func ReadCSV(fileName string) ([]byte, error) { return data, nil } +// WriteCSV writes data to a CSV file. func WriteCSV(fileName string, data []byte) error { file, err := os.Create(fileName) if err != nil { @@ -42,7 +45,7 @@ func WriteCSV(fileName string, data []byte) error { defer file.Close() writer := csv.NewWriter(file) - records := strings.Split(string(data), "\n") + records := strings.Split(strings.TrimSpace(string(data)), "\n") for _, record := range records { fields := strings.Split(record, ",") err := writer.Write(fields) @@ -79,18 +82,17 @@ func (r CSVSource) FetchData(req interfaces.Request) (interface{}, error) { } // Validate the data - validatedData, err := validateCSVData(data) + validatedData, err := validateCSVData(data, req.ValidationRules) if err != nil { return nil, err } // Transform the data - transformedData, err := transformCSVData(validatedData) + transformedData, err := transformCSVData(validatedData, req.TransformationRules) if err != nil { return nil, err } return transformedData, nil - } // SendData connects to CSV and publishes data to the specified queue. @@ -115,24 +117,90 @@ func init() { registry.RegisterDestination("CSV", CSVDestination{}) } -// validateCSVData ensures the input data meets the required criteria. -func validateCSVData(data []byte) ([]byte, error) { +// validateCSVData ensures the input data meets the required criteria using validation rules. +func validateCSVData(data []byte, validationRules string) ([]byte, error) { + logger.Infof("Validating data: %s", data) - // Example: Check if data is non-empty if len(data) == 0 { return nil, errors.New("data is empty") } - // Add custom validation logic here + // Initialize lexer and tokenize the validation rules + lexer := language.NewLexer(validationRules) + tokens, err := lexer.Tokenize() + if err != nil { + return nil, fmt.Errorf("failed to tokenize validation rules: %v", err) + } + + // Parse the tokens into an AST + parser := language.NewParser(tokens) + rulesAST, err := parser.ParseRules() + if err != nil { + return nil, fmt.Errorf("failed to parse validation rules: %v", err) + } + + // Apply validation rules to data + records := strings.Split(strings.TrimSpace(string(data)), "\n") + for _, record := range records { + for _, ruleNode := range rulesAST.Children { + err := applyValidationRule(record, ruleNode) + if err != nil { + return nil, err // Return the first validation error encountered + } + } + } + return data, nil + } -// transformCSVData modifies the input data as per business logic. -func transformCSVData(data []byte) ([]byte, error) { +// transformCSVData modifies the input data as per business logic using transformation rules. +func transformCSVData(data []byte, transformationRules string) ([]byte, error) { logger.Infof("Transforming data: %s", data) - // Example: Convert data to uppercase (modify as needed) - transformed := []byte(strings.ToUpper(string(data))) - return transformed, nil + // Initialize lexer and tokenize the transformation rules + lexer := language.NewLexer(transformationRules) + tokens, err := lexer.Tokenize() + if err != nil { + return nil, fmt.Errorf("failed to tokenize transformation rules: %v", err) + } + + // Parse the tokens into an AST + parser := language.NewParser(tokens) + rulesAST, err := parser.ParseRules() + if err != nil { + return nil, fmt.Errorf("failed to parse transformation rules: %v", err) + } + + // Apply transformation rules to data + var transformedRecords []string + records := strings.Split(strings.TrimSpace(string(data)), "\n") + for _, record := range records { + for _, ruleNode := range rulesAST.Children { + transformedRecord, err := applyTransformationRule(record, ruleNode) + if err != nil { + return nil, err + } + record = transformedRecord // Apply each rule sequentially + } + transformedRecords = append(transformedRecords, record) + } + + return []byte(strings.Join(transformedRecords, "\n")), nil +} + +// applyValidationRule processes a single record against a validation rule AST node. +func applyValidationRule(record string, ruleNode *language.Node) error { + // Implementation details based on your business rules + // Validate the record using the information from ruleNode + // Example: Check if a specific field meets the condition + return nil // Replace with actual validation logic +} + +// applyTransformationRule processes a single record against a transformation rule AST node. +func applyTransformationRule(record string, ruleNode *language.Node) (string, error) { + // Implementation details based on your business logic + // Transform the record using the information from ruleNode + return record, nil // Replace with actual transformation logic } diff --git a/interfaces/interfaces.go b/interfaces/interfaces.go index 3dd09e5..a49e60d 100644 --- a/interfaces/interfaces.go +++ b/interfaces/interfaces.go @@ -10,8 +10,11 @@ type DataDestination interface { // Request struct to hold migration request data type Request struct { - Input string `json:"input"` // List of input types (Kafka, SQL, MongoDB, etc.) - Output string `json:"output"` // List of output types (CSV, MongoDB, etc.) + Input string `json:"input"` // List of input types (Kafka, SQL, MongoDB, etc.) + Output string `json:"output"` // List of output types (CSV, MongoDB, etc.) + ValidationRules string `json:"validation_rules"` // Validation rules + TransformationRules string `json:"transformation_rules"` + ErrorHandling string `json:"error_handling"` ConsumerURL string `json:"consumer_url"` // URL for Kafka ConsumerTopic string `json:"consumer_topic"` // Topic for Kafka ProducerURL string `json:"producer_url"` diff --git a/language/lexer.go b/language/lexer.go new file mode 100644 index 0000000..2187361 --- /dev/null +++ b/language/lexer.go @@ -0,0 +1,79 @@ +package language + +import ( + "fmt" + "regexp" + "strings" +) + +// TokenType represents the type of a token +type TokenType string + +const ( + TokenField TokenType = "FIELD" + TokenCondition TokenType = "CONDITION" + TokenOperator TokenType = "OPERATOR" + TokenValue TokenType = "VALUE" + TokenLogical TokenType = "LOGICAL" + TokenSeparator TokenType = "SEPARATOR" + TokenTransform TokenType = "TRANSFORM" + TokenInvalid TokenType = "INVALID" +) + +// Token represents a single token +type Token struct { + Type TokenType + Value string +} + +// Lexer for parsing rules +type Lexer struct { + input string + pos int +} + +// NewLexer initializes a lexer with the input string +func NewLexer(input string) *Lexer { + return &Lexer{ + input: strings.TrimSpace(input), + pos: 0, + } +} + +// Tokenize splits the input into tokens +func (l *Lexer) Tokenize() ([]Token, error) { + var tokens []Token + patterns := map[TokenType]*regexp.Regexp{ + TokenField: regexp.MustCompile(`^[a-zA-Z0-9_\.]+`), // Matches field names + TokenCondition: regexp.MustCompile(`^(==|!=|>=|<=|>|<)`), // Matches conditions + TokenOperator: regexp.MustCompile(`^(->|=>)`), // Matches transformation operators + TokenValue: regexp.MustCompile(`^"([^"]*)"|'([^']*)'|\d+`), // Matches strings or numbers + TokenLogical: regexp.MustCompile(`^(AND|OR|NOT)`), // Matches logical operators + TokenSeparator: regexp.MustCompile(`^,`), // Matches separators + TokenTransform: regexp.MustCompile(`^TRANSFORM\(`), // Matches the transform keyword + } + + for l.pos < len(l.input) { + // Skip whitespace + l.input = strings.TrimSpace(l.input[l.pos:]) + l.pos = 0 + + matched := false + for tokenType, pattern := range patterns { + loc := pattern.FindStringIndex(l.input) + if loc != nil && loc[0] == 0 { + value := l.input[loc[0]:loc[1]] + tokens = append(tokens, Token{Type: tokenType, Value: value}) + l.pos += len(value) + matched = true + break + } + } + + if !matched { + return nil, fmt.Errorf("unexpected token at: %s", l.input) + } + } + + return tokens, nil +} diff --git a/language/parser.go b/language/parser.go new file mode 100644 index 0000000..31952c0 --- /dev/null +++ b/language/parser.go @@ -0,0 +1,92 @@ +package language + +import ( + "errors" + "fmt" +) + +// Node represents a node in the Abstract Syntax Tree (AST) +type Node struct { + Type TokenType + Value string + Children []*Node +} + +// Parser for validation and transformation rules +type Parser struct { + tokens []Token + pos int +} + +// NewParser initializes a parser with tokens +func NewParser(tokens []Token) *Parser { + return &Parser{ + tokens: tokens, + pos: 0, + } +} + +// ParseRules parses the tokens into an AST +func (p *Parser) ParseRules() (*Node, error) { + root := &Node{Type: "ROOT", Children: []*Node{}} + + for p.pos < len(p.tokens) { + node, err := p.parseExpression() + if err != nil { + return nil, err + } + root.Children = append(root.Children, node) + } + + return root, nil +} + +func (p *Parser) parseExpression() (*Node, error) { + // Example rule: FIELD CONDITION VALUE [LOGICAL FIELD CONDITION VALUE] + field := p.consume(TokenField) + if field == nil { + return nil, errors.New("expected field") + } + + condition := p.consume(TokenCondition) + if condition == nil { + return nil, fmt.Errorf("expected condition after field %s", field.Value) + } + + value := p.consume(TokenValue) + if value == nil { + return nil, fmt.Errorf("expected value after condition %s", condition.Value) + } + + node := &Node{Type: "EXPRESSION", Children: []*Node{ + {Type: field.Type, Value: field.Value}, + {Type: condition.Type, Value: condition.Value}, + {Type: value.Type, Value: value.Value}, + }} + + // Check for logical operators (AND, OR, NOT) + logical := p.consume(TokenLogical) + if logical != nil { + rightExpr, err := p.parseExpression() + if err != nil { + return nil, err + } + node.Children = append(node.Children, &Node{ + Type: logical.Type, + Value: logical.Value, + Children: []*Node{rightExpr}, + }) + } + + return node, nil +} + +// consume retrieves the next token if it matches the expected type +func (p *Parser) consume(expected TokenType) *Token { + if p.pos < len(p.tokens) && p.tokens[p.pos].Type == expected { + token := p.tokens[p.pos] + p.pos++ + return &token + } + return nil +} diff --git a/test.csv b/test.csv new file mode 100644 index 0000000000000000000000000000000000000000..e6290072d93fdb0aa7300d74d351b44bc72ba96d GIT binary patch literal 199 zcmX|*%?dzJ6okLE@)Gw6L>>TJ*;^tZe}$5l$GJN0?wgtBoH5T`P%+cuV?0Ab=Qv~F z;Bv_%x^Zblph+R#sa&$MF{(;M)yg5UQZDYUoUnW!C%N%^8NF=Uu>byDk!#!3?x>`< Hepm2bWON&D literal 0 HcmV?d00001