Skip to content

Commit bf138a0

Browse files
authored
feat: schedule_job and interval (#231)
* test: for builtin functions * test: expect fail for `datetime()` * feat: add `interval()` fn(WIP) * feat: `interval()` fn in builtin(UNTEST) * refactor: move `py_vec_obj_to_array` to util.rs * style: fmt * test: simple `interval()` cases * test: `interval()` with `last()`&`first()` * doc: `ts` param of `interval()` * log: common_telemetry for logging in script crate * doc: corrsponding test fn for each .ron file * feat: change to`mpsc` for schedule_job * test: schedule_job * dep: rm rustpython dep in common-function * refactor: mv `schedule_job` into `Script` trait * test: change to use `interval` to sample datapoint * feat: add gen_none_array for generate None Array * feat: impl Missing value for `prev`&`next` * test: `sum(prev(values))` * doc: add comment for why not support Float16 in `prev()` * feat: add `interval` in py side mock module * style: cargo fmt * refactor: according to comments * refactor: extract `apply_interval_function` * style: cargo fmt * refactor: remove `schedule()` * style: cargo fmt
1 parent 4d8b76b commit bf138a0

File tree

18 files changed

+921
-146
lines changed

18 files changed

+921
-146
lines changed

Cargo.lock

Lines changed: 2 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

component/script/python/example/calc_rv.py

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -44,13 +44,15 @@ def as_table(kline: list):
4444
"rv_60d",
4545
"rv_90d",
4646
"rv_180d"
47-
],
48-
sql="select open_time, close from k_line")
47+
])
4948
def calc_rvs(open_time, close):
50-
from greptime import vector, log, prev, sqrt, datetime, pow, sum
49+
from greptime import vector, log, prev, sqrt, datetime, pow, sum, last
50+
import greptime as g
5151
def calc_rv(close, open_time, time, interval):
5252
mask = (open_time < time) & (open_time > time - interval)
5353
close = close[mask]
54+
open_time = open_time[mask]
55+
close = g.interval(open_time, close, datetime("10m"), lambda x:last(x))
5456

5557
avg_time_interval = (open_time[-1] - open_time[0])/(len(open_time)-1)
5658
ref = log(close/prev(close))
@@ -60,10 +62,10 @@ def calc_rv(close, open_time, time, interval):
6062
# how to get env var,
6163
# maybe through accessing scope and serde then send to remote?
6264
timepoint = open_time[-1]
63-
rv_7d = calc_rv(close, open_time, timepoint, datetime("7d"))
64-
rv_15d = calc_rv(close, open_time, timepoint, datetime("15d"))
65-
rv_30d = calc_rv(close, open_time, timepoint, datetime("30d"))
66-
rv_60d = calc_rv(close, open_time, timepoint, datetime("60d"))
67-
rv_90d = calc_rv(close, open_time, timepoint, datetime("90d"))
68-
rv_180d = calc_rv(close, open_time, timepoint, datetime("180d"))
65+
rv_7d = vector([calc_rv(close, open_time, timepoint, datetime("7d"))])
66+
rv_15d = vector([calc_rv(close, open_time, timepoint, datetime("15d"))])
67+
rv_30d = vector([calc_rv(close, open_time, timepoint, datetime("30d"))])
68+
rv_60d = vector([calc_rv(close, open_time, timepoint, datetime("60d"))])
69+
rv_90d = vector([calc_rv(close, open_time, timepoint, datetime("90d"))])
70+
rv_180d = vector([calc_rv(close, open_time, timepoint, datetime("180d"))])
6971
return rv_7d, rv_15d, rv_30d, rv_60d, rv_90d, rv_180d

component/script/python/example/kline.json

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
{
88
"symbol": "BTCUSD",
99
"period": "1",
10-
"open_time": 1581231300,
10+
"open_time": 300,
1111
"open": "10107",
1212
"high": "10109.34",
1313
"low": "10106.71",
@@ -16,7 +16,7 @@
1616
{
1717
"symbol": "BTCUSD",
1818
"period": "1",
19-
"open_time": 1581231360,
19+
"open_time": 900,
2020
"open": "10106.79",
2121
"high": "10109.27",
2222
"low": "10105.92",
@@ -25,7 +25,7 @@
2525
{
2626
"symbol": "BTCUSD",
2727
"period": "1",
28-
"open_time": 1581231420,
28+
"open_time": 1200,
2929
"open": "10106.09",
3030
"high": "10108.75",
3131
"low": "10104.66",
@@ -34,7 +34,7 @@
3434
{
3535
"symbol": "BTCUSD",
3636
"period": "1",
37-
"open_time": 1581231480,
37+
"open_time": 1800,
3838
"open": "10108.73",
3939
"high": "10109.52",
4040
"low": "10106.07",
@@ -43,7 +43,7 @@
4343
{
4444
"symbol": "BTCUSD",
4545
"period": "1",
46-
"open_time": 1581231540,
46+
"open_time": 2400,
4747
"open": "10106.38",
4848
"high": "10109.48",
4949
"low": "10104.81",
@@ -52,7 +52,7 @@
5252
{
5353
"symbol": "BTCUSD",
5454
"period": "1",
55-
"open_time": 1581231600,
55+
"open_time": 3000,
5656
"open": "10106.95",
5757
"high": "10109.48",
5858
"low": "10106.6",
@@ -61,7 +61,7 @@
6161
{
6262
"symbol": "BTCUSD",
6363
"period": "1",
64-
"open_time": 1581231660,
64+
"open_time": 3600,
6565
"open": "10107.55",
6666
"high": "10109.28",
6767
"low": "10104.68",
@@ -70,7 +70,7 @@
7070
{
7171
"symbol": "BTCUSD",
7272
"period": "1",
73-
"open_time": 1581231720,
73+
"open_time": 4200,
7474
"open": "10104.68",
7575
"high": "10109.18",
7676
"low": "10104.14",
@@ -79,7 +79,7 @@
7979
{
8080
"symbol": "BTCUSD",
8181
"period": "1",
82-
"open_time": 1581231780,
82+
"open_time": 4800,
8383
"open": "10108.8",
8484
"high": "10117.36",
8585
"low": "10108.8",
@@ -88,7 +88,7 @@
8888
{
8989
"symbol": "BTCUSD",
9090
"period": "1",
91-
"open_time": 1581231840,
91+
"open_time": 5400,
9292
"open": "10115.96",
9393
"high": "10119.19",
9494
"low": "10115.96",
@@ -97,7 +97,7 @@
9797
{
9898
"symbol": "BTCUSD",
9999
"period": "1",
100-
"open_time": 1581231900,
100+
"open_time": 6000,
101101
"open": "10117.08",
102102
"high": "10120.73",
103103
"low": "10116.96",
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
from .greptime import coprocessor, copr
2-
from .greptime import vector, log, prev, sqrt, pow, datetime, sum
2+
from .greptime import vector, log, prev, next, first, last, sqrt, pow, datetime, sum, interval
33
from .mock import mock_tester
44
from .cfg import set_conn_addr, get_conn_addr

component/script/python/greptime/greptime.py

Lines changed: 16 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -89,42 +89,34 @@ def datatype(self):
8989
def filter(self, lst_bool):
9090
return self[lst_bool]
9191

92+
def last(lst):
93+
return lst[-1]
94+
95+
def first(lst):
96+
return lst[0]
9297

9398
def prev(lst):
9499
ret = np.zeros(len(lst))
95100
ret[1:] = lst[0:-1]
96101
ret[0] = nan
97102
return ret
98103

104+
def next(lst):
105+
ret = np.zeros(len(lst))
106+
ret[:-1] = lst[1:]
107+
ret[-1] = nan
108+
return ret
99109

100-
def query(sql: str):
101-
pass
102-
103-
104-
def interval(arr: list, duration: int, fill, step: None | int = None, explicitOffset=False):
110+
def interval(ts: vector, arr: vector, duration: int, func):
105111
"""
106112
Note that this is a mock function with same functionailty to the actual Python Coprocessor
107113
`arr` is a vector of integral or temporal type.
108-
109-
`duration` is the length of sliding window
110-
111-
`step` being the length when sliding window take a step
112-
113-
`fill` indicate how to fill missing value:
114-
- "prev": use previous
115-
- "post": next
116-
- "linear": linear interpolation, if not possible to interpolate certain types, fallback to prev
117-
- "null": use null
118-
- "none": do not interpolate
119114
"""
120-
if step is None:
121-
step = duration
122-
123-
tot_len = int(np.ceil(len(arr) // step))
124-
slices = np.zeros((tot_len, int(duration)))
125-
for idx, start in enumerate(range(0, len(arr), step)):
126-
slices[idx] = arr[start:(start + duration)]
127-
return slices
115+
start = np.min(ts)
116+
end = np.max(ts)
117+
masks = [(ts >= i) & (ts <= (i+duration)) for i in range(start, end, duration)]
118+
lst_res = [func(arr[mask]) for mask in masks]
119+
return lst_res
128120

129121

130122
def factor(unit: str) -> int:

component/script/python/greptime/mock.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
"""
55
from typing import Any
66
import numpy as np
7-
from .greptime import i32,i64,f32,f64, vector, interval, query, prev, datetime, log, sum, sqrt, pow, nan, copr, coprocessor
7+
from .greptime import i32,i64,f32,f64, vector, interval, prev, datetime, log, sum, sqrt, pow, nan, copr, coprocessor
88

99
import inspect
1010
import functools

component/script/python/test.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,16 @@ def get_db(req:str):
2626
return requests.get("http://{}{}".format(get_conn_addr(), req))
2727

2828
if __name__ == "__main__":
29+
with open("component/script/python/example/kline.json", "r") as kline_file:
30+
kline = json.load(kline_file)
31+
table = as_table(kline["result"])
32+
close = table["close"]
33+
open_time = table["open_time"]
34+
env = {"close":close, "open_time": open_time}
35+
36+
res = mock_tester(calc_rvs, env=env)
37+
print("Mock result:", [i[0] for i in res])
38+
exit()
2939
if len(sys.argv)!=2:
3040
raise Exception("Expect only one address as cmd's args")
3141
set_conn_addr(sys.argv[1])
@@ -42,11 +52,6 @@ def get_db(req:str):
4252
open_time = table["open_time"]
4353
init_table(close, open_time)
4454

45-
# print(repr(close), repr(open_time))
46-
# print("calc_rv:", calc_rv(close, open_time, open_time[-1]+datetime("10m"), datetime("7d")))
47-
env = {"close":close, "open_time": open_time}
48-
# print("env:", env)
49-
print("Mock result:", mock_tester(calc_rvs, env=env))
5055
real = calc_rvs()
5156
print(real)
5257
try:

src/common/function/Cargo.toml

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,6 @@ num = "0.4"
2020
num-traits = "0.2"
2121
once_cell = "1.10"
2222
paste = "1.0"
23-
rustpython-ast = {git = "https://github.com/RustPython/RustPython", optional = true, rev = "02a1d1d"}
24-
rustpython-bytecode = {git = "https://github.com/RustPython/RustPython", optional = true, rev = "02a1d1d"}
25-
rustpython-compiler = {git = "https://github.com/RustPython/RustPython", optional = true, rev = "02a1d1d"}
26-
rustpython-compiler-core = {git = "https://github.com/RustPython/RustPython", optional = true, rev = "02a1d1d"}
27-
rustpython-parser = {git = "https://github.com/RustPython/RustPython", optional = true, rev = "02a1d1d"}
28-
rustpython-vm = {git = "https://github.com/RustPython/RustPython", optional = true, rev = "02a1d1d"}
2923
snafu = { version = "0.7", features = ["backtraces"] }
3024
statrs = "0.15"
3125

src/script/Cargo.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ python = [
1515
"dep:rustpython-compiler-core",
1616
"dep:rustpython-bytecode",
1717
"dep:rustpython-ast",
18+
"dep:paste"
1819
]
1920

2021
[dependencies]
@@ -23,6 +24,7 @@ common-error = {path = "../common/error"}
2324
common-function = { path = "../common/function" }
2425
common-query = {path = "../common/query"}
2526
common-recordbatch = {path = "../common/recordbatch" }
27+
common-telemetry = { path = "../common/telemetry" }
2628
console = "0.15"
2729
datafusion = {git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2", optional = true}
2830
datafusion-common = {git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2"}
@@ -38,8 +40,11 @@ rustpython-compiler = {git = "https://github.com/RustPython/RustPython", optiona
3840
rustpython-compiler-core = {git = "https://github.com/RustPython/RustPython", optional = true, rev = "02a1d1d"}
3941
rustpython-parser = {git = "https://github.com/RustPython/RustPython", optional = true, rev = "02a1d1d"}
4042
rustpython-vm = {git = "https://github.com/RustPython/RustPython", optional = true, rev = "02a1d1d"}
43+
paste = { version = "1.0", optional = true}
4144
snafu = {version = "0.7", features = ["backtraces"]}
4245
sql = { path = "../sql" }
46+
tokio = { version = "1.0", features = ["full"] }
47+
4348

4449
[dev-dependencies]
4550
catalog = { path = "../catalog" }

src/script/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
#![feature(iterator_try_reduce)]
12
pub mod engine;
23
#[cfg(feature = "python")]
34
pub mod python;

0 commit comments

Comments
 (0)