Skip to content

Commit

Permalink
update oracle stores
Browse files Browse the repository at this point in the history
  • Loading branch information
BurdenBear committed Nov 13, 2018
1 parent c32423a commit 713c7f1
Show file tree
Hide file tree
Showing 16 changed files with 99 additions and 76 deletions.
36 changes: 8 additions & 28 deletions client/src/routes/Task/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -137,20 +137,10 @@ export default class TaskTableList extends PureComponent {
{
title: '类型',
dataIndex: 'type',
filters: [
{
text: taskTypeText[0],
value: 0,
},
{
text: taskTypeText[1],
value: 1,
},
{
text: taskTypeText[2],
value: 2,
},
],
filters: Object.keys(taskTypeText).map(i => ({
text: taskTypeText[i],
value: i,
})),
render: item => (
<div style={{ color: taskTypeColor[item] }}>{taskTypeText[item] || 'UNKNOWN'}</div>
),
Expand Down Expand Up @@ -188,20 +178,10 @@ export default class TaskTableList extends PureComponent {
{
title: '状态',
dataIndex: 'status',
filters: [
{
text: statusText[0],
value: 0,
},
{
text: statusText[1],
value: 1,
},
{
text: statusText[2],
value: 2,
},
],
filters: Object.keys(statusText).map(i => ({
text: statusText[i],
value: i,
})),
render(item, record) {
return (
<div>
Expand Down
2 changes: 1 addition & 1 deletion server/calculators/celery.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (calculator *CeleryCalculator) Process(id string, factor models.Factor, fac
if err != nil {
return err
}
data := map[string][]interface{}{"args": []interface{}{factor.ID, factorValueStr, string(processType)}}
data := map[string][]interface{}{"args": []interface{}{factor.Name, factorValueStr, string(processType)}}
jsonData, err := json.Marshal(data)
_, err = calculator.queueProcess.Publish(id, jsonData)
return err
Expand Down
6 changes: 4 additions & 2 deletions server/examples/oracleStore/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"fmt"
"log"

"fxdayu.com/dyupdater/server/task"

"fxdayu.com/dyupdater/server/stores"

"github.com/spf13/viper"
Expand All @@ -18,7 +20,7 @@ func main() {
store := new(stores.OracleStore)
store.Init(config)
factor := models.Factor{ID: "test", Formula: ""}
lost, err := store.Check(factor, []int{20160101})
lost, err := store.Check(factor, task.ProcessTypeNone, []int{20160101})
if err != nil {
log.Panic(err)
} else {
Expand All @@ -31,6 +33,6 @@ func main() {
for _, s := range symbols {
factorValues.Values[s] = make([]float64, len(lost))
}
count, err := store.Update(factor, factorValues, false)
count, err := store.Update(factor, task.ProcessTypeNone, factorValues, false)
fmt.Println(count)
}
2 changes: 2 additions & 0 deletions server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ func init() {
f1.StringVarP(&host, "host", "H", "127.0.0.1", "Dashboard's host.")
f1.IntVarP(&port, "port", "p", 19328, "Dashboard's port.")
f1.BoolVarP(&initCheck, "check", "C", false, "Whether to do a initial checking after run.")
f1.IntVarP(&startTime, "start", "s", 0, "The check's start date in format \"20060102\", default will be cal-start-date in configation.")
f1.IntVarP(&endTime, "end", "e", 0, "The check's end date in format \"20060102\", 0 means up to today.")
f1.SortFlags = false
f2 = flag.NewFlagSet("dyupdater check", flag.ContinueOnError)
f2.StringVarP(&logPath, "logfile", "l", "./dyupdater.log", "Set the logfile path.")
Expand Down
3 changes: 2 additions & 1 deletion server/models/factor.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ import (
)

type Factor struct {
ID string `json:"name"`
ID string
Name string `json:"name"`
Formula string `json:"formula,omitempty"`
Archive string `json:"archive,omitempty"`
}
Expand Down
6 changes: 3 additions & 3 deletions server/services/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (service *FactorServices) GetScheduler() schedulers.TaskScheduler {
func (service *FactorServices) mapFactor(store string, factor models.Factor) (newFactor models.Factor) {
newFactor = factor
if service.mapper != nil {
newFactor.ID = service.mapper.Map(store, newFactor.ID)
newFactor.Name = service.mapper.Map(store, newFactor.Name)
}
return
}
Expand All @@ -93,7 +93,7 @@ func (service *FactorServices) Check(factor models.Factor, dateRange models.Date
}

func (service *FactorServices) CheckWithLock(factor models.Factor, dateRange models.DateRange) *task.TaskFuture {
if val, ok := service.count.Load(factor.ID); ok {
if val, ok := service.count.Load(factor.Name); ok {
count := val.(int)
if count > 0 {
return nil
Expand All @@ -108,7 +108,7 @@ func (service *FactorServices) CheckWithLock(factor models.Factor, dateRange mod
ti := task.NewCheckTaskInput(stores, factor, dateRange, task.ProcessTypeNone)
tf := service.scheduler.Publish(nil, ti)
if tf != nil {
service.count.Store(factor.ID, 1)
service.count.Store(factor.Name, 1)
}
return tf
}
Expand Down
6 changes: 3 additions & 3 deletions server/services/handler_cal.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func (handler *calTaskHandler) Handle(tf *task.TaskFuture) error {
if !ok {
panic(fmt.Errorf("store not found: %s", syncFrom))
}
log.Infof("(Task %s) { %s } [ %d , %d ] Fetch begin.", tf.ID, data.Factor.ID, data.DateRange.Start, data.DateRange.End)
log.Infof("(Task %s) { %s } [ %d , %d ] Fetch begin.", tf.ID, data.GetFactorID(), data.GetStartTime(), data.GetEndTime())
if values, err := store.Fetch(service.mapFactor(syncFrom, data.Factor), data.DateRange); err != nil {
return err
} else {
Expand All @@ -45,7 +45,7 @@ func (handler *calTaskHandler) Handle(tf *task.TaskFuture) error {
if !ok {
return fmt.Errorf("calculator not found: %s", data.Calculator)
}
log.Infof("(Task %s) { %s } [ %d , %d ] Cal begin.", tf.ID, data.Factor.ID, data.DateRange.Start, data.DateRange.End)
log.Infof("(Task %s) { %s } [ %d , %d ] Cal begin.", tf.ID, data.GetFactorID(), data.GetStartTime(), data.GetEndTime())
err := calculator.Cal(tf.ID, data.Factor, data.DateRange)
if err != nil {
return err
Expand Down Expand Up @@ -95,7 +95,7 @@ func (handler *calTaskHandler) OnFailed(tf task.TaskFuture, err error) {
if !ok {
return
}
log.Errorf("(Task %s) { %s } [ %d , %d ] Cal failed: %s", tf.ID, data.Factor.ID, data.DateRange.Start, data.DateRange.End, err)
log.Errorf("(Task %s) { %s } [ %d , %d ] Cal failed: %s", tf.ID, data.GetFactorID(), data.GetStartTime(), data.GetEndTime(), err)
}

func (handler *calTaskHandler) GetTaskType() task.TaskType {
Expand Down
34 changes: 25 additions & 9 deletions server/services/handler_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ func (handler *checkTaskHandler) OnSuccess(tf task.TaskFuture, r task.TaskResult
log.Infof("(Task %s) { %s } [ %d , %d ] Check finish, missing data in (%d ... %d).", r.ID, data.GetFactorID(), data.GetStartTime(), data.GetEndTime(), first, last)
maxCalDuration := utils.GetGlobalConfig().GetMaxCalDuration()
minCalDuration := utils.GetGlobalConfig().GetMinCalDuration()
calStartDate := utils.GetGlobalConfig().GetCalStartDate()
if len(result.Datetimes) > 0 {
var ranges []models.DateRange
var current *models.DateRange
Expand All @@ -95,6 +96,9 @@ func (handler *checkTaskHandler) OnSuccess(tf task.TaskFuture, r task.TaskResult
if int(e.Sub(s).Seconds()) <= maxCalDuration {
continue
} else {
if calStartDate >= current.Start {
current.Start = calStartDate
}
ranges = append(ranges, *current)
current = nil
}
Expand All @@ -107,6 +111,9 @@ func (handler *checkTaskHandler) OnSuccess(tf task.TaskFuture, r task.TaskResult
newS := e.Add(-time.Duration(minCalDuration) * time.Second)
current.Start, _ = utils.Datetoi(newS)
}
if calStartDate >= current.Start {
current.Start = calStartDate
}
ranges = append(ranges, *current)
}
if data.ProcessType == task.ProcessTypeNone {
Expand All @@ -119,20 +126,29 @@ func (handler *checkTaskHandler) OnSuccess(tf task.TaskFuture, r task.TaskResult
} else {
// process
syncFrom := utils.GetGlobalConfig().GetSyncFrom()
processFrom := utils.GetGlobalConfig().GetProcessFrom()
var name string
if syncFrom != "" {
name = syncFrom
} else {
name = data.Stores[0]
} else if processFrom != "" {
name = processFrom
}
store, _ := service.stores[name]
index := service.indexer.GetIndex(data.DateRange)
factorValue, err := store.Fetch(service.mapFactor(name, data.Factor), models.DateRange{Start: index[0], End: index[len(index)-1]})
if err != nil {
return err
for _, dateRange := range ranges {
var factorValue models.FactorValue
if name != "" {
var err error
store, _ := service.stores[name]
index := service.indexer.GetIndex(dateRange)
factorValue, err = store.Fetch(service.mapFactor(name, data.Factor), models.DateRange{Start: index[0], End: index[len(index)-1]})
if err != nil {
return err
}
} else {
factorValue.Datetime = []int{dateRange.Start, dateRange.End}
}
ti := task.NewProcessTaskInput(calculators.DefaultCalculator, data.Factor, factorValue, data.ProcessType)
service.scheduler.Publish(&tf, ti)
}
ti := task.NewProcessTaskInput(calculators.DefaultCalculator, data.Factor, factorValue, data.ProcessType)
service.scheduler.Publish(&tf, ti)
}
}
} else {
Expand Down
1 change: 1 addition & 0 deletions server/sources/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ func (source *FileSystemSource) Fetch() []models.Factor {
return ret
}
factor.Archive = base64.StdEncoding.EncodeToString(fileContent)
factor.Name = factorName
factor.ID = factorName
factors = append(factors, factor)
return ret
Expand Down
3 changes: 2 additions & 1 deletion server/sources/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ func (source *MysqlSource) Fetch() []models.Factor {

for rows.Next() {
factor := models.Factor{}
err2 := rows.Scan(&factor.ID, &factor.Formula)
err2 := rows.Scan(&factor.Name, &factor.Formula)
factor.ID = factor.Name
// Exit if we get an error
if err2 != nil {
log.Panic(err2)
Expand Down
10 changes: 5 additions & 5 deletions server/stores/csv.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,15 +68,15 @@ func (s *CSVStore) Init(config *viper.Viper) {
}

func (s *CSVStore) getCSVPath(factor models.Factor) string {
p := path.Join(s.config.Path, factor.ID+".csv")
p := path.Join(s.config.Path, factor.Name+".csv")
return p
}

func (s *CSVStore) getLock(factor models.Factor) *sync.RWMutex {
lock, ok := s.locks[factor.ID]
lock, ok := s.locks[factor.Name]
if !ok {
lock = &sync.RWMutex{}
s.locks[factor.ID] = lock
s.locks[factor.Name] = lock
}
return lock
}
Expand Down Expand Up @@ -202,7 +202,7 @@ func (s *CSVStore) Update(factor models.Factor, factorValue models.FactorValue,
}
}
if len(newRecords) == 0 {
log.Debugf("CSV data of %s no need to be update.", factor.ID)
log.Debugf("CSV data of %s no need to be update.", factor.Name)
return 0, nil
}
start := newRecords[0].TDate
Expand All @@ -212,7 +212,7 @@ func (s *CSVStore) Update(factor models.Factor, factorValue models.FactorValue,
s1 := records[:startIndex]
s2 := newRecords
s3 := records[endIndex:]
log.Debugf("Update CSV data of %s in [%d, %d]", factor.ID, newRecords[0].TDate, newRecords[len(newRecords)-1].TDate)
log.Debugf("Update CSV data of %s in [%d, %d]", factor.Name, newRecords[0].TDate, newRecords[len(newRecords)-1].TDate)
err := s.writeFactor(factor, s1, s2, s3)
if err != nil {
return 0, err
Expand Down
6 changes: 3 additions & 3 deletions server/stores/hdf5.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,9 +205,9 @@ func (s *HDF5Store) handleFetchResult() {

func (s *HDF5Store) getFactorIndentity(factor models.Factor, processType task.FactorProcessType) string {
if processType == task.ProcessTypeNone {
return factor.ID
return factor.Name
}
return factor.ID + "__" + string(processType)
return factor.Name + "__" + string(processType)
}

func (s *HDF5Store) Check(factor models.Factor, processType task.FactorProcessType, index []int) ([]int, error) {
Expand Down Expand Up @@ -276,7 +276,7 @@ func (s *HDF5Store) Update(factor models.Factor, processType task.FactorProcessT

func (s *HDF5Store) Fetch(factor models.Factor, dateRange models.DateRange) (models.FactorValue, error) {
data := map[string][]interface{}{
"args": []interface{}{factor.ID, dateRange.Start, dateRange.End},
"args": []interface{}{factor.Name, dateRange.Start, dateRange.End},
}
jsonData, err := json.Marshal(data)
s.taskFetchCount++
Expand Down
10 changes: 5 additions & 5 deletions server/stores/mongo.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ func (store *MongoStore) Init(config *viper.Viper) {

func (store *MongoStore) getFactorIndentity(factor models.Factor, processType task.FactorProcessType) string {
if processType == task.ProcessTypeNone {
return factor.ID
return factor.Name
}
return factor.ID + "__" + string(processType)
return factor.Name + "__" + string(processType)
}

func (store *MongoStore) getFactorDateSet(factor models.Factor, processType task.FactorProcessType) (mapset.Set, error) {
Expand Down Expand Up @@ -162,7 +162,7 @@ func (store *MongoStore) Check(factor models.Factor, processType task.FactorProc
func (store *MongoStore) Fetch(factor models.Factor, dateRange models.DateRange) (models.FactorValue, error) {
conn := store.session.Clone()
defer conn.Close()
col := conn.DB(store.config.Db).C(factor.ID)
col := conn.DB(store.config.Db).C(factor.Name)
start, err := utils.ItoDate(dateRange.Start)
end, err := utils.ItoDate(dateRange.End)
filter := bson.M{"datetime": bson.M{"$gte": start, "$lte": end}}
Expand All @@ -184,7 +184,7 @@ func (store *MongoStore) Fetch(factor models.Factor, dateRange models.DateRange)
}
dtTime, ok := dt.(time.Time)
if !ok {
return models.FactorValue{}, fmt.Errorf("unvalid factor values data in %s: %v", factor.ID, dt)
return models.FactorValue{}, fmt.Errorf("unvalid factor values data in %s: %v", factor.Name, dt)
}
dtPoint, _ := utils.Datetoi(dtTime)
datetime = append(datetime, dtPoint)
Expand All @@ -200,7 +200,7 @@ func (store *MongoStore) Fetch(factor models.Factor, dateRange models.DateRange)
}
valuePoint, ok := v.(float64)
if !ok {
return models.FactorValue{}, fmt.Errorf("unvalid factor values data in %s: %v", factor.ID, v)
return models.FactorValue{}, fmt.Errorf("unvalid factor values data in %s: %v", factor.Name, v)
}
values[k] = append(vSlice, valuePoint)
}
Expand Down
Loading

0 comments on commit 713c7f1

Please sign in to comment.