From 1cba7a107f6f6e9429aa0d1c46935ac52fb68bf4 Mon Sep 17 00:00:00 2001 From: foolcage <5533061@qq.com> Date: Tue, 23 Jul 2024 23:41:22 +0800 Subject: [PATCH] vip --- api-tests/trading/get_quote_stats.http | 2 + api-tests/trading/query_kdata.http | 13 + .../query_ts.http} | 2 +- src/zvt/api/kdata.py | 22 +- src/zvt/broker/qmt/qmt_quote.py | 41 ++- src/zvt/contract/recorder.py | 3 +- src/zvt/contract/utils.py | 70 +++++ src/zvt/domain/quotes/stock/stock_quote.py | 39 ++- src/zvt/factors/factor_models.py | 21 +- src/zvt/factors/factor_service.py | 33 +-- src/zvt/fill_project.py | 5 +- src/zvt/{zhdate => misc}/__init__.py | 2 + src/zvt/{zhdate => misc}/constants.py | 2 + src/zvt/misc/misc_models.py | 15 + .../{zhdate/ztime.py => misc/misc_service.py} | 41 +-- src/zvt/{zhdate => misc}/zhdate.py | 2 +- .../qmt/quotes/qmt_kdata_recorder.py | 37 ++- src/zvt/rest/factor.py | 7 +- src/zvt/rest/misc.py | 22 ++ src/zvt/rest/trading.py | 20 ++ src/zvt/server.py | 2 + src/zvt/tag/common.py | 13 +- src/zvt/tag/dynamic_pool.py | 31 +++ src/zvt/tag/tag_models.py | 17 +- src/zvt/tag/tag_service.py | 19 +- src/zvt/trading/trading_models.py | 59 +++- src/zvt/trading/trading_service.py | 130 ++++++++- src/zvt/utils/recorder_utils.py | 4 +- src/zvt/utils/time_utils.py | 69 ----- src/zvt/utils/utils.py | 15 - src/zvt_vip/__init__.py | 2 + src/zvt_vip/dataset/__init__.py | 13 + src/zvt_vip/dataset/stock_events.py | 22 ++ src/zvt_vip/event/__init__.py | 1 + src/zvt_vip/event/ai_suggestion.py | 198 ++++++++++++++ src/zvt_vip/event/event_models.py | 37 +++ src/zvt_vip/event/event_service.py | 125 +++++++++ src/zvt_vip/fill_project.py | 7 + src/zvt_vip/recorders/__init__.py | 19 ++ src/zvt_vip/recorders/em_api.py | 77 ++++++ .../recorders/em_stock_events_recorder.py | 48 ++++ .../recorders/em_stock_news_recorder.py | 50 ++++ src/zvt_vip/recorders/jqka/__init__.py | 1 + src/zvt_vip/recorders/jqka/jqka_vip_api.py | 6 + src/zvt_vip/rest/__init__.py | 1 + src/zvt_vip/rest/event.py | 34 +++ src/zvt_vip/server.py | 48 ++++ src/zvt_vip/tag/__init__.py | 13 + src/zvt_vip/tag/tag_service.py | 197 +++++++++++++ src/zvt_vip/tag/taggers.py | 38 +++ src/zvt_vip/tasks/__init__.py | 1 + src/zvt_vip/tasks/event_runner.py | 48 ++++ src/zvt_vip/tasks/news_runner.py | 59 ++++ src/zvt_vip/tasks/stock_pool_runner.py | 258 ++++++++++++++++++ src/zvt_vip/utils/__init__.py | 1 + src/zvt_vip/utils/ai_utils.py | 5 + tests/utils/test_time_utils.py | 4 +- 57 files changed, 1863 insertions(+), 208 deletions(-) create mode 100644 api-tests/trading/get_quote_stats.http create mode 100644 api-tests/trading/query_kdata.http rename api-tests/{factor/query_kdata.http => trading/query_ts.http} (72%) create mode 100644 src/zvt/contract/utils.py rename src/zvt/{zhdate => misc}/__init__.py (96%) rename src/zvt/{zhdate => misc}/constants.py (99%) create mode 100644 src/zvt/misc/misc_models.py rename src/zvt/{zhdate/ztime.py => misc/misc_service.py} (67%) rename src/zvt/{zhdate => misc}/zhdate.py (99%) create mode 100644 src/zvt/rest/misc.py create mode 100644 src/zvt/tag/dynamic_pool.py create mode 100644 src/zvt_vip/__init__.py create mode 100644 src/zvt_vip/dataset/__init__.py create mode 100644 src/zvt_vip/dataset/stock_events.py create mode 100644 src/zvt_vip/event/__init__.py create mode 100644 src/zvt_vip/event/ai_suggestion.py create mode 100644 src/zvt_vip/event/event_models.py create mode 100644 src/zvt_vip/event/event_service.py create mode 100644 src/zvt_vip/fill_project.py create mode 100644 src/zvt_vip/recorders/__init__.py create mode 100644 src/zvt_vip/recorders/em_api.py create mode 100644 src/zvt_vip/recorders/em_stock_events_recorder.py create mode 100644 src/zvt_vip/recorders/em_stock_news_recorder.py create mode 100644 src/zvt_vip/recorders/jqka/__init__.py create mode 100644 src/zvt_vip/recorders/jqka/jqka_vip_api.py create mode 100644 src/zvt_vip/rest/__init__.py create mode 100644 src/zvt_vip/rest/event.py create mode 100644 src/zvt_vip/server.py create mode 100644 src/zvt_vip/tag/__init__.py create mode 100644 src/zvt_vip/tag/tag_service.py create mode 100644 src/zvt_vip/tag/taggers.py create mode 100644 src/zvt_vip/tasks/__init__.py create mode 100644 src/zvt_vip/tasks/event_runner.py create mode 100644 src/zvt_vip/tasks/news_runner.py create mode 100644 src/zvt_vip/tasks/stock_pool_runner.py create mode 100644 src/zvt_vip/utils/__init__.py create mode 100644 src/zvt_vip/utils/ai_utils.py diff --git a/api-tests/trading/get_quote_stats.http b/api-tests/trading/get_quote_stats.http new file mode 100644 index 00000000..abad3cbb --- /dev/null +++ b/api-tests/trading/get_quote_stats.http @@ -0,0 +1,2 @@ +GET http://127.0.0.1:8090/api/trading/get_quote_stats +accept: application/json diff --git a/api-tests/trading/query_kdata.http b/api-tests/trading/query_kdata.http new file mode 100644 index 00000000..9bf63c61 --- /dev/null +++ b/api-tests/trading/query_kdata.http @@ -0,0 +1,13 @@ +POST http://127.0.0.1:8090/api/trading/query_kdata +accept: application/json +Content-Type: application/json + + +{ + "data_provider": "em", + "entity_ids": [ + "stock_sz_002085", + "stock_sz_300133" + ], + "adjust_type": "hfq" +} diff --git a/api-tests/factor/query_kdata.http b/api-tests/trading/query_ts.http similarity index 72% rename from api-tests/factor/query_kdata.http rename to api-tests/trading/query_ts.http index bfe1359f..f91f7d6e 100644 --- a/api-tests/factor/query_kdata.http +++ b/api-tests/trading/query_ts.http @@ -1,4 +1,4 @@ -POST http://127.0.0.1:8090/api/factor/query_kdata +POST http://127.0.0.1:8090/api/trading/query_ts accept: application/json Content-Type: application/json diff --git a/src/zvt/api/kdata.py b/src/zvt/api/kdata.py index 330f2ab5..4b176eef 100644 --- a/src/zvt/api/kdata.py +++ b/src/zvt/api/kdata.py @@ -9,10 +9,17 @@ from zvt.contract.api import decode_entity_id, get_schema_by_name from zvt.domain import Index1dKdata from zvt.utils.pd_utils import pd_is_not_null -from zvt.utils.time_utils import to_time_str, TIME_FORMAT_DAY, TIME_FORMAT_ISO8601, to_pd_timestamp +from zvt.utils.time_utils import ( + to_time_str, + TIME_FORMAT_DAY, + TIME_FORMAT_ISO8601, + to_pd_timestamp, + date_time_by_interval, + current_date, +) -def get_trade_dates(start, end): +def get_trade_dates(start, end=None): df = Index1dKdata.query_data( entity_id="index_sh_000001", provider="em", @@ -25,6 +32,12 @@ def get_trade_dates(start, end): return df["timestamp"].tolist() +def get_recent_trade_dates(days_count=5): + max_start = date_time_by_interval(current_date(), -days_count - 15) + dates = get_trade_dates(start=max_start) + return dates[-days_count:] + + def get_latest_kdata_date( entity_type: str, provider: str = None, @@ -185,9 +198,14 @@ def to_sum(s): return df +if __name__ == "__main__": + print(get_recent_trade_dates()) + + # the __all__ is generated __all__ = [ "get_trade_dates", + "get_recent_trade_dates", "get_latest_kdata_date", "get_kdata_schema", "get_kdata", diff --git a/src/zvt/broker/qmt/qmt_quote.py b/src/zvt/broker/qmt/qmt_quote.py index b582b45b..3940beac 100644 --- a/src/zvt/broker/qmt/qmt_quote.py +++ b/src/zvt/broker/qmt/qmt_quote.py @@ -9,9 +9,15 @@ from zvt.contract import IntervalLevel, AdjustType from zvt.contract.api import decode_entity_id, df_to_db, get_db_session from zvt.domain import StockQuote, Stock -from zvt.domain.quotes.stock.stock_quote import Stock1mQuote +from zvt.domain.quotes.stock.stock_quote import Stock1mQuote, StockQuoteLog from zvt.utils.pd_utils import pd_is_not_null -from zvt.utils.time_utils import to_time_str, current_date, to_pd_timestamp, now_pd_timestamp, TIME_FORMAT_MINUTE +from zvt.utils.time_utils import ( + to_time_str, + current_date, + to_pd_timestamp, + now_pd_timestamp, + TIME_FORMAT_MINUTE, + date_time_by_interval) # https://dict.thinktrader.net/nativeApi/start_now.html?id=e2M5nZ @@ -128,16 +134,16 @@ def get_entity_list(): def get_kdata( - entity_id, - start_timestamp, - end_timestamp, - level=IntervalLevel.LEVEL_1DAY, - adjust_type=AdjustType.qfq, + entity_id, + start_timestamp, + end_timestamp, + level=IntervalLevel.LEVEL_1DAY, + adjust_type=AdjustType.qfq, ): code = _to_qmt_code(entity_id=entity_id) period = level.value # 保证qmt先下载数据到本地 - xtdata.download_history_data(stock_code=code, period=period, start_time="", end_time="") + xtdata.download_history_data(stock_code=code, period=period) records = xtdata.get_market_data( stock_list=[code], period=period, @@ -223,14 +229,22 @@ def on_data(datas, stock_df=entity_df): df["float_cap"] = df["float_volume"] * df["price"] df["total_cap"] = df["total_volume"] * df["price"] + # 实时行情统计,只保留最新 df_to_db(df, data_schema=StockQuote, provider="qmt", force_update=True, drop_duplicates=False) - cost_time = time.time() - start_time - logger.info(f"Quotes cost_time:{cost_time} for {len(datas.keys())} stocks") + # 1分钟分时 df["id"] = df[["entity_id", "timestamp"]].apply( lambda se: "{}_{}".format(se["entity_id"], to_time_str(se["timestamp"], TIME_FORMAT_MINUTE)), axis=1 ) df_to_db(df, data_schema=Stock1mQuote, provider="qmt", force_update=True, drop_duplicates=False) + # 历史记录 + # df["id"] = df[["entity_id", "timestamp"]].apply( + # lambda se: "{}_{}".format(se["entity_id"], to_time_str(se["timestamp"], TIME_FORMAT_MINUTE2)), axis=1 + # ) + # df_to_db(df, data_schema=StockQuoteLog, provider="qmt", force_update=True, drop_duplicates=False) + + cost_time = time.time() - start_time + logger.info(f"Quotes cost_time:{cost_time} for {len(datas.keys())} stocks") return on_data @@ -245,6 +259,9 @@ def download_capital_data(): def clear_history_quote(): session = get_db_session("qmt", data_schema=StockQuote) session.query(StockQuote).filter(StockQuote.timestamp < current_date()).delete() + start_date = date_time_by_interval(current_date(), -20) + session.query(Stock1mQuote).filter(Stock1mQuote.timestamp < start_date).delete() + session.query(StockQuoteLog).filter(StockQuoteLog.timestamp < start_date).delete() session.commit() @@ -253,7 +270,7 @@ def record_tick(): Stock.record_data(provider="em") stocks = get_qmt_stocks() print(stocks) - xtdata.subscribe_whole_quote(stocks, callback=tick_to_quote()) + sid = xtdata.subscribe_whole_quote(stocks, callback=tick_to_quote()) """阻塞线程接收行情回调""" import time @@ -267,6 +284,7 @@ def record_tick(): if current_timestamp.hour >= 15 and current_timestamp.minute >= 10: logger.info(f"record tick finished at: {current_timestamp}") break + xtdata.unsubscribe_quote(sid) if __name__ == "__main__": @@ -278,7 +296,6 @@ def record_tick(): sched.start() sched._thread.join() - # the __all__ is generated __all__ = [ "get_qmt_stocks", diff --git a/src/zvt/contract/recorder.py b/src/zvt/contract/recorder.py index 92f3e5ae..b5292fde 100644 --- a/src/zvt/contract/recorder.py +++ b/src/zvt/contract/recorder.py @@ -13,14 +13,13 @@ from zvt.contract.api import get_entities, get_data from zvt.contract.base_service import OneStateService from zvt.contract.schema import Mixin, TradableEntity +from zvt.contract.utils import is_in_same_interval, evaluate_size_from_timestamp from zvt.contract.zvt_info import RecorderState from zvt.utils.pd_utils import pd_is_not_null from zvt.utils.time_utils import ( to_pd_timestamp, TIME_FORMAT_DAY, to_time_str, - evaluate_size_from_timestamp, - is_in_same_interval, now_pd_timestamp, now_time_str, ) diff --git a/src/zvt/contract/utils.py b/src/zvt/contract/utils.py new file mode 100644 index 00000000..0f73375a --- /dev/null +++ b/src/zvt/contract/utils.py @@ -0,0 +1,70 @@ +# -*- coding: utf-8 -*- +import math + +import pandas as pd + +from zvt.contract import IntervalLevel +from zvt.utils.time_utils import to_pd_timestamp + + +def is_in_same_interval(t1: pd.Timestamp, t2: pd.Timestamp, level: IntervalLevel): + t1 = to_pd_timestamp(t1) + t2 = to_pd_timestamp(t2) + if level == IntervalLevel.LEVEL_1WEEK: + return t1.week == t2.week + if level == IntervalLevel.LEVEL_1MON: + return t1.month == t2.month + + return level.floor_timestamp(t1) == level.floor_timestamp(t2) + + +def evaluate_size_from_timestamp( + start_timestamp, level: IntervalLevel, one_day_trading_minutes, end_timestamp: pd.Timestamp = None +): + """ + given from timestamp,level,one_day_trading_minutes,this func evaluate size of kdata to current. + it maybe a little bigger than the real size for fetching all the kdata. + + :param start_timestamp: + :type start_timestamp: pd.Timestamp + :param level: + :type level: IntervalLevel + :param one_day_trading_minutes: + :type one_day_trading_minutes: int + """ + if not end_timestamp: + end_timestamp = pd.Timestamp.now() + else: + end_timestamp = to_pd_timestamp(end_timestamp) + + time_delta = end_timestamp - to_pd_timestamp(start_timestamp) + + one_day_trading_seconds = one_day_trading_minutes * 60 + + if level == IntervalLevel.LEVEL_1DAY: + return time_delta.days + 1 + + if level == IntervalLevel.LEVEL_1WEEK: + return int(math.ceil(time_delta.days / 7)) + 1 + + if level == IntervalLevel.LEVEL_1MON: + return int(math.ceil(time_delta.days / 30)) + 1 + + if time_delta.days > 0: + seconds = (time_delta.days + 1) * one_day_trading_seconds + return int(math.ceil(seconds / level.to_second())) + 1 + else: + seconds = time_delta.total_seconds() + return min(int(math.ceil(seconds / level.to_second())) + 1, one_day_trading_seconds / level.to_second() + 1) + + +def next_timestamp_on_level(current_timestamp: pd.Timestamp, level: IntervalLevel) -> pd.Timestamp: + current_timestamp = to_pd_timestamp(current_timestamp) + return current_timestamp + pd.Timedelta(seconds=level.to_second()) + + +def is_finished_kdata_timestamp(timestamp, level: IntervalLevel): + timestamp = to_pd_timestamp(timestamp) + if level.floor_timestamp(timestamp) == timestamp: + return True + return False diff --git a/src/zvt/domain/quotes/stock/stock_quote.py b/src/zvt/domain/quotes/stock/stock_quote.py index c1f81986..eee3d045 100644 --- a/src/zvt/domain/quotes/stock/stock_quote.py +++ b/src/zvt/domain/quotes/stock/stock_quote.py @@ -41,6 +41,39 @@ class StockQuote(StockQuoteBase, Mixin): total_cap = Column(Float) +class StockQuoteLog(StockQuoteBase, Mixin): + __tablename__ = "stock_quote_log" + code = Column(String(length=32)) + name = Column(String(length=32)) + + #: UNIX时间戳 + time = Column(Integer) + #: 最新价 + price = Column(Float) + # 涨跌幅 + change_pct = Column(Float) + # 成交金额 + turnover = Column(Float) + # 换手率 + turnover_rate = Column(Float) + #: 是否涨停 + is_limit_up = Column(Boolean) + #: 封涨停金额 + limit_up_amount = Column(Float) + #: 是否跌停 + is_limit_down = Column(Boolean) + #: 封跌停金额 + limit_down_amount = Column(Float) + #: 5挡卖单金额 + ask_amount = Column(Float) + #: 5挡买单金额 + bid_amount = Column(Float) + #: 流通市值 + float_cap = Column(Float) + #: 总市值 + total_cap = Column(Float) + + class Stock1mQuote(StockQuoteBase, Mixin): __tablename__ = "stock_1m_quote" code = Column(String(length=32)) @@ -60,10 +93,14 @@ class Stock1mQuote(StockQuoteBase, Mixin): turnover = Column(Float) # 换手率 turnover_rate = Column(Float) + #: 是否涨停 + is_limit_up = Column(Boolean) + #: 是否跌停 + is_limit_down = Column(Boolean) register_schema(providers=["qmt"], db_name="stock_quote", schema_base=StockQuoteBase, entity_type="stock") # the __all__ is generated -__all__ = ["StockQuote", "Stock1mQuote"] +__all__ = ["StockQuote", "StockQuoteLog", "Stock1mQuote"] diff --git a/src/zvt/factors/factor_models.py b/src/zvt/factors/factor_models.py index 3fc179c7..8eb65324 100644 --- a/src/zvt/factors/factor_models.py +++ b/src/zvt/factors/factor_models.py @@ -4,7 +4,7 @@ from pydantic import BaseModel, Field -from zvt.contract import IntervalLevel, AdjustType +from zvt.contract import IntervalLevel from zvt.trader import TradingSignalType from zvt.utils.time_utils import date_time_by_interval, current_date @@ -17,23 +17,6 @@ class FactorRequestModel(BaseModel): level: IntervalLevel = Field(default=IntervalLevel.LEVEL_1DAY) -class KdataRequestModel(BaseModel): - entity_ids: List[str] - data_provider: str = Field(default="em") - start_timestamp: datetime = Field(default=date_time_by_interval(current_date(), -365)) - end_timestamp: Optional[datetime] = Field(default=None) - level: IntervalLevel = Field(default=IntervalLevel.LEVEL_1DAY) - adjust_type: AdjustType = Field(default=AdjustType.hfq) - - -class KdataModel(BaseModel): - entity_id: str - code: str - name: str - level: IntervalLevel = Field(default=IntervalLevel.LEVEL_1DAY) - datas: List - - class TradingSignalModel(BaseModel): entity_id: str happen_timestamp: datetime @@ -51,4 +34,4 @@ class FactorResultModel(BaseModel): # the __all__ is generated -__all__ = ["FactorRequestModel", "KdataRequestModel", "KdataModel", "TradingSignalModel", "FactorResultModel"] +__all__ = ["FactorRequestModel", "TradingSignalModel", "FactorResultModel"] diff --git a/src/zvt/factors/factor_service.py b/src/zvt/factors/factor_service.py index a211cdf3..01c9ca82 100644 --- a/src/zvt/factors/factor_service.py +++ b/src/zvt/factors/factor_service.py @@ -1,40 +1,11 @@ # -*- coding: utf-8 -*- import pandas as pd -from zvt.api import kdata as kdata_api from zvt.contract import zvt_context from zvt.domain import Stock -from zvt.factors.factor_models import FactorRequestModel, KdataRequestModel +from zvt.factors.factor_models import FactorRequestModel from zvt.factors.technical_factor import TechnicalFactor from zvt.trader import TradingSignalType -from zvt.utils.pd_utils import pd_is_not_null - - -def query_kdata(kdata_request_model: KdataRequestModel): - - kdata_df = kdata_api.get_kdata( - entity_ids=kdata_request_model.entity_ids, - provider=kdata_request_model.data_provider, - start_timestamp=kdata_request_model.start_timestamp, - end_timestamp=kdata_request_model.end_timestamp, - adjust_type=kdata_request_model.adjust_type, - ) - if pd_is_not_null(kdata_df): - kdata_df["timestamp"] = kdata_df["timestamp"].apply(lambda x: int(x.timestamp())) - kdata_df["data"] = kdata_df.apply( - lambda x: x[ - ["timestamp", "open", "high", "low", "close", "volume", "turnover", "change_pct", "turnover_rate"] - ].values.tolist(), - axis=1, - ) - df = kdata_df.groupby("entity_id").agg( - code=("code", "first"), - name=("name", "first"), - level=("level", "first"), - datas=("data", lambda data: list(data)), - ) - df = df.reset_index(drop=False) - return df.to_dict(orient="records") def query_factor_result(factor_request_model: FactorRequestModel): @@ -70,4 +41,4 @@ def to_trading_signal(order_type): # the __all__ is generated -__all__ = ["query_kdata", "query_factor_result"] +__all__ = ["query_factor_result"] diff --git a/src/zvt/fill_project.py b/src/zvt/fill_project.py index 8271c639..8da970ec 100644 --- a/src/zvt/fill_project.py +++ b/src/zvt/fill_project.py @@ -87,7 +87,7 @@ def gen_kdata_schemas(): if __name__ == "__main__": - # gen_exports("api") + gen_exports("api") # gen_exports("broker") # gen_exports("common") # gen_exports("contract", export_from_package=True, export_modules=["schema"]) @@ -102,6 +102,7 @@ def gen_kdata_schemas(): # gen_exports('autocode') # gen_exports("zhdate") # gen_exports("recorders", export_from_package=True) - gen_exports("tag", export_from_package=False) + # gen_exports("tag", export_from_package=False) + # gen_exports("tag", export_from_package=False) # gen_kdata_schemas() # zip_dir(ZVT_TEST_DATA_PATH, zip_file_name=DATA_SAMPLE_ZIP_PATH) diff --git a/src/zvt/zhdate/__init__.py b/src/zvt/misc/__init__.py similarity index 96% rename from src/zvt/zhdate/__init__.py rename to src/zvt/misc/__init__.py index d1a487dd..2686fff0 100644 --- a/src/zvt/zhdate/__init__.py +++ b/src/zvt/misc/__init__.py @@ -1,3 +1,5 @@ # -*- coding: utf-8 -*- + + # the __all__ is generated __all__ = [] diff --git a/src/zvt/zhdate/constants.py b/src/zvt/misc/constants.py similarity index 99% rename from src/zvt/zhdate/constants.py rename to src/zvt/misc/constants.py index 8bc0a6b3..7fc21639 100644 --- a/src/zvt/zhdate/constants.py +++ b/src/zvt/misc/constants.py @@ -422,5 +422,7 @@ """ 从1900年,至2100年每年的农历春节的公历日期 """ + + # the __all__ is generated __all__ = [] diff --git a/src/zvt/misc/misc_models.py b/src/zvt/misc/misc_models.py new file mode 100644 index 00000000..33f3de50 --- /dev/null +++ b/src/zvt/misc/misc_models.py @@ -0,0 +1,15 @@ +# -*- coding: utf-8 -*- +from datetime import datetime + +from zvt.contract.model import CustomModel + + +class TimeMessage(CustomModel): + # 时间 + timestamp: datetime + # 信息 + message: str + + +# the __all__ is generated +__all__ = ["TimeMessage"] diff --git a/src/zvt/zhdate/ztime.py b/src/zvt/misc/misc_service.py similarity index 67% rename from src/zvt/zhdate/ztime.py rename to src/zvt/misc/misc_service.py index 81fda83a..de016644 100644 --- a/src/zvt/zhdate/ztime.py +++ b/src/zvt/misc/misc_service.py @@ -1,68 +1,73 @@ # -*- coding: utf-8 -*- +from zvt.misc.zhdate import ZhDate from zvt.utils.time_utils import to_pd_timestamp, current_date, count_interval -from zvt.zhdate.zhdate import ZhDate -def holiday_distance(timestamp=None, days=15): +def holiday_distance(timestamp=None, consider_future_days=15): if not timestamp: the_date = current_date() else: the_date = to_pd_timestamp(timestamp) + # 业绩预告 month = the_date.month - infos = [f"现在是{month}月,关注:"] + infos = [f"今天是{the_date.date()}"] if month == 12: infos.append("业绩预告期,注意排雷") # 元旦 new_year = to_pd_timestamp(f"{the_date.year + 1}-01-01") distance = count_interval(the_date, new_year) - if 0 < distance < days: + if 0 < distance < consider_future_days: infos.append(f"距离元旦还有{distance}天") if month in (1, 2): # 春节 zh_date = ZhDate(lunar_year=the_date.year, lunar_month=1, lunar_day=1) spring_date = zh_date.newyear distance = count_interval(the_date, spring_date) - if 0 < distance < days: + if 0 < distance < consider_future_days: infos.append(f"距离春节还有{distance}天") # 两会 # 三月初 lianghui = to_pd_timestamp(f"{the_date.year}-03-01") distance = count_interval(the_date, lianghui) - if 0 < distance < days: + if 0 < distance < consider_future_days: infos.append(f"距离两会还有{distance}天") - # 业绩发布 + # 年报发布 if month in (3, 4): - infos.append("业绩发布期,注意排雷") + infos.append("年报发布期,注意排雷") # 五一 if month == 4: wuyi = to_pd_timestamp(f"{the_date.year}-05-01") distance = count_interval(the_date, wuyi) - if 0 < distance < days: + if 0 < distance < consider_future_days: infos.append(f"距离五一还有{distance}天") + # 业绩发布 + if month in (7, 8): + infos.append("半年报发布期,注意排雷") + if month == 9: # 国庆 shiyi = to_pd_timestamp(f"{the_date.year}-10-01") distance = count_interval(the_date, shiyi) - if 0 < distance < days: + if 0 < distance < consider_future_days: infos.append(f"距离国庆还有{distance}天") - msg = "\n".join(infos) - msg = msg + "\n" - print(msg) + msg = ",".join(infos) return msg +def get_time_message(): + return {"timestamp": current_date(), "message": holiday_distance()} + + if __name__ == "__main__": - holiday_distance() - # for month in range(1, 13): - # holiday_distance(f"2023-{month}-15") - # holiday_distance(f"2023-{month}-20") + print(get_time_message()) + # the __all__ is generated -__all__ = ["holiday_distance"] +__all__ = ["holiday_distance", "get_time_message"] diff --git a/src/zvt/zhdate/zhdate.py b/src/zvt/misc/zhdate.py similarity index 99% rename from src/zvt/zhdate/zhdate.py rename to src/zvt/misc/zhdate.py index ea0296f1..3ec4b24e 100644 --- a/src/zvt/zhdate/zhdate.py +++ b/src/zvt/misc/zhdate.py @@ -5,7 +5,7 @@ from datetime import datetime, timedelta from itertools import accumulate -from zvt.zhdate.constants import CHINESEYEARCODE, CHINESENEWYEAR +from zvt.misc.constants import CHINESEYEARCODE, CHINESENEWYEAR class ZhDate: diff --git a/src/zvt/recorders/qmt/quotes/qmt_kdata_recorder.py b/src/zvt/recorders/qmt/quotes/qmt_kdata_recorder.py index ff694942..fb4cb051 100644 --- a/src/zvt/recorders/qmt/quotes/qmt_kdata_recorder.py +++ b/src/zvt/recorders/qmt/quotes/qmt_kdata_recorder.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- import pandas as pd -from zvt.api.kdata import get_kdata_schema +from zvt.api.kdata import get_kdata_schema, get_kdata from zvt.broker.qmt import qmt_quote from zvt.contract import IntervalLevel, AdjustType from zvt.contract.api import df_to_db @@ -16,7 +16,7 @@ class BaseQmtKdataRecorder(FixedCycleDataRecorder): default_size = 50000 - entity_provider: str = "exchange" + entity_provider: str = "qmt" provider = "qmt" @@ -70,10 +70,38 @@ def __init__( ) def record(self, entity, start, end, size, timestamps): + # 判断是否需要重新计算之前保存的前复权数据 + if start and (self.adjust_type == AdjustType.qfq): + check_df = qmt_quote.get_kdata( + entity_id=entity.id, + start_timestamp=start, + end_timestamp=start, + adjust_type=self.adjust_type, + level=self.level, + ) + current_df = get_kdata( + entity_id=entity.id, + provider=self.provider, + start_timestamp=start, + end_timestamp=start, + limit=1, + level=self.level, + adjust_type=self.adjust_type, + ) + if pd_is_not_null(current_df): + old = current_df.iloc[0, :]["close"] + new = check_df["close"][0] + # 相同时间的close不同,表明前复权需要重新计算 + if round(old, 2) != round(new, 2): + # 删掉重新获取 + self.session.query(self.data_schema).filter(self.data_schema.entity_id == entity.id).delete() + start = "2005-01-01" + if not start: start = "2005-01-01" if not end: end = current_date() + df = qmt_quote.get_kdata( entity_id=entity.id, start_timestamp=start, @@ -92,6 +120,7 @@ def record(self, entity, start, end, size, timestamps): df.rename(columns={"amount": "turnover"}, inplace=True) df["change_pct"] = (df["close"] - df["preClose"]) / df["preClose"] df_to_db(df=df, data_schema=self.data_schema, provider=self.provider, force_update=self.force_update) + else: self.logger.info(f"no kdata for {entity.id}") @@ -102,8 +131,8 @@ class QMTStockKdataRecorder(BaseQmtKdataRecorder): if __name__ == "__main__": - # Stock.record_data(provider="exchange") - QMTStockKdataRecorder(entity_id="stock_sz_000338", adjust_type=AdjustType.hfq).run() + Stock.record_data(provider="qmt") + QMTStockKdataRecorder(entity_id="stock_sz_000338", adjust_type=AdjustType.qfq).run() # the __all__ is generated diff --git a/src/zvt/rest/factor.py b/src/zvt/rest/factor.py index ecd9856a..7c52d345 100644 --- a/src/zvt/rest/factor.py +++ b/src/zvt/rest/factor.py @@ -5,7 +5,7 @@ from zvt.contract import zvt_context from zvt.factors import factor_service -from zvt.factors.factor_models import FactorRequestModel, TradingSignalModel, KdataModel, KdataRequestModel +from zvt.factors.factor_models import FactorRequestModel, TradingSignalModel factor_router = APIRouter( prefix="/api/factor", @@ -22,8 +22,3 @@ def get_factors(): @factor_router.post("/query_factor_result", response_model=List[TradingSignalModel]) def query_factor_result(factor_request_model: FactorRequestModel): return factor_service.query_factor_result(factor_request_model) - - -@factor_router.post("/query_kdata", response_model=List[KdataModel]) -def query_kdata(kdata_request_model: KdataRequestModel): - return factor_service.query_kdata(kdata_request_model) diff --git a/src/zvt/rest/misc.py b/src/zvt/rest/misc.py new file mode 100644 index 00000000..a0a11120 --- /dev/null +++ b/src/zvt/rest/misc.py @@ -0,0 +1,22 @@ +# -*- coding: utf-8 -*- +from fastapi import APIRouter + +from zvt.misc import misc_service +from zvt.misc.misc_models import TimeMessage + +misc_router = APIRouter( + prefix="/api/misc", + tags=["misc"], + responses={404: {"description": "Not found"}}, +) + + +@misc_router.get( + "/time_message", + response_model=TimeMessage, +) +def get_time_message(): + """ + Get time message + """ + return misc_service.get_time_message() diff --git a/src/zvt/rest/trading.py b/src/zvt/rest/trading.py index 4c68e456..cddb55ff 100644 --- a/src/zvt/rest/trading.py +++ b/src/zvt/rest/trading.py @@ -17,6 +17,11 @@ BuildQueryStockQuoteSettingModel, QueryTagQuoteModel, TagQuoteStatsModel, + KdataModel, + KdataRequestModel, + TSModel, + TSRequestModel, + QuoteStatsModel, ) from zvt.trading.trading_schemas import QueryStockQuoteSetting @@ -27,6 +32,21 @@ ) +@trading_router.post("/query_kdata", response_model=Optional[List[KdataModel]]) +def query_kdata(kdata_request_model: KdataRequestModel): + return trading_service.query_kdata(kdata_request_model) + + +@trading_router.post("/query_ts", response_model=Optional[List[TSModel]]) +def query_kdata(ts_request_model: TSRequestModel): + return trading_service.query_ts(ts_request_model) + + +@trading_router.get("/get_quote_stats", response_model=Optional[QuoteStatsModel]) +def get_quote_stats(): + return trading_service.query_quote_stats() + + @trading_router.get("/get_query_stock_quote_setting", response_model=Optional[QueryStockQuoteSettingModel]) def get_query_stock_quote_setting(): with contract_api.DBSession(provider="zvt", data_schema=QueryStockQuoteSetting)() as session: diff --git a/src/zvt/server.py b/src/zvt/server.py index b929b25c..2a1cda6f 100644 --- a/src/zvt/server.py +++ b/src/zvt/server.py @@ -10,6 +10,7 @@ from zvt import zvt_env from zvt.rest.data import data_router from zvt.rest.factor import factor_router +from zvt.rest.misc import misc_router from zvt.rest.trading import trading_router from zvt.rest.work import work_router @@ -35,6 +36,7 @@ async def root(): app.include_router(factor_router) app.include_router(work_router) app.include_router(trading_router) +app.include_router(misc_router) add_pagination(app) diff --git a/src/zvt/tag/common.py b/src/zvt/tag/common.py index de124d87..efc17575 100644 --- a/src/zvt/tag/common.py +++ b/src/zvt/tag/common.py @@ -5,6 +5,17 @@ class StockPoolType(Enum): system = "system" custom = "custom" + dynamic = "dynamic" + + +class DynamicPoolType(Enum): + limit_up = "limit_up" + limit_down = "limit_down" + + +class InsertMode(Enum): + overwrite = "overwrite" + append = "append" class TagType(Enum): @@ -20,4 +31,4 @@ class TagStatsQueryType(Enum): # the __all__ is generated -__all__ = ["StockPoolType", "TagType", "TagStatsQueryType"] +__all__ = ["StockPoolType", "DynamicPoolType", "TagType", "TagStatsQueryType"] diff --git a/src/zvt/tag/dynamic_pool.py b/src/zvt/tag/dynamic_pool.py new file mode 100644 index 00000000..65ecbe81 --- /dev/null +++ b/src/zvt/tag/dynamic_pool.py @@ -0,0 +1,31 @@ +# -*- coding: utf-8 -*- +from zvt.domain import StockQuote +from zvt.utils.pd_utils import pd_is_not_null + + +def get_limit_up(): + df = StockQuote.query_data(filters=[StockQuote.is_limit_up], columns=[StockQuote.entity_id]) + if pd_is_not_null(df): + return df["entity_id"].to_list() + + +def get_top_50(): + df = StockQuote.query_data(columns=[StockQuote.entity_id], order=StockQuote.change_pct.desc(), limit=50) + if pd_is_not_null(df): + return df["entity_id"].to_list() + + +def get_limit_down(): + df = StockQuote.query_data(filters=[StockQuote.is_limit_down], columns=[StockQuote.entity_id]) + if pd_is_not_null(df): + return df["entity_id"].to_list() + + +if __name__ == "__main__": + print(get_limit_up()) + print(get_limit_down()) + print(get_top_50()) + + +# the __all__ is generated +__all__ = ["get_limit_up", "get_top_50", "get_limit_down"] diff --git a/src/zvt/tag/tag_models.py b/src/zvt/tag/tag_models.py index 61fb4e4a..710a1b19 100644 --- a/src/zvt/tag/tag_models.py +++ b/src/zvt/tag/tag_models.py @@ -5,7 +5,7 @@ from pydantic_core.core_schema import ValidationInfo from zvt.contract.model import MixinModel, CustomModel -from zvt.tag.common import StockPoolType, TagType, TagStatsQueryType +from zvt.tag.common import StockPoolType, TagType, TagStatsQueryType, InsertMode from zvt.tag.tag_utils import get_stock_pool_names @@ -136,14 +136,15 @@ class StockPoolsModel(MixinModel): class CreateStockPoolsModel(CustomModel): stock_pool_name: str entity_ids: List[str] + insert_model: InsertMode = Field(default=InsertMode.overwrite) - @field_validator("stock_pool_name") - @classmethod - def stock_pool_name_must_be_in(cls, v: str) -> str: - if v: - if v not in get_stock_pool_names(): - raise ValueError(f"stock_pool_name: {v} must be created at stock_pool_info at first") - return v + # @field_validator("stock_pool_name") + # @classmethod + # def stock_pool_name_must_be_in(cls, v: str) -> str: + # if v: + # if v not in get_stock_pool_names(): + # raise ValueError(f"stock_pool_name: {v} must be created at stock_pool_info at first") + # return v class QueryStockTagStatsModel(CustomModel): diff --git a/src/zvt/tag/tag_service.py b/src/zvt/tag/tag_service.py index ce61ce7c..9aa37be7 100644 --- a/src/zvt/tag/tag_service.py +++ b/src/zvt/tag/tag_service.py @@ -9,7 +9,7 @@ import zvt.contract.api as contract_api from zvt.api.selector import get_entity_ids_by_filter from zvt.domain import BlockStock, Block, Stock -from zvt.tag.common import TagType, TagStatsQueryType +from zvt.tag.common import TagType, TagStatsQueryType, StockPoolType, InsertMode from zvt.tag.tag_models import ( SetStockTagsModel, CreateStockPoolInfoModel, @@ -35,6 +35,7 @@ industry_to_main_tag, get_sub_tags, get_concept_main_tag_mapping, + get_stock_pool_names, ) from zvt.utils.time_utils import to_pd_timestamp, to_time_str, current_date, now_pd_timestamp from zvt.utils.utils import fill_dict, compare_dicts, flatten_list @@ -432,10 +433,17 @@ def build_stock_pool_info(create_stock_pool_info_model: CreateStockPoolInfoModel return stock_pool_info -def build_stock_pool(create_stock_pools_model: CreateStockPoolsModel, target_date): +def build_stock_pool(create_stock_pools_model: CreateStockPoolsModel, target_date=current_date()): with contract_api.DBSession(provider="zvt", data_schema=StockPools)() as session: + if create_stock_pools_model.stock_pool_name not in get_stock_pool_names(): + build_stock_pool_info( + CreateStockPoolInfoModel( + stock_pool_type=StockPoolType.custom, stock_pool_name=create_stock_pools_model.stock_pool_name + ) + ) + # one instance per day stock_pool_id = f"admin_{to_time_str(target_date)}_{create_stock_pools_model.stock_pool_name}" - datas = StockPools.query_data( + datas: List[StockPools] = StockPools.query_data( session=session, filters=[ StockPools.timestamp == to_pd_timestamp(target_date), @@ -445,7 +453,10 @@ def build_stock_pool(create_stock_pools_model: CreateStockPoolsModel, target_dat ) if datas: stock_pool = datas[0] - stock_pool.entity_ids = create_stock_pools_model.entity_ids + if create_stock_pools_model.insert_model == InsertMode.overwrite: + stock_pool.entity_ids = create_stock_pools_model.entity_ids + else: + stock_pool.entity_ids = list(set(stock_pool.entity_ids + create_stock_pools_model.entity_ids)) else: stock_pool = StockPools( entity_id="admin", diff --git a/src/zvt/trading/trading_models.py b/src/zvt/trading/trading_models.py index ddf5da3d..b9b928e5 100644 --- a/src/zvt/trading/trading_models.py +++ b/src/zvt/trading/trading_models.py @@ -1,17 +1,72 @@ # -*- coding: utf-8 -*- from datetime import datetime -from typing import Union, List, Optional +from typing import List, Optional +from typing import Union -from pydantic import BaseModel, field_validator, Field +from pydantic import BaseModel, Field +from pydantic import field_validator from zvt.common.query_models import TimeRange, OrderByType +from zvt.contract import IntervalLevel, AdjustType from zvt.contract.model import MixinModel, CustomModel from zvt.tag.tag_utils import get_stock_pool_names from zvt.trader import TradingSignalType from zvt.trading.common import ExecutionStatus +from zvt.utils.time_utils import date_time_by_interval, current_date from zvt.utils.time_utils import tomorrow_date, to_pd_timestamp +class KdataRequestModel(BaseModel): + entity_ids: List[str] + data_provider: str = Field(default="qmt") + start_timestamp: datetime = Field(default=date_time_by_interval(current_date(), -500)) + end_timestamp: Optional[datetime] = Field(default=None) + level: IntervalLevel = Field(default=IntervalLevel.LEVEL_1DAY) + adjust_type: AdjustType = Field(default=AdjustType.qfq) + + +class KdataModel(BaseModel): + entity_id: str + code: str + name: str + level: IntervalLevel = Field(default=IntervalLevel.LEVEL_1DAY) + datas: List + + +class TSRequestModel(BaseModel): + entity_ids: List[str] + data_provider: str = Field(default="qmt") + days_count: int = Field(default=5) + + +class TSModel(BaseModel): + entity_id: str + code: str + name: str + datas: List + + +class QuoteStatsModel(BaseModel): + #: UNIX时间戳 + time: int + #: 涨停数 + limit_up_count: int + #: 跌停数 + limit_down_count: int + #: 上涨数 + up_count: int + #: 下跌数 + down_count: int + #: 涨幅 + change_pct: float + #: 成交额 + turnover: float + #: 昨日成交额 + pre_turnover: Optional[float] = Field(default=None) + #: 同比 + turnover_change: Optional[float] = Field(default=None) + + class QueryStockQuoteSettingModel(CustomModel): stock_pool_name: Optional[str] = Field(default=None) main_tags: Optional[List[str]] = Field(default=None) diff --git a/src/zvt/trading/trading_service.py b/src/zvt/trading/trading_service.py index 64233d53..be4aa5c7 100644 --- a/src/zvt/trading/trading_service.py +++ b/src/zvt/trading/trading_service.py @@ -5,8 +5,10 @@ import pandas as pd from fastapi_pagination.ext.sqlalchemy import paginate +import zvt.api.kdata as kdata_api import zvt.contract.api as contract_api -from zvt.domain import Stock, StockQuote +from zvt.common.query_models import TimeUnit +from zvt.domain import Stock, StockQuote, Stock1mQuote from zvt.tag.tag_schemas import StockTags, StockPools from zvt.trading.common import ExecutionStatus from zvt.trading.trading_models import ( @@ -15,13 +17,72 @@ QueryTagQuoteModel, QueryStockQuoteModel, BuildQueryStockQuoteSettingModel, + KdataRequestModel, + TSRequestModel, ) from zvt.trading.trading_schemas import TradingPlan, QueryStockQuoteSetting -from zvt.utils.time_utils import to_time_str, to_pd_timestamp, now_pd_timestamp, date_time_by_interval, current_date +from zvt.utils.pd_utils import pd_is_not_null +from zvt.utils.time_utils import ( + to_time_str, + to_pd_timestamp, + now_pd_timestamp, + date_time_by_interval, + current_date, + date_and_time, +) logger = logging.getLogger(__name__) +def query_kdata(kdata_request_model: KdataRequestModel): + kdata_df = kdata_api.get_kdata( + entity_ids=kdata_request_model.entity_ids, + provider=kdata_request_model.data_provider, + start_timestamp=kdata_request_model.start_timestamp, + end_timestamp=kdata_request_model.end_timestamp, + adjust_type=kdata_request_model.adjust_type, + ) + if pd_is_not_null(kdata_df): + kdata_df["timestamp"] = kdata_df["timestamp"].apply(lambda x: int(x.timestamp())) + kdata_df["data"] = kdata_df.apply( + lambda x: x[ + ["timestamp", "open", "high", "low", "close", "volume", "turnover", "change_pct", "turnover_rate"] + ].values.tolist(), + axis=1, + ) + df = kdata_df.groupby("entity_id").agg( + code=("code", "first"), + name=("name", "first"), + level=("level", "first"), + datas=("data", lambda data: list(data)), + ) + df = df.reset_index(drop=False) + return df.to_dict(orient="records") + + +def query_ts(ts_request_model: TSRequestModel): + trading_dates = kdata_api.get_recent_trade_dates(days_count=ts_request_model.days_count) + ts_df = Stock1mQuote.query_data( + entity_ids=ts_request_model.entity_ids, + provider=ts_request_model.data_provider, + start_timestamp=trading_dates[0], + ) + if pd_is_not_null(ts_df): + ts_df["data"] = ts_df.apply( + lambda x: x[ + ["time", "price", "avg_price", "change_pct", "volume", "turnover", "turnover_rate"] + ].values.tolist(), + axis=1, + ) + df = ts_df.groupby("entity_id").agg( + code=("code", "first"), + name=("name", "first"), + datas=("data", lambda data: list(data)), + ) + df = df.reset_index(drop=False) + return df.to_dict(orient="records") + + def build_trading_plan(build_trading_plan_model: BuildTradingPlanModel): with contract_api.DBSession(provider="zvt", data_schema=TradingPlan)() as session: stock_id = build_trading_plan_model.stock_id @@ -109,6 +170,65 @@ def check_trading_plan(): logger.debug(f"current plans:{plans}") +def query_quote_stats(): + quote_df = StockQuote.query_data( + return_type="df", + filters=[StockQuote.change_pct >= -0.31, StockQuote.change_pct <= 0.31], + columns=["timestamp", "entity_id", "time", "change_pct", "turnover", "is_limit_up", "is_limit_down"], + ) + current_stats = cal_quote_stats(quote_df) + start_timestamp = current_stats["timestamp"] + + pre_date_df = Stock1mQuote.query_data( + filters=[Stock1mQuote.timestamp < to_time_str(start_timestamp)], + order=Stock1mQuote.timestamp.desc(), + limit=1, + columns=["timestamp"], + ) + pre_date = pre_date_df["timestamp"].tolist()[0] + + if start_timestamp.hour >= 15: + start_timestamp = date_and_time(pre_date, "15:00") + else: + start_timestamp = date_and_time(pre_date, f"{start_timestamp.hour}:{start_timestamp.minute}") + end_timestamp = date_time_by_interval(start_timestamp, 1, TimeUnit.minute) + + pre_df = Stock1mQuote.query_data( + return_type="df", + start_timestamp=start_timestamp, + end_timestamp=end_timestamp, + filters=[Stock1mQuote.change_pct >= -0.31, Stock1mQuote.change_pct <= 0.31], + columns=["timestamp", "entity_id", "time", "change_pct", "turnover", "is_limit_up", "is_limit_down"], + ) + + if pd_is_not_null(pre_df): + pre_stats = cal_quote_stats(pre_df) + current_stats["pre_turnover"] = pre_stats["turnover"] + current_stats["turnover_change"] = current_stats["turnover"] - current_stats["pre_turnover"] + return current_stats + + +def cal_quote_stats(quote_df): + quote_df["ss"] = 1 + + df = ( + quote_df.groupby("ss") + .agg( + timestamp=("timestamp", "last"), + time=("time", "last"), + up_count=("change_pct", lambda x: (x > 0).sum()), + down_count=("change_pct", lambda x: (x <= 0).sum()), + turnover=("turnover", "sum"), + change_pct=("change_pct", "mean"), + limit_up_count=("is_limit_up", "sum"), + limit_down_count=("is_limit_down", lambda x: (x == True).sum()), + ) + .reset_index(drop=True) + ) + + return df.to_dict(orient="records")[0] + + def query_tag_quotes(query_tag_quote_model: QueryTagQuoteModel): stock_pools: List[StockPools] = StockPools.query_data( filters=[StockPools.stock_pool_name == query_tag_quote_model.stock_pool_name], @@ -241,9 +361,9 @@ def build_query_stock_quote_setting(build_query_stock_quote_setting_model: Build if __name__ == "__main__": - print(query_tag_quotes(QueryTagQuoteModel(stock_pool_name="all", main_tags=["低空经济", "半导体", "化工", "消费电子"]))) - print(query_stock_quotes(QueryStockQuoteModel(stock_pool_name="all", main_tag="半导体"))) - + # print(query_tag_quotes(QueryTagQuoteModel(stock_pool_name="all", main_tags=["低空经济", "半导体", "化工", "消费电子"]))) + # print(query_stock_quotes(QueryStockQuoteModel(stock_pool_name="all", main_tag="半导体"))) + print(query_quote_stats()) # the __all__ is generated __all__ = [ "build_trading_plan", diff --git a/src/zvt/utils/recorder_utils.py b/src/zvt/utils/recorder_utils.py index a7c70577..a083580b 100644 --- a/src/zvt/utils/recorder_utils.py +++ b/src/zvt/utils/recorder_utils.py @@ -3,15 +3,15 @@ import time from typing import Type +import zvt as zvt from zvt import zvt_config -from zvt.contract import Mixin from zvt.informer import EmailInformer logger = logging.getLogger("__name__") def run_data_recorder( - domain: Type[Mixin], + domain: Type["zvt.contract.Mixin"], entity_provider=None, data_provider=None, entity_ids=None, diff --git a/src/zvt/utils/time_utils.py b/src/zvt/utils/time_utils.py index f6275d5e..f5a57ecb 100644 --- a/src/zvt/utils/time_utils.py +++ b/src/zvt/utils/time_utils.py @@ -1,13 +1,11 @@ # -*- coding: utf-8 -*- import calendar import datetime -import math import arrow import pandas as pd from zvt.common.query_models import TimeUnit -from zvt.contract import IntervalLevel CHINA_TZ = "Asia/Shanghai" @@ -183,69 +181,6 @@ def date_and_time(the_date, the_time): return to_pd_timestamp(time_str) -def next_timestamp_on_level(current_timestamp: pd.Timestamp, level: IntervalLevel) -> pd.Timestamp: - current_timestamp = to_pd_timestamp(current_timestamp) - return current_timestamp + pd.Timedelta(seconds=level.to_second()) - - -def evaluate_size_from_timestamp( - start_timestamp, level: IntervalLevel, one_day_trading_minutes, end_timestamp: pd.Timestamp = None -): - """ - given from timestamp,level,one_day_trading_minutes,this func evaluate size of kdata to current. - it maybe a little bigger than the real size for fetching all the kdata. - - :param start_timestamp: - :type start_timestamp: pd.Timestamp - :param level: - :type level: IntervalLevel - :param one_day_trading_minutes: - :type one_day_trading_minutes: int - """ - if not end_timestamp: - end_timestamp = pd.Timestamp.now() - else: - end_timestamp = to_pd_timestamp(end_timestamp) - - time_delta = end_timestamp - to_pd_timestamp(start_timestamp) - - one_day_trading_seconds = one_day_trading_minutes * 60 - - if level == IntervalLevel.LEVEL_1DAY: - return time_delta.days + 1 - - if level == IntervalLevel.LEVEL_1WEEK: - return int(math.ceil(time_delta.days / 7)) + 1 - - if level == IntervalLevel.LEVEL_1MON: - return int(math.ceil(time_delta.days / 30)) + 1 - - if time_delta.days > 0: - seconds = (time_delta.days + 1) * one_day_trading_seconds - return int(math.ceil(seconds / level.to_second())) + 1 - else: - seconds = time_delta.total_seconds() - return min(int(math.ceil(seconds / level.to_second())) + 1, one_day_trading_seconds / level.to_second() + 1) - - -def is_finished_kdata_timestamp(timestamp, level: IntervalLevel): - timestamp = to_pd_timestamp(timestamp) - if level.floor_timestamp(timestamp) == timestamp: - return True - return False - - -def is_in_same_interval(t1: pd.Timestamp, t2: pd.Timestamp, level: IntervalLevel): - t1 = to_pd_timestamp(t1) - t2 = to_pd_timestamp(t2) - if level == IntervalLevel.LEVEL_1WEEK: - return t1.week == t2.week - if level == IntervalLevel.LEVEL_1MON: - return t1.month == t2.month - - return level.floor_timestamp(t1) == level.floor_timestamp(t2) - - def split_time_interval(start, end, method=None, interval=30, freq="D"): start = to_pd_timestamp(start) end = to_pd_timestamp(end) @@ -308,10 +243,6 @@ def count_interval(start_date, end_date): "day_offset_today", "get_year_quarters", "date_and_time", - "next_timestamp_on_level", - "evaluate_size_from_timestamp", - "is_finished_kdata_timestamp", - "is_in_same_interval", "split_time_interval", "count_interval", ] diff --git a/src/zvt/utils/utils.py b/src/zvt/utils/utils.py index 4c3bce82..6500a0f0 100644 --- a/src/zvt/utils/utils.py +++ b/src/zvt/utils/utils.py @@ -2,13 +2,10 @@ import logging import numbers from decimal import * -from enum import Enum from urllib import parse import pandas as pd -from zvt.utils.time_utils import to_time_str - getcontext().prec = 16 logger = logging.getLogger(__name__) @@ -131,16 +128,6 @@ def read_csv(f, encoding, sep=None, na_values=None): return None -def marshal_object_for_ui(object): - if isinstance(object, Enum): - return object.value - - if isinstance(object, pd.Timestamp): - return to_time_str(object) - - return object - - def chrome_copy_header_to_dict(src): lines = src.split("\n") header = {} @@ -293,7 +280,6 @@ def fill_dict(src, dst): # the __all__ is generated __all__ = [ - "getcontext().prec", "none_values", "zero_values", "first_item_to_float", @@ -305,7 +291,6 @@ def fill_dict(src, dst): "fill_domain_from_dict", "SUPPORT_ENCODINGS", "read_csv", - "marshal_object_for_ui", "chrome_copy_header_to_dict", "to_positive_number", "multiple_number", diff --git a/src/zvt_vip/__init__.py b/src/zvt_vip/__init__.py new file mode 100644 index 00000000..29e9de99 --- /dev/null +++ b/src/zvt_vip/__init__.py @@ -0,0 +1,2 @@ +# -*- coding: utf-8 -*- +import zvt_vip.recorders as zvt_vip_recorders diff --git a/src/zvt_vip/dataset/__init__.py b/src/zvt_vip/dataset/__init__.py new file mode 100644 index 00000000..8c5e6183 --- /dev/null +++ b/src/zvt_vip/dataset/__init__.py @@ -0,0 +1,13 @@ +# -*- coding: utf-8 -*- +# the __all__ is generated +__all__ = [] + +# __init__.py structure: +# common code of the package +# export interface in __all__ which contains __all__ of its sub modules + +# import all from submodule stock_events +from .stock_events import * +from .stock_events import __all__ as _stock_events_all + +__all__ += _stock_events_all diff --git a/src/zvt_vip/dataset/stock_events.py b/src/zvt_vip/dataset/stock_events.py new file mode 100644 index 00000000..4b3b99a2 --- /dev/null +++ b/src/zvt_vip/dataset/stock_events.py @@ -0,0 +1,22 @@ +# -*- coding: utf-8 -*- +from sqlalchemy import Column, String, DateTime +from sqlalchemy.orm import declarative_base + +from zvt.contract import Mixin +from zvt.contract.register import register_schema + +EventsBase = declarative_base() + + +class StockEvents(EventsBase, Mixin): + __tablename__ = "stock_events" + event_type = Column(String) + specific_event_type = Column(String) + notice_date = Column(DateTime) + level1_content = Column(String) + level2_content = Column(String) + + +register_schema(providers=["em"], db_name="stock_events", schema_base=EventsBase, entity_type="stock") +# the __all__ is generated +__all__ = ["StockEvents"] diff --git a/src/zvt_vip/event/__init__.py b/src/zvt_vip/event/__init__.py new file mode 100644 index 00000000..40a96afc --- /dev/null +++ b/src/zvt_vip/event/__init__.py @@ -0,0 +1 @@ +# -*- coding: utf-8 -*- diff --git a/src/zvt_vip/event/ai_suggestion.py b/src/zvt_vip/event/ai_suggestion.py new file mode 100644 index 00000000..d3ffc850 --- /dev/null +++ b/src/zvt_vip/event/ai_suggestion.py @@ -0,0 +1,198 @@ +# -*- coding: utf-8 -*- +import json +import logging +import re +from typing import List + +import pandas as pd +from openai import OpenAI +from sqlalchemy import func, or_ + +import zvt.contract.api as contract_api +from zvt import zvt_config +from zvt.domain import StockNews, Stock +from zvt.tag.tag_utils import match_tag +from zvt.utils.time_utils import date_time_by_interval, current_date + +logger = logging.getLogger(__name__) + + +def normalize_tag_suggestions(tag_suggestions): + for direction in ["up", "down"]: + if direction in tag_suggestions: + for item in tag_suggestions[direction]: + tag_type, tag = match_tag(item["block"]) + item["tag"] = tag + item["tag_type"] = tag_type + if item["stocks"]: + stocks = Stock.query_data( + filters=[Stock.name.in_(item["stocks"])], return_type="dict", provider="em" + ) + if len(stocks) != len(item["stocks"]): + logger.warning( + f"Stocks not found in zvt:{set(item['stocks']) - set([item['name'] for item in stocks])}" + ) + item["stocks"] = [{"entity_id": item["entity_id"], "name": item["name"]} for item in stocks] + return tag_suggestions + + +def set_stock_news_tag_suggestions(stock_news, tag_suggestions, session): + if stock_news.news_analysis: + stock_news.news_analysis = dict(stock_news.news_analysis) + else: + stock_news.news_analysis = {} + + result = normalize_tag_suggestions(tag_suggestions) + logger.debug(result) + stock_news.news_analysis["tag_suggestions"] = result + session.add(stock_news) + session.commit() + + +def build_tag_suggestions(entity_id=None): + with contract_api.DBSession(provider="em", data_schema=StockNews)() as session: + start_date = date_time_by_interval(current_date(), -30) + datas: List[StockNews] = StockNews.query_data( + entity_id=entity_id, + limit=1, + order=StockNews.timestamp.desc(), + filters=[ + StockNews.timestamp >= start_date, + func.json_extract(StockNews.news_analysis, f'$."tag_suggestions"') != None, + ], + return_type="domain", + ) + if datas: + latest_data = datas[0] + else: + latest_data = None + + filters = [ + or_( + StockNews.news_title.like("%上涨%"), + StockNews.news_title.like("%拉升%"), + StockNews.news_title.like("%涨停%"), + StockNews.news_title.like("%下跌%"), + StockNews.news_title.like("%跌停%"), + ), + StockNews.timestamp >= start_date, + func.json_extract(StockNews.news_analysis, f'$."tag_suggestions"') == None, + ] + if latest_data: + filters = filters + [ + StockNews.timestamp >= latest_data.timestamp, + StockNews.news_code != latest_data.news_code, + ] + + stock_news_list: List[StockNews] = StockNews.query_data( + session=session, + order=StockNews.news_code.asc(), + return_type="domain", + filters=filters, + ) + + if not stock_news_list: + logger.info("all stock news has been analyzed") + return + + example = { + "up": [{"block": "block_a", "stocks": ["stock_a", "stock_b"]}], + "down": [{"block": "block_b", "stocks": ["stock_1", "stock_2"]}], + } + + client = OpenAI( + api_key=zvt_config["qwen_api_key"], + base_url="https://dashscope.aliyuncs.com/compatible-mode/v1", + ) + + for stock_news in stock_news_list: + # same news + if latest_data and (stock_news.news_code == latest_data.news_code): + tag_suggestions = latest_data.news_analysis.get("tag_suggestions") + if tag_suggestions: + set_stock_news_tag_suggestions(stock_news, tag_suggestions, session) + continue + + news_title = stock_news.news_title + news_content = stock_news.news_content + logger.info(news_title) + logger.info(news_content) + + completion = client.chat.completions.create( + model="qwen-max", + messages=[ + {"role": "system", "content": f"请从新闻标题和内容中识别是上涨还是下跌,提取相应的板块和个股,按照格式: {example} 输出一个 JSON 对象"}, + { + "role": "user", + "content": f"新闻标题:{news_title}, 新闻内容:{news_content}", + }, + ], + temperature=0.2, + ) + content = completion.choices[0].message.content + content = content.replace("```json", "") + content = content.replace("```", "") + content = re.sub(r"\s+", "", content) + logger.info(f"message content: {content}") + tag_suggestions = json.loads(content) + set_stock_news_tag_suggestions(stock_news, tag_suggestions, session) + + +def extract_info(tag_dict): + extracted_info = [] + for key, value in tag_dict.items(): + for item in value: + extracted_info.append({"tag": item["tag"], "stocks": [stock["name"] for stock in item["stocks"]]}) + return extracted_info + + +def build_tag_suggestions_stats(): + with contract_api.DBSession(provider="em", data_schema=StockNews)() as session: + start_date = date_time_by_interval(current_date(), -10) + stock_news_list: List[StockNews] = StockNews.query_data( + session=session, + order=StockNews.timestamp.desc(), + distinct=StockNews.news_code, + return_type="dict", + filters=[ + StockNews.timestamp >= start_date, + func.json_extract(StockNews.news_analysis, f'$."tag_suggestions"') != None, + ], + ) + datas = [] + for stock_news in stock_news_list: + tag_suggestions = stock_news["news_analysis"].get("tag_suggestions") + if tag_suggestions: + for key in ("up", "down"): + suggestions = tag_suggestions.get(key) + if suggestions: + datas = datas + [ + { + "tag": item["tag"], + "tag_type": item["tag_type"], + "entity_ids": [stock["entity_id"] for stock in item["stocks"]], + "stock_names": [stock["name"] for stock in item["stocks"]], + } + for item in suggestions + ] + df = pd.DataFrame.from_records(data=datas) + grouped_df = ( + df.groupby("tag") + .agg( + tag_count=("tag", "count"), + tag_type=("tag_type", "first"), + entity_ids=("entity_ids", "sum"), + stock_names=("stock_names", "sum"), + ) + .reset_index() + ) + grouped_df["entity_ids"] = grouped_df["entity_ids"].apply(set).apply(list) + grouped_df["stock_names"] = grouped_df["stock_names"].apply(set).apply(list) + grouped_df["entity_ids_count"] = grouped_df["entity_ids"].apply(len) + + sorted_df = grouped_df.sort_values(by=["tag_count", "entity_ids_count"], ascending=[False, False]) + return sorted_df.to_dict(orient="records") + + +if __name__ == "__main__": + build_tag_suggestions_stats() diff --git a/src/zvt_vip/event/event_models.py b/src/zvt_vip/event/event_models.py new file mode 100644 index 00000000..36902748 --- /dev/null +++ b/src/zvt_vip/event/event_models.py @@ -0,0 +1,37 @@ +# -*- coding: utf-8 -*- +from datetime import datetime +from typing import List, Optional + +from pydantic import Field + +from zvt.contract.model import CustomModel, MixinModel + + +class StockEventModel(CustomModel): + # 事件时间 + timestamp: datetime + # 事件类型 + event_type: str + # 事件内容 + event_content: Optional[str] = Field(default=None) + + +class StockNewsModel(MixinModel): + #: 标题 + news_title: str + #: 内容 + news_content: Optional[str] = Field(default=None) + #: 分析 + news_analysis: Optional[dict] = Field(default=None) + #: 用户忽略 + ignore_by_user: Optional[bool] = Field(default=False) + + +class StockEventSummaryModel(CustomModel): + good_events: List[StockEventModel] + bad_events: List[StockEventModel] + news: List[StockNewsModel] + + +class IgnoreNewsModel(CustomModel): + news_id: str diff --git a/src/zvt_vip/event/event_service.py b/src/zvt_vip/event/event_service.py new file mode 100644 index 00000000..5f9d9482 --- /dev/null +++ b/src/zvt_vip/event/event_service.py @@ -0,0 +1,125 @@ +# -*- coding: utf-8 -*- +import logging +import re +from typing import List + +from sqlalchemy import func, not_ + +import zvt.contract.api as contract_api +from zvt.domain import StockNews +from zvt.utils.time_utils import date_time_by_interval, current_date +from zvt_vip.dataset import StockEvents +from zvt_vip.event.ai_suggestion import build_tag_suggestions_stats +from zvt_vip.event.event_models import IgnoreNewsModel +from zvt_vip.recorders.em_stock_news_recorder import VIPEMStockNewsRecorder + +logger = logging.getLogger(__name__) + + +def get_stock_event(entity_id): + events_list = ["增减持计划", "增发", "配股", "业绩预告", "股票回购", "股东增减持", "诉讼仲裁", "违规处罚", "高管及关联方增减持", "限售解禁"] + recent_90_days = date_time_by_interval(current_date(), -90) + future_60_days = date_time_by_interval(current_date(), 60) + + stock_events: List[StockEvents] = StockEvents.query_data( + entity_id=entity_id, + start_timestamp=recent_90_days, + end_timestamp=future_60_days, + filters=[StockEvents.event_type.in_(events_list)], + return_type="domain", + order=StockEvents.timestamp.asc(), + ) + + bad_events = [] + good_events = [] + for event in stock_events: + is_good = True + event_content = event.level1_content + + if not event_content: + event_content = event.level2_content + elif event.level2_content and (event_content != event.level2_content): + event_content = event_content + ";" + event.level2_content + + the_event = {"timestamp": event.timestamp, "event_content": event_content} + + if (event.event_type == "增减持计划" and ("减持" in event.level1_content)) or ("减持" in event.specific_event_type): + the_event["event_type"] = "减持" + is_good = False + elif ( + (event.event_type == "增减持计划" and ("增持" in event.level1_content)) + or ("增持" in event.specific_event_type) + or event.event_type == "股票回购" + ): + the_event["event_type"] = "增持" + + elif event.event_type == "限售解禁": + the_event["event_type"] = "解禁" + is_good = False + elif event.event_type in ("增发", "配股", "违规处罚", "诉讼仲裁"): + the_event["event_type"] = event.event_type + is_good = False + elif event.event_type == "业绩预告" and bool(re.search("预增|略增|扭亏|减亏", event.level1_content)): + the_event["event_type"] = "业绩向好" + + elif event.event_type == "业绩预告" and bool(re.search("预减|略减|续亏|首亏", event.level1_content)): + the_event["event_type"] = "业绩不行" + is_good = False + else: + logger.warning(f"unknown event {event.__dict__}") + the_event["event_type"] = event.event_type + + if is_good: + good_events.append(the_event) + else: + bad_events.append(the_event) + + # news + recent_60_days = date_time_by_interval(current_date(), -60) + VIPEMStockNewsRecorder(entity_id=entity_id, force_update=False, sleeping_time=0).run() + stock_news: List[StockNews] = StockNews.query_data( + entity_id=entity_id, + start_timestamp=recent_60_days, + filters=[not_(StockNews.ignore_by_user.is_(True))], + return_type="dict", + limit=50, + order=StockNews.timestamp.desc(), + ) + + return {"good_events": good_events, "bad_events": bad_events, "news": stock_news} + + +def ignore_news(ignore_news_model: IgnoreNewsModel): + with contract_api.DBSession(provider="em", data_schema=StockNews)() as session: + datas: List[StockNews] = StockNews.query_data( + filters=[StockNews.id == ignore_news_model.news_id], limit=1, return_type="domain" + ) + if datas: + stock_news = datas[0] + stock_news.ignore_by_user = True + session.commit() + + +def get_stock_news_analysis(days_interval=10): + with contract_api.DBSession(provider="em", data_schema=StockNews)() as session: + start_date = date_time_by_interval(current_date(), -days_interval) + stock_news_list: List[StockNews] = StockNews.query_data( + session=session, + order=StockNews.timestamp.desc(), + distinct=StockNews.news_code, + limit=100, + return_type="dict", + filters=[ + StockNews.timestamp >= start_date, + func.json_extract(StockNews.news_analysis, f'$."tag_suggestions"') != None, + ], + ) + return stock_news_list + + +def get_tag_suggestions_stats(): + return build_tag_suggestions_stats() + + +if __name__ == "__main__": + print(get_stock_event(entity_id="stock_sz_000034")) diff --git a/src/zvt_vip/fill_project.py b/src/zvt_vip/fill_project.py new file mode 100644 index 00000000..76457fa7 --- /dev/null +++ b/src/zvt_vip/fill_project.py @@ -0,0 +1,7 @@ +# -*- coding: utf-8 -*- +from zvt.autocode import gen_exports + +if __name__ == "__main__": + gen_exports(dir_path="./dataset") + gen_exports(dir_path="./recorders") + gen_exports(dir_path="./tag") diff --git a/src/zvt_vip/recorders/__init__.py b/src/zvt_vip/recorders/__init__.py new file mode 100644 index 00000000..25683d86 --- /dev/null +++ b/src/zvt_vip/recorders/__init__.py @@ -0,0 +1,19 @@ +# -*- coding: utf-8 -*- +# the __all__ is generated +__all__ = [] + +# __init__.py structure: +# common code of the package +# export interface in __all__ which contains __all__ of its sub modules + +# import all from submodule em_api +from .em_api import * +from .em_api import __all__ as _em_api_all + +__all__ += _em_api_all + +# import all from submodule em_stock_events_recorder +from .em_stock_events_recorder import * +from .em_stock_events_recorder import __all__ as _em_stock_events_recorder_all + +__all__ += _em_stock_events_recorder_all diff --git a/src/zvt_vip/recorders/em_api.py b/src/zvt_vip/recorders/em_api.py new file mode 100644 index 00000000..2c7926ed --- /dev/null +++ b/src/zvt_vip/recorders/em_api.py @@ -0,0 +1,77 @@ +# -*- coding: utf-8 -*- +import logging + +import requests + +from zvt.contract.api import decode_entity_id +from zvt.recorders.em import em_api +from zvt.utils.time_utils import to_pd_timestamp +from zvt.utils.utils import to_str, flatten_list + +logger = logging.getLogger(__name__) + + +def get_news_content(news_code, session=None): + url = f"https://wap.eastmoney.com/a/{news_code}.html" + logger.info(f"get news content from: {url}") + if session: + resp = session.get(url) + else: + resp = requests.get(url) + + if resp.status_code == 200: + from bs4 import BeautifulSoup + + soup = BeautifulSoup(resp.text, "html.parser") + meta_with_content = soup.head.find("meta", property="og:description") + news_content = meta_with_content["content"] if meta_with_content else None + return news_content + logger.error(f"request em data code: {resp.status_code}, error: {resp.text}") + + return None + + +def get_events(entity_id, session=None, fetch_count=2000): + _, _, code = decode_entity_id(entity_id) + + datas = em_api.get_em_data( + session=session, + fields=None, + request_type="RTP_F10_DETAIL", + source="SECURITIES", + params=f"{code}.{em_api.get_exchange(code)}", + fetch_all=False, + fetch_count=fetch_count, + ) + if not datas: + return None + datas = flatten_list(datas) + stock_events = [] + checking_date = None + index = 0 + for item in datas: + the_date = item["NOTICE_DATE"] + event_type = item["EVENT_TYPE"] + if checking_date == the_date: + index = index + 1 + the_id = f"{entity_id}_{the_date}_{event_type}_{index}" + else: + checking_date = the_date + index = 0 + the_id = f"{entity_id}_{the_date}_{event_type}" + event = { + "id": the_id, + "entity_id": entity_id, + "timestamp": to_pd_timestamp(the_date), + "event_type": event_type, + "specific_event_type": item["SPECIFIC_EVENTTYPE"], + "notice_date": to_pd_timestamp(the_date), + "level1_content": to_str(item["LEVEL1_CONTENT"]), + "level2_content": to_str(item["LEVEL2_CONTENT"]), + } + stock_events.append(event) + return stock_events + + +# the __all__ is generated +__all__ = ["get_events"] diff --git a/src/zvt_vip/recorders/em_stock_events_recorder.py b/src/zvt_vip/recorders/em_stock_events_recorder.py new file mode 100644 index 00000000..a9fc9a3b --- /dev/null +++ b/src/zvt_vip/recorders/em_stock_events_recorder.py @@ -0,0 +1,48 @@ +# -*- coding: utf-8 -*- +import pandas as pd +import requests + +from zvt.contract.api import df_to_db +from zvt.contract.recorder import FixedCycleDataRecorder +from zvt.domain import Stock +from zvt.utils.time_utils import to_pd_timestamp, count_interval, now_pd_timestamp +from zvt_vip.dataset.stock_events import StockEvents +from zvt_vip.recorders import em_api + + +class EMStockEventsRecorder(FixedCycleDataRecorder): + original_page_url = ( + "https://emh5.eastmoney.com/html/detail.html?fc=300684.SZ&shareFlag=1&color=w&appfenxiang=1#/gsds" + ) + url = "https://datacenter.eastmoney.com/securities/api/data/get?type=RTP_F10_DETAIL¶ms=300684.SZ&source=SECURITIES&client=APP&p=1&v=05132741154833669" + + entity_schema = Stock + data_schema = StockEvents + entity_provider = "em" + provider = "em" + + def record(self, entity, start, end, size, timestamps): + if not start: + start = to_pd_timestamp("2005-01-01") + days = count_interval(start, now_pd_timestamp()) + if days < 0: + fetch_count = 1 + elif days <= 10: + fetch_count = 3 + elif days <= 30: + fetch_count = 5 + else: + fetch_count = 2000 + + stock_events = em_api.get_events(session=self.http_session, entity_id=entity.id, fetch_count=fetch_count) + if stock_events: + df = pd.DataFrame.from_records(stock_events) + self.logger.info(df.head()) + df_to_db(df=df, data_schema=self.data_schema, provider=self.provider, force_update=self.force_update) + + +if __name__ == "__main__": + r = EMStockEventsRecorder(entity_ids=["stock_sz_000338"], sleeping_time=0) + r.run() +# the __all__ is generated +__all__ = ["EMStockEventsRecorder"] diff --git a/src/zvt_vip/recorders/em_stock_news_recorder.py b/src/zvt_vip/recorders/em_stock_news_recorder.py new file mode 100644 index 00000000..074517db --- /dev/null +++ b/src/zvt_vip/recorders/em_stock_news_recorder.py @@ -0,0 +1,50 @@ +# -*- coding: utf-8 -*- +import pandas as pd + +from zvt.contract.api import df_to_db +from zvt.domain.misc.stock_news import StockNews +from zvt.recorders import EMStockNewsRecorder +from zvt.recorders.em import em_api +from zvt.utils.time_utils import count_interval, now_pd_timestamp, recent_year_date +from zvt_vip.recorders.em_api import get_news_content + + +class VIPEMStockNewsRecorder(EMStockNewsRecorder): + def record(self, entity, start, end, size, timestamps): + from_date = recent_year_date() + if not start or (start < from_date): + start = from_date + + if count_interval(start, now_pd_timestamp()) <= 30: + ps = 30 + else: + ps = 200 + + latest_news: StockNews = self.get_latest_saved_record(entity=entity) + + news = em_api.get_news( + session=self.http_session, + entity_id=entity.id, + ps=ps, + start_timestamp=start, + latest_code=latest_news.news_code if latest_news else None, + ) + if news: + # get latest 50 news content + for item in news[:50]: + news_code = item.get("news_code") + if news_code: + item["news_content"] = get_news_content(news_code=news_code, session=self.http_session) + + df = pd.DataFrame.from_records(news) + self.logger.info(df) + df_to_db(df=df, data_schema=self.data_schema, provider=self.provider, force_update=self.force_update) + + +if __name__ == "__main__": + # df = Stock.query_data(filters=[Stock.exchange == "bj"], provider="em") + # entity_ids = df["entity_id"].tolist() + r = EMStockNewsRecorder(entity_ids=["stock_sh_600345"], sleeping_time=0) + r.run() +# the __all__ is generated +__all__ = ["EMStockNewsRecorder"] diff --git a/src/zvt_vip/recorders/jqka/__init__.py b/src/zvt_vip/recorders/jqka/__init__.py new file mode 100644 index 00000000..40a96afc --- /dev/null +++ b/src/zvt_vip/recorders/jqka/__init__.py @@ -0,0 +1 @@ +# -*- coding: utf-8 -*- diff --git a/src/zvt_vip/recorders/jqka/jqka_vip_api.py b/src/zvt_vip/recorders/jqka/jqka_vip_api.py new file mode 100644 index 00000000..3affeeed --- /dev/null +++ b/src/zvt_vip/recorders/jqka/jqka_vip_api.py @@ -0,0 +1,6 @@ +# -*- coding: utf-8 -*- +from zvt.recorders.jqka import jqka_api + + +def get_reason(): + pass diff --git a/src/zvt_vip/rest/__init__.py b/src/zvt_vip/rest/__init__.py new file mode 100644 index 00000000..40a96afc --- /dev/null +++ b/src/zvt_vip/rest/__init__.py @@ -0,0 +1 @@ +# -*- coding: utf-8 -*- diff --git a/src/zvt_vip/rest/event.py b/src/zvt_vip/rest/event.py new file mode 100644 index 00000000..6585879f --- /dev/null +++ b/src/zvt_vip/rest/event.py @@ -0,0 +1,34 @@ +# -*- coding: utf-8 -*- +from typing import List + +from fastapi import APIRouter + +from zvt_vip.event import event_service +from zvt_vip.event.event_models import StockEventSummaryModel, StockNewsModel, IgnoreNewsModel + +event_router = APIRouter( + prefix="/api/event", + tags=["event"], + responses={404: {"description": "Not found"}}, +) + + +@event_router.get("/get_stock_event", response_model=StockEventSummaryModel) +def get_stock_event(entity_id: str): + return event_service.get_stock_event(entity_id) + + +@event_router.get("/get_stock_news_analysis", response_model=List[StockNewsModel]) +def get_stock_news_analysis(): + return event_service.get_stock_news_analysis() + + +@event_router.get("/get_tag_suggestions_stats") +def get_tag_suggestions_stats(): + return event_service.get_tag_suggestions_stats() + + +@event_router.post("/ignore_stock_news", response_model=str) +def ignore_stock_news(ignore_news_model: IgnoreNewsModel): + event_service.ignore_news(ignore_news_model) + return "success" diff --git a/src/zvt_vip/server.py b/src/zvt_vip/server.py new file mode 100644 index 00000000..b65247f4 --- /dev/null +++ b/src/zvt_vip/server.py @@ -0,0 +1,48 @@ +# -*- coding: utf-8 -*- +import os + +import uvicorn +from fastapi import FastAPI +from fastapi.middleware.cors import CORSMiddleware +from fastapi.responses import ORJSONResponse +from fastapi_pagination import add_pagination + +from zvt import zvt_env +from zvt.rest.data import data_router +from zvt.rest.factor import factor_router +from zvt.rest.misc import misc_router +from zvt.rest.trading import trading_router +from zvt.rest.work import work_router +from zvt_vip.rest.event import event_router + +app = FastAPI(default_response_class=ORJSONResponse) + +origins = ["*"] + +app.add_middleware( + CORSMiddleware, + allow_origins=origins, + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) + + +@app.get("/") +async def root(): + return {"message": "Hello World"} + + +app.include_router(data_router) +app.include_router(factor_router) +app.include_router(work_router) +app.include_router(trading_router) +app.include_router(misc_router) +app.include_router(event_router) + +add_pagination(app) + + +if __name__ == "__main__": + log_config = os.path.join(zvt_env["resource_path"], "log_conf.yaml") + uvicorn.run("server:app", host="0.0.0.0", reload=True, port=8090, log_config=log_config) diff --git a/src/zvt_vip/tag/__init__.py b/src/zvt_vip/tag/__init__.py new file mode 100644 index 00000000..26882b27 --- /dev/null +++ b/src/zvt_vip/tag/__init__.py @@ -0,0 +1,13 @@ +# -*- coding: utf-8 -*- +# the __all__ is generated +__all__ = [] + +# __init__.py structure: +# common code of the package +# export interface in __all__ which contains __all__ of its sub modules + +# import all from submodule tag_service +from .tag_service import * +from .tag_service import __all__ as _tag_service_all + +__all__ += _tag_service_all diff --git a/src/zvt_vip/tag/tag_service.py b/src/zvt_vip/tag/tag_service.py new file mode 100644 index 00000000..4b56b4d8 --- /dev/null +++ b/src/zvt_vip/tag/tag_service.py @@ -0,0 +1,197 @@ +# -*- coding: utf-8 -*- +import logging +from typing import List + +import pandas as pd +import sqlalchemy + +import zvt.contract.api as contract_api +from zvt.api.kdata import get_trade_dates +from zvt.api.selector import get_entity_ids_by_filter +from zvt.domain import Stock +from zvt.tag.tag_models import ( + SetStockTagsModel, +) +from zvt.tag.tag_schemas import StockTags, StockSystemTags +from zvt.tag.tag_service import build_stock_tags +from zvt.tag.tag_utils import ( + get_sub_tags, + get_concept_main_tag_mapping, +) +from zvt.utils.time_utils import ( + to_time_str, + to_pd_timestamp, + date_time_by_interval, + TIME_FORMAT_DAY, + current_date, +) +from zvt_vip.dataset.stock_events import StockEvents + +logger = logging.getLogger(__name__) + + +def vip_build_default_sub_tags(entity_ids=None): + if not entity_ids: + entity_ids = get_entity_ids_by_filter( + provider="em", ignore_delist=True, ignore_st=False, ignore_new_stock=False + ) + + for entity_id in entity_ids: + logger.info(f"build sub tag for: {entity_id}") + datas = StockTags.query_data(entity_id=entity_id, limit=1, return_type="domain") + if not datas: + raise AssertionError(f"Main tag must be set at first for {entity_id}") + + current_stock_tags = datas[0] + keep_current = False + if current_stock_tags.set_by_user: + logger.info(f"keep current tags set by user for: {entity_id}") + keep_current = True + + start_timestamp = max(current_stock_tags.timestamp, to_pd_timestamp("2005-01-01")) + current_sub_tag = current_stock_tags.sub_tag + filters = [StockEvents.event_type == "新增概念", StockEvents.entity_id == entity_id] + if current_sub_tag: + logger.info(f"{entity_id} current_sub_tag: {current_sub_tag}") + current_sub_tags = current_stock_tags.sub_tags.keys() + for current_sub_tag in current_sub_tags: + filters = filters + [sqlalchemy.not_(StockEvents.level1_content.contains(current_sub_tag))] + + logger.info(f"get stock_events from start_timestamp: {start_timestamp}") + + stock_events: List[StockEvents] = StockEvents.query_data( + provider="em", + start_timestamp=start_timestamp, + filters=filters, + order=StockEvents.timestamp.asc(), + return_type="domain", + ) + if not stock_events: + logger.info(f"no stock_events for: {entity_id}") + continue + + for stock_event in stock_events: + event_timestamp = to_pd_timestamp(stock_event.timestamp) + if stock_event.level1_content: + contents = stock_event.level1_content.split(":") + if len(contents) < 2: + logger.warning(f"wrong stock_event:{stock_event.level1_content}") + else: + sub_tag = contents[1] + if sub_tag in get_sub_tags(): + if stock_event.level2_content: + sub_tag_reason = stock_event.level2_content.split(":")[1] + else: + sub_tag_reason = f"来自概念:{sub_tag}" + + main_tag = get_concept_main_tag_mapping().get(sub_tag) + main_tag_reason = sub_tag_reason + if main_tag == "其他": + main_tag = current_stock_tags.main_tag + main_tag_reason = current_stock_tags.main_tag_reason + + build_stock_tags( + set_stock_tags_model=SetStockTagsModel( + entity_id=entity_id, + main_tag=main_tag, + main_tag_reason=main_tag_reason, + sub_tag=sub_tag, + sub_tag_reason=sub_tag_reason, + active_hidden_tags=None, + ), + timestamp=event_timestamp, + set_by_user=False, + keep_current=keep_current, + ) + else: + logger.info(f"ignore {sub_tag} not in sub_tag_info yet") + + +def build_system_tags(): + entity_ids = get_entity_ids_by_filter(provider="em", ignore_delist=True, ignore_st=False, ignore_new_stock=False) + + latest = StockSystemTags.query_data(limit=1, order=StockSystemTags.timestamp.desc(), return_type="domain") + if latest: + start = date_time_by_interval(to_time_str(latest[0].timestamp, fmt=TIME_FORMAT_DAY)) + else: + start = "2018-01-01" + trade_days = get_trade_dates(start=start, end=current_date()) + + entity_df = Stock.query_data( + provider="em", entity_ids=entity_ids, columns=["entity_id", "code", "name"], index="entity_id" + ) + entity_df = entity_df[["entity_id", "code", "name"]] + + for target_date in trade_days: + logger.info(f"build_system_tags to: {target_date}") + events_list = ["增减持计划", "增发", "配股", "业绩预告", "股票回购", "股东增减持", "诉讼仲裁", "违规处罚", "高管及关联方增减持", "龙虎榜"] + recent_30_days = date_time_by_interval(target_date, -30) + future_30_days = date_time_by_interval(target_date, 30) + df1 = StockEvents.query_data( + entity_ids=entity_ids, + start_timestamp=recent_30_days, + end_timestamp=target_date, + filters=[StockEvents.event_type.in_(events_list)], + ) + + forecast_events_list = [ + "限售解禁", + ] + df2 = StockEvents.query_data( + entity_ids=entity_ids, + start_timestamp=recent_30_days, + end_timestamp=future_30_days, + filters=[StockEvents.event_type.in_(forecast_events_list)], + ) + df = pd.concat([df1, df2]) + df["recent_reduction"] = ((df["event_type"] == "增减持计划") & df["level1_content"].str.contains("减持")) | ( + df["specific_event_type"] == "股东减持" + ) + df["recent_acquisition"] = ( + ((df["event_type"] == "增减持计划") & df["level1_content"].str.contains("增持")) + | (df["specific_event_type"] == "股东增持") + | (df["event_type"] == "股票回购") + ) + + # 解禁 + df["recent_unlock"] = df["event_type"] == "限售解禁" + # 增发配股 + df["recent_additional_or_rights_issue"] = (df["event_type"] == "增发") | (df["event_type"] == "配股") + + # 违规 + df["recent_violation_alert"] = df["event_type"] == "违规处罚" + + # 业绩向好 + df["recent_positive_earnings_news"] = (df["event_type"] == "业绩预告") & ( + df["level1_content"].str.contains("预增|略增|扭亏", regex=True) + ) + # 业绩不行 + df["recent_negative_earnings_news"] = (df["event_type"] == "业绩预告") & ( + df["level1_content"].str.contains("预减|略减|续亏|首亏", regex=True) + ) + + grouped_df = df.groupby("entity_id").agg( + recent_reduction=("recent_reduction", "any"), + recent_acquisition=("recent_acquisition", "any"), + recent_unlock=("recent_unlock", "any"), + recent_additional_or_rights_issue=("recent_additional_or_rights_issue", "any"), + recent_violation_alert=("recent_violation_alert", "any"), + recent_positive_earnings_news=("recent_positive_earnings_news", "any"), + recent_negative_earnings_news=("recent_negative_earnings_news", "any"), + recent_dragon_and_tiger_count=( + "event_type", + lambda x: (x == "龙虎榜").astype(int).sum(), + ), + ) + grouped_df["timestamp"] = to_pd_timestamp(target_date) + result_df = pd.concat([grouped_df, entity_df[entity_df.index.isin(grouped_df.index)]], axis=1) + result_df["id"] = result_df["entity_id"] + "_" + to_time_str(target_date, fmt=TIME_FORMAT_DAY) + contract_api.df_to_db(result_df, data_schema=StockSystemTags, force_update=False, provider="zvt") + + +if __name__ == "__main__": + vip_build_default_sub_tags() + # build_system_tags() + +# the __all__ is generated +__all__ = ["vip_build_default_sub_tags", "build_system_tags"] diff --git a/src/zvt_vip/tag/taggers.py b/src/zvt_vip/tag/taggers.py new file mode 100644 index 00000000..2609e31d --- /dev/null +++ b/src/zvt_vip/tag/taggers.py @@ -0,0 +1,38 @@ +# -*- coding: utf-8 -*- +import logging +import re +from typing import List + +from zvt.tag.common import TagType +from zvt.tag.tag_models import BatchSetStockTagsModel +from zvt.tag.tag_service import batch_set_stock_tags +from zvt.utils.time_utils import date_time_by_interval, current_date +from zvt_vip.dataset import StockEvents + +logger = logging.getLogger(__name__) + + +def tag_financial_result(): + """ + tag:炒业绩 + + """ + events_list = ["业绩预告"] + recent_90_days = date_time_by_interval(current_date(), -60) + + stock_events: List[StockEvents] = StockEvents.query_data( + start_timestamp=recent_90_days, + filters=[StockEvents.event_type.in_(events_list)], + return_type="domain", + order=StockEvents.timestamp.asc(), + ) + + entity_ids = [event.entity_id for event in stock_events if bool(re.search("预增|略增|扭亏|减亏", event.level1_content))] + + batch_set_stock_tags( + BatchSetStockTagsModel(entity_ids=entity_ids, tag="炒业绩", tag_reason="业绩超预期", tag_type=TagType.sub_tag) + ) + + +if __name__ == "__main__": + tag_financial_result() diff --git a/src/zvt_vip/tasks/__init__.py b/src/zvt_vip/tasks/__init__.py new file mode 100644 index 00000000..40a96afc --- /dev/null +++ b/src/zvt_vip/tasks/__init__.py @@ -0,0 +1 @@ +# -*- coding: utf-8 -*- diff --git a/src/zvt_vip/tasks/event_runner.py b/src/zvt_vip/tasks/event_runner.py new file mode 100644 index 00000000..24b2251c --- /dev/null +++ b/src/zvt_vip/tasks/event_runner.py @@ -0,0 +1,48 @@ +# -*- coding: utf-8 -*- +import logging + +from apscheduler.schedulers.background import BackgroundScheduler + +from zvt.api.selector import get_entity_ids_by_filter +from zvt.domain import BlockStock +from zvt.utils.recorder_utils import run_data_recorder +from zvt_vip.dataset import StockEvents + +logger = logging.getLogger(__name__) + +sched = BackgroundScheduler() + + +def record_events(): + data_provider = "em" + sleeping_time = 0 + normal_stock_ids = get_entity_ids_by_filter( + provider="em", ignore_delist=True, ignore_st=False, ignore_new_stock=False + ) + + run_data_recorder( + entity_ids=normal_stock_ids, + day_data=False, + domain=StockEvents, + data_provider=data_provider, + force_update=False, + sleeping_time=sleeping_time, + return_unfinished=True, + ) + + run_data_recorder( + entity_ids=normal_stock_ids, + day_data=True, + domain=BlockStock, + data_provider=data_provider, + force_update=False, + sleeping_time=sleeping_time, + return_unfinished=True, + ) + + +if __name__ == "__main__": + record_events() + sched.add_job(func=record_events, trigger="cron", hour=17, minute=00, day_of_week="mon-fri") + sched.start() + sched._thread.join() diff --git a/src/zvt_vip/tasks/news_runner.py b/src/zvt_vip/tasks/news_runner.py new file mode 100644 index 00000000..afe2a475 --- /dev/null +++ b/src/zvt_vip/tasks/news_runner.py @@ -0,0 +1,59 @@ +# -*- coding: utf-8 -*- +import logging +import time +from typing import List + +from apscheduler.schedulers.background import BackgroundScheduler + +from zvt.tag.common import InsertMode +from zvt.tag.dynamic_pool import get_top_50 +from zvt.tag.tag_models import CreateStockPoolsModel +from zvt.tag.tag_schemas import StockPools +from zvt.tag.tag_service import build_stock_pool +from zvt.utils.time_utils import now_pd_timestamp, current_date +from zvt_vip.recorders.em_stock_news_recorder import VIPEMStockNewsRecorder + +logger = logging.getLogger(__name__) + + +sched = BackgroundScheduler() + + +def analyze_news(): + sleeping_time = 0 + stock_pools: List[StockPools] = StockPools.query_data( + filters=[StockPools.stock_pool_name.in_(["大局"])], + order=StockPools.timestamp.desc(), + limit=2, + return_type="domain", + ) + entity_ids = list(set(stock_pools[0].entity_ids + stock_pools[1].entity_ids)) + + if entity_ids: + r = VIPEMStockNewsRecorder(entity_ids=entity_ids, force_update=False, sleeping_time=sleeping_time) + r.run() + + while True: + entity_ids = get_top_50() + + if entity_ids: + create_stock_pools_model: CreateStockPoolsModel = CreateStockPoolsModel( + stock_pool_name="今日强势", entity_ids=entity_ids, insert_mode=InsertMode.append + ) + + build_stock_pool(create_stock_pools_model, target_date=current_date()) + r = VIPEMStockNewsRecorder(entity_ids=entity_ids, force_update=False, sleeping_time=sleeping_time) + r.run() + + current_timestamp = now_pd_timestamp() + if current_timestamp.hour >= 15 and current_timestamp.minute >= 1: + logger.info(f"record stock news finished at: {current_timestamp}") + break + time.sleep(30) + + +if __name__ == "__main__": + analyze_news() + sched.add_job(func=analyze_news, trigger="cron", hour=9, minute=10, day_of_week="mon-fri") + sched.start() + sched._thread.join() diff --git a/src/zvt_vip/tasks/stock_pool_runner.py b/src/zvt_vip/tasks/stock_pool_runner.py new file mode 100644 index 00000000..4f007acb --- /dev/null +++ b/src/zvt_vip/tasks/stock_pool_runner.py @@ -0,0 +1,258 @@ +# -*- coding: utf-8 -*- +import logging +from collections import Counter +from typing import List + +from apscheduler.schedulers.background import BackgroundScheduler + +from zvt import zvt_config +from zvt.api.kdata import get_latest_kdata_date +from zvt.api.selector import get_entity_ids_by_filter +from zvt.contract import AdjustType +from zvt.contract.api import get_entity_code +from zvt.domain import ( + Stock, + Stock1dHfqKdata, + Stockhk, + Stockhk1dHfqKdata, + Block, + Block1dKdata, + BlockCategory, + Index, + Index1dKdata, + LimitUpInfo, +) +from zvt.factors import compute_top_stocks +from zvt.informer import EmailInformer +from zvt.informer.inform_utils import add_to_eastmoney, clean_groups +from zvt.informer.inform_utils import inform_email +from zvt.rest.work import query_simple_stock_tags +from zvt.tag.tag_models import QuerySimpleStockTagsModel +from zvt.tag.tag_schemas import TagStats +from zvt.tag.tag_stats import build_system_stock_pools, build_stock_pool_tag_stats +from zvt.utils.recorder_utils import run_data_recorder +from zvt.utils.time_utils import current_date +from zvt.utils.time_utils import to_pd_timestamp + +logger = logging.getLogger(__name__) + +sched = BackgroundScheduler() + +email_informer = EmailInformer() + + +def report_limit_up(): + latest_data = LimitUpInfo.query_data(order=LimitUpInfo.timestamp.desc(), limit=1, return_type="domain") + timestamp = latest_data[0].timestamp + df = LimitUpInfo.query_data(start_timestamp=timestamp, end_timestamp=timestamp, columns=["code", "name", "reason"]) + df["reason"] = df["reason"].str.split("+") + print(df) + email_informer.send_message(zvt_config["email_username"], f"{timestamp} 热门报告", f"{df}") + + +def record_stock_data(data_provider="em", entity_provider="em", sleeping_time=0): + # 涨停数据 + run_data_recorder(domain=LimitUpInfo, data_provider=None, force_update=False) + report_limit_up() + + # A股指数 + run_data_recorder(domain=Index, data_provider=data_provider, force_update=False) + # A股指数行情 + run_data_recorder( + domain=Index1dKdata, + data_provider=data_provider, + entity_provider=entity_provider, + day_data=True, + sleeping_time=sleeping_time, + ) + + # 板块(概念,行业) + run_data_recorder(domain=Block, entity_provider=entity_provider, data_provider=entity_provider, force_update=False) + # 板块行情(概念,行业) + run_data_recorder( + domain=Block1dKdata, + entity_provider=entity_provider, + data_provider=entity_provider, + day_data=True, + sleeping_time=sleeping_time, + ) + + # 报告新概念和行业 + df = Block.query_data( + filters=[Block.category == BlockCategory.concept.value], + order=Block.list_date.desc(), + index="entity_id", + limit=7, + ) + + inform_email( + entity_ids=df.index.tolist(), entity_type="block", target_date=current_date(), title="report 新概念", provider="em" + ) + + # A股标的 + run_data_recorder(domain=Stock, data_provider=data_provider, force_update=False) + # A股后复权行情 + normal_stock_ids = get_entity_ids_by_filter( + provider="em", ignore_delist=True, ignore_st=False, ignore_new_stock=False + ) + + run_data_recorder( + entity_ids=normal_stock_ids, + domain=Stock1dHfqKdata, + data_provider=data_provider, + entity_provider=entity_provider, + day_data=True, + sleeping_time=sleeping_time, + return_unfinished=True, + ) + + +def record_stockhk_data(data_provider="em", entity_provider="em", sleeping_time=2): + # 港股标的 + run_data_recorder(domain=Stockhk, data_provider=data_provider, force_update=False) + # 港股后复权行情 + df = Stockhk.query_data(filters=[Stockhk.south == True], index="entity_id") + run_data_recorder( + domain=Stockhk1dHfqKdata, + entity_ids=df.index.tolist(), + data_provider=data_provider, + entity_provider=entity_provider, + day_data=True, + sleeping_time=sleeping_time, + ) + + +def report_main_line_stocks(): + target_date = get_latest_kdata_date(provider="em", entity_type="stock", adjust_type=AdjustType.hfq) + # target_date = "2024-04-03" + + tag_stats_list: List[TagStats] = TagStats.query_data( + filters=[TagStats.timestamp == to_pd_timestamp(target_date), TagStats.stock_pool_name == "main_line"], + order=TagStats.position.asc(), + return_type="domain", + ) + msg = "" + + all_tags = [item.main_tag for item in tag_stats_list] + + main_line_tags = all_tags[:3] + main_line_stocks = [] + + following_tags = all_tags[3:5] + following_stocks = [] + + other_tags = all_tags[5:] + other_stocks = [] + + for tag_stats in tag_stats_list: + if tag_stats.main_tag in main_line_tags: + main_line_stocks = main_line_stocks + tag_stats.entity_ids + elif tag_stats.main_tag in following_tags: + following_stocks = following_stocks + tag_stats.entity_ids + else: + other_stocks = other_stocks + tag_stats.entity_ids + + msg = msg + f"^^^^^^ {tag_stats.main_tag}[{tag_stats.turnover} ({tag_stats.entity_count})] ^^^^^^\n" + stocks_info = [] + simple_stock_tags = query_simple_stock_tags(QuerySimpleStockTagsModel(entity_ids=tag_stats.entity_ids)) + sub_tag_counts = Counter(tag["sub_tag"] for tag in simple_stock_tags) + sorted_simple_stock_tags = sorted(simple_stock_tags, key=lambda x: sub_tag_counts[x["sub_tag"]], reverse=True) + + for simple_stock_tag in sorted_simple_stock_tags: + stocks_info.append( + f"{simple_stock_tag['entity_id']}({simple_stock_tag['name']}) [{simple_stock_tag['sub_tag']}]" + ) + + msg = msg + "\n".join(stocks_info) + msg = msg + "\n" + + if main_line_stocks: + codes = [get_entity_code(entity_id) for entity_id in main_line_stocks] + logger.info(f"主线个股: {codes}") + add_to_eastmoney(codes=codes, entity_type="stock", group="主线", over_write=True) + + if len(following_stocks) > 0: + clean_groups(keep=["自选股", "练气", "重要板块", "主线"]) + + if following_stocks: + codes = [get_entity_code(entity_id) for entity_id in following_stocks] + logger.info(f"次主线个股: {codes}") + group = "".join(following_tags)[:6] + add_to_eastmoney(codes=codes, entity_type="stock", group=group, over_write=True) + + if other_stocks: + codes = [get_entity_code(entity_id) for entity_id in other_stocks] + logger.info(f"其他个股: {codes}") + group = "".join(other_tags)[:5] + "等" + add_to_eastmoney(codes=codes, entity_type="stock", group=group, over_write=True) + + email_informer.send_message(zvt_config["email_username"], f"{target_date} 主线报告", msg) + return main_line_tags + + +def report_vol_up_stocks(main_line_tags): + target_date = get_latest_kdata_date(provider="em", entity_type="stock", adjust_type=AdjustType.hfq) + # target_date = "2024-04-03" + + tag_stats_list: List[TagStats] = TagStats.query_data( + filters=[TagStats.timestamp == to_pd_timestamp(target_date), TagStats.stock_pool_name == "vol_up"], + order=TagStats.position.asc(), + return_type="domain", + ) + msg = "" + main_line_stocks = [] + others = [] + + for tag_stats in tag_stats_list: + if tag_stats.main_tag in main_line_tags: + main_line_stocks = main_line_stocks + tag_stats.entity_ids + else: + others = others + tag_stats.entity_ids + + msg = msg + f"^^^^^^ {tag_stats.main_tag}[{tag_stats.turnover} ({tag_stats.entity_count})] ^^^^^^\n" + stocks_info = [] + simple_stock_tags = query_simple_stock_tags(QuerySimpleStockTagsModel(entity_ids=tag_stats.entity_ids)) + sub_tag_counts = Counter(tag["sub_tag"] for tag in simple_stock_tags) + sorted_simple_stock_tags = sorted(simple_stock_tags, key=lambda x: sub_tag_counts[x["sub_tag"]], reverse=True) + + for simple_stock_tag in sorted_simple_stock_tags: + stocks_info.append( + f"{simple_stock_tag['entity_id']}({simple_stock_tag['name']}) [{simple_stock_tag['sub_tag']}]" + ) + + msg = msg + "\n".join(stocks_info) + msg = msg + "\n" + if main_line_stocks: + codes = [get_entity_code(entity_id) for entity_id in main_line_stocks] + print(codes) + add_to_eastmoney(codes=codes, entity_type="stock", group="主线", over_write=False) + + if others: + codes = [get_entity_code(entity_id) for entity_id in others] + print(codes) + add_to_eastmoney(codes=codes, entity_type="stock", group="年线股票", over_write=True) + + email_informer.send_message(zvt_config["email_username"], f"{target_date} 年线股票", msg) + + +def build_stock_pool_and_report(): + # 获取 涨停 指数 板块(概念) 个股行情数据 + record_stock_data() + # 计算短期/中期最强 放量突破年线半年线个股 + compute_top_stocks() + # 放入股票池 + build_system_stock_pools() + for stock_pool_name in ["main_line", "vol_up", "大局"]: + build_stock_pool_tag_stats(stock_pool_name=stock_pool_name, force_rebuild_latest=True) + + # report + main_line_tags = report_main_line_stocks() + logger.info(f"mainline is :{main_line_tags}") + report_vol_up_stocks(main_line_tags=main_line_tags) + + +if __name__ == "__main__": + build_stock_pool_and_report() + sched.add_job(func=build_stock_pool_and_report, trigger="cron", hour=16, minute=00, day_of_week="mon-fri") + sched.start() + sched._thread.join() diff --git a/src/zvt_vip/utils/__init__.py b/src/zvt_vip/utils/__init__.py new file mode 100644 index 00000000..40a96afc --- /dev/null +++ b/src/zvt_vip/utils/__init__.py @@ -0,0 +1 @@ +# -*- coding: utf-8 -*- diff --git a/src/zvt_vip/utils/ai_utils.py b/src/zvt_vip/utils/ai_utils.py new file mode 100644 index 00000000..73540422 --- /dev/null +++ b/src/zvt_vip/utils/ai_utils.py @@ -0,0 +1,5 @@ +# -*- coding: utf-8 -*- + + +def build_tags_from(): + pass diff --git a/tests/utils/test_time_utils.py b/tests/utils/test_time_utils.py index c4c272f7..4f6f2270 100644 --- a/tests/utils/test_time_utils.py +++ b/tests/utils/test_time_utils.py @@ -1,10 +1,8 @@ # -*- coding: utf-8 -*- from zvt.contract import IntervalLevel +from zvt.contract.utils import evaluate_size_from_timestamp, next_timestamp_on_level, is_finished_kdata_timestamp from zvt.utils.time_utils import ( - evaluate_size_from_timestamp, - next_timestamp_on_level, to_pd_timestamp, - is_finished_kdata_timestamp, split_time_interval, is_same_date, month_start_end_ranges,