diff --git a/.gitignore b/.gitignore index 1fe5aad..f5281a3 100644 --- a/.gitignore +++ b/.gitignore @@ -17,4 +17,5 @@ # Added by cargo **/target **/plugins_dev.toml -.vscode \ No newline at end of file +.vscode +rr \ No newline at end of file diff --git a/github/pool.go b/github/pool.go new file mode 100644 index 0000000..fe46a61 --- /dev/null +++ b/github/pool.go @@ -0,0 +1,193 @@ +package github + +import ( + "bufio" + "context" + "fmt" + "net/http" + "path" + "strings" + "sync" + "time" + + "github.com/google/go-github/v49/github" + "github.com/roadrunner-server/velox" + "go.uber.org/zap" +) + +type processor struct { + maxWorkers int + errs []error + wg sync.WaitGroup + mu sync.Mutex + log *zap.Logger + queueCh chan *pcfg + modinfo []*velox.ModulesInfo + client *github.Client +} + +type pcfg struct { + pluginCfg *velox.PluginConfig + name string +} + +func newPool(log *zap.Logger, client *github.Client) *processor { + p := &processor{ + maxWorkers: 10, + log: log, + client: client, + modinfo: make([]*velox.ModulesInfo, 0, 10), + queueCh: make(chan *pcfg, 1), + wg: sync.WaitGroup{}, + mu: sync.Mutex{}, + errs: make([]error, 0, 1), + } + + // start the processor + p.run() + + return p +} + +func (p *processor) run() { + for i := 0; i < p.maxWorkers; i++ { + go func() { + for v := range p.queueCh { + modInfo := new(velox.ModulesInfo) + p.log.Debug("[FETCHING PLUGIN DATA]", + zap.String("repository", v.pluginCfg.Repo), + zap.String("owner", v.pluginCfg.Owner), + zap.String("folder", v.pluginCfg.Folder), + zap.String("plugin", v.name), + zap.String("ref", v.pluginCfg.Ref), + ) + + if v.pluginCfg.Ref == "" { + p.mu.Lock() + p.errs = append(p.errs, fmt.Errorf("ref can't be empty")) + p.mu.Unlock() + p.wg.Done() + continue + } + + rc, resp, err := p.client.Repositories.DownloadContents(context.Background(), + v.pluginCfg.Owner, + v.pluginCfg.Repo, + path.Join(v.pluginCfg.Folder, "go.mod"), &github.RepositoryContentGetOptions{Ref: v.pluginCfg.Ref}, + ) + if err != nil { + p.mu.Lock() + p.errs = append(p.errs, err) + p.mu.Unlock() + p.wg.Done() + continue + } + + if resp.StatusCode != http.StatusOK { + p.mu.Lock() + p.errs = append(p.errs, fmt.Errorf("bad response status: %d", resp.StatusCode)) + p.mu.Unlock() + p.wg.Done() + continue + } + + rdr := bufio.NewReader(rc) + ret, err := rdr.ReadString('\n') + if err != nil { + p.mu.Lock() + p.errs = append(p.errs, err) + p.mu.Unlock() + p.wg.Done() + continue + } + + p.log.Debug("[READING MODULE INFO]", zap.String("plugin", v.name), zap.String("mod", ret)) + + // module github.com/roadrunner-server/logger/v2, we split and get the second part + retMod := strings.Split(ret, " ") + if len(retMod) < 2 { + p.mu.Lock() + p.errs = append(p.errs, fmt.Errorf("failed to parse module info for the plugin: %s", ret)) + p.mu.Unlock() + p.wg.Done() + continue + } + + err = resp.Body.Close() + if err != nil { + p.mu.Lock() + p.errs = append(p.errs, err) + p.mu.Unlock() + p.wg.Done() + continue + } + + modInfo.ModuleName = strings.TrimRight(retMod[1], "\n") + + p.log.Debug("[REQUESTING COMMIT SHA-1]", zap.String("plugin", v.name), zap.String("ref", v.pluginCfg.Ref)) + commits, rsp, err := p.client.Repositories.ListCommits(context.Background(), v.pluginCfg.Owner, v.pluginCfg.Repo, &github.CommitsListOptions{ + SHA: v.pluginCfg.Ref, + Until: time.Now(), + ListOptions: github.ListOptions{ + Page: 1, + PerPage: 1, + }, + }) + if err != nil { + p.mu.Lock() + p.errs = append(p.errs, err) + p.mu.Unlock() + p.wg.Done() + continue + } + + if rsp.StatusCode != http.StatusOK { + p.mu.Lock() + p.errs = append(p.errs, fmt.Errorf("bad response status: %d", rsp.StatusCode)) + p.mu.Unlock() + p.wg.Done() + continue + } + + for j := 0; j < len(commits); j++ { + modInfo.Version = *commits[j].SHA + } + + if v.pluginCfg.Replace != "" { + p.log.Debug("[REPLACE REQUESTED]", zap.String("plugin", v.name), zap.String("path", v.pluginCfg.Replace)) + } + + p.mu.Lock() + p.modinfo = append(p.modinfo, modInfo) + p.mu.Unlock() + + p.wg.Done() + } + }() + } +} + +func (p *processor) add(pjob *pcfg) { + p.wg.Add(1) + p.queueCh <- pjob +} + +func (p *processor) errors() []error { + p.mu.Lock() + defer p.mu.Unlock() + return p.errs +} + +func (p *processor) moduleinfo() []*velox.ModulesInfo { + p.mu.Lock() + defer p.mu.Unlock() + return p.modinfo +} + +func (p *processor) wait() { + p.wg.Wait() +} + +func (p *processor) stop() { + close(p.queueCh) +} diff --git a/github/repo.go b/github/repo.go index 4153f45..d472ead 100644 --- a/github/repo.go +++ b/github/repo.go @@ -2,7 +2,6 @@ package github import ( "archive/zip" - "bufio" "bytes" "context" "errors" @@ -13,7 +12,6 @@ import ( "path" "path/filepath" "strings" - "time" "github.com/google/go-github/v49/github" "github.com/roadrunner-server/velox" @@ -30,6 +28,7 @@ const ( GHRepo represents template repository */ type GHRepo struct { + pool *processor client *github.Client config *velox.Config log *zap.Logger @@ -46,6 +45,7 @@ func NewGHRepoInfo(cfg *velox.Config, log *zap.Logger) *GHRepo { } return &GHRepo{ + pool: newPool(log, github.NewClient(client)), log: log, config: cfg, client: github.NewClient(client), @@ -198,74 +198,21 @@ func extract(dest string, zf *zip.File) error { // https://github.com/spiral/roadrunner-binary/archive/refs/tags/v2.7.0.zip func (r *GHRepo) GetPluginsModData() ([]*velox.ModulesInfo, error) { - modInfoRet := make([]*velox.ModulesInfo, 0, 5) - for k, v := range r.config.GitHub.Plugins { - modInfo := new(velox.ModulesInfo) - r.log.Debug("[FETCHING PLUGIN DATA]", zap.String("repository", v.Repo), zap.String("owner", v.Owner), zap.String("folder", v.Folder), zap.String("plugin", k), zap.String("ref", v.Ref)) - - if v.Ref == "" { - return nil, errors.New("ref can't be empty") - } - - rc, resp, err := r.client.Repositories.DownloadContents(context.Background(), v.Owner, v.Repo, path.Join(v.Folder, "go.mod"), &github.RepositoryContentGetOptions{Ref: v.Ref}) - if err != nil { - return nil, err - } - - if resp.StatusCode != http.StatusOK { - return nil, fmt.Errorf("bad response status: %d", resp.StatusCode) - } - - rdr := bufio.NewReader(rc) - ret, err := rdr.ReadString('\n') - if err != nil { - return nil, err - } - - r.log.Debug("[READING MODULE INFO]", zap.String("plugin", k), zap.String("mod", ret)) - - // module github.com/roadrunner-server/logger/v2, we split and get the second part - retMod := strings.Split(ret, " ") - if len(retMod) < 2 { - return nil, fmt.Errorf("failed to parse module info for the plugin: %s", ret) - } - - err = resp.Body.Close() - if err != nil { - return nil, err - } - - modInfo.ModuleName = strings.TrimRight(retMod[1], "\n") - - r.log.Debug("[REQUESTING COMMIT SHA-1]", zap.String("plugin", k), zap.String("ref", v.Ref)) - commits, rsp, err := r.client.Repositories.ListCommits(context.Background(), v.Owner, v.Repo, &github.CommitsListOptions{ - SHA: v.Ref, - Until: time.Now(), - ListOptions: github.ListOptions{ - Page: 1, - PerPage: 1, - }, + r.pool.add(&pcfg{ + pluginCfg: v, + name: k, }) - if err != nil { - return nil, err - } - - if rsp.StatusCode != http.StatusOK { - return nil, fmt.Errorf("bad response status: %d", rsp.StatusCode) - } - - for i := 0; i < len(commits); i++ { - modInfo.Version = *commits[i].SHA - } + } - if v.Replace != "" { - r.log.Debug("[REPLACE REQUESTED]", zap.String("plugin", k), zap.String("path", v.Replace)) - } + r.pool.wait() - modInfo.Replace = v.Replace - modInfoRet = append(modInfoRet, modInfo) + if len(r.pool.errors()) != 0 { + return nil, errors.Join(r.pool.errors()...) } - return modInfoRet, nil + mi := r.pool.moduleinfo() + r.pool.stop() + + return mi, nil } diff --git a/velox_rr_v2023.toml b/velox_rr_v2023.toml index 85372fa..2e1ed7c 100644 --- a/velox_rr_v2023.toml +++ b/velox_rr_v2023.toml @@ -86,5 +86,5 @@ ref = "v2023.3.0" # test_plugin_2 = { ref = "main", owner = "rustatian", repository = "36405235" } # [log] -level = "debug" +level = "error" mode = "development"