diff --git a/.github/workflows/integration.yaml b/.github/workflows/integration.yaml index 8f60efe6c7..11b08c934e 100644 --- a/.github/workflows/integration.yaml +++ b/.github/workflows/integration.yaml @@ -23,89 +23,91 @@ concurrency: permissions: contents: read # to fetch code (actions/checkout) -jobs: +env: + REDIS_STACK_IMAGE: redis/redis-stack-server:7.4.0-rc1 - dependency-audit: - name: Dependency audit - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v4 - - uses: pypa/gh-action-pip-audit@v1.0.8 - with: - inputs: requirements.txt dev_requirements.txt - ignore-vulns: | - GHSA-w596-4wvx-j9j6 # subversion related git pull, dependency for pytest. There is no impact here. +jobs: + dependency-audit: + name: Dependency audit + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: pypa/gh-action-pip-audit@v1.0.8 + with: + inputs: requirements.txt dev_requirements.txt + ignore-vulns: | + GHSA-w596-4wvx-j9j6 # subversion related git pull, dependency for pytest. There is no impact here. - lint: - name: Code linters - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v4 - - uses: actions/setup-python@v5 - with: - python-version: 3.9 - cache: 'pip' - - name: run code linters - run: | - pip install -r dev_requirements.txt - invoke linters + lint: + name: Code linters + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-python@v5 + with: + python-version: 3.9 + cache: 'pip' + - name: run code linters + run: | + pip install -r dev_requirements.txt + invoke linters - run-tests: - runs-on: ubuntu-latest - timeout-minutes: 60 - strategy: - max-parallel: 15 - fail-fast: false - matrix: - python-version: ['3.8', '3.9', '3.10', '3.11', 'pypy-3.8', 'pypy-3.9'] - test-type: ['standalone', 'cluster'] - connection-type: ['hiredis', 'plain'] - env: - ACTIONS_ALLOW_UNSECURE_COMMANDS: true - name: Python ${{ matrix.python-version }} ${{matrix.test-type}}-${{matrix.connection-type}} tests - steps: - - uses: actions/checkout@v4 - - uses: actions/setup-python@v5 - with: - python-version: ${{ matrix.python-version }} - cache: 'pip' - - name: run tests - run: | - pip install -U setuptools wheel - pip install -r requirements.txt - pip install -r dev_requirements.txt - if [ "${{matrix.connection-type}}" == "hiredis" ]; then - pip install hiredis - fi - invoke devenv - sleep 10 # time to settle - invoke ${{matrix.test-type}}-tests + run-tests: + runs-on: ubuntu-latest + timeout-minutes: 60 + strategy: + max-parallel: 15 + fail-fast: false + matrix: + python-version: ['3.8', '3.9', '3.10', '3.11', 'pypy-3.8', 'pypy-3.9'] + test-type: ['standalone', 'cluster'] + connection-type: ['hiredis', 'plain'] + env: + ACTIONS_ALLOW_UNSECURE_COMMANDS: true + name: Python ${{ matrix.python-version }} ${{matrix.test-type}}-${{matrix.connection-type}} tests + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + cache: 'pip' + - name: run tests + run: | + pip install -U setuptools wheel + pip install -r requirements.txt + pip install -r dev_requirements.txt + if [ "${{matrix.connection-type}}" == "hiredis" ]; then + pip install hiredis + fi + invoke devenv + sleep 10 # time to settle + invoke ${{matrix.test-type}}-tests - - uses: actions/upload-artifact@v4 - if: success() || failure() - with: - name: pytest-results-${{matrix.test-type}}-${{matrix.connection-type}}-${{matrix.python-version}} - path: '${{matrix.test-type}}*results.xml' + - uses: actions/upload-artifact@v4 + if: success() || failure() + with: + name: pytest-results-${{matrix.test-type}}-${{matrix.connection-type}}-${{matrix.python-version}} + path: '${{matrix.test-type}}*results.xml' - - name: Upload codecov coverage - uses: codecov/codecov-action@v4 - with: - fail_ci_if_error: false + - name: Upload codecov coverage + uses: codecov/codecov-action@v4 + with: + fail_ci_if_error: false - - name: View Test Results - uses: dorny/test-reporter@v1 - if: success() || failure() - continue-on-error: true - with: - name: Test Results ${{matrix.python-version}} ${{matrix.test-type}}-${{matrix.connection-type}} - path: '*.xml' - reporter: java-junit - list-suites: all - list-tests: all - max-annotations: 10 - fail-on-error: 'false' + - name: View Test Results + uses: dorny/test-reporter@v1 + if: success() || failure() + continue-on-error: true + with: + name: Test Results ${{matrix.python-version}} ${{matrix.test-type}}-${{matrix.connection-type}} + path: '*.xml' + reporter: java-junit + list-suites: all + list-tests: all + max-annotations: 10 + fail-on-error: 'false' - resp3_tests: + resp3_tests: runs-on: ubuntu-latest strategy: fail-fast: false @@ -115,7 +117,7 @@ jobs: connection-type: ['hiredis', 'plain'] protocol: ['3'] env: - ACTIONS_ALLOW_UNSECURE_COMMANDS: true + ACTIONS_ALLOW_UNSECURE_COMMANDS: true name: RESP3 [${{ matrix.python-version }} ${{matrix.test-type}}-${{matrix.connection-type}}] steps: - uses: actions/checkout@v4 @@ -136,7 +138,7 @@ jobs: invoke ${{matrix.test-type}}-tests invoke ${{matrix.test-type}}-tests --uvloop - build_and_test_package: + build_and_test_package: name: Validate building and installing the package runs-on: ubuntu-latest needs: [run-tests] @@ -153,7 +155,7 @@ jobs: run: | bash .github/workflows/install_and_test.sh ${{ matrix.extension }} - install_package_from_commit: + install_package_from_commit: name: Install package from commit hash runs-on: ubuntu-latest strategy: diff --git a/docker-compose.yml b/docker-compose.yml index 09418ed094..72c43c2252 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -105,7 +105,7 @@ services: - all redis-stack: - image: redis/redis-stack-server:edge + image: ${REDIS_STACK_IMAGE:-redis/redis-stack-server:edge} container_name: redis-stack ports: - 6479:6379 diff --git a/redis/commands/timeseries/commands.py b/redis/commands/timeseries/commands.py index 208ddfb09f..f8dfe8b5c0 100644 --- a/redis/commands/timeseries/commands.py +++ b/redis/commands/timeseries/commands.py @@ -33,44 +33,67 @@ def create( labels: Optional[Dict[str, str]] = None, chunk_size: Optional[int] = None, duplicate_policy: Optional[str] = None, + ignore_max_time_diff: Optional[int] = None, + ignore_max_val_diff: Optional[Number] = None, ): """ Create a new time-series. - Args: + For more information see https://redis.io/commands/ts.create/ - key: - time-series key - retention_msecs: - Maximum age for samples compared to highest reported timestamp (in milliseconds). - If None or 0 is passed then the series is not trimmed at all. - uncompressed: - Changes data storage from compressed (by default) to uncompressed - labels: - Set of label-value pairs that represent metadata labels of the key. - chunk_size: - Memory size, in bytes, allocated for each data chunk. - Must be a multiple of 8 in the range [128 .. 1048576]. - duplicate_policy: - Policy for handling multiple samples with identical timestamps. - Can be one of: - - 'block': an error will occur for any out of order sample. - - 'first': ignore the new value. - - 'last': override with latest value. - - 'min': only override if the value is lower than the existing value. - - 'max': only override if the value is higher than the existing value. - - 'sum': If a previous sample exists, add the new sample to it so that \ - the updated value is equal to (previous + new). If no previous sample \ - exists, set the updated value equal to the new value. - - For more information: https://redis.io/commands/ts.create/ - """ # noqa + Args: + key: + The time-series key. + retention_msecs: + Maximum age for samples, compared to the highest reported timestamp in + milliseconds. If `None` or `0` is passed, the series is not trimmed at + all. + uncompressed: + Changes data storage from compressed (default) to uncompressed. + labels: + A dictionary of label-value pairs that represent metadata labels of the + key. + chunk_size: + Memory size, in bytes, allocated for each data chunk. Must be a multiple + of 8 in the range `[48..1048576]`. In earlier versions of the module the + minimum value was different. + duplicate_policy: + Policy for handling multiple samples with identical timestamps. Can be + one of: + - 'block': An error will occur and the new value will be ignored. + - 'first': Ignore the new value. + - 'last': Override with the latest value. + - 'min': Only override if the value is lower than the existing + value. + - 'max': Only override if the value is higher than the existing + value. + - 'sum': If a previous sample exists, add the new sample to it so + that the updated value is equal to (previous + new). If no + previous sample exists, set the updated value equal to the new + value. + ignore_max_time_diff: + A non-negative integer value, in milliseconds, that sets an ignore + threshold for added timestamps. If the difference between the last + timestamp and the new timestamp is lower than this threshold, the new + entry is ignored. Only applicable if `duplicate_policy` is set to + `last`, and if `ignore_max_val_diff` is also set. Available since + RedisTimeSeries version 1.12.0. + ignore_max_val_diff: + A non-negative floating point value, that sets an ignore threshold for + added values. If the difference between the last value and the new value + is lower than this threshold, the new entry is ignored. Only applicable + if `duplicate_policy` is set to `last`, and if `ignore_max_time_diff` is + also set. Available since RedisTimeSeries version 1.12.0. + """ params = [key] self._append_retention(params, retention_msecs) self._append_uncompressed(params, uncompressed) self._append_chunk_size(params, chunk_size) - self._append_duplicate_policy(params, CREATE_CMD, duplicate_policy) + self._append_duplicate_policy(params, duplicate_policy) self._append_labels(params, labels) + self._append_insertion_filters( + params, ignore_max_time_diff, ignore_max_val_diff + ) return self.execute_command(CREATE_CMD, *params) @@ -81,42 +104,65 @@ def alter( labels: Optional[Dict[str, str]] = None, chunk_size: Optional[int] = None, duplicate_policy: Optional[str] = None, + ignore_max_time_diff: Optional[int] = None, + ignore_max_val_diff: Optional[Number] = None, ): """ - Update the retention, chunk size, duplicate policy, and labels of an existing - time series. + Update an existing time series. - Args: + For more information see https://redis.io/commands/ts.alter/ - key: - time-series key - retention_msecs: - Maximum retention period, compared to maximal existing timestamp (in milliseconds). - If None or 0 is passed then the series is not trimmed at all. - labels: - Set of label-value pairs that represent metadata labels of the key. - chunk_size: - Memory size, in bytes, allocated for each data chunk. - Must be a multiple of 8 in the range [128 .. 1048576]. - duplicate_policy: - Policy for handling multiple samples with identical timestamps. - Can be one of: - - 'block': an error will occur for any out of order sample. - - 'first': ignore the new value. - - 'last': override with latest value. - - 'min': only override if the value is lower than the existing value. - - 'max': only override if the value is higher than the existing value. - - 'sum': If a previous sample exists, add the new sample to it so that \ - the updated value is equal to (previous + new). If no previous sample \ - exists, set the updated value equal to the new value. - - For more information: https://redis.io/commands/ts.alter/ - """ # noqa + Args: + key: + The time-series key. + retention_msecs: + Maximum age for samples, compared to the highest reported timestamp in + milliseconds. If `None` or `0` is passed, the series is not trimmed at + all. + labels: + A dictionary of label-value pairs that represent metadata labels of the + key. + chunk_size: + Memory size, in bytes, allocated for each data chunk. Must be a multiple + of 8 in the range `[48..1048576]`. In earlier versions of the module the + minimum value was different. Changing this value does not affect + existing chunks. + duplicate_policy: + Policy for handling multiple samples with identical timestamps. Can be + one of: + - 'block': An error will occur and the new value will be ignored. + - 'first': Ignore the new value. + - 'last': Override with the latest value. + - 'min': Only override if the value is lower than the existing + value. + - 'max': Only override if the value is higher than the existing + value. + - 'sum': If a previous sample exists, add the new sample to it so + that the updated value is equal to (previous + new). If no + previous sample exists, set the updated value equal to the new + value. + ignore_max_time_diff: + A non-negative integer value, in milliseconds, that sets an ignore + threshold for added timestamps. If the difference between the last + timestamp and the new timestamp is lower than this threshold, the new + entry is ignored. Only applicable if `duplicate_policy` is set to + `last`, and if `ignore_max_val_diff` is also set. Available since + RedisTimeSeries version 1.12.0. + ignore_max_val_diff: + A non-negative floating point value, that sets an ignore threshold for + added values. If the difference between the last value and the new value + is lower than this threshold, the new entry is ignored. Only applicable + if `duplicate_policy` is set to `last`, and if `ignore_max_time_diff` is + also set. Available since RedisTimeSeries version 1.12.0. + """ params = [key] self._append_retention(params, retention_msecs) self._append_chunk_size(params, chunk_size) - self._append_duplicate_policy(params, ALTER_CMD, duplicate_policy) + self._append_duplicate_policy(params, duplicate_policy) self._append_labels(params, labels) + self._append_insertion_filters( + params, ignore_max_time_diff, ignore_max_val_diff + ) return self.execute_command(ALTER_CMD, *params) @@ -130,60 +176,104 @@ def add( labels: Optional[Dict[str, str]] = None, chunk_size: Optional[int] = None, duplicate_policy: Optional[str] = None, + ignore_max_time_diff: Optional[int] = None, + ignore_max_val_diff: Optional[Number] = None, + on_duplicate: Optional[str] = None, ): """ - Append (or create and append) a new sample to a time series. + Append a sample to a time series. When the specified key does not exist, a new + time series is created. - Args: + For more information see https://redis.io/commands/ts.add/ - key: - time-series key - timestamp: - Timestamp of the sample. * can be used for automatic timestamp (using the system clock). - value: - Numeric data value of the sample - retention_msecs: - Maximum retention period, compared to maximal existing timestamp (in milliseconds). - If None or 0 is passed then the series is not trimmed at all. - uncompressed: - Changes data storage from compressed (by default) to uncompressed - labels: - Set of label-value pairs that represent metadata labels of the key. - chunk_size: - Memory size, in bytes, allocated for each data chunk. - Must be a multiple of 8 in the range [128 .. 1048576]. - duplicate_policy: - Policy for handling multiple samples with identical timestamps. - Can be one of: - - 'block': an error will occur for any out of order sample. - - 'first': ignore the new value. - - 'last': override with latest value. - - 'min': only override if the value is lower than the existing value. - - 'max': only override if the value is higher than the existing value. - - 'sum': If a previous sample exists, add the new sample to it so that \ - the updated value is equal to (previous + new). If no previous sample \ - exists, set the updated value equal to the new value. - - For more information: https://redis.io/commands/ts.add/ - """ # noqa + Args: + key: + The time-series key. + timestamp: + Timestamp of the sample. `*` can be used for automatic timestamp (using + the system clock). + value: + Numeric data value of the sample. + retention_msecs: + Maximum age for samples, compared to the highest reported timestamp in + milliseconds. If `None` or `0` is passed, the series is not trimmed at + all. + uncompressed: + Changes data storage from compressed (default) to uncompressed. + labels: + A dictionary of label-value pairs that represent metadata labels of the + key. + chunk_size: + Memory size, in bytes, allocated for each data chunk. Must be a multiple + of 8 in the range `[48..1048576]`. In earlier versions of the module the + minimum value was different. + duplicate_policy: + Policy for handling multiple samples with identical timestamps. Can be + one of: + - 'block': An error will occur and the new value will be ignored. + - 'first': Ignore the new value. + - 'last': Override with the latest value. + - 'min': Only override if the value is lower than the existing + value. + - 'max': Only override if the value is higher than the existing + value. + - 'sum': If a previous sample exists, add the new sample to it so + that the updated value is equal to (previous + new). If no + previous sample exists, set the updated value equal to the new + value. + ignore_max_time_diff: + A non-negative integer value, in milliseconds, that sets an ignore + threshold for added timestamps. If the difference between the last + timestamp and the new timestamp is lower than this threshold, the new + entry is ignored. Only applicable if `duplicate_policy` is set to + `last`, and if `ignore_max_val_diff` is also set. Available since + RedisTimeSeries version 1.12.0. + ignore_max_val_diff: + A non-negative floating point value, that sets an ignore threshold for + added values. If the difference between the last value and the new value + is lower than this threshold, the new entry is ignored. Only applicable + if `duplicate_policy` is set to `last`, and if `ignore_max_time_diff` is + also set. Available since RedisTimeSeries version 1.12.0. + on_duplicate: + Use a specific duplicate policy for the specified timestamp. Overrides + the duplicate policy set by `duplicate_policy`. + """ params = [key, timestamp, value] self._append_retention(params, retention_msecs) self._append_uncompressed(params, uncompressed) self._append_chunk_size(params, chunk_size) - self._append_duplicate_policy(params, ADD_CMD, duplicate_policy) + self._append_duplicate_policy(params, duplicate_policy) self._append_labels(params, labels) + self._append_insertion_filters( + params, ignore_max_time_diff, ignore_max_val_diff + ) + self._append_on_duplicate(params, on_duplicate) return self.execute_command(ADD_CMD, *params) def madd(self, ktv_tuples: List[Tuple[KeyT, Union[int, str], Number]]): """ - Append (or create and append) a new `value` to series - `key` with `timestamp`. - Expects a list of `tuples` as (`key`,`timestamp`, `value`). - Return value is an array with timestamps of insertions. + Append new samples to one or more time series. + + Each time series must already exist. + + The method expects a list of tuples. Each tuple should contain three elements: + (`key`, `timestamp`, `value`). The `value` will be appended to the time series + identified by 'key', at the given 'timestamp'. - For more information: https://redis.io/commands/ts.madd/ - """ # noqa + For more information see https://redis.io/commands/ts.madd/ + + Args: + ktv_tuples: + A list of tuples, where each tuple contains: + - `key`: The key of the time series. + - `timestamp`: The timestamp at which the value should be appended. + - `value`: The value to append to the time series. + + Returns: + A list that contains, for each sample, either the timestamp that was used, + or an error, if the sample could not be added. + """ params = [] for ktv in ktv_tuples: params.extend(ktv) @@ -199,37 +289,86 @@ def incrby( uncompressed: Optional[bool] = False, labels: Optional[Dict[str, str]] = None, chunk_size: Optional[int] = None, + duplicate_policy: Optional[str] = None, + ignore_max_time_diff: Optional[int] = None, + ignore_max_val_diff: Optional[Number] = None, ): """ - Increment (or create an time-series and increment) the latest sample's of a series. - This command can be used as a counter or gauge that automatically gets history as a time series. + Increment the latest sample's of a series. When the specified key does not + exist, a new time series is created. - Args: + This command can be used as a counter or gauge that automatically gets history + as a time series. + + For more information see https://redis.io/commands/ts.incrby/ - key: - time-series key - value: - Numeric data value of the sample - timestamp: - Timestamp of the sample. * can be used for automatic timestamp (using the system clock). - retention_msecs: - Maximum age for samples compared to last event time (in milliseconds). - If None or 0 is passed then the series is not trimmed at all. - uncompressed: - Changes data storage from compressed (by default) to uncompressed - labels: - Set of label-value pairs that represent metadata labels of the key. - chunk_size: - Memory size, in bytes, allocated for each data chunk. - - For more information: https://redis.io/commands/ts.incrby/ - """ # noqa + Args: + key: + The time-series key. + value: + Numeric value to be added (addend). + timestamp: + Timestamp of the sample. `*` can be used for automatic timestamp (using + the system clock). `timestamp` must be equal to or higher than the + maximum existing timestamp in the series. When equal, the value of the + sample with the maximum existing timestamp is increased. If it is + higher, a new sample with a timestamp set to `timestamp` is created, and + its value is set to the value of the sample with the maximum existing + timestamp plus the addend. + retention_msecs: + Maximum age for samples, compared to the highest reported timestamp in + milliseconds. If `None` or `0` is passed, the series is not trimmed at + all. + uncompressed: + Changes data storage from compressed (default) to uncompressed. + labels: + A dictionary of label-value pairs that represent metadata labels of the + key. + chunk_size: + Memory size, in bytes, allocated for each data chunk. Must be a multiple + of 8 in the range `[48..1048576]`. In earlier versions of the module the + minimum value was different. + duplicate_policy: + Policy for handling multiple samples with identical timestamps. Can be + one of: + - 'block': An error will occur and the new value will be ignored. + - 'first': Ignore the new value. + - 'last': Override with the latest value. + - 'min': Only override if the value is lower than the existing + value. + - 'max': Only override if the value is higher than the existing + value. + - 'sum': If a previous sample exists, add the new sample to it so + that the updated value is equal to (previous + new). If no + previous sample exists, set the updated value equal to the new + value. + ignore_max_time_diff: + A non-negative integer value, in milliseconds, that sets an ignore + threshold for added timestamps. If the difference between the last + timestamp and the new timestamp is lower than this threshold, the new + entry is ignored. Only applicable if `duplicate_policy` is set to + `last`, and if `ignore_max_val_diff` is also set. Available since + RedisTimeSeries version 1.12.0. + ignore_max_val_diff: + A non-negative floating point value, that sets an ignore threshold for + added values. If the difference between the last value and the new value + is lower than this threshold, the new entry is ignored. Only applicable + if `duplicate_policy` is set to `last`, and if `ignore_max_time_diff` is + also set. Available since RedisTimeSeries version 1.12.0. + + Returns: + The timestamp of the sample that was modified or added. + """ params = [key, value] self._append_timestamp(params, timestamp) self._append_retention(params, retention_msecs) self._append_uncompressed(params, uncompressed) self._append_chunk_size(params, chunk_size) + self._append_duplicate_policy(params, duplicate_policy) self._append_labels(params, labels) + self._append_insertion_filters( + params, ignore_max_time_diff, ignore_max_val_diff + ) return self.execute_command(INCRBY_CMD, *params) @@ -242,37 +381,86 @@ def decrby( uncompressed: Optional[bool] = False, labels: Optional[Dict[str, str]] = None, chunk_size: Optional[int] = None, + duplicate_policy: Optional[str] = None, + ignore_max_time_diff: Optional[int] = None, + ignore_max_val_diff: Optional[Number] = None, ): """ - Decrement (or create an time-series and decrement) the latest sample's of a series. - This command can be used as a counter or gauge that automatically gets history as a time series. + Decrement the latest sample's of a series. When the specified key does not + exist, a new time series is created. - Args: + This command can be used as a counter or gauge that automatically gets history + as a time series. - key: - time-series key - value: - Numeric data value of the sample - timestamp: - Timestamp of the sample. * can be used for automatic timestamp (using the system clock). - retention_msecs: - Maximum age for samples compared to last event time (in milliseconds). - If None or 0 is passed then the series is not trimmed at all. - uncompressed: - Changes data storage from compressed (by default) to uncompressed - labels: - Set of label-value pairs that represent metadata labels of the key. - chunk_size: - Memory size, in bytes, allocated for each data chunk. - - For more information: https://redis.io/commands/ts.decrby/ - """ # noqa + For more information see https://redis.io/commands/ts.decrby/ + + Args: + key: + The time-series key. + value: + Numeric value to subtract (subtrahend). + timestamp: + Timestamp of the sample. `*` can be used for automatic timestamp (using + the system clock). `timestamp` must be equal to or higher than the + maximum existing timestamp in the series. When equal, the value of the + sample with the maximum existing timestamp is decreased. If it is + higher, a new sample with a timestamp set to `timestamp` is created, and + its value is set to the value of the sample with the maximum existing + timestamp minus subtrahend. + retention_msecs: + Maximum age for samples, compared to the highest reported timestamp in + milliseconds. If `None` or `0` is passed, the series is not trimmed at + all. + uncompressed: + Changes data storage from compressed (default) to uncompressed. + labels: + A dictionary of label-value pairs that represent metadata labels of the + key. + chunk_size: + Memory size, in bytes, allocated for each data chunk. Must be a multiple + of 8 in the range `[48..1048576]`. In earlier versions of the module the + minimum value was different. + duplicate_policy: + Policy for handling multiple samples with identical timestamps. Can be + one of: + - 'block': An error will occur and the new value will be ignored. + - 'first': Ignore the new value. + - 'last': Override with the latest value. + - 'min': Only override if the value is lower than the existing + value. + - 'max': Only override if the value is higher than the existing + value. + - 'sum': If a previous sample exists, add the new sample to it so + that the updated value is equal to (previous + new). If no + previous sample exists, set the updated value equal to the new + value. + ignore_max_time_diff: + A non-negative integer value, in milliseconds, that sets an ignore + threshold for added timestamps. If the difference between the last + timestamp and the new timestamp is lower than this threshold, the new + entry is ignored. Only applicable if `duplicate_policy` is set to + `last`, and if `ignore_max_val_diff` is also set. Available since + RedisTimeSeries version 1.12.0. + ignore_max_val_diff: + A non-negative floating point value, that sets an ignore threshold for + added values. If the difference between the last value and the new value + is lower than this threshold, the new entry is ignored. Only applicable + if `duplicate_policy` is set to `last`, and if `ignore_max_time_diff` is + also set. Available since RedisTimeSeries version 1.12.0. + + Returns: + The timestamp of the sample that was modified or added. + """ params = [key, value] self._append_timestamp(params, timestamp) self._append_retention(params, retention_msecs) self._append_uncompressed(params, uncompressed) self._append_chunk_size(params, chunk_size) + self._append_duplicate_policy(params, duplicate_policy) self._append_labels(params, labels) + self._append_insertion_filters( + params, ignore_max_time_diff, ignore_max_val_diff + ) return self.execute_command(DECRBY_CMD, *params) @@ -280,17 +468,22 @@ def delete(self, key: KeyT, from_time: int, to_time: int): """ Delete all samples between two timestamps for a given time series. - Args: + The given timestamp interval is closed (inclusive), meaning that samples whose + timestamp equals `from_time` or `to_time` are also deleted. - key: - time-series key. - from_time: - Start timestamp for the range deletion. - to_time: - End timestamp for the range deletion. + For more information see https://redis.io/commands/ts.del/ - For more information: https://redis.io/commands/ts.del/ - """ # noqa + Args: + key: + The time-series key. + from_time: + Start timestamp for the range deletion. + to_time: + End timestamp for the range deletion. + + Returns: + The number of samples deleted. + """ return self.execute_command(DEL_CMD, key, from_time, to_time) def createrule( @@ -304,24 +497,23 @@ def createrule( """ Create a compaction rule from values added to `source_key` into `dest_key`. - Args: + For more information see https://redis.io/commands/ts.createrule/ - source_key: - Key name for source time series - dest_key: - Key name for destination (compacted) time series - aggregation_type: - Aggregation type: One of the following: - [`avg`, `sum`, `min`, `max`, `range`, `count`, `first`, `last`, `std.p`, - `std.s`, `var.p`, `var.s`, `twa`] - bucket_size_msec: - Duration of each bucket, in milliseconds - align_timestamp: - Assure that there is a bucket that starts at exactly align_timestamp and - align all other buckets accordingly. - - For more information: https://redis.io/commands/ts.createrule/ - """ # noqa + Args: + source_key: + Key name for source time series. + dest_key: + Key name for destination (compacted) time series. + aggregation_type: + Aggregation type: One of the following: + [`avg`, `sum`, `min`, `max`, `range`, `count`, `first`, `last`, `std.p`, + `std.s`, `var.p`, `var.s`, `twa`] + bucket_size_msec: + Duration of each bucket, in milliseconds. + align_timestamp: + Assure that there is a bucket that starts at exactly align_timestamp and + align all other buckets accordingly. + """ params = [source_key, dest_key] self._append_aggregation(params, aggregation_type, bucket_size_msec) if align_timestamp is not None: @@ -331,10 +523,10 @@ def createrule( def deleterule(self, source_key: KeyT, dest_key: KeyT): """ - Delete a compaction rule from `source_key` to `dest_key`.. + Delete a compaction rule from `source_key` to `dest_key`. - For more information: https://redis.io/commands/ts.deleterule/ - """ # noqa + For more information see https://redis.io/commands/ts.deleterule/ + """ return self.execute_command(DELETERULE_CMD, source_key, dest_key) def __range_params( @@ -383,42 +575,46 @@ def range( empty: Optional[bool] = False, ): """ - Query a range in forward direction for a specific time-serie. + Query a range in forward direction for a specific time-series. - Args: + For more information see https://redis.io/commands/ts.range/ - key: - Key name for timeseries. - from_time: - Start timestamp for the range query. - can be used to express the minimum possible timestamp (0). - to_time: - End timestamp for range query, + can be used to express the maximum possible timestamp. - count: - Limits the number of returned samples. - aggregation_type: - Optional aggregation type. Can be one of [`avg`, `sum`, `min`, `max`, - `range`, `count`, `first`, `last`, `std.p`, `std.s`, `var.p`, `var.s`, `twa`] - bucket_size_msec: - Time bucket for aggregation in milliseconds. - filter_by_ts: - List of timestamps to filter the result by specific timestamps. - filter_by_min_value: - Filter result by minimum value (must mention also filter by_max_value). - filter_by_max_value: - Filter result by maximum value (must mention also filter by_min_value). - align: - Timestamp for alignment control for aggregation. - latest: - Used when a time series is a compaction, reports the compacted value of the - latest possibly partial bucket - bucket_timestamp: - Controls how bucket timestamps are reported. Can be one of [`-`, `low`, `+`, - `high`, `~`, `mid`]. - empty: - Reports aggregations for empty buckets. - - For more information: https://redis.io/commands/ts.range/ - """ # noqa + Args: + key: + Key name for timeseries. + from_time: + Start timestamp for the range query. `-` can be used to express the + minimum possible timestamp (0). + to_time: + End timestamp for range query, `+` can be used to express the maximum + possible timestamp. + count: + Limits the number of returned samples. + aggregation_type: + Optional aggregation type. Can be one of [`avg`, `sum`, `min`, `max`, + `range`, `count`, `first`, `last`, `std.p`, `std.s`, `var.p`, `var.s`, + `twa`] + bucket_size_msec: + Time bucket for aggregation in milliseconds. + filter_by_ts: + List of timestamps to filter the result by specific timestamps. + filter_by_min_value: + Filter result by minimum value (must mention also + `filter by_max_value`). + filter_by_max_value: + Filter result by maximum value (must mention also + `filter by_min_value`). + align: + Timestamp for alignment control for aggregation. + latest: + Used when a time series is a compaction, reports the compacted value of + the latest possibly partial bucket. + bucket_timestamp: + Controls how bucket timestamps are reported. Can be one of [`-`, `low`, + `+`, `high`, `~`, `mid`]. + empty: + Reports aggregations for empty buckets. + """ params = self.__range_params( key, from_time, @@ -457,40 +653,44 @@ def revrange( **Note**: This command is only available since RedisTimeSeries >= v1.4 - Args: + For more information see https://redis.io/commands/ts.revrange/ - key: - Key name for timeseries. - from_time: - Start timestamp for the range query. - can be used to express the minimum possible timestamp (0). - to_time: - End timestamp for range query, + can be used to express the maximum possible timestamp. - count: - Limits the number of returned samples. - aggregation_type: - Optional aggregation type. Can be one of [`avg`, `sum`, `min`, `max`, - `range`, `count`, `first`, `last`, `std.p`, `std.s`, `var.p`, `var.s`, `twa`] - bucket_size_msec: - Time bucket for aggregation in milliseconds. - filter_by_ts: - List of timestamps to filter the result by specific timestamps. - filter_by_min_value: - Filter result by minimum value (must mention also filter_by_max_value). - filter_by_max_value: - Filter result by maximum value (must mention also filter_by_min_value). - align: - Timestamp for alignment control for aggregation. - latest: - Used when a time series is a compaction, reports the compacted value of the - latest possibly partial bucket - bucket_timestamp: - Controls how bucket timestamps are reported. Can be one of [`-`, `low`, `+`, - `high`, `~`, `mid`]. - empty: - Reports aggregations for empty buckets. - - For more information: https://redis.io/commands/ts.revrange/ - """ # noqa + Args: + key: + Key name for timeseries. + from_time: + Start timestamp for the range query. `-` can be used to express the + minimum possible timestamp (0). + to_time: + End timestamp for range query, `+` can be used to express the maximum + possible timestamp. + count: + Limits the number of returned samples. + aggregation_type: + Optional aggregation type. Can be one of [`avg`, `sum`, `min`, `max`, + `range`, `count`, `first`, `last`, `std.p`, `std.s`, `var.p`, `var.s`, + `twa`] + bucket_size_msec: + Time bucket for aggregation in milliseconds. + filter_by_ts: + List of timestamps to filter the result by specific timestamps. + filter_by_min_value: + Filter result by minimum value (must mention also + `filter_by_max_value`). + filter_by_max_value: + Filter result by maximum value (must mention also + `filter_by_min_value`). + align: + Timestamp for alignment control for aggregation. + latest: + Used when a time series is a compaction, reports the compacted value of + the latest possibly partial bucket. + bucket_timestamp: + Controls how bucket timestamps are reported. Can be one of [`-`, `low`, + `+`, `high`, `~`, `mid`]. + empty: + Reports aggregations for empty buckets. + """ params = self.__range_params( key, from_time, @@ -567,49 +767,55 @@ def mrange( """ Query a range across multiple time-series by filters in forward direction. - Args: + For more information see https://redis.io/commands/ts.mrange/ - from_time: - Start timestamp for the range query. `-` can be used to express the minimum possible timestamp (0). - to_time: - End timestamp for range query, `+` can be used to express the maximum possible timestamp. - filters: - filter to match the time-series labels. - count: - Limits the number of returned samples. - aggregation_type: - Optional aggregation type. Can be one of [`avg`, `sum`, `min`, `max`, - `range`, `count`, `first`, `last`, `std.p`, `std.s`, `var.p`, `var.s`, `twa`] - bucket_size_msec: - Time bucket for aggregation in milliseconds. - with_labels: - Include in the reply all label-value pairs representing metadata labels of the time series. - filter_by_ts: - List of timestamps to filter the result by specific timestamps. - filter_by_min_value: - Filter result by minimum value (must mention also filter_by_max_value). - filter_by_max_value: - Filter result by maximum value (must mention also filter_by_min_value). - groupby: - Grouping by fields the results (must mention also reduce). - reduce: - Applying reducer functions on each group. Can be one of [`avg` `sum`, `min`, - `max`, `range`, `count`, `std.p`, `std.s`, `var.p`, `var.s`]. - select_labels: - Include in the reply only a subset of the key-value pair labels of a series. - align: - Timestamp for alignment control for aggregation. - latest: - Used when a time series is a compaction, reports the compacted - value of the latest possibly partial bucket - bucket_timestamp: - Controls how bucket timestamps are reported. Can be one of [`-`, `low`, `+`, - `high`, `~`, `mid`]. - empty: - Reports aggregations for empty buckets. - - For more information: https://redis.io/commands/ts.mrange/ - """ # noqa + Args: + from_time: + Start timestamp for the range query. `-` can be used to express the + minimum possible timestamp (0). + to_time: + End timestamp for range query, `+` can be used to express the maximum + possible timestamp. + filters: + Filter to match the time-series labels. + count: + Limits the number of returned samples. + aggregation_type: + Optional aggregation type. Can be one of [`avg`, `sum`, `min`, `max`, + `range`, `count`, `first`, `last`, `std.p`, `std.s`, `var.p`, `var.s`, + `twa`] + bucket_size_msec: + Time bucket for aggregation in milliseconds. + with_labels: + Include in the reply all label-value pairs representing metadata labels + of the time series. + filter_by_ts: + List of timestamps to filter the result by specific timestamps. + filter_by_min_value: + Filter result by minimum value (must mention also + `filter_by_max_value`). + filter_by_max_value: + Filter result by maximum value (must mention also + `filter_by_min_value`). + groupby: + Grouping by fields the results (must mention also `reduce`). + reduce: + Applying reducer functions on each group. Can be one of [`avg` `sum`, + `min`, `max`, `range`, `count`, `std.p`, `std.s`, `var.p`, `var.s`]. + select_labels: + Include in the reply only a subset of the key-value pair labels of a + series. + align: + Timestamp for alignment control for aggregation. + latest: + Used when a time series is a compaction, reports the compacted value of + the latest possibly partial bucket. + bucket_timestamp: + Controls how bucket timestamps are reported. Can be one of [`-`, `low`, + `+`, `high`, `~`, `mid`]. + empty: + Reports aggregations for empty buckets. + """ params = self.__mrange_params( aggregation_type, bucket_size_msec, @@ -655,49 +861,55 @@ def mrevrange( """ Query a range across multiple time-series by filters in reverse direction. - Args: + For more information see https://redis.io/commands/ts.mrevrange/ - from_time: - Start timestamp for the range query. - can be used to express the minimum possible timestamp (0). - to_time: - End timestamp for range query, + can be used to express the maximum possible timestamp. - filters: - Filter to match the time-series labels. - count: - Limits the number of returned samples. - aggregation_type: - Optional aggregation type. Can be one of [`avg`, `sum`, `min`, `max`, - `range`, `count`, `first`, `last`, `std.p`, `std.s`, `var.p`, `var.s`, `twa`] - bucket_size_msec: - Time bucket for aggregation in milliseconds. - with_labels: - Include in the reply all label-value pairs representing metadata labels of the time series. - filter_by_ts: - List of timestamps to filter the result by specific timestamps. - filter_by_min_value: - Filter result by minimum value (must mention also filter_by_max_value). - filter_by_max_value: - Filter result by maximum value (must mention also filter_by_min_value). - groupby: - Grouping by fields the results (must mention also reduce). - reduce: - Applying reducer functions on each group. Can be one of [`avg` `sum`, `min`, - `max`, `range`, `count`, `std.p`, `std.s`, `var.p`, `var.s`]. - select_labels: - Include in the reply only a subset of the key-value pair labels of a series. - align: - Timestamp for alignment control for aggregation. - latest: - Used when a time series is a compaction, reports the compacted - value of the latest possibly partial bucket - bucket_timestamp: - Controls how bucket timestamps are reported. Can be one of [`-`, `low`, `+`, - `high`, `~`, `mid`]. - empty: - Reports aggregations for empty buckets. - - For more information: https://redis.io/commands/ts.mrevrange/ - """ # noqa + Args: + from_time: + Start timestamp for the range query. '-' can be used to express the + minimum possible timestamp (0). + to_time: + End timestamp for range query, '+' can be used to express the maximum + possible timestamp. + filters: + Filter to match the time-series labels. + count: + Limits the number of returned samples. + aggregation_type: + Optional aggregation type. Can be one of [`avg`, `sum`, `min`, `max`, + `range`, `count`, `first`, `last`, `std.p`, `std.s`, `var.p`, `var.s`, + `twa`]. + bucket_size_msec: + Time bucket for aggregation in milliseconds. + with_labels: + Include in the reply all label-value pairs representing metadata labels + of the time series. + filter_by_ts: + List of timestamps to filter the result by specific timestamps. + filter_by_min_value: + Filter result by minimum value (must mention also + `filter_by_max_value`). + filter_by_max_value: + Filter result by maximum value (must mention also + `filter_by_min_value`). + groupby: + Grouping by fields the results (must mention also `reduce`). + reduce: + Applying reducer functions on each group. Can be one of [`avg` `sum`, + `min`, `max`, `range`, `count`, `std.p`, `std.s`, `var.p`, `var.s`]. + select_labels: + Include in the reply only a subset of the key-value pair labels of a + series. + align: + Timestamp for alignment control for aggregation. + latest: + Used when a time series is a compaction, reports the compacted value of + the latest possibly partial bucket. + bucket_timestamp: + Controls how bucket timestamps are reported. Can be one of [`-`, `low`, + `+`, `high`, `~`, `mid`]. + empty: + Reports aggregations for empty buckets. + """ params = self.__mrange_params( aggregation_type, bucket_size_msec, @@ -721,13 +933,16 @@ def mrevrange( return self.execute_command(MREVRANGE_CMD, *params) def get(self, key: KeyT, latest: Optional[bool] = False): - """# noqa + """ Get the last sample of `key`. - `latest` used when a time series is a compaction, reports the compacted - value of the latest (possibly partial) bucket - For more information: https://redis.io/commands/ts.get/ - """ # noqa + For more information see https://redis.io/commands/ts.get/ + + Args: + latest: + Used when a time series is a compaction, reports the compacted value of + the latest (possibly partial) bucket. + """ params = [key] self._append_latest(params, latest) return self.execute_command(GET_CMD, *params, keys=[key]) @@ -739,24 +954,24 @@ def mget( select_labels: Optional[List[str]] = None, latest: Optional[bool] = False, ): - """# noqa + """ Get the last samples matching the specific `filter`. - Args: + For more information see https://redis.io/commands/ts.mget/ - filters: - Filter to match the time-series labels. - with_labels: - Include in the reply all label-value pairs representing metadata - labels of the time series. - select_labels: - Include in the reply only a subset of the key-value pair labels of a series. - latest: - Used when a time series is a compaction, reports the compacted - value of the latest possibly partial bucket - - For more information: https://redis.io/commands/ts.mget/ - """ # noqa + Args: + filters: + Filter to match the time-series labels. + with_labels: + Include in the reply all label-value pairs representing metadata labels + of the time series. + select_labels: + Include in the reply only a subset of the key-value pair labels o the + time series. + latest: + Used when a time series is a compaction, reports the compacted value of + the latest possibly partial bucket. + """ params = [] self._append_latest(params, latest) self._append_with_labels(params, with_labels, select_labels) @@ -765,26 +980,26 @@ def mget( return self.execute_command(MGET_CMD, *params) def info(self, key: KeyT): - """# noqa + """ Get information of `key`. - For more information: https://redis.io/commands/ts.info/ - """ # noqa + For more information see https://redis.io/commands/ts.info/ + """ return self.execute_command(INFO_CMD, key, keys=[key]) def queryindex(self, filters: List[str]): - """# noqa + """ Get all time series keys matching the `filter` list. - For more information: https://redis.io/commands/ts.queryindex/ - """ # noq + For more information see https://redis.io/commands/ts.queryindex/ + """ return self.execute_command(QUERYINDEX_CMD, *filters) @staticmethod def _append_uncompressed(params: List[str], uncompressed: Optional[bool]): """Append UNCOMPRESSED tag to params.""" if uncompressed: - params.extend(["UNCOMPRESSED"]) + params.extend(["ENCODING", "UNCOMPRESSED"]) @staticmethod def _append_with_labels( @@ -860,17 +1075,16 @@ def _append_chunk_size(params: List[str], chunk_size: Optional[int]): params.extend(["CHUNK_SIZE", chunk_size]) @staticmethod - def _append_duplicate_policy( - params: List[str], command: Optional[str], duplicate_policy: Optional[str] - ): - """Append DUPLICATE_POLICY property to params on CREATE - and ON_DUPLICATE on ADD. - """ + def _append_duplicate_policy(params: List[str], duplicate_policy: Optional[str]): + """Append DUPLICATE_POLICY property to params.""" if duplicate_policy is not None: - if command == "TS.ADD": - params.extend(["ON_DUPLICATE", duplicate_policy]) - else: - params.extend(["DUPLICATE_POLICY", duplicate_policy]) + params.extend(["DUPLICATE_POLICY", duplicate_policy]) + + @staticmethod + def _append_on_duplicate(params: List[str], on_duplicate: Optional[str]): + """Append ON_DUPLICATE property to params.""" + if on_duplicate is not None: + params.extend(["ON_DUPLICATE", on_duplicate]) @staticmethod def _append_filer_by_ts(params: List[str], ts_list: Optional[List[int]]): @@ -903,3 +1117,20 @@ def _append_empty(params: List[str], empty: Optional[bool]): """Append EMPTY property to params.""" if empty: params.append("EMPTY") + + @staticmethod + def _append_insertion_filters( + params: List[str], + ignore_max_time_diff: Optional[int] = None, + ignore_max_val_diff: Optional[Number] = None, + ): + """Append insertion filters to params.""" + if (ignore_max_time_diff is None) != (ignore_max_val_diff is None): + raise ValueError( + "Both ignore_max_time_diff and ignore_max_val_diff must be set." + ) + + if ignore_max_time_diff is not None and ignore_max_val_diff is not None: + params.extend( + ["IGNORE", str(ignore_max_time_diff), str(ignore_max_val_diff)] + ) diff --git a/tests/conftest.py b/tests/conftest.py index 9263c4353d..6df6875845 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -157,8 +157,12 @@ def pytest_sessionstart(session): session.config.REDIS_INFO = REDIS_INFO # module info + stack_url = redis_url + if stack_url == default_redis_url: + stack_url = default_redismod_url try: - REDIS_INFO["modules"] = info["modules"] + stack_info = _get_info(stack_url) + REDIS_INFO["modules"] = stack_info["modules"] except (KeyError, redis.exceptions.ConnectionError): pass diff --git a/tests/test_asyncio/test_timeseries.py b/tests/test_asyncio/test_timeseries.py index 0c78ce0941..c93af1ea5b 100644 --- a/tests/test_asyncio/test_timeseries.py +++ b/tests/test_asyncio/test_timeseries.py @@ -75,7 +75,7 @@ async def test_alter(decoded_r: redis.Redis): @pytest.mark.redismod @skip_ifmodversion_lt("1.4.0", "timeseries") -async def test_alter_diplicate_policy(decoded_r: redis.Redis): +async def test_alter_duplicate_policy(decoded_r: redis.Redis): assert await decoded_r.ts().create(1) info = await decoded_r.ts().info(1) assert_resp_response( @@ -113,42 +113,44 @@ async def test_add(decoded_r: redis.Redis): @pytest.mark.redismod @skip_ifmodversion_lt("1.4.0", "timeseries") -async def test_add_duplicate_policy(r: redis.Redis): +async def test_add_duplicate_policy(decoded_r: redis.Redis): # Test for duplicate policy BLOCK - assert 1 == await r.ts().add("time-serie-add-ooo-block", 1, 5.0) + assert 1 == await decoded_r.ts().add("time-serie-add-ooo-block", 1, 5.0) with pytest.raises(Exception): - await r.ts().add("time-serie-add-ooo-block", 1, 5.0, duplicate_policy="block") + await decoded_r.ts().add( + "time-serie-add-ooo-block", 1, 5.0, on_duplicate="block" + ) # Test for duplicate policy LAST - assert 1 == await r.ts().add("time-serie-add-ooo-last", 1, 5.0) - assert 1 == await r.ts().add( - "time-serie-add-ooo-last", 1, 10.0, duplicate_policy="last" + assert 1 == await decoded_r.ts().add("time-serie-add-ooo-last", 1, 5.0) + assert 1 == await decoded_r.ts().add( + "time-serie-add-ooo-last", 1, 10.0, on_duplicate="last" ) - res = await r.ts().get("time-serie-add-ooo-last") + res = await decoded_r.ts().get("time-serie-add-ooo-last") assert 10.0 == res[1] # Test for duplicate policy FIRST - assert 1 == await r.ts().add("time-serie-add-ooo-first", 1, 5.0) - assert 1 == await r.ts().add( - "time-serie-add-ooo-first", 1, 10.0, duplicate_policy="first" + assert 1 == await decoded_r.ts().add("time-serie-add-ooo-first", 1, 5.0) + assert 1 == await decoded_r.ts().add( + "time-serie-add-ooo-first", 1, 10.0, on_duplicate="first" ) - res = await r.ts().get("time-serie-add-ooo-first") + res = await decoded_r.ts().get("time-serie-add-ooo-first") assert 5.0 == res[1] # Test for duplicate policy MAX - assert 1 == await r.ts().add("time-serie-add-ooo-max", 1, 5.0) - assert 1 == await r.ts().add( - "time-serie-add-ooo-max", 1, 10.0, duplicate_policy="max" + assert 1 == await decoded_r.ts().add("time-serie-add-ooo-max", 1, 5.0) + assert 1 == await decoded_r.ts().add( + "time-serie-add-ooo-max", 1, 10.0, on_duplicate="max" ) - res = await r.ts().get("time-serie-add-ooo-max") + res = await decoded_r.ts().get("time-serie-add-ooo-max") assert 10.0 == res[1] # Test for duplicate policy MIN - assert 1 == await r.ts().add("time-serie-add-ooo-min", 1, 5.0) - assert 1 == await r.ts().add( - "time-serie-add-ooo-min", 1, 10.0, duplicate_policy="min" + assert 1 == await decoded_r.ts().add("time-serie-add-ooo-min", 1, 5.0) + assert 1 == await decoded_r.ts().add( + "time-serie-add-ooo-min", 1, 10.0, on_duplicate="min" ) - res = await r.ts().get("time-serie-add-ooo-min") + res = await decoded_r.ts().get("time-serie-add-ooo-min") assert 5.0 == res[1] @@ -214,7 +216,7 @@ async def test_create_and_delete_rule(decoded_r: redis.Redis): @pytest.mark.redismod -@skip_ifmodversion_lt("99.99.99", "timeseries") +@skip_ifmodversion_lt("1.10.0", "timeseries") async def test_del_range(decoded_r: redis.Redis): try: await decoded_r.ts().delete("test", 0, 100) @@ -248,7 +250,7 @@ async def test_range(decoded_r: redis.Redis): @pytest.mark.redismod -@skip_ifmodversion_lt("99.99.99", "timeseries") +@skip_ifmodversion_lt("1.10.0", "timeseries") async def test_range_advanced(decoded_r: redis.Redis): for i in range(100): await decoded_r.ts().add(1, i, i % 7) @@ -279,7 +281,7 @@ async def test_range_advanced(decoded_r: redis.Redis): @pytest.mark.redismod -@skip_ifmodversion_lt("99.99.99", "timeseries") +@skip_ifmodversion_lt("1.10.0", "timeseries") async def test_rev_range(decoded_r: redis.Redis): for i in range(100): await decoded_r.ts().add(1, i, i % 7) @@ -379,7 +381,7 @@ async def test_multi_range(decoded_r: redis.Redis): @pytest.mark.onlynoncluster @pytest.mark.redismod -@skip_ifmodversion_lt("99.99.99", "timeseries") +@skip_ifmodversion_lt("1.10.0", "timeseries") async def test_multi_range_advanced(decoded_r: redis.Redis): await decoded_r.ts().create(1, labels={"Test": "This", "team": "ny"}) await decoded_r.ts().create( @@ -497,7 +499,7 @@ async def test_multi_range_advanced(decoded_r: redis.Redis): @pytest.mark.onlynoncluster @pytest.mark.redismod -@skip_ifmodversion_lt("99.99.99", "timeseries") +@skip_ifmodversion_lt("1.10.0", "timeseries") async def test_multi_reverse_range(decoded_r: redis.Redis): await decoded_r.ts().create(1, labels={"Test": "This", "team": "ny"}) await decoded_r.ts().create( @@ -752,9 +754,117 @@ async def test_query_index(decoded_r: redis.Redis): async def test_uncompressed(decoded_r: redis.Redis): await decoded_r.ts().create("compressed") await decoded_r.ts().create("uncompressed", uncompressed=True) + for i in range(1000): + await decoded_r.ts().add("compressed", i, i) + await decoded_r.ts().add("uncompressed", i, i) compressed_info = await decoded_r.ts().info("compressed") uncompressed_info = await decoded_r.ts().info("uncompressed") if is_resp2_connection(decoded_r): assert compressed_info.memory_usage != uncompressed_info.memory_usage else: assert compressed_info["memoryUsage"] != uncompressed_info["memoryUsage"] + + +@skip_ifmodversion_lt("1.12.0", "timeseries") +async def test_create_with_insertion_filters(decoded_r: redis.Redis): + await decoded_r.ts().create( + "time-series-1", + duplicate_policy="last", + ignore_max_time_diff=5, + ignore_max_val_diff=10.0, + ) + assert 1000 == await decoded_r.ts().add("time-series-1", 1000, 1.0) + assert 1010 == await decoded_r.ts().add("time-series-1", 1010, 11.0) + assert 1010 == await decoded_r.ts().add("time-series-1", 1013, 10.0) + assert 1020 == await decoded_r.ts().add("time-series-1", 1020, 11.5) + assert 1021 == await decoded_r.ts().add("time-series-1", 1021, 22.0) + + data_points = await decoded_r.ts().range("time-series-1", "-", "+") + expected_points = [(1000, 1.0), (1010, 11.0), (1020, 11.5), (1021, 22.0)] + assert expected_points == data_points + + +@skip_ifmodversion_lt("1.12.0", "timeseries") +async def test_alter_with_insertion_filters(decoded_r: redis.Redis): + assert 1000 == await decoded_r.ts().add("time-series-1", 1000, 1.0) + assert 1010 == await decoded_r.ts().add("time-series-1", 1010, 11.0) + assert 1013 == await decoded_r.ts().add("time-series-1", 1013, 10.0) + + await decoded_r.ts().alter( + "time-series-1", + duplicate_policy="last", + ignore_max_time_diff=5, + ignore_max_val_diff=10.0, + ) + + assert 1013 == await decoded_r.ts().add("time-series-1", 1015, 11.5) + + data_points = await decoded_r.ts().range("time-series-1", "-", "+") + expected_points = [(1000, 1.0), (1010, 11.0), (1013, 10.0)] + assert expected_points == data_points + + +@skip_ifmodversion_lt("1.12.0", "timeseries") +async def test_add_with_insertion_filters(decoded_r: redis.Redis): + assert 1000 == await decoded_r.ts().add( + "time-series-1", + 1000, + 1.0, + duplicate_policy="last", + ignore_max_time_diff=5, + ignore_max_val_diff=10.0, + ) + + assert 1000 == await decoded_r.ts().add("time-series-1", 1004, 3.0) + + data_points = await decoded_r.ts().range("time-series-1", "-", "+") + expected_points = [(1000, 1.0)] + assert expected_points == data_points + + +@skip_ifmodversion_lt("1.12.0", "timeseries") +async def test_incrby_with_insertion_filters(decoded_r: redis.Redis): + assert 1000 == await decoded_r.ts().incrby( + "time-series-1", + 1.0, + timestamp=1000, + duplicate_policy="last", + ignore_max_time_diff=5, + ignore_max_val_diff=10.0, + ) + + assert 1000 == await decoded_r.ts().incrby("time-series-1", 3.0, timestamp=1000) + + data_points = await decoded_r.ts().range("time-series-1", "-", "+") + expected_points = [(1000, 1.0)] + assert expected_points == data_points + + assert 1000 == await decoded_r.ts().incrby("time-series-1", 10.1, timestamp=1000) + + data_points = await decoded_r.ts().range("time-series-1", "-", "+") + expected_points = [(1000, 11.1)] + assert expected_points == data_points + + +@skip_ifmodversion_lt("1.12.0", "timeseries") +async def test_decrby_with_insertion_filters(decoded_r: redis.Redis): + assert 1000 == await decoded_r.ts().decrby( + "time-series-1", + 1.0, + timestamp=1000, + duplicate_policy="last", + ignore_max_time_diff=5, + ignore_max_val_diff=10.0, + ) + + assert 1000 == await decoded_r.ts().decrby("time-series-1", 3.0, timestamp=1000) + + data_points = await decoded_r.ts().range("time-series-1", "-", "+") + expected_points = [(1000, -1.0)] + assert expected_points == data_points + + assert 1000 == await decoded_r.ts().decrby("time-series-1", 10.1, timestamp=1000) + + data_points = await decoded_r.ts().range("time-series-1", "-", "+") + expected_points = [(1000, -11.1)] + assert expected_points == data_points diff --git a/tests/test_timeseries.py b/tests/test_timeseries.py index 5318818e79..5647bd45c6 100644 --- a/tests/test_timeseries.py +++ b/tests/test_timeseries.py @@ -84,7 +84,7 @@ def test_alter(client): @pytest.mark.redismod @skip_ifmodversion_lt("1.4.0", "timeseries") -def test_alter_diplicate_policy(client): +def test_alter_duplicate_policy(client): assert client.ts().create(1) info = client.ts().info(1) assert_resp_response( @@ -122,38 +122,32 @@ def test_add(client): @pytest.mark.redismod @skip_ifmodversion_lt("1.4.0", "timeseries") -def test_add_duplicate_policy(client): +def test_add_on_duplicate(client): # Test for duplicate policy BLOCK assert 1 == client.ts().add("time-serie-add-ooo-block", 1, 5.0) with pytest.raises(Exception): - client.ts().add("time-serie-add-ooo-block", 1, 5.0, duplicate_policy="block") + client.ts().add("time-serie-add-ooo-block", 1, 5.0, on_duplicate="block") # Test for duplicate policy LAST assert 1 == client.ts().add("time-serie-add-ooo-last", 1, 5.0) - assert 1 == client.ts().add( - "time-serie-add-ooo-last", 1, 10.0, duplicate_policy="last" - ) + assert 1 == client.ts().add("time-serie-add-ooo-last", 1, 10.0, on_duplicate="last") assert 10.0 == client.ts().get("time-serie-add-ooo-last")[1] # Test for duplicate policy FIRST assert 1 == client.ts().add("time-serie-add-ooo-first", 1, 5.0) assert 1 == client.ts().add( - "time-serie-add-ooo-first", 1, 10.0, duplicate_policy="first" + "time-serie-add-ooo-first", 1, 10.0, on_duplicate="first" ) assert 5.0 == client.ts().get("time-serie-add-ooo-first")[1] # Test for duplicate policy MAX assert 1 == client.ts().add("time-serie-add-ooo-max", 1, 5.0) - assert 1 == client.ts().add( - "time-serie-add-ooo-max", 1, 10.0, duplicate_policy="max" - ) + assert 1 == client.ts().add("time-serie-add-ooo-max", 1, 10.0, on_duplicate="max") assert 10.0 == client.ts().get("time-serie-add-ooo-max")[1] # Test for duplicate policy MIN assert 1 == client.ts().add("time-serie-add-ooo-min", 1, 5.0) - assert 1 == client.ts().add( - "time-serie-add-ooo-min", 1, 10.0, duplicate_policy="min" - ) + assert 1 == client.ts().add("time-serie-add-ooo-min", 1, 10.0, on_duplicate="min") assert 5.0 == client.ts().get("time-serie-add-ooo-min")[1] @@ -163,6 +157,15 @@ def test_madd(client): assert [1, 2, 3] == client.ts().madd([("a", 1, 5), ("a", 2, 10), ("a", 3, 15)]) +@pytest.mark.redismod +def test_madd_missing_timeseries(client): + response = client.ts().madd([("a", 1, 5), ("a", 2, 10)]) + assert isinstance(response, list) + assert len(response) == 2 + assert isinstance(response[0], redis.ResponseError) + assert isinstance(response[1], redis.ResponseError) + + @pytest.mark.redismod def test_incrby_decrby(client): for _ in range(100): @@ -217,12 +220,12 @@ def test_create_and_delete_rule(client): @pytest.mark.redismod -@skip_ifmodversion_lt("99.99.99", "timeseries") +@skip_ifmodversion_lt("1.10.0", "timeseries") def test_del_range(client): try: client.ts().delete("test", 0, 100) - except Exception as e: - assert e.__str__() != "" + except redis.ResponseError as e: + assert "key does not exist" in str(e) for i in range(100): client.ts().add(1, i, i % 7) @@ -247,7 +250,7 @@ def test_range(client): @pytest.mark.redismod -@skip_ifmodversion_lt("99.99.99", "timeseries") +@skip_ifmodversion_lt("1.10.0", "timeseries") def test_range_advanced(client): for i in range(100): client.ts().add(1, i, i % 7) @@ -381,7 +384,7 @@ def test_range_empty(client: redis.Redis): @pytest.mark.redismod -@skip_ifmodversion_lt("99.99.99", "timeseries") +@skip_ifmodversion_lt("1.10.0", "timeseries") def test_rev_range(client): for i in range(100): client.ts().add(1, i, i % 7) @@ -578,7 +581,7 @@ def test_mrange(client): @pytest.mark.onlynoncluster @pytest.mark.redismod -@skip_ifmodversion_lt("99.99.99", "timeseries") +@skip_ifmodversion_lt("1.10.0", "timeseries") def test_multi_range_advanced(client): client.ts().create(1, labels={"Test": "This", "team": "ny"}) client.ts().create(2, labels={"Test": "This", "Taste": "That", "team": "sf"}) @@ -722,7 +725,7 @@ def test_mrange_latest(client: redis.Redis): @pytest.mark.onlynoncluster @pytest.mark.redismod -@skip_ifmodversion_lt("99.99.99", "timeseries") +@skip_ifmodversion_lt("1.10.0", "timeseries") def test_multi_reverse_range(client): client.ts().create(1, labels={"Test": "This", "team": "ny"}) client.ts().create(2, labels={"Test": "This", "Taste": "That", "team": "sf"}) @@ -1010,9 +1013,157 @@ def test_pipeline(client): def test_uncompressed(client): client.ts().create("compressed") client.ts().create("uncompressed", uncompressed=True) + for i in range(1000): + client.ts().add("compressed", i, i) + client.ts().add("uncompressed", i, i) compressed_info = client.ts().info("compressed") uncompressed_info = client.ts().info("uncompressed") if is_resp2_connection(client): - assert compressed_info.memory_usage != uncompressed_info.memory_usage + assert compressed_info.memory_usage < uncompressed_info.memory_usage else: - assert compressed_info["memoryUsage"] != uncompressed_info["memoryUsage"] + assert compressed_info["memoryUsage"] < uncompressed_info["memoryUsage"] + + +@skip_ifmodversion_lt("1.12.0", "timeseries") +def test_create_with_insertion_filters(client): + client.ts().create( + "time-series-1", + duplicate_policy="last", + ignore_max_time_diff=5, + ignore_max_val_diff=10.0, + ) + assert 1000 == client.ts().add("time-series-1", 1000, 1.0) + assert 1010 == client.ts().add("time-series-1", 1010, 11.0) + assert 1010 == client.ts().add("time-series-1", 1013, 10.0) + assert 1020 == client.ts().add("time-series-1", 1020, 11.5) + assert 1021 == client.ts().add("time-series-1", 1021, 22.0) + + data_points = client.ts().range("time-series-1", "-", "+") + expected_points = [(1000, 1.0), (1010, 11.0), (1020, 11.5), (1021, 22.0)] + assert expected_points == data_points + + +@skip_ifmodversion_lt("1.12.0", "timeseries") +def test_create_with_insertion_filters_other_duplicate_policy(client): + client.ts().create( + "time-series-1", + ignore_max_time_diff=5, + ignore_max_val_diff=10.0, + ) + assert 1000 == client.ts().add("time-series-1", 1000, 1.0) + assert 1010 == client.ts().add("time-series-1", 1010, 11.0) + # Still accepted because the duplicate_policy is not `last`. + assert 1013 == client.ts().add("time-series-1", 1013, 10.0) + + data_points = client.ts().range("time-series-1", "-", "+") + expected_points = [(1000, 1.0), (1010, 11.0), (1013, 10)] + assert expected_points == data_points + + +@skip_ifmodversion_lt("1.12.0", "timeseries") +def test_alter_with_insertion_filters(client): + assert 1000 == client.ts().add("time-series-1", 1000, 1.0) + assert 1010 == client.ts().add("time-series-1", 1010, 11.0) + assert 1013 == client.ts().add("time-series-1", 1013, 10.0) + + client.ts().alter( + "time-series-1", + duplicate_policy="last", + ignore_max_time_diff=5, + ignore_max_val_diff=10.0, + ) + + assert 1013 == client.ts().add("time-series-1", 1015, 11.5) + + data_points = client.ts().range("time-series-1", "-", "+") + expected_points = [(1000, 1.0), (1010, 11.0), (1013, 10.0)] + assert expected_points == data_points + + +@skip_ifmodversion_lt("1.12.0", "timeseries") +def test_add_with_insertion_filters(client): + assert 1000 == client.ts().add( + "time-series-1", + 1000, + 1.0, + duplicate_policy="last", + ignore_max_time_diff=5, + ignore_max_val_diff=10.0, + ) + + assert 1000 == client.ts().add("time-series-1", 1004, 3.0) + + data_points = client.ts().range("time-series-1", "-", "+") + expected_points = [(1000, 1.0)] + assert expected_points == data_points + + +@skip_ifmodversion_lt("1.12.0", "timeseries") +def test_incrby_with_insertion_filters(client): + assert 1000 == client.ts().incrby( + "time-series-1", + 1.0, + timestamp=1000, + duplicate_policy="last", + ignore_max_time_diff=5, + ignore_max_val_diff=10.0, + ) + + assert 1000 == client.ts().incrby("time-series-1", 3.0, timestamp=1000) + + data_points = client.ts().range("time-series-1", "-", "+") + expected_points = [(1000, 1.0)] + assert expected_points == data_points + + assert 1000 == client.ts().incrby("time-series-1", 10.1, timestamp=1000) + + data_points = client.ts().range("time-series-1", "-", "+") + expected_points = [(1000, 11.1)] + assert expected_points == data_points + + +@skip_ifmodversion_lt("1.12.0", "timeseries") +def test_decrby_with_insertion_filters(client): + assert 1000 == client.ts().decrby( + "time-series-1", + 1.0, + timestamp=1000, + duplicate_policy="last", + ignore_max_time_diff=5, + ignore_max_val_diff=10.0, + ) + + assert 1000 == client.ts().decrby("time-series-1", 3.0, timestamp=1000) + + data_points = client.ts().range("time-series-1", "-", "+") + expected_points = [(1000, -1.0)] + assert expected_points == data_points + + assert 1000 == client.ts().decrby("time-series-1", 10.1, timestamp=1000) + + data_points = client.ts().range("time-series-1", "-", "+") + expected_points = [(1000, -11.1)] + assert expected_points == data_points + + +@skip_ifmodversion_lt("1.12.0", "timeseries") +def test_madd_with_insertion_filters(client): + client.ts().create( + "time-series-1", + duplicate_policy="last", + ignore_max_time_diff=5, + ignore_max_val_diff=10.0, + ) + assert 1010 == client.ts().add("time-series-1", 1010, 1.0) + assert [1010, 1010, 1020, 1021] == client.ts().madd( + [ + ("time-series-1", 1011, 11.0), + ("time-series-1", 1013, 10.0), + ("time-series-1", 1020, 2.0), + ("time-series-1", 1021, 22.0), + ] + ) + + data_points = client.ts().range("time-series-1", "-", "+") + expected_points = [(1010, 1.0), (1020, 2.0), (1021, 22.0)] + assert expected_points == data_points