Skip to content

Commit

Permalink
INTG-2821 configure the concurrency (#418)
Browse files Browse the repository at this point in the history
  • Loading branch information
MickStanciu authored May 30, 2023
1 parent fce5a92 commit d7e8867
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 18 deletions.
1 change: 1 addition & 0 deletions cmd/safetyculture-exporter/cmd/export/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ func NewSafetyCultureExporter(v *viper.Viper) *exporterAPI.SafetyCultureExporter
// MapViperConfigToExporterConfiguration maps Viper config to ExporterConfiguration structure
func MapViperConfigToExporterConfiguration(v *viper.Viper, cfg *exporterAPI.ExporterConfiguration) {
cfg.AccessToken = v.GetString("access_token")
cfg.API.MaxConcurrency = v.GetInt("api.max_concurrency")
cfg.SheqsyUsername = v.GetString("sheqsy_username")
cfg.SheqsyCompanyID = v.GetString("sheqsy_company_id")
cfg.Db.Dialect = v.GetString("db.dialect")
Expand Down
2 changes: 2 additions & 0 deletions cmd/safetyculture-exporter/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ func configFlags() {
connectionFlags.Bool("tls-skip-verify", false, "Skip verification of API TLS certificates")
connectionFlags.String("tls-cert", "", "Custom root CA certificate to use when making API requests")
connectionFlags.String("proxy-url", "", "Proxy URL for making API requests through")
connectionFlags.Int("max-concurrency", 10, "Maximum number of concurrent API requests (defaults to max 10)")

dbFlags = flag.NewFlagSet("db", flag.ContinueOnError)
dbFlags.String("db-dialect", "mysql", "Database dialect. mysql, postgres and sqlserver are the only valid options.")
Expand Down Expand Up @@ -161,6 +162,7 @@ func bindFlags() {
util.Check(viper.BindPFlag("api.tls_skip_verify", connectionFlags.Lookup("tls-skip-verify")), "while binding flag")
util.Check(viper.BindPFlag("api.tls_cert", connectionFlags.Lookup("tls-cert")), "while binding flag")
util.Check(viper.BindPFlag("api.proxy_url", connectionFlags.Lookup("proxy-url")), "while binding flag")
util.Check(viper.BindPFlag("api.max_concurrency", connectionFlags.Lookup("max-concurrency")), "while binding flag")

util.Check(viper.BindPFlag("db.dialect", dbFlags.Lookup("db-dialect")), "while binding flag")
util.Check(viper.BindPFlag("db.connection_string", dbFlags.Lookup("db-connection-string")), "while binding flag")
Expand Down
13 changes: 8 additions & 5 deletions pkg/api/configuration_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@ import (
type ExporterConfiguration struct {
AccessToken string `yaml:"access_token"`
API struct {
ProxyURL string `yaml:"proxy_url"`
SheqsyURL string `yaml:"sheqsy_url"`
TLSCert string `yaml:"tls_cert"`
TLSSkipVerify bool `yaml:"tls_skip_verify"`
URL string `yaml:"url"`
ProxyURL string `yaml:"proxy_url"`
SheqsyURL string `yaml:"sheqsy_url"`
TLSCert string `yaml:"tls_cert"`
TLSSkipVerify bool `yaml:"tls_skip_verify"`
URL string `yaml:"url"`
MaxConcurrency int `yaml:"max_concurrency"`
} `yaml:"api"`
Csv struct {
MaxRowsPerFile int `yaml:"max_rows_per_file"`
Expand Down Expand Up @@ -249,6 +250,7 @@ func BuildConfigurationWithDefaults() *ExporterConfiguration {
cfg := &ExporterConfiguration{}
cfg.API.SheqsyURL = "https://app.sheqsy.com"
cfg.API.URL = "https://api.safetyculture.io"
cfg.API.MaxConcurrency = 10
cfg.Csv.MaxRowsPerFile = 1000000
cfg.Db.Dialect = "mysql"
cfg.Export.Tables = []string{}
Expand Down Expand Up @@ -339,6 +341,7 @@ func (ec *ExporterConfiguration) ToExporterConfig() *feed.ExporterFeedCfg {
ExportSiteIncludeFullHierarchy: ec.Export.Site.IncludeFullHierarchy,
ExportIssueLimit: ec.Export.Issue.Limit,
ExportAssetLimit: ec.Export.Asset.Limit,
MaxConcurrentGoRoutines: ec.API.MaxConcurrency,
}
}

Expand Down
32 changes: 19 additions & 13 deletions pkg/internal/feed/feed_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"sync"
"time"

"github.com/MickStanciu/go-fn/fn"
"github.com/SafetyCulture/safetyculture-exporter/pkg/httpapi"
"github.com/SafetyCulture/safetyculture-exporter/pkg/internal/events"
"github.com/SafetyCulture/safetyculture-exporter/pkg/logger"
Expand Down Expand Up @@ -59,6 +60,7 @@ type ExporterFeedCfg struct {
ExportSiteIncludeFullHierarchy bool
ExportIssueLimit int
ExportAssetLimit int
MaxConcurrentGoRoutines int
}

func NewExporterApp(scApiClient *httpapi.Client, sheqsyApiClient *httpapi.Client, cfg *ExporterFeedCfg) *ExporterFeedClient {
Expand Down Expand Up @@ -99,8 +101,12 @@ func (e *ExporterFeedClient) ExportFeeds(exporter Exporter, ctx context.Context)
tablesMap[table] = true
}

maxConcurrentRoutines := fn.GetOrElse(e.configuration.MaxConcurrentGoRoutines, maxConcurrentGoRoutines, func(i int) bool {
return i > 0
})

var wg sync.WaitGroup
semaphore := make(chan int, maxConcurrentGoRoutines)
semaphore := make(chan int, maxConcurrentRoutines)

atLeastOneRun := false

Expand Down Expand Up @@ -227,17 +233,7 @@ func (e *ExporterFeedClient) ExportFeeds(exporter Exporter, ctx context.Context)
func (e *ExporterFeedClient) GetFeeds() []Feed {
return []Feed{
e.getInspectionFeed(),
&InspectionItemFeed{
SkipIDs: e.configuration.ExportInspectionSkipIds,
ModifiedAfter: e.configuration.ExportModifiedAfterTime,
TemplateIDs: e.configuration.ExportTemplateIds,
Archived: e.configuration.ExportInspectionArchived,
Completed: e.configuration.ExportInspectionCompleted,
IncludeInactive: e.configuration.ExportInspectionIncludedInactiveItems,
Incremental: e.configuration.ExportIncremental,
Limit: e.configuration.ExportInspectionLimit,
ExportMedia: e.configuration.ExportMedia,
},
&UserFeed{},
&TemplateFeed{
Incremental: e.configuration.ExportIncremental,
},
Expand All @@ -249,7 +245,6 @@ func (e *ExporterFeedClient) GetFeeds() []Feed {
IncludeFullHierarchy: e.configuration.ExportSiteIncludeFullHierarchy,
},
&SiteMemberFeed{},
&UserFeed{},
&GroupFeed{},
&GroupUserFeed{},
&ScheduleFeed{
Expand All @@ -275,6 +270,17 @@ func (e *ExporterFeedClient) GetFeeds() []Feed {
Incremental: e.configuration.ExportIncremental,
Limit: e.configuration.ExportActionLimit,
},
&InspectionItemFeed{
SkipIDs: e.configuration.ExportInspectionSkipIds,
ModifiedAfter: e.configuration.ExportModifiedAfterTime,
TemplateIDs: e.configuration.ExportTemplateIds,
Archived: e.configuration.ExportInspectionArchived,
Completed: e.configuration.ExportInspectionCompleted,
IncludeInactive: e.configuration.ExportInspectionIncludedInactiveItems,
Incremental: e.configuration.ExportIncremental,
Limit: e.configuration.ExportInspectionLimit,
ExportMedia: e.configuration.ExportMedia,
},
&IssueFeed{
Incremental: false, // this was disabled on request. Issues API doesn't support modified After filters
Limit: e.configuration.ExportIssueLimit,
Expand Down

0 comments on commit d7e8867

Please sign in to comment.