Skip to content

Commit

Permalink
add fund stock
Browse files Browse the repository at this point in the history
  • Loading branch information
52lu committed Sep 6, 2021
1 parent d57e104 commit 839cb65
Show file tree
Hide file tree
Showing 5 changed files with 263 additions and 4 deletions.
4 changes: 2 additions & 2 deletions config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ mysql:
skipInitializeWithVersion: false # 根据当前 MySQL 版本自动配置
autoMigrate: true # 开启时,每次服务启动都会根据实体创建/更新表结构
slowSql: 1ms # 慢sql时间。单位毫秒
logLevel: error # error、info、warn
logLevel: info # error、info、warn
ignoreRecordNotFoundError: true # 是否忽略ErrRecordNotFound(未查到记录错误)
gorm: # gorm配置项disableForeignKeyConstraintWhenMigrating
skipDefaultTx: false # 是否跳过默认事务
Expand Down Expand Up @@ -61,4 +61,4 @@ elastic:
logPre: ES-
cron:
# 是否开启
enable: true
enable: false
122 changes: 122 additions & 0 deletions crontab/fund_stock_cron.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
// Package crontab: 基金股票持仓
package crontab

import (
"52lu/fund-analye-system/global"
"52lu/fund-analye-system/model/dao"
"52lu/fund-analye-system/model/entity"
"52lu/fund-analye-system/service/crawl/fund"
"fmt"
"math"
"sync"
"time"
)

type FundStockCron struct {
}

// 声明并发等待组
var wg sync.WaitGroup

// 每次任务抓取总数量
var perTaskTotal = 50

// 记录每次任务对应的基金code
var fundCodeChannel = make(chan []string, perTaskTotal)

// 定时任务启动入口
func (c FundStockCron) Run() {
btime := time.Now().UnixMilli()
fmt.Println("基金持仓-股票定时任务准备执行....")
pageNum := 10
totalPage := int(math.Ceil(float64(perTaskTotal) / float64(pageNum)))
// 开启协程分组抓取
// 创建数据通道
var dataChan = make(chan [][]entity.FundStock, perTaskTotal/pageNum)
runWithGoroutine(dataChan, totalPage, pageNum)
// 读取通道,数据入库
saveToDb(dataChan)
fmt.Printf("基金持仓股票-定时任务执行完成,耗时:%vms\n", time.Now().UnixMilli()-btime)
}

// 开启协程分组抓取
func runWithGoroutine(dataChan chan [][]entity.FundStock, totalPage, pageNum int) {
// 开启协程抓取
wg.Add(totalPage)
for i := 1; i <= totalPage; i++ {
page := i
go func() {
// 获取对应页数的code
fundStocks, err := dao.FindNoSyncFundStockByPage(page, pageNum)
if err == nil {
var fundStockList [][]entity.FundStock
var fundCodes []string
for _, val := range fundStocks {
rows := &fund.StockPercentageRowsCrawl{}
rows.CrawlHtml(val.Code)
fundCodes = append(fundCodes,val.Code)
if len(rows.Rows) > 0 {
convertEntity := rows.ConvertEntity()
fundStockList = append(fundStockList, convertEntity)
}
}
// 数据存入通道
dataChan <- fundStockList
fundCodeChannel <- fundCodes
}
// 并发等待组减一
wg.Done()
}()
}
wg.Wait()
close(dataChan)
close(fundCodeChannel)
}

// 保存入库
func saveToDb(dataChan chan [][]entity.FundStock) {
// 声明基金持仓股票实体列表
fundStockRows := []entity.FundStock{}
// 声明股票实体列表
stockRows := []entity.Stock{}
// 声明股票实体列表
checkExistKey := make(map[string]struct{}, perTaskTotal)
// 遍历
for fundStockGroup := range dataChan {
for _, fundStockList := range fundStockGroup {
for _, fundStock := range fundStockList {
stockCode := fundStock.StockCode
fundStockRows = append(fundStockRows, fundStock)
// 判断是否已经存在
if _, ok := checkExistKey[stockCode]; !ok {
stockRows = append(stockRows, entity.Stock{
Code: fundStock.StockCode,
Name: fundStock.StockName,
})
checkExistKey[stockCode] = struct{}{}
}
}
}
}
var codeList []string
for val := range fundCodeChannel {
for _, c := range val {
codeList = append(codeList,c)
}
}

// 入库
if save := global.GvaMysqlClient.Create(fundStockRows); save.Error != nil {
global.GvaLogger.Sugar().Errorf("基金持仓入库失败:%s", save.Error)
}
// 批量更新
if len(codeList) > 0 {
if up := global.GvaMysqlClient.Model(&entity.FundBasis{}).Where("`code` in ?", codeList).
Update("sync_stock", 1); up.Error != nil {
global.GvaLogger.Sugar().Errorf("信息更新失败:%s", up.Error)
}
}
if save := global.GvaMysqlClient.Create(stockRows); save.Error != nil {
global.GvaLogger.Sugar().Errorf("股票信息入库失败:%s", save.Error)
}
}
28 changes: 28 additions & 0 deletions model/dao/fund_basic.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Package dao: 基金基本信息
package dao

import (
"52lu/fund-analye-system/global"
"52lu/fund-analye-system/model/entity"
"gorm.io/gorm"
)

// 统计没有获取持仓股票的基金数量
func CountNoSyncFundStock() int64 {
var num int64
global.GvaMysqlClient.Model(&entity.FundBasis{}).Where("sync_stock = ?",0).Count(&num)
return num
}


// 分页获取没有持仓股票的基金code
func FindNoSyncFundStockByPage(page, pageNum int) ([]entity.FundBasis, error) {
limit := (page - 1) * pageNum
fbs := []entity.FundBasis{}
find := global.GvaMysqlClient.Select("`code`").Where("sync_stock = ?", 0).
Limit(pageNum).Offset(limit).Find(&fbs)
if find.Error != nil && find.Error == gorm.ErrRecordNotFound {
return fbs, nil
}
return fbs, find.Error
}
6 changes: 4 additions & 2 deletions model/entity/fund.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,18 @@ type FundBasis struct {
CustodyFeeRate float64 `json:"custodyFeeRate" gorm:"type:decimal(4,2);comment:托管费率(百分比)"`
SaleFeeRate float64 `json:"saleFeeRate" gorm:"type:decimal(4,2);comment:销售服务费率(百分比)"`
Benchmark string `json:"benchmark" gorm:"type:varchar(255);comment:业绩比较基准"`
SyncStock int `json:"syncTop" gorm:"type:tinyint(4);default:0;comment:是否同步股票持仓, 1:是 0:否"`
}

// FundStock 基金股票持仓
type FundStock struct {
global.BaseModel
FundCode string `json:"fundCode" gorm:"type:varchar(10);not null; default:'';index;comment:基金code"`
StockCode string `json:"stockCode" gorm:"type:varchar(10);index;comment:股票code"`
StockName string `json:"stockName" gorm:"type:varchar(10);index;comment:股票名称"`
Percentage float64 `json:"percentage" gorm:"type:decimal(4,2);comment:持仓占比(百分比)"`
Quantity float64 `json:"quantity" gorm:"type:decimal(5,2);comment:持股数(万股)"`
Amount float64 `json:"amount" gorm:"type:decimal(5,2);comment:持股市值(万元)"`
Quantity float64 `json:"quantity" gorm:"type:decimal(10,2);comment:持股数(万股)"`
Amount float64 `json:"amount" gorm:"type:decimal(10,2);comment:持股市值(万元)"`
CutOffDate string `json:"cutOffDate" gorm:"type:char(10);comment:截止时间"`
}

Expand Down
107 changes: 107 additions & 0 deletions service/crawl/fund/stock_crawl.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
// Package fund: 基金股票持仓
package fund

import (
"52lu/fund-analye-system/global"
"52lu/fund-analye-system/model/entity"
"52lu/fund-analye-system/service/crawl"
"bytes"
"fmt"
"github.com/PuerkitoBio/goquery"
"github.com/gocolly/colly/v2"
"go.uber.org/zap"
"regexp"
"strconv"
"strings"
"time"
)

type StockPercentageRow struct {
StockCode string `selector:"td:nth-of-type(2)"`
StockName string `selector:"td:nth-of-type(3)"`
Percentage string `selector:"td:nth-of-type(7)"`
Quantity string `selector:"td:nth-of-type(8)"`
Amount string `selector:"td:nth-of-type(9)"`
}

type StockPercentageRowsCrawl struct {
Rows []StockPercentageRow `selector:"tr"`
FundCode string
CutOffDate string
}


// 爬取信息
func (c *StockPercentageRowsCrawl) CrawlHtml(fundCode string) {
collector := colly.NewCollector(colly.UserAgent(crawl.UserAgent),colly.Async(true))
// 开启限速
err := collector.Limit(&colly.LimitRule{
DomainGlob: "*fundf10.eastmoney.*",
Delay: 500 * time.Millisecond,
RandomDelay: 500 * time.Millisecond,
Parallelism: 20,
})
collector.OnRequest(func(request *colly.Request) {
fmt.Println("url:",request.URL)
})
// 处理返回的数据
collector.OnResponse(func(response *colly.Response) {
// 替换字符串
compile := regexp.MustCompile(`var apidata=\{ content:"(.*)",arryear:`)
matchResult := compile.FindAllStringSubmatch(string(response.Body), -1)
if len(matchResult) == 0 {
return
}
htmlString := matchResult[0][1]
htmlString = strings.ReplaceAll(htmlString, "%", "")
htmlString = strings.ReplaceAll(htmlString, ",", "")
doc, err := goquery.NewDocumentFromReader(bytes.NewBuffer([]byte(htmlString)))
if err != nil {
return
}
docSelection := doc.Find("div[class='box']").First()
e := &colly.HTMLElement{
DOM: docSelection.Find("table"),
}
err = e.Unmarshal(c)
if err != nil {
global.GvaLogger.Error("爬虫解析失败",zap.String("error",err.Error()))
return
}
// 过滤header
if len(c.Rows) > 0 && c.Rows[0].StockCode == ""{
c.Rows = c.Rows[1:]
}

// 获取持仓季度时间信息
c.CutOffDate = docSelection.Find("h4 label").Eq(1).Find("font").Text()
// 补充额外信息
c.FundCode = fundCode
})
err = collector.Visit(fmt.Sprintf("https://fundf10.eastmoney.com/FundArchivesDatas.aspx?type=jjcc&code=%s&topline=30", fundCode))
if err != nil {
global.GvaLogger.Sugar().Errorf("CrawlHtml error:%s", err)
}
collector.Wait()
}

// 数据清洗
func (c StockPercentageRowsCrawl) ConvertEntity() []entity.FundStock {
var fundStocks []entity.FundStock
if len(c.Rows) < 1 {
return []entity.FundStock{}
}
for _, row := range c.Rows {
item := entity.FundStock{
FundCode: c.FundCode,
StockCode: row.StockCode,
StockName: row.StockName,
CutOffDate: c.CutOffDate,
}
item.Percentage, _ = strconv.ParseFloat(row.Percentage, 64)
item.Quantity, _ = strconv.ParseFloat(row.Quantity, 64)
item.Amount, _ = strconv.ParseFloat(row.Amount, 64)
fundStocks = append(fundStocks, item)
}
return fundStocks
}

0 comments on commit 839cb65

Please sign in to comment.