Skip to content

Commit

Permalink
1)Calculate 1m quote from tick
Browse files Browse the repository at this point in the history
2)Add quote stats and ts api
3)Looser constraints on tag
  • Loading branch information
foolcage committed Jul 31, 2024
1 parent db69c41 commit c5276c9
Show file tree
Hide file tree
Showing 31 changed files with 551 additions and 203 deletions.
2 changes: 2 additions & 0 deletions api-tests/trading/get_quote_stats.http
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
GET http://127.0.0.1:8090/api/trading/get_quote_stats
accept: application/json
13 changes: 13 additions & 0 deletions api-tests/trading/query_kdata.http
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
POST http://127.0.0.1:8090/api/trading/query_kdata
accept: application/json
Content-Type: application/json


{
"data_provider": "em",
"entity_ids": [
"stock_sz_002085",
"stock_sz_300133"
],
"adjust_type": "hfq"
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
POST http://127.0.0.1:8090/api/factor/query_kdata
POST http://127.0.0.1:8090/api/trading/query_ts
accept: application/json
Content-Type: application/json

Expand Down
22 changes: 20 additions & 2 deletions src/zvt/api/kdata.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,17 @@
from zvt.contract.api import decode_entity_id, get_schema_by_name
from zvt.domain import Index1dKdata
from zvt.utils.pd_utils import pd_is_not_null
from zvt.utils.time_utils import to_time_str, TIME_FORMAT_DAY, TIME_FORMAT_ISO8601, to_pd_timestamp
from zvt.utils.time_utils import (
to_time_str,
TIME_FORMAT_DAY,
TIME_FORMAT_ISO8601,
to_pd_timestamp,
date_time_by_interval,
current_date,
)


def get_trade_dates(start, end):
def get_trade_dates(start, end=None):
df = Index1dKdata.query_data(
entity_id="index_sh_000001",
provider="em",
Expand All @@ -25,6 +32,12 @@ def get_trade_dates(start, end):
return df["timestamp"].tolist()


def get_recent_trade_dates(days_count=5):
max_start = date_time_by_interval(current_date(), -days_count - 15)
dates = get_trade_dates(start=max_start)
return dates[-days_count:]


def get_latest_kdata_date(
entity_type: str,
provider: str = None,
Expand Down Expand Up @@ -185,9 +198,14 @@ def to_sum(s):
return df


if __name__ == "__main__":
print(get_recent_trade_dates())


# the __all__ is generated
__all__ = [
"get_trade_dates",
"get_recent_trade_dates",
"get_latest_kdata_date",
"get_kdata_schema",
"get_kdata",
Expand Down
32 changes: 25 additions & 7 deletions src/zvt/broker/qmt/qmt_quote.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,16 @@
from zvt.contract import IntervalLevel, AdjustType
from zvt.contract.api import decode_entity_id, df_to_db, get_db_session
from zvt.domain import StockQuote, Stock
from zvt.domain.quotes.stock.stock_quote import Stock1mQuote
from zvt.domain.quotes.stock.stock_quote import Stock1mQuote, StockQuoteLog
from zvt.utils.pd_utils import pd_is_not_null
from zvt.utils.time_utils import to_time_str, current_date, to_pd_timestamp, now_pd_timestamp, TIME_FORMAT_MINUTE
from zvt.utils.time_utils import (
to_time_str,
current_date,
to_pd_timestamp,
now_pd_timestamp,
TIME_FORMAT_MINUTE,
date_time_by_interval,
)

# https://dict.thinktrader.net/nativeApi/start_now.html?id=e2M5nZ

Expand Down Expand Up @@ -137,7 +144,7 @@ def get_kdata(
code = _to_qmt_code(entity_id=entity_id)
period = level.value
# 保证qmt先下载数据到本地
xtdata.download_history_data(stock_code=code, period=period, start_time="", end_time="")
xtdata.download_history_data(stock_code=code, period=period)
records = xtdata.get_market_data(
stock_list=[code],
period=period,
Expand Down Expand Up @@ -223,14 +230,22 @@ def on_data(datas, stock_df=entity_df):
df["float_cap"] = df["float_volume"] * df["price"]
df["total_cap"] = df["total_volume"] * df["price"]

# 实时行情统计,只保留最新
df_to_db(df, data_schema=StockQuote, provider="qmt", force_update=True, drop_duplicates=False)
cost_time = time.time() - start_time
logger.info(f"Quotes cost_time:{cost_time} for {len(datas.keys())} stocks")

# 1分钟分时
df["id"] = df[["entity_id", "timestamp"]].apply(
lambda se: "{}_{}".format(se["entity_id"], to_time_str(se["timestamp"], TIME_FORMAT_MINUTE)), axis=1
)
df_to_db(df, data_schema=Stock1mQuote, provider="qmt", force_update=True, drop_duplicates=False)
# 历史记录
# df["id"] = df[["entity_id", "timestamp"]].apply(
# lambda se: "{}_{}".format(se["entity_id"], to_time_str(se["timestamp"], TIME_FORMAT_MINUTE2)), axis=1
# )
# df_to_db(df, data_schema=StockQuoteLog, provider="qmt", force_update=True, drop_duplicates=False)

cost_time = time.time() - start_time
logger.info(f"Quotes cost_time:{cost_time} for {len(datas.keys())} stocks")

return on_data

Expand All @@ -245,6 +260,9 @@ def download_capital_data():
def clear_history_quote():
session = get_db_session("qmt", data_schema=StockQuote)
session.query(StockQuote).filter(StockQuote.timestamp < current_date()).delete()
start_date = date_time_by_interval(current_date(), -20)
session.query(Stock1mQuote).filter(Stock1mQuote.timestamp < start_date).delete()
session.query(StockQuoteLog).filter(StockQuoteLog.timestamp < start_date).delete()
session.commit()


Expand All @@ -253,7 +271,7 @@ def record_tick():
Stock.record_data(provider="em")
stocks = get_qmt_stocks()
print(stocks)
xtdata.subscribe_whole_quote(stocks, callback=tick_to_quote())
sid = xtdata.subscribe_whole_quote(stocks, callback=tick_to_quote())

"""阻塞线程接收行情回调"""
import time
Expand All @@ -267,6 +285,7 @@ def record_tick():
if current_timestamp.hour >= 15 and current_timestamp.minute >= 10:
logger.info(f"record tick finished at: {current_timestamp}")
break
xtdata.unsubscribe_quote(sid)


if __name__ == "__main__":
Expand All @@ -278,7 +297,6 @@ def record_tick():
sched.start()
sched._thread.join()


# the __all__ is generated
__all__ = [
"get_qmt_stocks",
Expand Down
3 changes: 1 addition & 2 deletions src/zvt/contract/recorder.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,13 @@
from zvt.contract.api import get_entities, get_data
from zvt.contract.base_service import OneStateService
from zvt.contract.schema import Mixin, TradableEntity
from zvt.contract.utils import is_in_same_interval, evaluate_size_from_timestamp
from zvt.contract.zvt_info import RecorderState
from zvt.utils.pd_utils import pd_is_not_null
from zvt.utils.time_utils import (
to_pd_timestamp,
TIME_FORMAT_DAY,
to_time_str,
evaluate_size_from_timestamp,
is_in_same_interval,
now_pd_timestamp,
now_time_str,
)
Expand Down
70 changes: 70 additions & 0 deletions src/zvt/contract/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# -*- coding: utf-8 -*-
import math

import pandas as pd

from zvt.contract import IntervalLevel
from zvt.utils.time_utils import to_pd_timestamp


def is_in_same_interval(t1: pd.Timestamp, t2: pd.Timestamp, level: IntervalLevel):
t1 = to_pd_timestamp(t1)
t2 = to_pd_timestamp(t2)
if level == IntervalLevel.LEVEL_1WEEK:
return t1.week == t2.week
if level == IntervalLevel.LEVEL_1MON:
return t1.month == t2.month

return level.floor_timestamp(t1) == level.floor_timestamp(t2)


def evaluate_size_from_timestamp(
start_timestamp, level: IntervalLevel, one_day_trading_minutes, end_timestamp: pd.Timestamp = None
):
"""
given from timestamp,level,one_day_trading_minutes,this func evaluate size of kdata to current.
it maybe a little bigger than the real size for fetching all the kdata.
:param start_timestamp:
:type start_timestamp: pd.Timestamp
:param level:
:type level: IntervalLevel
:param one_day_trading_minutes:
:type one_day_trading_minutes: int
"""
if not end_timestamp:
end_timestamp = pd.Timestamp.now()
else:
end_timestamp = to_pd_timestamp(end_timestamp)

time_delta = end_timestamp - to_pd_timestamp(start_timestamp)

one_day_trading_seconds = one_day_trading_minutes * 60

if level == IntervalLevel.LEVEL_1DAY:
return time_delta.days + 1

if level == IntervalLevel.LEVEL_1WEEK:
return int(math.ceil(time_delta.days / 7)) + 1

if level == IntervalLevel.LEVEL_1MON:
return int(math.ceil(time_delta.days / 30)) + 1

if time_delta.days > 0:
seconds = (time_delta.days + 1) * one_day_trading_seconds
return int(math.ceil(seconds / level.to_second())) + 1
else:
seconds = time_delta.total_seconds()
return min(int(math.ceil(seconds / level.to_second())) + 1, one_day_trading_seconds / level.to_second() + 1)


def next_timestamp_on_level(current_timestamp: pd.Timestamp, level: IntervalLevel) -> pd.Timestamp:
current_timestamp = to_pd_timestamp(current_timestamp)
return current_timestamp + pd.Timedelta(seconds=level.to_second())


def is_finished_kdata_timestamp(timestamp, level: IntervalLevel):
timestamp = to_pd_timestamp(timestamp)
if level.floor_timestamp(timestamp) == timestamp:
return True
return False
39 changes: 38 additions & 1 deletion src/zvt/domain/quotes/stock/stock_quote.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,39 @@ class StockQuote(StockQuoteBase, Mixin):
total_cap = Column(Float)


class StockQuoteLog(StockQuoteBase, Mixin):
__tablename__ = "stock_quote_log"
code = Column(String(length=32))
name = Column(String(length=32))

#: UNIX时间戳
time = Column(Integer)
#: 最新价
price = Column(Float)
# 涨跌幅
change_pct = Column(Float)
# 成交金额
turnover = Column(Float)
# 换手率
turnover_rate = Column(Float)
#: 是否涨停
is_limit_up = Column(Boolean)
#: 封涨停金额
limit_up_amount = Column(Float)
#: 是否跌停
is_limit_down = Column(Boolean)
#: 封跌停金额
limit_down_amount = Column(Float)
#: 5挡卖单金额
ask_amount = Column(Float)
#: 5挡买单金额
bid_amount = Column(Float)
#: 流通市值
float_cap = Column(Float)
#: 总市值
total_cap = Column(Float)


class Stock1mQuote(StockQuoteBase, Mixin):
__tablename__ = "stock_1m_quote"
code = Column(String(length=32))
Expand All @@ -60,10 +93,14 @@ class Stock1mQuote(StockQuoteBase, Mixin):
turnover = Column(Float)
# 换手率
turnover_rate = Column(Float)
#: 是否涨停
is_limit_up = Column(Boolean)
#: 是否跌停
is_limit_down = Column(Boolean)


register_schema(providers=["qmt"], db_name="stock_quote", schema_base=StockQuoteBase, entity_type="stock")


# the __all__ is generated
__all__ = ["StockQuote", "Stock1mQuote"]
__all__ = ["StockQuote", "StockQuoteLog", "Stock1mQuote"]
21 changes: 2 additions & 19 deletions src/zvt/factors/factor_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from pydantic import BaseModel, Field

from zvt.contract import IntervalLevel, AdjustType
from zvt.contract import IntervalLevel
from zvt.trader import TradingSignalType
from zvt.utils.time_utils import date_time_by_interval, current_date

Expand All @@ -17,23 +17,6 @@ class FactorRequestModel(BaseModel):
level: IntervalLevel = Field(default=IntervalLevel.LEVEL_1DAY)


class KdataRequestModel(BaseModel):
entity_ids: List[str]
data_provider: str = Field(default="em")
start_timestamp: datetime = Field(default=date_time_by_interval(current_date(), -365))
end_timestamp: Optional[datetime] = Field(default=None)
level: IntervalLevel = Field(default=IntervalLevel.LEVEL_1DAY)
adjust_type: AdjustType = Field(default=AdjustType.hfq)


class KdataModel(BaseModel):
entity_id: str
code: str
name: str
level: IntervalLevel = Field(default=IntervalLevel.LEVEL_1DAY)
datas: List


class TradingSignalModel(BaseModel):
entity_id: str
happen_timestamp: datetime
Expand All @@ -51,4 +34,4 @@ class FactorResultModel(BaseModel):


# the __all__ is generated
__all__ = ["FactorRequestModel", "KdataRequestModel", "KdataModel", "TradingSignalModel", "FactorResultModel"]
__all__ = ["FactorRequestModel", "TradingSignalModel", "FactorResultModel"]
33 changes: 2 additions & 31 deletions src/zvt/factors/factor_service.py
Original file line number Diff line number Diff line change
@@ -1,40 +1,11 @@
# -*- coding: utf-8 -*-
import pandas as pd

from zvt.api import kdata as kdata_api
from zvt.contract import zvt_context
from zvt.domain import Stock
from zvt.factors.factor_models import FactorRequestModel, KdataRequestModel
from zvt.factors.factor_models import FactorRequestModel
from zvt.factors.technical_factor import TechnicalFactor
from zvt.trader import TradingSignalType
from zvt.utils.pd_utils import pd_is_not_null


def query_kdata(kdata_request_model: KdataRequestModel):

kdata_df = kdata_api.get_kdata(
entity_ids=kdata_request_model.entity_ids,
provider=kdata_request_model.data_provider,
start_timestamp=kdata_request_model.start_timestamp,
end_timestamp=kdata_request_model.end_timestamp,
adjust_type=kdata_request_model.adjust_type,
)
if pd_is_not_null(kdata_df):
kdata_df["timestamp"] = kdata_df["timestamp"].apply(lambda x: int(x.timestamp()))
kdata_df["data"] = kdata_df.apply(
lambda x: x[
["timestamp", "open", "high", "low", "close", "volume", "turnover", "change_pct", "turnover_rate"]
].values.tolist(),
axis=1,
)
df = kdata_df.groupby("entity_id").agg(
code=("code", "first"),
name=("name", "first"),
level=("level", "first"),
datas=("data", lambda data: list(data)),
)
df = df.reset_index(drop=False)
return df.to_dict(orient="records")


def query_factor_result(factor_request_model: FactorRequestModel):
Expand Down Expand Up @@ -70,4 +41,4 @@ def to_trading_signal(order_type):


# the __all__ is generated
__all__ = ["query_kdata", "query_factor_result"]
__all__ = ["query_factor_result"]
Loading

0 comments on commit c5276c9

Please sign in to comment.