Skip to content

Commit

Permalink
refactor: integrated database services into task data insert
Browse files Browse the repository at this point in the history
  • Loading branch information
tikazyq committed Oct 8, 2024
1 parent 0ed2150 commit 4f52936
Show file tree
Hide file tree
Showing 12 changed files with 161 additions and 264 deletions.
60 changes: 2 additions & 58 deletions core/controllers/spider_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package controllers
import (
"errors"
"github.com/apex/log"
"github.com/crawlab-team/crawlab/core/constants"
"github.com/crawlab-team/crawlab/core/fs"
"github.com/crawlab-team/crawlab/core/interfaces"
models2 "github.com/crawlab-team/crawlab/core/models/models/v2"
Expand Down Expand Up @@ -48,8 +47,8 @@ func GetSpiderById(c *gin.Context) {
}
}

// data collection
if !s.ColId.IsZero() {
// data collection (compatible to old version) # TODO: remove in the future
if s.ColName == "" && !s.ColId.IsZero() {
col, err := service.NewModelServiceV2[models2.DataCollectionV2]().GetById(s.ColId)
if err != nil {
if !errors.Is(err, mongo2.ErrNoDocuments) {
Expand Down Expand Up @@ -252,12 +251,6 @@ func PostSpider(c *gin.Context) {
return
}

// upsert data collection
if err := upsertSpiderDataCollection(&s); err != nil {
HandleErrorInternalServerError(c, err)
return
}

// user
u := GetUserFromContextV2(c)

Expand Down Expand Up @@ -311,12 +304,6 @@ func PutSpiderById(c *gin.Context) {
return
}

// upsert data collection
if err := upsertSpiderDataCollection(&s); err != nil {
HandleErrorInternalServerError(c, err)
return
}

u := GetUserFromContextV2(c)

modelSvc := service.NewModelServiceV2[models2.SpiderV2]()
Expand Down Expand Up @@ -773,49 +760,6 @@ func getSpiderFsSvcById(id primitive.ObjectID) (svc interfaces.FsServiceV2, err
return getSpiderFsSvc(s)
}

func upsertSpiderDataCollection(s *models2.SpiderV2) (err error) {
modelSvc := service.NewModelServiceV2[models2.DataCollectionV2]()
if s.ColId.IsZero() {
// validate
if s.ColName == "" {
return errors.New("data collection name is required")
}
// no id
dc, err := modelSvc.GetOne(bson.M{"name": s.ColName}, nil)
if err != nil {
if errors.Is(err, mongo2.ErrNoDocuments) {
// not exists, add new
dc = &models2.DataCollectionV2{Name: s.ColName}
dcId, err := modelSvc.InsertOne(*dc)
if err != nil {
return err
}
dc.SetId(dcId)
} else {
// error
return err
}
}
s.ColId = dc.Id

// create index
_ = mongo.GetMongoCol(dc.Name).CreateIndex(mongo2.IndexModel{Keys: bson.M{constants.TaskKey: 1}})
_ = mongo.GetMongoCol(dc.Name).CreateIndex(mongo2.IndexModel{Keys: bson.M{constants.HashKey: 1}})
} else {
// with id
dc, err := modelSvc.GetById(s.ColId)
if err != nil {
return err
}
s.ColId = dc.Id
}
return nil
}

func UpsertSpiderDataCollection(s *models2.SpiderV2) (err error) {
return upsertSpiderDataCollection(s)
}

func getSpiderRootPath(c *gin.Context) (rootPath string, err error) {
// spider id
id, err := primitive.ObjectIDFromHex(c.Param("id"))
Expand Down
44 changes: 22 additions & 22 deletions core/controllers/task_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
log2 "github.com/apex/log"
"github.com/crawlab-team/crawlab/core/constants"
"github.com/crawlab-team/crawlab/core/interfaces"
models2 "github.com/crawlab-team/crawlab/core/models/models/v2"
"github.com/crawlab-team/crawlab/core/models/models/v2"
"github.com/crawlab-team/crawlab/core/models/service"
"github.com/crawlab-team/crawlab/core/result"
"github.com/crawlab-team/crawlab/core/spider/admin"
Expand Down Expand Up @@ -34,7 +34,7 @@ func GetTaskById(c *gin.Context) {
}

// task
t, err := service.NewModelServiceV2[models2.TaskV2]().GetById(id)
t, err := service.NewModelServiceV2[models.TaskV2]().GetById(id)
if errors.Is(err, mongo2.ErrNoDocuments) {
HandleErrorNotFound(c, err)
return
Expand All @@ -45,7 +45,7 @@ func GetTaskById(c *gin.Context) {
}

// spider
t.Spider, _ = service.NewModelServiceV2[models2.SpiderV2]().GetById(t.SpiderId)
t.Spider, _ = service.NewModelServiceV2[models.SpiderV2]().GetById(t.SpiderId)

// skip if task status is pending
if t.Status == constants.TaskStatusPending {
Expand All @@ -54,15 +54,15 @@ func GetTaskById(c *gin.Context) {
}

// task stat
t.Stat, _ = service.NewModelServiceV2[models2.TaskStatV2]().GetById(id)
t.Stat, _ = service.NewModelServiceV2[models.TaskStatV2]().GetById(id)

HandleSuccessWithData(c, t)
}

func GetTaskList(c *gin.Context) {
withStats := c.Query("stats")
if withStats == "" {
NewControllerV2[models2.TaskV2]().GetList(c)
NewControllerV2[models.TaskV2]().GetList(c)
return
}

Expand All @@ -72,7 +72,7 @@ func GetTaskList(c *gin.Context) {
sort := MustGetSortOption(c)

// get tasks
tasks, err := service.NewModelServiceV2[models2.TaskV2]().GetMany(query, &mongo.FindOptions{
tasks, err := service.NewModelServiceV2[models.TaskV2]().GetMany(query, &mongo.FindOptions{
Sort: sort,
Skip: pagination.Size * (pagination.Page - 1),
Limit: pagination.Size,
Expand Down Expand Up @@ -101,14 +101,14 @@ func GetTaskList(c *gin.Context) {
}

// total count
total, err := service.NewModelServiceV2[models2.TaskV2]().Count(query)
total, err := service.NewModelServiceV2[models.TaskV2]().Count(query)
if err != nil {
HandleErrorInternalServerError(c, err)
return
}

// stat list
stats, err := service.NewModelServiceV2[models2.TaskStatV2]().GetMany(bson.M{
stats, err := service.NewModelServiceV2[models.TaskStatV2]().GetMany(bson.M{
"_id": bson.M{
"$in": taskIds,
},
Expand All @@ -119,13 +119,13 @@ func GetTaskList(c *gin.Context) {
}

// cache stat list to dict
statsDict := map[primitive.ObjectID]models2.TaskStatV2{}
statsDict := map[primitive.ObjectID]models.TaskStatV2{}
for _, s := range stats {
statsDict[s.Id] = s
}

// spider list
spiders, err := service.NewModelServiceV2[models2.SpiderV2]().GetMany(bson.M{
spiders, err := service.NewModelServiceV2[models.SpiderV2]().GetMany(bson.M{
"_id": bson.M{
"$in": spiderIds,
},
Expand All @@ -136,7 +136,7 @@ func GetTaskList(c *gin.Context) {
}

// cache spider list to dict
spiderDict := map[primitive.ObjectID]models2.SpiderV2{}
spiderDict := map[primitive.ObjectID]models.SpiderV2{}
for _, s := range spiders {
spiderDict[s.Id] = s
}
Expand Down Expand Up @@ -170,22 +170,22 @@ func DeleteTaskById(c *gin.Context) {
// delete in db
if err := mongo.RunTransaction(func(context mongo2.SessionContext) (err error) {
// delete task
_, err = service.NewModelServiceV2[models2.TaskV2]().GetById(id)
_, err = service.NewModelServiceV2[models.TaskV2]().GetById(id)
if err != nil {
return err
}
err = service.NewModelServiceV2[models2.TaskV2]().DeleteById(id)
err = service.NewModelServiceV2[models.TaskV2]().DeleteById(id)
if err != nil {
return err
}

// delete task stat
_, err = service.NewModelServiceV2[models2.TaskStatV2]().GetById(id)
_, err = service.NewModelServiceV2[models.TaskStatV2]().GetById(id)
if err != nil {
log2.Warnf("delete task stat error: %s", err.Error())
return nil
}
err = service.NewModelServiceV2[models2.TaskStatV2]().DeleteById(id)
err = service.NewModelServiceV2[models.TaskStatV2]().DeleteById(id)
if err != nil {
log2.Warnf("delete task stat error: %s", err.Error())
return nil
Expand Down Expand Up @@ -217,7 +217,7 @@ func DeleteList(c *gin.Context) {

if err := mongo.RunTransaction(func(context mongo2.SessionContext) error {
// delete tasks
if err := service.NewModelServiceV2[models2.TaskV2]().DeleteMany(bson.M{
if err := service.NewModelServiceV2[models.TaskV2]().DeleteMany(bson.M{
"_id": bson.M{
"$in": payload.Ids,
},
Expand All @@ -226,7 +226,7 @@ func DeleteList(c *gin.Context) {
}

// delete task stats
if err := service.NewModelServiceV2[models2.TaskV2]().DeleteMany(bson.M{
if err := service.NewModelServiceV2[models.TaskV2]().DeleteMany(bson.M{
"_id": bson.M{
"$in": payload.Ids,
},
Expand Down Expand Up @@ -261,7 +261,7 @@ func DeleteList(c *gin.Context) {

func PostTaskRun(c *gin.Context) {
// task
var t models2.TaskV2
var t models.TaskV2
if err := c.ShouldBindJSON(&t); err != nil {
HandleErrorBadRequest(c, err)
return
Expand All @@ -274,7 +274,7 @@ func PostTaskRun(c *gin.Context) {
}

// spider
s, err := service.NewModelServiceV2[models2.SpiderV2]().GetById(t.SpiderId)
s, err := service.NewModelServiceV2[models.SpiderV2]().GetById(t.SpiderId)
if err != nil {
HandleErrorInternalServerError(c, err)
return
Expand Down Expand Up @@ -319,7 +319,7 @@ func PostTaskRestart(c *gin.Context) {
}

// task
t, err := service.NewModelServiceV2[models2.TaskV2]().GetById(id)
t, err := service.NewModelServiceV2[models.TaskV2]().GetById(id)
if err != nil {
HandleErrorInternalServerError(c, err)
return
Expand Down Expand Up @@ -363,7 +363,7 @@ func PostTaskCancel(c *gin.Context) {
}

// task
t, err := service.NewModelServiceV2[models2.TaskV2]().GetById(id)
t, err := service.NewModelServiceV2[models.TaskV2]().GetById(id)
if err != nil {
HandleErrorInternalServerError(c, err)
return
Expand Down Expand Up @@ -446,7 +446,7 @@ func GetTaskData(c *gin.Context) {
}

// task
t, err := service.NewModelServiceV2[models2.TaskV2]().GetById(id)
t, err := service.NewModelServiceV2[models.TaskV2]().GetById(id)
if err != nil {
HandleErrorInternalServerError(c, err)
return
Expand Down
File renamed without changes.
11 changes: 11 additions & 0 deletions core/database/interfaces/database_registry_service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package interfaces

import (
"go.mongodb.org/mongo-driver/bson/primitive"
)

type DatabaseRegistryService interface {
Start()
CheckStatus()
GetDatabaseService(id primitive.ObjectID) (res DatabaseService, err error)
}
27 changes: 27 additions & 0 deletions core/database/interfaces/database_service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package interfaces

import (
"github.com/crawlab-team/crawlab/core/database/entity"
"github.com/crawlab-team/crawlab/core/models/models/v2"
"go.mongodb.org/mongo-driver/bson/primitive"
)

type DatabaseService interface {
TestConnection(id primitive.ObjectID) (err error)
GetMetadata(id primitive.ObjectID) (m *entity.DatabaseMetadata, err error)
GetMetadataAllDb(id primitive.ObjectID) (m *entity.DatabaseMetadata, err error)
CreateDatabase(id primitive.ObjectID, databaseName string) (err error)
DropDatabase(id primitive.ObjectID, databaseName string) (err error)
GetTableMetadata(id primitive.ObjectID, databaseName, tableName string) (table *entity.DatabaseTable, err error)
CreateTable(id primitive.ObjectID, databaseName string, table *entity.DatabaseTable) (err error)
ModifyTable(id primitive.ObjectID, databaseName string, table *entity.DatabaseTable) (err error)
DropTable(id primitive.ObjectID, databaseName, tableName string) (err error)
RenameTable(id primitive.ObjectID, databaseName, oldTableName, newTableName string) (err error)
GetColumnTypes(query string) (types []string)
ReadRows(id primitive.ObjectID, databaseName, tableName string, filter map[string]interface{}, skip, limit int) ([]map[string]interface{}, int64, error)
CreateRow(id primitive.ObjectID, databaseName, tableName string, row map[string]interface{}) error
UpdateRow(id primitive.ObjectID, databaseName, tableName string, filter map[string]interface{}, update map[string]interface{}) error
DeleteRow(id primitive.ObjectID, databaseName, tableName string, filter map[string]interface{}) error
Query(id primitive.ObjectID, databaseName, query string) (results *entity.DatabaseQueryResults, err error)
GetCurrentMetric(id primitive.ObjectID) (m *models.DatabaseMetricV2, err error)
}
15 changes: 15 additions & 0 deletions core/database/registry_service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package database

import (
"github.com/crawlab-team/crawlab/core/database/interfaces"
)

var serviceInstance interfaces.DatabaseRegistryService

func SetDatabaseRegistryService(svc interfaces.DatabaseRegistryService) {
serviceInstance = svc
}

func GetDatabaseRegistryService() interfaces.DatabaseRegistryService {
return serviceInstance
}
2 changes: 1 addition & 1 deletion core/grpc/server/task_server_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ func (svr TaskServerV2) handleInsertData(msg *grpc.StreamMessage) (err error) {
if err != nil {
return err
}
var records []interface{}
var records []map[string]interface{}
for _, d := range data.Records {
res, ok := d[constants.TaskKey]
if ok {
Expand Down
4 changes: 2 additions & 2 deletions core/models/models/v2/spider_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ type SpiderV2 struct {
any `collection:"spiders"`
BaseModelV2[SpiderV2] `bson:",inline"`
Name string `json:"name" bson:"name"` // spider name
ColId primitive.ObjectID `json:"col_id" bson:"col_id"` // data collection id
ColName string `json:"col_name,omitempty" bson:"-"` // data collection name
ColId primitive.ObjectID `json:"col_id" bson:"col_id"` // data collection id (deprecated) # TODO: remove this field in the future
ColName string `json:"col_name,omitempty" bson:"col_name"` // data collection name
DataSourceId primitive.ObjectID `json:"data_source_id" bson:"data_source_id"` // data source id
DataSource *DatabaseV2 `json:"data_source,omitempty" bson:"-"` // data source
Description string `json:"description" bson:"description"` // description
Expand Down
8 changes: 1 addition & 7 deletions core/result/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func NewResultService(registryKey string, s *models.Spider) (svc2 interfaces.Res

var store = sync.Map{}

func GetResultService(spiderId primitive.ObjectID, opts ...Option) (svc2 interfaces.ResultService, err error) {
func GetResultService(spiderId primitive.ObjectID) (svc2 interfaces.ResultService, err error) {
// model service
modelSvc, err := service.GetService()
if err != nil {
Expand All @@ -51,12 +51,6 @@ func GetResultService(spiderId primitive.ObjectID, opts ...Option) (svc2 interfa
return nil, trace.TraceError(err)
}

// apply options
_opts := &Options{}
for _, opt := range opts {
opt(_opts)
}

// store key
storeKey := s.ColId.Hex() + ":" + s.DataSourceId.Hex()

Expand Down
Loading

0 comments on commit 4f52936

Please sign in to comment.