Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/gpu decode #44

Merged
merged 3 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,14 @@

Changelog for the `video-sampler`.

### 0.11.2 - 0.12.2

- added GPUVideoSampler
- added `use-gpu-decoder` option to the `video-sampler` command which allows to use a GPU decoder.
- major code refactoring
- added support for streaming and RTSP
- tests added

### 0.11.1 - 0.11.2

- added `start-time` and `end-time` options to the `video-sampler` command which allows to specify the start and end time of the video segment to be processed.
Expand Down
34 changes: 31 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ Currently, it uses keyframe decoding, frame interval gating and perceptual hashi
- [Features](#features)
- [Installation and Usage](#installation-and-usage)
- [Basic usage](#basic-usage)
- [GPU support](#gpu-support)
- [Streaming and RTSP support](#streaming-and-rtsp-support)
- [Image sampling](#image-sampling)
- [YT-DLP integration plugin](#yt-dlp-integration-plugin)
Expand All @@ -39,9 +40,9 @@ Currently, it uses keyframe decoding, frame interval gating and perceptual hashi
- [Benchmarks](#benchmarks)
- [Benchmark videos](#benchmark-videos)
- [Flit commands](#flit-commands)
- [Build](#build)
- [Install](#install)
- [Publish](#publish)
- [Build](#build)
- [Install](#install)
- [Publish](#publish)
- [🛡 License](#-license)
- [📃 Citation](#-citation)

Expand Down Expand Up @@ -111,6 +112,32 @@ python3 -m video_sampler config ./configs/hash_base.yaml /my-video-folder/ ./my-

You can set the number of workers to use with the `n_workers` parameter. The default is 1.

#### GPU support

GPU support is experimental and may not work for all GPUs. Definitely, only NVIDIA GPUs are supported for now.

To use the GPU sampler, you need to install the `gpu` extra:

```bash
python3 -m pip install -U video_sampler[gpu]
```

You can also use the GPU sampler by running the following command:

```bash
python3 -m video_sampler hash ./videos/FatCat.mp4 ./output-frames/ --use-gpu-decoder
```

and any other main command also uses `--use-gpu-decoder` flag.
LemurPwned marked this conversation as resolved.
Show resolved Hide resolved

For configuration, you simply add `use_gpu_decoder: true` to the config file. See [./configs/hash_gpu.yaml](./configs/hash_gpu.yaml) for an example.

Known limitations due to PyNvVideoCodec library:

- Keyframes only mode is not supported with GPU decoder.
- Timestamps are estimated from the FPS, so they may not be 100% accurate.
- Only NVIDIA GPUs are supported.

#### Streaming and RTSP support

RTSP support is experimental and may not work for all RTSP servers, but it should work for most of them.
Expand All @@ -119,6 +146,7 @@ You can test out the RTSP support by running the following command:
```bash
python3 -m video_sampler config ./configs/hash_base.yaml rtsp://localhost:8554/some-stream ./sampled-stream/
```

[RTSP simple server](https://github.com/bhaney/rtsp-simple-server) is a good way to test RTSP streams.

Other streams (MJPEG) also work, e.g.
Expand Down
31 changes: 31 additions & 0 deletions configs/hash_gpu.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Minimum time interval between processed frames (in seconds)
min_frame_interval_sec: 3.0
# Whether to process only keyframes (it's way faster than processing all frames)
keyframes_only: true
# Read interval while processing video (in seconds) (when there's no frame yielded, when to check again)
queue_wait: 0.1
debug: false
# Whether to print stats
print_stats: false
# Buffer configuration
buffer_config:
type: hash
# the smaller the hash size, the greater chance of collision
# smaller hashsets are faster to process & reduce frames more aggressively
hash_size: 8
# size of the collision buffer. The larger the buffer, the more in time back the
# hashes are stored.
size: 15
debug: true
# Gating configuration
gate_config:
type: pass
extractor_config: {}
summary_config: {}
# Number of workers (separate processes) to process the frames. Determines level of parallelism
n_workers: 3
use_gpu_decoder: true
save_format:
encode_time_b64: false
include_filename: true
avoid_dot: true
5 changes: 5 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ dev = [
"tabulate",
"pytest"
]
gpu = [
"pycuda >= 2024.1.2",
"PyNvVideoCodec >= 1.0.2"
]

all = [
"open_clip_torch >= 2.23.0",
"torch >= 2.1.0",
Expand Down
10 changes: 9 additions & 1 deletion video_sampler/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from .config import ImageSamplerConfig, SamplerConfig
from .iterators import delegate_workers
from .logging import Color, console
from .samplers import ImageSampler, SegmentSampler, VideoSampler
from .samplers import GPUVideoSampler, ImageSampler, SegmentSampler, VideoSampler
from .schemas import BufferType

app = typer.Typer(
Expand Down Expand Up @@ -60,6 +60,8 @@ def _create_from_config(
sampler_cls: VideoSampler = VideoSampler,
):
# create a test buffer
if cfg.use_gpu_decoder:
sampler_cls = GPUVideoSampler
try:
check_args_validity(cfg)
except AssertionError as e:
Expand Down Expand Up @@ -133,6 +135,7 @@ def main(
n_workers: int = typer.Option(
1, help="Number of workers to use. Default is 1. Use -1 to use all CPUs."
),
use_gpu_decoder: bool = typer.Option(False, help="Use GPU decoder."),
) -> None:
"""Default buffer is the perceptual hash buffer"""
extractor_cfg = {}
Expand Down Expand Up @@ -182,6 +185,7 @@ def main(
),
extractor_config=extractor_cfg,
n_workers=n_workers,
use_gpu_decoder=use_gpu_decoder,
)
if ytdlp:
video_path = _ytdlp_plugin(yt_extra_args, video_path, get_subs=subs_enable)
Expand Down Expand Up @@ -232,6 +236,7 @@ def buffer(
n_workers: int = typer.Option(
1, help="Number of workers to use. Default is 1. Use -1 to use all CPUs."
),
use_gpu_decoder: bool = typer.Option(False, help="Use GPU decoder."),
):
"""Buffer type can be one of entropy, gzip, hash, passthrough"""
cfg = SamplerConfig(
Expand Down Expand Up @@ -264,6 +269,7 @@ def buffer(
}
),
n_workers=n_workers,
use_gpu_decoder=use_gpu_decoder,
)
if ytdlp:
video_path = _ytdlp_plugin(yt_extra_args, video_path)
Expand Down Expand Up @@ -312,6 +318,7 @@ def clip(
n_workers: int = typer.Option(
1, help="Number of workers to use. Default is 1. Use -1 to use all CPUs."
),
use_gpu_decoder: bool = typer.Option(False, help="Use GPU decoder."),
):
"""Buffer type can be only of type hash when using CLIP gating."""
if pos_samples is not None:
Expand Down Expand Up @@ -346,6 +353,7 @@ def clip(
"batch_size": batch_size,
},
n_workers=n_workers,
use_gpu_decoder=use_gpu_decoder,
)
if ytdlp:
video_path = _ytdlp_plugin(yt_extra_args, video_path)
Expand Down
2 changes: 1 addition & 1 deletion video_sampler/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ class SamplerConfig(BaseModel):
extractor_config: dict[str, Any] = Field(default_factory=dict)
summary_config: dict[str, Any] = Field(default_factory=dict)
n_workers: int = Field(default=1)

use_gpu_decoder: bool = Field(default=False)
save_format: SaveFormatConfig = Field(default_factory=SaveFormatConfig)

def __str__(self) -> str:
Expand Down
9 changes: 8 additions & 1 deletion video_sampler/samplers/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
from .base_sampler import BaseSampler
from .gpu_sampler import GPUVideoSampler
from .image_sampler import ImageSampler
from .video_sampler import SegmentSampler, VideoSampler

__all__ = ["ImageSampler", "VideoSampler", "SegmentSampler", "BaseSampler"]
__all__ = [
"ImageSampler",
"VideoSampler",
"SegmentSampler",
"BaseSampler",
"GPUVideoSampler",
]
2 changes: 1 addition & 1 deletion video_sampler/samplers/base_sampler.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def __init__(self, cfg: SamplerConfig):
def sample(self, _: str) -> Iterable[list[FrameObject]]:
raise NotImplementedError("sample method must be implemented")

def write_queue(self, _: str, q: Queue, subs: str = None):
def write_queue(self, _: str, q: Queue, subs: str | None = None):
raise NotImplementedError("write_queue method must be implemented")

def init_sampler(self):
Expand Down
106 changes: 106 additions & 0 deletions video_sampler/samplers/gpu_sampler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
import os
from collections.abc import Iterable

import numpy as np
from PIL import Image

from ..config import SamplerConfig
from ..logging import Color, console
from ..schemas import FrameObject
from .video_sampler import VideoSampler

try:
import cv2
import pycuda.driver as cuda
import PyNvVideoCodec as nvc
except ImportError:
console.print(
"GPUVideoSampler requires pycuda and PyNvVideoCodec to be installed.",
style=f"bold {Color.red.value}",
)


class GPUVideoSampler(VideoSampler):
def __init__(self, cfg: SamplerConfig) -> None:
super().__init__(cfg)

def sample(
self, video_path: str, subs: str | None = None
) -> Iterable[list[FrameObject]]:
"""Generate sample frames from a video using a GPU decoder.

Args:
video_path (str): The path to the video file.
subs (str): Unused in video sampler

Yields:
Iterable[list[FrameObject]]: A generator that yields a list
of FrameObjects representing sampled frames.
"""
self.init_sampler()
cuda.init()
LemurPwned marked this conversation as resolved.
Show resolved Hide resolved
cudaDevice = cuda.Device(int(os.environ.get("GPU_ID", 0)))
cudaCtx = cudaDevice.retain_primary_context()
console.print(
LemurPwned marked this conversation as resolved.
Show resolved Hide resolved
f"Context created on device: {cudaDevice.name()}",
style=f"bold {Color.green.value}",
)
cudaCtx.push()
cudaStreamNvDec = cuda.Stream()
nvDmx = nvc.CreateDemuxer(filename=video_path)
nvDec = nvc.CreateDecoder(
gpuid=0,
codec=nvDmx.GetNvCodecId(),
cudacontext=cudaCtx.handle,
cudastream=cudaStreamNvDec.handle,
enableasyncallocations=False,
)
fps_est = nvDmx.FrameRate()
LemurPwned marked this conversation as resolved.
Show resolved Hide resolved
console.print(
f"GPU decoder currently produces est. timestamps based on FPS: {fps_est}",
style=f"bold {Color.yellow.value}",
)
if self.cfg.keyframes_only:
console.print(
"Keyframes only mode is not supported with GPU decoder. Argument is ignored.",
style=f"bold {Color.red.value}",
)

frame_indx = 0
# currently stuff like
# packet.pts, decodedFrame.timestamp etc. they don't work
# so we use the frame_indx to keep track of the frame number
prev_time = -10
for packet in nvDmx:
for decodedFrame in nvDec.Decode(packet):
self.stats["total"] += 1
ftime = frame_indx / fps_est
if self.cfg.start_time_s > 0 and ftime < self.cfg.start_time_s:
self.debug_print(
f"Frame time {ftime} is before start time {self.cfg.start_time_s}, skipping."
)
continue

if self.cfg.end_time_s is not None and ftime > self.cfg.end_time_s:
self.debug_print(
f"Frame time {ftime} is after end time {self.cfg.end_time_s}, stopping."
)
break
frame_indx += 1
time_diff = ftime - prev_time
if time_diff < self.cfg.min_frame_interval_sec:
continue
prev_time = ftime

cuda_ptr = decodedFrame.GetPtrToPlane(0)
numpy_array = np.ndarray(shape=(decodedFrame.shape), dtype=np.uint8)
cuda.memcpy_dtoh(numpy_array, cuda_ptr)
numpy_array = cv2.cvtColor(numpy_array, cv2.COLOR_YUV2RGB_NV12)
pil_image = Image.fromarray(numpy_array) # Convert to PIL
yield from self.process_frame(frame_indx, pil_image, ftime)
# flush buffer
yield from self.flush_buffer()
cudaCtx.pop()
console.print(
"Context removed.\nEnd of decode session", style=f"bold {Color.green.value}"
)
13 changes: 8 additions & 5 deletions video_sampler/samplers/video_sampler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from queue import Queue

import av
import av.enum

from ..config import SamplerConfig
from ..language.keyword_capture import create_extractor, subtitle_line
Expand Down Expand Up @@ -35,7 +34,9 @@ class VideoSampler(BaseSampler):
def __init__(self, cfg: SamplerConfig) -> None:
super().__init__(cfg)

def sample(self, video_path: str, subs: str = None) -> Iterable[list[FrameObject]]:
def sample(
self, video_path: str, subs: str | None = None
) -> Iterable[list[FrameObject]]:
"""Generate sample frames from a video.

Args:
Expand Down Expand Up @@ -125,7 +126,7 @@ def sample(self, video_path: str, subs: str = None) -> Iterable[list[FrameObject
# flush buffer
yield from self.flush_buffer()

def write_queue(self, video_path: str, q: Queue, subs: str = None) -> None:
def write_queue(self, video_path: str, q: Queue, subs: str | None = None) -> None:
try:
item: tuple[FrameObject, int]
for item in self.sample(video_path=video_path, subs=subs):
Expand Down Expand Up @@ -158,7 +159,9 @@ def __init__(self, cfg: SamplerConfig) -> None:
super().__init__(cfg)
self.extractor = create_extractor(cfg.extractor_config)

def sample(self, video_path: str, subs: str = None) -> Iterable[list[FrameObject]]:
def sample(
self, video_path: str, subs: str | None = None
) -> Iterable[list[FrameObject]]:
"""Generate sample frames from a video.

Args:
Expand Down Expand Up @@ -226,5 +229,5 @@ def sample(self, video_path: str, subs: str = None) -> Iterable[list[FrameObject
# flush buffer
yield from self.flush_buffer()

def write_queue(self, video_path: str, q: Queue, subs: str = None):
def write_queue(self, video_path: str, q: Queue, subs: str | None = None):
super().write_queue(video_path, q, subs=subs)
Loading