Skip to content

Commit

Permalink
Prettify
Browse files Browse the repository at this point in the history
  • Loading branch information
Lun4m committed Dec 2, 2024
1 parent 760722a commit 5b9ac44
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 28 deletions.
19 changes: 12 additions & 7 deletions migrations/kdvh/dump/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ import (
var INVALID_COLUMNS = []string{"dato", "stnr", "typeid", "season", "xxx"}

func DumpTable(table *db.Table, pool *pgxpool.Pool, config *Config) {
fmt.Printf("Dumping %s...\n", table.TableName)
defer fmt.Println(strings.Repeat("- ", 50))

if err := os.MkdirAll(filepath.Join(config.Path, table.Path), os.ModePerm); err != nil {
slog.Error(err.Error())
return
Expand All @@ -40,23 +43,26 @@ func DumpTable(table *db.Table, pool *pgxpool.Pool, config *Config) {
// Used to limit connections to the database
semaphore := make(chan struct{}, config.MaxConn)

bar := utils.NewBar(len(stations), table.TableName)
bar.RenderBlank()
for _, station := range stations {
path := filepath.Join(config.Path, table.Path, string(station))
path := filepath.Join(config.Path, table.Path, station)
if err := os.MkdirAll(path, os.ModePerm); err != nil {
slog.Error(err.Error())
return
}

bar := utils.NewBar(len(elements), fmt.Sprint(" "+station))

var wg sync.WaitGroup
for _, element := range elements {
wg.Add(1)

// This blocks if the channel is full
semaphore <- struct{}{}

wg.Add(1)
go func() {
defer wg.Done()
defer func() {
wg.Done()
bar.Add(1)
}()

err := dumpFunc(
path,
Expand All @@ -78,7 +84,6 @@ func DumpTable(table *db.Table, pool *pgxpool.Pool, config *Config) {
}()
}
wg.Wait()
bar.Add(1)
}
}

Expand Down
6 changes: 3 additions & 3 deletions migrations/kdvh/dump/dump_functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func fileExists(filename string, overwrite bool) error {
if _, err := os.Stat(filename); err == nil && !overwrite {
return errors.New(
fmt.Sprintf(
"Skipping dump of '%s' because dumped file already exists and the --overwrite flag was not provided",
"Skipping dump of %q because dumped file already exists and the --overwrite flag was not provided",
filename,
))
}
Expand All @@ -59,12 +59,12 @@ func fetchYearRange(tableName, station string, pool *pgxpool.Pool) (int64, int64

begin, err := strconv.ParseInt(beginStr, 10, 64)
if err != nil {
return 0, 0, fmt.Errorf("Could not parse year '%s': %s", beginStr, err)
return 0, 0, fmt.Errorf("Could not parse year %q: %s", beginStr, err)
}

end, err := strconv.ParseInt(endStr, 10, 64)
if err != nil {
return 0, 0, fmt.Errorf("Could not parse year '%s': %s", endStr, err)
return 0, 0, fmt.Errorf("Could not parse year %q: %s", endStr, err)
}

return begin, end, nil
Expand Down
31 changes: 18 additions & 13 deletions migrations/kdvh/import/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ import (
var INVALID_ELEMENTS = []string{"TYPEID", "TAM_NORMAL_9120", "RRA_NORMAL_9120", "OT", "OTN", "OTX", "DD06", "DD12", "DD18"}

func ImportTable(table *db.Table, cache *cache.Cache, pool *pgxpool.Pool, config *Config) (rowsInserted int64) {
fmt.Printf("Importing %s...\n", table.TableName)
defer fmt.Println(strings.Repeat("- ", 50))

stations, err := os.ReadDir(filepath.Join(config.Path, table.Path))
if err != nil {
slog.Warn(err.Error())
Expand All @@ -49,21 +52,23 @@ func ImportTable(table *db.Table, cache *cache.Cache, pool *pgxpool.Pool, config
continue
}

bar := utils.NewBar(len(elements), stationDir)
bar := utils.NewBar(len(elements), fmt.Sprint(" "+station.Name()))
var wg sync.WaitGroup
for _, element := range elements {
bar.Add(1)
elemCode, err := getElementCode(element, config.Elements)
if err != nil {
if config.Verbose {
slog.Info(err.Error())
}
continue
}

wg.Add(1)
go func() {
defer wg.Done()
defer func() {
wg.Done()
bar.Add(1)
}()

elemCode, err := getElementCode(element, config.Elements)
if err != nil {
if config.Verbose {
slog.Info(err.Error())
}
return
}

tsInfo, err := cache.NewTsInfo(table.TableName, elemCode, stnr, pool)
if err != nil {
Expand Down Expand Up @@ -132,11 +137,11 @@ func getElementCode(element os.DirEntry, elementList []string) (string, error) {
elemCode := strings.ToUpper(strings.TrimSuffix(element.Name(), ".csv"))

if len(elementList) > 0 && !slices.Contains(elementList, elemCode) {
return "", errors.New(fmt.Sprintf("Element '%s' not in the list, skipping", elemCode))
return "", errors.New(fmt.Sprintf("Element %q not in the list, skipping", elemCode))
}

if elemcodeIsInvalid(elemCode) {
return "", errors.New(fmt.Sprintf("Element '%s' not set for import, skipping", elemCode))
return "", errors.New(fmt.Sprintf("Element %q not set for import, skipping", elemCode))
}
return elemCode, nil
}
Expand Down
2 changes: 1 addition & 1 deletion migrations/kvalobs/dump/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func getStationLabelMap(labels []*db.Label) map[int32][]*db.Label {
}

func dumpTable[S db.DataSeries | db.TextSeries](table db.Table[S], pool *pgxpool.Pool, config *Config) {
fmt.Println("Importing from " + table.Path)
fmt.Printf("Dumping to %q...\n", table.Path)
defer fmt.Println(strings.Repeat("- ", 50))

labels, err := getLabels(table, pool, config)
Expand Down
4 changes: 2 additions & 2 deletions migrations/kvalobs/import/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
)

func ImportTable[S db.DataSeries | db.TextSeries](table db.Table[S], cache *cache.Cache, pool *pgxpool.Pool, config *Config) (int64, error) {
fmt.Println("Importing from " + table.Path)
fmt.Printf("Importing from %q...\n", table.Path)
defer fmt.Println(strings.Repeat("- ", 50))

stations, err := os.ReadDir(table.Path)
Expand Down Expand Up @@ -93,7 +93,7 @@ func ImportTable[S db.DataSeries | db.TextSeries](table db.Table[S], cache *cach

count, err := table.Import(ts, pool, labelStr)
if err != nil {
slog.Error(labelStr + "Failed bulk insertion: " + err.Error())
slog.Error(labelStr + "Failed bulk insertion - " + err.Error())
return
}

Expand Down
4 changes: 2 additions & 2 deletions migrations/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func FilterSlice[T comparable](slice, reference []T, formatMsg string) []T {
}

if formatMsg == "" {
formatMsg = "Value '%s' not present in reference slice, skipping"
formatMsg = "Value '%v' not present in reference slice, skipping"
}

// I hate this so much
Expand Down Expand Up @@ -72,7 +72,7 @@ func SetLogFile(table, procedure string) {
filename := fmt.Sprintf("%s_%s_log.txt", table, procedure)
fh, err := os.Create(filename)
if err != nil {
slog.Error(fmt.Sprintf("Could not create log '%s': %s", filename, err))
slog.Error(fmt.Sprintf("Could not create log %q: %s", filename, err))
return
}
log.SetOutput(fh)
Expand Down

0 comments on commit 5b9ac44

Please sign in to comment.