Skip to content
This repository has been archived by the owner on Feb 21, 2024. It is now read-only.

Commit

Permalink
SQL BULK INSERT (fb-1749) (#2291)
Browse files Browse the repository at this point in the history
 SQL BULK INSERT

This change is to support a BULK INSERT/REPLACE statement that adds the ability to 1) take its input from a file, url or in-line blob 2) to map from the input source to the target columns
3) to transform data (using sql expressions) before inserting
4) support csv and ndjson formats

* improving test coverage

* increase test coverage again

* refactoring for handling transformation with types other than id and int

(cherry picked from commit 8f660a5)
  • Loading branch information
paddyjok authored and Fletcher Haynes committed Nov 15, 2022
1 parent 53170ce commit e14cde7
Show file tree
Hide file tree
Showing 16 changed files with 1,910 additions and 774 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ require (
)

require (
github.com/PaesslerAG/gval v1.0.0
github.com/PaesslerAG/jsonpath v0.1.1
github.com/google/uuid v1.3.0
github.com/jaffee/commandeer v0.5.0
github.com/linkedin/goavro/v2 v2.11.1
Expand Down
5 changes: 5 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ github.com/Microsoft/go-winio v0.5.2/go.mod h1:WpS1mjBmmwHBEWmogvA2mj8546UReBk4v
github.com/NYTimes/gziphandler v0.0.0-20170623195520-56545f4a5d46/go.mod h1:3wb06e3pkSAbeQ52E9H9iFoQsEEwGN64994WTCIhntQ=
github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/PaesslerAG/gval v1.0.0 h1:GEKnRwkWDdf9dOmKcNrar9EA1bz1z9DqPIO1+iLzhd8=
github.com/PaesslerAG/gval v1.0.0/go.mod h1:y/nm5yEyTeX6av0OfKJNp9rBNj2XrGhAf5+v24IBN1I=
github.com/PaesslerAG/jsonpath v0.1.0/go.mod h1:4BzmtoM/PI8fPO4aQGIusjGxGir2BzcV0grWtFzq1Y8=
github.com/PaesslerAG/jsonpath v0.1.1 h1:c1/AToHQMVsduPAa4Vh6xp2U0evy4t8SWp8imEsylIk=
github.com/PaesslerAG/jsonpath v0.1.1/go.mod h1:lVboNxFGal/VwW6d9JzIy56bUsYAP6tH/x80vjnCseY=
github.com/PuerkitoBio/purell v1.0.0/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0=
github.com/PuerkitoBio/urlesc v0.0.0-20160726150825-5bd2802263f2/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE=
github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo=
Expand Down
91 changes: 88 additions & 3 deletions sql3/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ const (

ErrCacheKeyNotFound errors.Code = "ErrCacheKeyNotFound"

ErrDuplicateColumn errors.Code = "ErrDuplicateColumn"
ErrUnknownType errors.Code = "ErrUnknownType"
ErrDuplicateColumn errors.Code = "ErrDuplicateColumn"
ErrUnknownType errors.Code = "ErrUnknownType"
ErrUnknownIdentifier errors.Code = "ErrUnknownIdentifier"

ErrTypeIncompatibleWithBitwiseOperator errors.Code = "ErrTypeIncompatibleWithBitwiseOperator"
ErrTypeIncompatibleWithLogicalOperator errors.Code = "ErrTypeIncompatibleWithLogicalOperator"
Expand Down Expand Up @@ -43,6 +44,7 @@ const (
ErrLiteralExpected errors.Code = "ErrLiteralExpected"
ErrIntegerLiteral errors.Code = "ErrIntegerLiteral"
ErrStringLiteral errors.Code = "ErrStringLiteral"
ErrBoolLiteral errors.Code = "ErrBoolLiteral"
ErrLiteralEmptySetNotAllowed errors.Code = "ErrLiteralEmptySetNotAllowed"
ErrLiteralEmptyTupleNotAllowed errors.Code = "ErrLiteralEmptyTupleNotAllowed"
ErrSetLiteralMustContainIntOrString errors.Code = "ErrSetLiteralMustContainIntOrString"
Expand Down Expand Up @@ -85,7 +87,18 @@ const (
ErrParameterTypeMistmatch errors.Code = "ErrParameterTypeMistmatch"
ErrCallParameterValueInvalid errors.Code = "ErrCallParameterValueInvalid"

//optimizer errors
// bulk insert errors

ErrReadingDatasource errors.Code = "ErrReadingDatasource"
ErrMappingFromDatasource errors.Code = "ErrMappingFromDatasource"
ErrFormatSpecifierExpected errors.Code = "ErrFormatSpecifierExpected"
ErrInvalidFormatSpecifier errors.Code = "ErrInvalidFormatSpecifier"
ErrInputSpecifierExpected errors.Code = "ErrInputSpecifierExpected"
ErrInvalidInputSpecifier errors.Code = "ErrInvalidInputSpecifier"
ErrInvalidBatchSize errors.Code = "ErrInvalidBatchSize"
ErrTypeConversionOnMap errors.Code = "ErrTypeConversionOnMap"

// optimizer errors
ErrAggregateNotAllowedInGroupBy errors.Code = "ErrIdPercentileNotAllowedInGroupBy"
)

Expand All @@ -103,6 +116,13 @@ func NewErrUnknownType(line int, col int, typ string) error {
)
}

func NewErrUnknownIdentifier(line int, col int, ident string) error {
return errors.New(
ErrUnknownIdentifier,
fmt.Sprintf("[%d:%d] unknown identifier '%s'", line, col, ident),
)
}

func NewErrInternal(msg string) error {
preamble := "internal error"
_, filename, line, ok := runtime.Caller(1)
Expand Down Expand Up @@ -186,6 +206,13 @@ func NewErrStringLiteral(line, col int) error {
)
}

func NewErrBoolLiteral(line, col int) error {
return errors.New(
ErrBoolLiteral,
fmt.Sprintf("[%d:%d] bool literal expected", line, col),
)
}

func NewErrLiteralEmptySetNotAllowed(line, col int) error {
return errors.New(
ErrLiteralEmptySetNotAllowed,
Expand Down Expand Up @@ -533,6 +560,64 @@ func NewErrCallParameterValueInvalid(line, col int, badParameterValue string, pa
)
}

// bulk insert

func NewErrReadingDatasource(line, col int, dataSource string, errorText string) error {
return errors.New(
ErrReadingDatasource,
fmt.Sprintf("[%d:%d] unable to read datasource '%s': %s", line, col, dataSource, errorText),
)
}

func NewErrMappingFromDatasource(line, col int, dataSource string, errorText string) error {
return errors.New(
ErrMappingFromDatasource,
fmt.Sprintf("[%d:%d] unable to map from datasource '%s': %s", line, col, dataSource, errorText),
)
}

func NewErrFormatSpecifierExpected(line, col int) error {
return errors.New(
ErrFormatSpecifierExpected,
fmt.Sprintf("[%d:%d] format specifier expected", line, col),
)
}

func NewErrInvalidFormatSpecifier(line, col int, specifier string) error {
return errors.New(
ErrInvalidFormatSpecifier,
fmt.Sprintf("[%d:%d] invalid format specifier '%s'", line, col, specifier),
)
}

func NewErrInputSpecifierExpected(line, col int) error {
return errors.New(
ErrInputSpecifierExpected,
fmt.Sprintf("[%d:%d] input specifier expected", line, col),
)
}

func NewErrInvalidInputSpecifier(line, col int, specifier string) error {
return errors.New(
ErrInvalidFormatSpecifier,
fmt.Sprintf("[%d:%d] invalid input specifier '%s'", line, col, specifier),
)
}

func NewErrInvalidBatchSize(line, col int, batchSize int) error {
return errors.New(
ErrInvalidBatchSize,
fmt.Sprintf("[%d:%d] invalid batch size '%d'", line, col, batchSize),
)
}

func NewErrTypeConversionOnMap(line, col int, value interface{}, typeName string) error {
return errors.New(
ErrTypeConversionOnMap,
fmt.Sprintf("[%d:%d] value '%v' cannot be converted to type '%s'", line, col, value, typeName),
)
}

// optimizer

func NewErrAggregateNotAllowedInGroupBy(line, col int, aggName string) error {
Expand Down
Loading

0 comments on commit e14cde7

Please sign in to comment.