Skip to content

Commit

Permalink
finished service
Browse files Browse the repository at this point in the history
  • Loading branch information
audetv committed Jul 15, 2024
1 parent 3f13fc0 commit f8e2353
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 23 deletions.
28 changes: 16 additions & 12 deletions cmd/service/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,17 @@ import (

"github.com/terratensor/svodd-server/internal/app"
"github.com/terratensor/svodd-server/internal/config"
"github.com/terratensor/svodd-server/internal/entities/answer"
"github.com/terratensor/svodd-server/internal/lib/httpclient"
"github.com/terratensor/svodd-server/internal/qaparser/qavideo"
"github.com/terratensor/svodd-server/internal/qaparser/questionanswer"
"github.com/terratensor/svodd-server/internal/workerpool"
)

func main() {
cfg := config.MustLoad()

app := app.NewApp(cfg)

ch := make(chan *url.URL, cfg.EntryChanBuffer)

wg := &sync.WaitGroup{}
Expand All @@ -29,21 +32,22 @@ func main() {
var allTask []*workerpool.Task
pool := workerpool.NewPool(allTask, cfg.Workers)

// Создаем срез клиетнов мантикоры по количеству индексов в конфиге
var manticoreStorages []answer.Entries
for _, index := range cfg.ManticoreIndex {
manticoreStorages = append(manticoreStorages, *app.NewEntriesStorage(index.Name))
}

go func() {
for {
task := workerpool.NewTask(func(data interface{}) error {
if cfg.Env != "prod" {
return nil
}
taskID := data.(*url.URL)
time.Sleep(100 * time.Millisecond)
// e := data.(answer.Entry)
return nil
log.Printf("Task %v processed\n", taskID.String())

entry := questionanswer.NewEntry(taskID)

client := httpclient.New(nil)
err := entry.FetchData(client)
if err != nil {
return err
}

return app.Process(entry)
}, <-ch)
pool.AddTask(task)
}
Expand Down
3 changes: 2 additions & 1 deletion config/parser/local.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ splitter:
parsers:
# - url: https://xn----8sba0bbi0cdm.xn--p1ai/qa/question/view-50275
- url: https://xn----8sba0bbi0cdm.xn--p1ai/qa/video
pages: 13
current: true
# pages: 13
fetch_all: false
# - url: https://xn----8sba0bbi0cdm.xn--p1ai/qa/video
# pages: 0
Expand Down
1 change: 1 addition & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type Index struct {

type Parser struct {
Url string `yaml:"url"`
Current bool `yaml:"current" env-default:"true"`
Previous bool `yaml:"previous" env-default:"false"`
Pages int `yaml:"pages,omitempty" env-default:"1"`
FetchAll bool `yaml:"fetch_all" env-default:"false"`
Expand Down
20 changes: 15 additions & 5 deletions internal/qaparser/qavideo/qaparser.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type Parser struct {
Delay time.Duration
RandomDelay time.Duration
UserAgent string
Current bool
Previous bool
MaxPages int
FetchAll bool
Expand Down Expand Up @@ -71,6 +72,7 @@ func NewParser(cfg config.Parser, delay, randomDelay time.Duration) *Parser {
Delay: delay,
RandomDelay: randomDelay,
UserAgent: userAgent,
Current: cfg.Current,
Previous: cfg.Previous,
MaxPages: cfg.Pages,
FetchAll: cfg.FetchAll,
Expand All @@ -97,8 +99,12 @@ func (p *Parser) RunBackground(output chan *url.URL, wg *sync.WaitGroup) {
if !p.FetchAll {
go func() {
for page := range qavideopage.FetchAndParsePages(p.Client, *p.Link, p.MaxPages) {
for _, entry := range page.ListQALinks() {
output <- entry
if p.Current {
output <- page.FirstQALink()
} else {
for _, entry := range page.ListQALinks() {
output <- entry
}
}
}
}()
Expand Down Expand Up @@ -129,14 +135,18 @@ func (p *Parser) RunBackground(output chan *url.URL, wg *sync.WaitGroup) {
func (p *Parser) Run(output chan *url.URL, wg *sync.WaitGroup) {

log.Printf("🚩 run parser: delay: %v, random delay: %v, url: %v", p.Delay, p.RandomDelay, p.Link.String())
// chout := make(chan *url.URL, 20)

defer wg.Done()
if !p.FetchAll {
go func() {
defer close(output)
for page := range qavideopage.FetchAndParsePages(p.Client, *p.Link, p.MaxPages) {
for _, entry := range page.ListQALinks() {
output <- entry
if p.Current {
output <- page.FirstQALink()
} else {
for _, entry := range page.ListQALinks() {
output <- entry
}
}
}
}()
Expand Down
4 changes: 2 additions & 2 deletions internal/storage/manticore/manticore.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ func New(tbl string) (*Client, error) {
configuration := openapiclient.NewConfiguration()
configuration.Servers = openapiclient.ServerConfigurations{
{
// URL: "http://manticore:9308", // Здесь должна быть переменная окружения manticore host:port
URL: "http://localhost:9308",
URL: "http://manticore:9308", // Здесь должна быть переменная окружения manticore host:port
// URL: "http://localhost:9308",
Description: "Default Manticore Search HTTP",
},
}
Expand Down
3 changes: 0 additions & 3 deletions internal/workerpool/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,12 @@ type Task struct {
Err error
Data *url.URL
f func(interface{}) error
// ManticoreStorages *[]answer.Entries
// PsqlStorage *answer.Entries
}

func NewTask(f func(interface{}) error, data *url.URL) *Task {
return &Task{
f: f,
Data: data,
// ManticoreStorages: storages,
}
}

Expand Down

0 comments on commit f8e2353

Please sign in to comment.