Skip to content

Commit

Permalink
feat : Added Parser and Lexer
Browse files Browse the repository at this point in the history
  • Loading branch information
SkySingh04 committed Nov 23, 2024
1 parent 7d34dd0 commit cf7ff0c
Show file tree
Hide file tree
Showing 8 changed files with 375 additions and 50 deletions.
40 changes: 21 additions & 19 deletions Readme.MD
Original file line number Diff line number Diff line change
Expand Up @@ -155,26 +155,28 @@ Depending on the data source, fields can be named using:
The rules can be embedded into the YAML configuration for pipelines:

```yaml
pipeline:
- name: "CSV to Firebase Pipeline"
input:
type: "CSV"
source: "/data/input.csv"
validation:
rules: |
FIELD("age") TYPE(INT) RANGE(18, 65)
FIELD("email") MATCHES(EMAIL_REGEX)
transformation:
rules: |
RENAME("old_field", "new_field")
IF FIELD("age") > 50 THEN ADD_FIELD("senior_discount", TRUE)
output:
type: "Firebase"
destination: "firebase://project_name/collection"
error_handling: |
ON_ERROR(LOG_AND_CONTINUE)
inputconfig:
url: wss://echo.websocket.org
inputmethod: WebSocket

outputconfig:
url: wss://echo.websocket.org
outputmethod: WebSocket

validation:
rules: |
FIELD("message") REQUIRED
FIELD("timestamp") TYPE(INT) RANGE(1609459200, 1704067200)
transformation:
rules: |
ADD_FIELD("processed_at", CURRENT_TIME())
IF FIELD("message") MATCHES("^hello") THEN RENAME("message", "greeting")
MAP("status", {"0": "inactive", "1": "active"})
error_handling:
on_error: LOG_AND_CONTINUE
```
---
# Adding a New Integration
Expand Down
14 changes: 10 additions & 4 deletions config.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
errorhandling:
strategy: LOG_AND_CONTINUE
inputconfig:
url: wss://echo.websocket.org
inputmethod: WebSocket
csvsourcefilename: sample.csv
inputmethod: CSV
outputconfig:
url: wss://echo.websocket.org
outputmethod: WebSocket
csvdestinationfilename: test.csv
outputmethod: CSV
transformations: |
ADD_FIELD("processed_at", CURRENT_TIME())
validations: |
FIELD("age") TYPE(INT) RANGE(18, 65)
99 changes: 87 additions & 12 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,29 @@ import (
"github.com/spf13/viper"
)

// Config represents the entire configuration structure
type Config struct {
InputMethod string `yaml:"inputMethod"`
OutputMethod string `yaml:"outputMethod"`
InputConfig map[string]interface{} `yaml:"inputconfig"`
OutputConfig map[string]interface{} `yaml:"outputconfig"`
Validations []string `yaml:"validations"`
Transformations []string `yaml:"transformations"`
ErrorHandling ErrorHandling `yaml:"errorhandling"`
}

// ErrorHandling represents the error handling configuration
type ErrorHandling struct {
Strategy string `yaml:"strategy"`
QuarantineOutput QuarantineOutput `yaml:"quarantineoutput"`
}

// QuarantineOutput represents the quarantine output configuration
type QuarantineOutput struct {
Type string `yaml:"type"`
Location string `yaml:"location"`
}

// AskForMode prompts the user to select between starting the HTTP server or using the CLI
func AskForMode() (string, error) {
modePrompt := promptui.Select{
Expand All @@ -33,10 +56,13 @@ func LoadConfig(configFile string) (map[string]interface{}, error) {
}

config := map[string]interface{}{
"inputMethod": viper.GetString("inputMethod"),
"outputMethod": viper.GetString("outputMethod"),
"inputconfig": viper.GetStringMap("inputconfig"),
"outputconfig": viper.GetStringMap("outputconfig"),
"inputMethod": viper.GetString("inputMethod"),
"outputMethod": viper.GetString("outputMethod"),
"inputconfig": viper.GetStringMap("inputconfig"),
"outputconfig": viper.GetStringMap("outputconfig"),
"errorhandling": viper.GetStringMap("errorhandling"),
"validations": viper.GetStringMap("validations"),
"transformations": viper.GetStringMap("transformations"),
}

return config, nil
Expand Down Expand Up @@ -81,19 +107,34 @@ func SetupConfigInteractively() (map[string]interface{}, error) {
return nil, fmt.Errorf("failed to get fields for output method: %w", err)
}

// Read validations and transformations
validations, err := readRules("validations")
if err != nil {
return nil, fmt.Errorf("failed to read validation rules: %w", err)
}
transformations, err := readRules("transformations")
if err != nil {
return nil, fmt.Errorf("failed to read transformation rules: %w", err)
}

// Read error handling
errorhandling, err := readErrorHandlingConfig()
if err != nil {
return nil, fmt.Errorf("failed to read error handling configuration: %w", err)
}

// Combine all configurations
config := map[string]interface{}{
"inputMethod": inputMethod,
"outputMethod": outputMethod,
"inputconfig": inputconfig,
"outputconfig": outputconfig,
"inputMethod": inputMethod,
"outputMethod": outputMethod,
"inputconfig": inputconfig,
"outputconfig": outputconfig,
"validations": validations,
"transformations": transformations,
"errorhandling": errorhandling,
}
//TODO : FIX THIS BUG OF MISSING INPUT CONFIG IN CONFIGURATION
saveConfig(config)

//wait for 2
// time.Sleep(5 * time.Second)

return config, nil
}

Expand Down Expand Up @@ -144,6 +185,40 @@ func readIntegrationFields(method string, isSource bool) (map[string]interface{}
return config, nil
}

// readRules reads validation or transformation rules interactively
func readRules(ruleType string) (string, error) {
prompt := promptui.Prompt{
Label: fmt.Sprintf("Enter %s rules (multiline, finish with empty line):", ruleType),
}
rules := ""
for {
line, err := prompt.Run()
if err != nil {
return "", fmt.Errorf("failed to read %s: %w", ruleType, err)
}
if line == "" {
break
}
rules += line + "\n"
}
return rules, nil
}

// readErrorHandlingConfig prompts for error handling strategy and quarantine details
func readErrorHandlingConfig() (map[string]interface{}, error) {
prompt := promptui.Prompt{
Label: "Enter Error Handling Strategy (e.g., LOG_AND_CONTINUE, STOP_ON_ERROR):",
}
strategy, err := prompt.Run()
if err != nil {
return nil, fmt.Errorf("failed to read error handling strategy: %w", err)
}

return map[string]interface{}{
"strategy": strategy,
}, nil
}

// saveConfig writes the configuration to a config.yaml file
func saveConfig(config map[string]interface{}) {
for key, value := range config {
Expand Down
94 changes: 81 additions & 13 deletions integrations/csv.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ package integrations
import (
"encoding/csv"
"errors"
"fmt"
"os"
"strings"

"github.com/SkySingh04/fractal/interfaces"
"github.com/SkySingh04/fractal/language"
"github.com/SkySingh04/fractal/logger"
"github.com/SkySingh04/fractal/registry"
)
Expand Down Expand Up @@ -34,6 +36,7 @@ func ReadCSV(fileName string) ([]byte, error) {
return data, nil
}

// WriteCSV writes data to a CSV file.
func WriteCSV(fileName string, data []byte) error {
file, err := os.Create(fileName)
if err != nil {
Expand All @@ -42,7 +45,7 @@ func WriteCSV(fileName string, data []byte) error {
defer file.Close()

writer := csv.NewWriter(file)
records := strings.Split(string(data), "\n")
records := strings.Split(strings.TrimSpace(string(data)), "\n")
for _, record := range records {
fields := strings.Split(record, ",")
err := writer.Write(fields)
Expand Down Expand Up @@ -79,18 +82,17 @@ func (r CSVSource) FetchData(req interfaces.Request) (interface{}, error) {
}

// Validate the data
validatedData, err := validateCSVData(data)
validatedData, err := validateCSVData(data, req.ValidationRules)
if err != nil {
return nil, err
}

// Transform the data
transformedData, err := transformCSVData(validatedData)
transformedData, err := transformCSVData(validatedData, req.TransformationRules)
if err != nil {
return nil, err
}
return transformedData, nil

}

// SendData connects to CSV and publishes data to the specified queue.
Expand All @@ -115,24 +117,90 @@ func init() {
registry.RegisterDestination("CSV", CSVDestination{})
}

// validateCSVData ensures the input data meets the required criteria.
func validateCSVData(data []byte) ([]byte, error) {
// validateCSVData ensures the input data meets the required criteria using validation rules.
func validateCSVData(data []byte, validationRules string) ([]byte, error) {

logger.Infof("Validating data: %s", data)

// Example: Check if data is non-empty
if len(data) == 0 {
return nil, errors.New("data is empty")
}

// Add custom validation logic here
// Initialize lexer and tokenize the validation rules
lexer := language.NewLexer(validationRules)
tokens, err := lexer.Tokenize()
if err != nil {
return nil, fmt.Errorf("failed to tokenize validation rules: %v", err)
}

// Parse the tokens into an AST
parser := language.NewParser(tokens)
rulesAST, err := parser.ParseRules()
if err != nil {
return nil, fmt.Errorf("failed to parse validation rules: %v", err)
}

// Apply validation rules to data
records := strings.Split(strings.TrimSpace(string(data)), "\n")
for _, record := range records {
for _, ruleNode := range rulesAST.Children {
err := applyValidationRule(record, ruleNode)
if err != nil {
return nil, err // Return the first validation error encountered
}
}
}

return data, nil

}

// transformCSVData modifies the input data as per business logic.
func transformCSVData(data []byte) ([]byte, error) {
// transformCSVData modifies the input data as per business logic using transformation rules.
func transformCSVData(data []byte, transformationRules string) ([]byte, error) {
logger.Infof("Transforming data: %s", data)

// Example: Convert data to uppercase (modify as needed)
transformed := []byte(strings.ToUpper(string(data)))
return transformed, nil
// Initialize lexer and tokenize the transformation rules
lexer := language.NewLexer(transformationRules)
tokens, err := lexer.Tokenize()
if err != nil {
return nil, fmt.Errorf("failed to tokenize transformation rules: %v", err)
}

// Parse the tokens into an AST
parser := language.NewParser(tokens)
rulesAST, err := parser.ParseRules()
if err != nil {
return nil, fmt.Errorf("failed to parse transformation rules: %v", err)
}

// Apply transformation rules to data
var transformedRecords []string
records := strings.Split(strings.TrimSpace(string(data)), "\n")
for _, record := range records {
for _, ruleNode := range rulesAST.Children {
transformedRecord, err := applyTransformationRule(record, ruleNode)
if err != nil {
return nil, err
}
record = transformedRecord // Apply each rule sequentially
}
transformedRecords = append(transformedRecords, record)
}

return []byte(strings.Join(transformedRecords, "\n")), nil
}

// applyValidationRule processes a single record against a validation rule AST node.
func applyValidationRule(record string, ruleNode *language.Node) error {
// Implementation details based on your business rules
// Validate the record using the information from ruleNode
// Example: Check if a specific field meets the condition
return nil // Replace with actual validation logic
}

// applyTransformationRule processes a single record against a transformation rule AST node.
func applyTransformationRule(record string, ruleNode *language.Node) (string, error) {
// Implementation details based on your business logic
// Transform the record using the information from ruleNode
return record, nil // Replace with actual transformation logic
}
7 changes: 5 additions & 2 deletions interfaces/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,11 @@ type DataDestination interface {

// Request struct to hold migration request data
type Request struct {
Input string `json:"input"` // List of input types (Kafka, SQL, MongoDB, etc.)
Output string `json:"output"` // List of output types (CSV, MongoDB, etc.)
Input string `json:"input"` // List of input types (Kafka, SQL, MongoDB, etc.)
Output string `json:"output"` // List of output types (CSV, MongoDB, etc.)
ValidationRules string `json:"validation_rules"` // Validation rules
TransformationRules string `json:"transformation_rules"`
ErrorHandling string `json:"error_handling"`
ConsumerURL string `json:"consumer_url"` // URL for Kafka
ConsumerTopic string `json:"consumer_topic"` // Topic for Kafka
ProducerURL string `json:"producer_url"`
Expand Down
Loading

0 comments on commit cf7ff0c

Please sign in to comment.