Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
systay committed Nov 13, 2024
1 parent 63aa1e8 commit 7165e29
Show file tree
Hide file tree
Showing 4 changed files with 159 additions and 125 deletions.
34 changes: 34 additions & 0 deletions go/data/logReaderState.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package data

import (
"bufio"
"errors"
"os"
"regexp"
"sync"
)

type logReaderState struct {
fd *os.File
scanner *bufio.Scanner
reg *regexp.Regexp
mu sync.Mutex
lineNumber int
closed bool
err error
}

func (s *logReaderState) Close() error {
s.mu.Lock()
defer s.mu.Unlock()

if !s.closed && s.fd != nil {
ferr := s.fd.Close()
if ferr != nil {
s.err = errors.Join(s.err, ferr)
}
s.closed = true
}

return s.err
}
36 changes: 0 additions & 36 deletions go/data/query_log_parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,13 @@ package data

import (
"bufio"
"errors"
"os"
"regexp"
"sync"
)

type (
MySQLLogLoader struct{}

logReaderState struct {
fd *os.File
scanner *bufio.Scanner
reg *regexp.Regexp
mu sync.Mutex
lineNumber int
closed bool
err error
}

mysqlLogReaderState struct {
logReaderState
prevQuery string
Expand Down Expand Up @@ -127,30 +115,6 @@ func (s *mysqlLogReaderState) Next() (Query, bool) {
return Query{}, false
}

func (s *logReaderState) Close() error {
s.mu.Lock()
defer s.mu.Unlock()

if !s.closed {
ferr := s.fd.Close()
if ferr != nil {
s.err = errors.Join(s.err, ferr)
}
s.closed = true
}

return s.err
}

func (s *logReaderState) NextLine() (string, bool) {
more := s.scanner.Scan()
if !more {
return "", false
}

return s.scanner.Text(), true
}

func (MySQLLogLoader) Loadit(fileName string) IteratorLoader {
reg := regexp.MustCompile(`^(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{6}Z)\s+(\d+)\s+(\w+)\s+(.*)`)

Expand Down
4 changes: 4 additions & 0 deletions go/data/query_log_parse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package data

import (
"github.com/google/go-cmp/cmp"
"testing"

"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -70,5 +71,8 @@ LIMIT 1`,
},
}

if diff := cmp.Diff(expected, gotQueries); diff != "" {
t.Errorf("unexpected queries (-want +got):\n%s", diff)
}
require.Equal(t, expected, gotQueries)
}
210 changes: 121 additions & 89 deletions go/data/sql_script_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ limitations under the License.
package data

import (
"bytes"
"bufio"
"fmt"
"io"
"net/http"
Expand All @@ -27,118 +27,150 @@ import (

type SQLScriptLoader struct{}

func readData(url string) ([]byte, error) {
if strings.HasPrefix(url, "http") {
client := http.Client{}
res, err := client.Get(url)
func (SQLScriptLoader) Load(fileName string) ([]Query, error) {
loader := SQLScriptLoader{}.Loadit(fileName)
return makeSlice(loader)
}

func (SQLScriptLoader) Loadit(fileName string) IteratorLoader {
var fd *os.File
var err error

if strings.HasPrefix(fileName, "http") {
// Read from URL
data, err := readData(fileName)
if err != nil {
return nil, err
return &errLoader{err}
}
if res.StatusCode != http.StatusOK {
return nil, fmt.Errorf("failed to get data from %s, status code %d", url, res.StatusCode)
scanner := bufio.NewScanner(strings.NewReader(string(data)))
return &sqlScriptReaderState{
logReaderState: logReaderState{
scanner: scanner,
},
newStmt: true,
}
defer res.Body.Close()
return io.ReadAll(res.Body)
}
return os.ReadFile(url)
}

func (SQLScriptLoader) Load(url string) ([]Query, error) {
data, err := readData(url)
// Read from file
fd, err = os.OpenFile(fileName, os.O_RDONLY, 0)
if err != nil {
return nil, err
return &errLoader{err}
}
seps := bytes.Split(data, []byte("\n"))
queries := make([]Query, 0, len(seps))
newStmt := true
for i, v := range seps {
v := bytes.TrimSpace(v)
s := string(v)
// Skip comments and empty lines
switch {
case strings.HasPrefix(s, "#"):
newStmt = true
continue
case strings.HasPrefix(s, "--"):
queries = append(queries, Query{Query: s, Line: i + 1})
newStmt = true
continue
case len(s) == 0:
continue
}

if newStmt {
queries = append(queries, Query{Query: s, Line: i + 1})
} else {
lastQuery := queries[len(queries)-1]
lastQuery.Query = fmt.Sprintf("%s\n%s", lastQuery.Query, s)
queries[len(queries)-1] = lastQuery
}

// Treat new line as a new statement if line ends with ';'
newStmt = strings.HasSuffix(s, ";")
scanner := bufio.NewScanner(fd)
return &sqlScriptReaderState{
logReaderState: logReaderState{
fd: fd,
scanner: scanner,
lineNumber: 0,
},
newStmt: true,
}
}

// Process queries directly without calling ParseQueries
finalQueries := make([]Query, 0, len(queries))
for _, rs := range queries {
q, err := parseQuery(rs)
if err != nil {
return nil, err
}
if q != nil {
finalQueries = append(finalQueries, *q)
}
}
return finalQueries, nil
type sqlScriptReaderState struct {
logReaderState
prevQuery string
queryStart int
newStmt bool
}

// Helper function to parse individual queries
func parseQuery(rs Query) (*Query, error) {
realS := rs.Query
s := rs.Query
q := Query{Line: rs.Line, Type: Unknown}
func (s *sqlScriptReaderState) Next() (Query, bool) {
s.mu.Lock()
defer s.mu.Unlock()

if len(s) < 3 {
return nil, nil
if s.closed {
return Query{}, false
}

switch {
case strings.HasPrefix(s, "#"):
q.Type = Comment
return &q, nil
case strings.HasPrefix(s, "--"):
q.Type = CommentWithCommand
if len(s) > 2 && s[2] == ' ' {
s = s[3:]
for s.scanner.Scan() {
line := s.scanner.Text()
line = strings.TrimSpace(line)
s.lineNumber++

// Skip empty lines and comments
if len(line) == 0 {
continue
}
switch {
case strings.HasPrefix(line, "#"):
s.newStmt = true
continue
case strings.HasPrefix(line, "--"):
// Return previous query before processing the comment
if s.prevQuery != "" {
query := Query{
Query: s.prevQuery,
Line: s.queryStart,
Type: QueryT,
}
s.prevQuery = ""
s.queryStart = 0
s.newStmt = true
// Store current comment line as new query
s.prevQuery = line
s.queryStart = s.lineNumber
return query, true
} else {
s.prevQuery = line
s.queryStart = s.lineNumber
s.newStmt = true
continue
}
}

if s.newStmt {
s.prevQuery = line
s.queryStart = s.lineNumber
} else {
s = s[2:]
s.prevQuery += "\n" + line
}
case s[0] == '\n':
q.Type = EmptyLine
return &q, nil
}

i := findFirstWord(s)
if i > 0 {
q.FirstWord = s[:i]
// Check if the line ends with a semicolon
if strings.HasSuffix(line, ";") {
query := Query{
Query: s.prevQuery,
Line: s.queryStart,
Type: QueryT,
}
s.prevQuery = ""
s.queryStart = 0
s.newStmt = true
return query, true
} else {
s.newStmt = false
}
}
q.Query = s[i:]

if q.Type == Unknown || q.Type == CommentWithCommand {
if err := q.getQueryType(realS); err != nil {
return nil, err
s.closed = true

// Return the last query if we have one
if s.prevQuery != "" {
query := Query{
Query: s.prevQuery,
Line: s.queryStart,
Type: QueryT,
}
s.prevQuery = ""
return query, true
}

return &q, nil
s.err = s.scanner.Err()
return Query{}, false
}

// findFirstWord calculates the length of the first word in the string
func findFirstWord(s string) int {
i := 0
for i < len(s) && s[i] != '(' && s[i] != ' ' && s[i] != ';' && s[i] != '\n' {
i++
func readData(url string) ([]byte, error) {
if strings.HasPrefix(url, "http") {
client := http.Client{}
res, err := client.Get(url)
if err != nil {
return nil, err
}
if res.StatusCode != http.StatusOK {
return nil, fmt.Errorf("failed to get data from %s, status code %d", url, res.StatusCode)
}
defer res.Body.Close()
return io.ReadAll(res.Body)
}
return i
return os.ReadFile(url)
}

0 comments on commit 7165e29

Please sign in to comment.