Skip to content
This repository has been archived by the owner on Apr 15, 2020. It is now read-only.

数据解析错误 #210

Open
zillionare opened this issue May 22, 2019 · 6 comments
Open

数据解析错误 #210

zillionare opened this issue May 22, 2019 · 6 comments

Comments

@zillionare
Copy link

我做了两个实现,一个是基于ThreadPooleExecutor的asyncio实现,最多使用了100个服务器并发连接;另一个是单进程单服务器版,我使用了最多30个进程,每个进程连接一个不同的服务器,仍然发现有数据错误。

最初发现有数据解析错误(在自己的程序里),最终发现是tdx返回的数据错误(不确定是服务器本身的错误还是tdx解析错误,但排除了自己程序错误。

如何认定数据错误:
对同一个代码,我向服务器发起两次请求。在异步版中,这两次请求分别向两个不同的服务器发出;在单进程、单线程、单服务器版本中,这两次请求是向一个服务器发出。然后我对两次返回的结果进行比较:

  1. 如果两次都返回None, 认定为正常
  2. 如果其中一次为None,认定为错误
  3. 如果两次数据不一致,认定为错误。比如请求8个30分钟线,将期转换为numpy structure array,再使用numpy.equal进行比较。只有每一个元素都相同,才认为数据获取正确。

下面是我得到的错误的结果示例:

1002722 [26.34 26.21 26.2 26.13 25.6 25.59 25.8 25.7 ] [5.09 5.08 5.07 5.07 5.03 5.01 5. 5.02]
1000155 [10.06 10.11 10.07 10.07 10.03 10.01 10. 10. ] [11.87 11.94 11.86 11.84 11.84 11.86 11.79 11.85]
2600728 [15.17 15.09 15.06 15.07 15.01 14.97 14.94 14.95] [26.34 26.21 26.2 26.13 25.6 25.59 25.8 25.7 ]
2600389 [4.22 4.2 4.18 4.19 4.13 4.13 4.12 4.13] [10.06 10.11 10.07 10.07 10.03 10.01 10. 10. ]
1002193 [10.43 10.35 10.36 10.34 10.28 10.22 10.18 10.35] [15.17 15.09 15.06 15.07 15.01 14.97 14.94 14.95]
1002858 [19.55 19.45 19.61 19.6 19.46 19.39 19.34 19.37] [10.43 10.35 10.36 10.34 10.28 10.22 10.18 10.35]
2600834 [8.96 8.93 8.94 8.95 8.89 8.89 8.89 8.99] [19.55 19.45 19.61 19.6 19.46 19.39 19.34 19.37]
2600874 [3.52 3.53 3.52 3.53 3.51 3.5 3.5 3.5 ] [8.96 8.93 8.94 8.95 8.89 8.89 8.89 8.99]
2603021 [17.94 17.86 17.74 17.71 17.81 17.74 17.65 17.74] [3.52 3.53 3.52 3.53 3.51 3.5 3.5 3.5 ]
2600758 [8.37 8.44 8.39 8.33 8.32 8.26 8.26 8.25] [17.94 17.86 17.74 17.71 17.81 17.74 17.65 17.74]
1000568 [7.95 7.94 7.9 7.92 7.85 7.84 7.82 7.84] [8.37 8.44 8.39 8.33 8.32 8.26 8.26 8.25]
1000637 [6.26 6.27 6.24 6.23 6.22 6.21 6.19 6.24] [7.95 7.94 7.9 7.92 7.85 7.84 7.82 7.84]
1002706 [6.47 6.53 6.49 6.47 6.33 6.3 6.3 6.33] [6.26 6.27 6.24 6.23 6.22 6.21 6.19 6.24]
1002139 [73.8 74.3 74.28 74.22 74.11 73.14 73.25 73.65] [6.47 6.53 6.49 6.47 6.33 6.3 6.3 6.33]
1002437 [4.96 4.94 4.93 4.97 4.92 4.88 4.84 4.86] [73.8 74.3 74.28 74.22 74.11 73.14 73.25 73.65]
1002681 [5.68 5.68 5.66 5.66 5.81 5.83 5.79 5.8 ] [4.96 4.94 4.93 4.97 4.92 4.88 4.84 4.86]
1002693 [5.86 5.89 5.83 5.82 5.79 5.79 5.77 5.79] [5.68 5.68 5.66 5.66 5.81 5.83 5.79 5.8 ]
1000631 [4.16 4.15 4.13 4.13 4.09 4.09 4.06 4.08] [5.86 5.89 5.83 5.82 5.79 5.79 5.77 5.79]
1002622 [4.54 4.53 4.52 4.53 4.48 4.49 4.47 4.49] [4.16 4.15 4.13 4.13 4.09 4.09 4.06 4.08]
1002219 [4.61 4.57 4.56 4.56 4.54 4.54 4.87 4.81] [4.54 4.53 4.52 4.53 4.48 4.49 4.47 4.49]
1002310 [3.11 3.12 3.09 3.11 3.09 3.09 3.08 3.09] [4.61 4.57 4.56 4.56 4.54 4.54 4.87 4.81]
2600729 [3.84 3.83 3.79 3.83 3.78 3.75 3.68 3.67] [3.11 3.12 3.09 3.11 3.09 3.09 3.08 3.09]
2600231 [3.74 3.72 3.72 3.72 3.73 3.72 3.71 3.84] [3.84 3.83 3.79 3.83 3.78 3.75 3.68 3.67]
1000553 [5.65 5.65 5.62 5.63 5.61 5.58 5.54 5.57] [3.74 3.72 3.72 3.72 3.73 3.72 3.71 3.84]
2601616 [30.72 30.72 30.78 30.85 30.52 30.65 30.44 30.44] [5.65 5.65 5.62 5.63 5.61 5.58 5.54 5.57]
summary: 192 of 2726 failed, server is 116.211.98.141, time cost is 341.1942048072815

数据第一列为股票代码,首位1表示深交所,2表示上证。其后为8个30分钟分时,获取时间为2019-5-22日。
summary: 在2726支股票当中,有192只出现错误,服务器是116.211.98.141,这是长江证券电信服务器。

上述测试结果是单进程、单线程、单服务器的测试结果(但同时运行了30个进程)。

通达信版本:
pip freeze |grep pytdx
pytdx==1.68

相关代码
`python
def _build_time(self, d: dict, code: str):
return arrow.Arrow(d['year'], d['month'], d['day'], d['hour'], d['minute'],
tzinfo='Asia/Chongqing').datetime

def convert_sec_quotes_to_nd_array(self, bars: List, code: str, frame_type: FrameType):
    if not bars:
        return None
    else:
        return np.array([(
            int(code), d['open'], d['close'], d['high'], d['low'], d['vol'], d['amount'],
            self._build_time(d, code),
            frame_type) for d in bars],
            dtype=self.sec_bar_dtypes)


def _get_bars(self, code: str, frame_type: FrameType, n_bars: int, is_index) -> (
        List[np.array], str, FrameType, int):
    """
    function to fetch securities (stock, bond, index and etc)
    :param is_index: 000001 could be index and code for 平安银行, so we need this field
    :param code: str, stock/index code with exchange code
    :param frame_type: str, one of TdxFetcherMT.FRAME_XX
    :param n_bars: int, how many bars should we fetch (backward from now)
    :return: List of np.array (which is more memory efficient than python List of Dict, each array contains:
            'open', 'close', 'high', 'low', 'vol', 'amount', 'frame'
            code of the bars
            FrameType of the bars
            expected bars count
    """
    if n_bars == 0:
        logger.warning("bad n_bars value for %s: %s", code, frame_type)
        return None, code, frame_type, n_bars

    try:
        market, scode = self.get_simcode_and_market(code)
    except KeyError as e:
        logger.exception(e)
        logger.warning("KeyError: %s, %s, is_index: %s", code, frame_type, is_index)
        return None, code, frame_type, n_bars

    batch_size = min(n_bars, 800)
    batches = math.ceil(n_bars / batch_size)
    batch_results = [None] * batches

    con = self.server.get("con")
    for i in range(0, batches):
        data = None
        if is_index:
            data = con.get_index_bars(frame_type, market, scode, i * batch_size, batch_size)
        else:
            data = con.get_security_bars(frame_type, market, scode, i * batch_size, batch_size)

        try:
            batch_results[i] = self.convert_sec_quotes_to_nd_array(data, code, frame_type)
        except Exception:
            # self.failures.append((ip, code, frame_type))
            logger.warning("failed to parse data:%s,%s fetched from %s. None is returned", code, frame_type,
                           self.server.get("name"))
            return None, code, frame_type, n_bars

    # merge result
    batch_results = [x for x in batch_results if x is not None]
    if not batch_results:
        # self.failures.append((ip, code, frame_type))
        logger.warning("fetching %s of %s from %s returns no data, %s expected", code, frame_type,
                       self.server.get("name"), n_bars)
        return None, code, frame_type, n_bars

    return np.concatenate(batch_results), code, frame_type, n_bars

`

@zillionare
Copy link
Author

可能跟连接通讯出现错误有关。当通讯出现错误后,该连接如果没有释放,继续在该连接上请求数据,则可能出现不期望的结果,比如复用了上一次请求的数据。

我已经将tdx的初始化参数设置为允许抛出异常,并在异常处理时释放该连接。近期测试表明错误有较大程度减少。

我在异常处理时还应该进一步改进。当前从连接池里取出连接、异常发生时释放连接时并没有加锁。这样可能导致两个不同线程的请求,请求到同一个连接上;当这个连接正在释放时,很可能另一个线程又请求到了这个连接。

待测试后进一步更新。

@yutiansut
Copy link
Collaborator

薅羊毛轻点薅啊.... 容易不定期抛出错误数据的

@yutiansut
Copy link
Collaborator

  1. 实时的服务器并不是数据都准确的 尤其体现在实时transaction数据 大量服务器错误数据
  2. 这种频繁的请求容易被封ip 注意pytdx只是个破解接口而非任意薅羊毛的接口 这种30进程薅羊毛的行为完全就是异常且不合理的
  3. 如果非要多线程 建议多服务器薅..

@zillionare
Copy link
Author

@yutiansut 谢谢,就是用了多服务器。因为担心被封IP,所以我这个改进版也一直不敢拿出来

@wellcomez
Copy link

wellcomez commented Nov 22, 2019

@yutiansut 谢谢,就是用了多服务器。因为担心被封IP,所以我这个改进版也一直不敢拿出来

阿诺你好。不是http1.2 /2.0 ,请求和应答不配对。原始代码貌似有锁,细节没看。

@abc100m
Copy link

abc100m commented Dec 20, 2019

#234
看下我这个issue,修改之后再跑一下,是否正常了?

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants