Skip to content

Commit

Permalink
[DOCS][+ CLEANUP]
Browse files Browse the repository at this point in the history
  • Loading branch information
Your Name committed Oct 21, 2024
1 parent c8dff59 commit 274bf4f
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 91 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ __pycache__/
build/
develop-eggs/
dist/
agent_workspace
downloads/
eggs/
.eggs/
Expand Down
8 changes: 8 additions & 0 deletions clusterops/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,19 @@
execute_on_gpu,
execute_on_multiple_gpus,
)
from clusterops.profiling_exec import (
monitor_resources,
profile_execution,
distributed_execute_on_gpus,
)

__all__ = [
"list_available_cpus",
"execute_with_cpu_cores",
"list_available_gpus",
"execute_on_gpu",
"execute_on_multiple_gpus",
"monitor_resources",
"profile_execution",
"distributed_execute_on_gpus",
]
62 changes: 59 additions & 3 deletions clusterops/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@
import psutil
import ray
from loguru import logger
from tenacity import (
retry,
retry_if_exception_type,
stop_after_attempt,
wait_fixed,
)

# Configurable environment variables
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO")
Expand All @@ -23,6 +29,11 @@
)


@retry(
stop=stop_after_attempt(3),
wait=wait_fixed(1),
retry=retry_if_exception_type(Exception),
)
def list_available_cpus() -> List[int]:
"""
Lists all available CPU cores.
Expand All @@ -44,6 +55,11 @@ def list_available_cpus() -> List[int]:
raise


@retry(
stop=stop_after_attempt(3),
wait=wait_fixed(1),
retry=retry_if_exception_type(Exception),
)
def select_best_gpu() -> Optional[int]:
"""
Selects the GPU with the most free memory.
Expand All @@ -63,6 +79,11 @@ def select_best_gpu() -> Optional[int]:
return None


@retry(
stop=stop_after_attempt(3),
wait=wait_fixed(1),
retry=retry_if_exception_type(Exception),
)
def execute_on_cpu(
cpu_id: int, func: Callable, *args: Any, **kwargs: Any
) -> Any:
Expand Down Expand Up @@ -105,6 +126,11 @@ def execute_on_cpu(
raise


@retry(
stop=stop_after_attempt(3),
wait=wait_fixed(1),
retry=retry_if_exception_type(Exception),
)
def retry_with_backoff(
func: Callable,
retries: int = RETRY_COUNT,
Expand Down Expand Up @@ -143,6 +169,11 @@ def retry_with_backoff(
time.sleep(delay * (2**attempt)) # Exponential backoff


@retry(
stop=stop_after_attempt(3),
wait=wait_fixed(1),
retry=retry_if_exception_type(Exception),
)
def execute_with_cpu_cores(
core_count: int, func: Callable, *args: Any, **kwargs: Any
) -> Any:
Expand Down Expand Up @@ -193,6 +224,11 @@ def execute_with_cpu_cores(
raise


@retry(
stop=stop_after_attempt(3),
wait=wait_fixed(1),
retry=retry_if_exception_type(Exception),
)
def list_available_gpus() -> List[str]:
"""
Lists all available GPUs.
Expand All @@ -215,6 +251,11 @@ def list_available_gpus() -> List[str]:
raise


@retry(
stop=stop_after_attempt(3),
wait=wait_fixed(1),
retry=retry_if_exception_type(Exception),
)
def execute_on_gpu(
gpu_id: int, func: Callable, *args: Any, **kwargs: Any
) -> Any:
Expand Down Expand Up @@ -257,8 +298,18 @@ def task_wrapper(*args, **kwargs):
raise


@retry(
stop=stop_after_attempt(3),
wait=wait_fixed(1),
retry=retry_if_exception_type(Exception),
)
def execute_on_multiple_gpus(
gpu_ids: List[int], func: Callable, *args: Any, **kwargs: Any
gpu_ids: List[int],
func: Callable,
all_gpus: bool = False,
timeout: float = None,
*args: Any,
**kwargs: Any,
) -> List[Any]:
"""
Executes a callable across multiple GPUs using Ray.
Expand Down Expand Up @@ -287,7 +338,12 @@ def execute_on_multiple_gpus(
)

if not ray.is_initialized():
ray.init(ignore_reinit_error=True, log_to_driver=False)
ray.init(
ignore_reinit_error=True,
log_to_driver=False,
*args,
**kwargs,
)

@ray.remote(num_gpus=1)
def task_wrapper(*args, **kwargs):
Expand All @@ -296,7 +352,7 @@ def task_wrapper(*args, **kwargs):
result_futures = [
task_wrapper.remote(*args, **kwargs) for _ in gpu_ids
]
results = ray.get(result_futures)
results = ray.get(result_futures, timeout)
logger.info(f"Execution on GPUs {gpu_ids} completed.")
return results
except Exception as e:
Expand Down
96 changes: 9 additions & 87 deletions clusterops/async_utils.py → clusterops/profiling_exec.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import asyncio # For async execution
import os
import sys
import time
Expand Down Expand Up @@ -29,72 +28,6 @@
)


### 1. ASYNC TASK EXECUTION ###
async def async_execute(
func: Callable, *args: Any, **kwargs: Any
) -> Any:
"""
Asynchronously executes a callable function.
Args:
func (Callable): The function to execute asynchronously.
*args (Any): Arguments for the callable.
**kwargs (Any): Keyword arguments for the callable.
Returns:
Any: The result of the function execution.
"""
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(None, func, *args, **kwargs)
logger.info("Asynchronous task completed.")
return result


### 2. TASK SCHEDULING & PRIORITY MANAGEMENT ###
def execute_with_priority(
priority: str,
func: Callable,
delay: float = 0.0,
*args: Any,
**kwargs: Any,
) -> Any:
"""
Executes a callable with priority management and optional delay.
Args:
priority (str): Task priority ('high', 'low').
delay (float): Optional delay before executing the task (default: 0).
func (Callable): The function to be executed.
*args (Any): Arguments for the callable.
**kwargs (Any): Keyword arguments for the callable.
Returns:
Any: The result of the function execution.
Raises:
ValueError: If priority is not valid.
"""
try:
if priority not in ["high", "low"]:
raise ValueError("Invalid priority. Use 'high' or 'low'.")

logger.info(
f"Task scheduled with {priority} priority and {delay}s delay."
)
if delay > 0:
time.sleep(delay)

# Execute the function
result = func(*args, **kwargs)
logger.info(f"Task with {priority} priority completed.")
return result

except Exception as e:
logger.error(f"Error executing task with priority: {e}")
raise


### 3. RESOURCE MONITORING & ALERTS ###
def monitor_resources():
"""
Continuously monitors CPU and GPU resources and logs alerts when thresholds are crossed.
Expand Down Expand Up @@ -126,7 +59,6 @@ def monitor_resources():
raise


### 4. TASK PROFILING & METRICS COLLECTION ###
def profile_execution(
func: Callable, *args: Any, **kwargs: Any
) -> Any:
Expand Down Expand Up @@ -168,7 +100,6 @@ def profile_execution(
return result


### 5. DISTRIBUTED EXECUTION ACROSS NODES ###
def distributed_execute_on_gpus(
gpu_ids: List[int], func: Callable, *args: Any, **kwargs: Any
) -> List[Any]:
Expand Down Expand Up @@ -218,25 +149,16 @@ def task_wrapper(*args, **kwargs):
raise


# import asyncio


# Example function to run
def sample_task(n: int) -> int:
return n * n


# Run task asynchronously
asyncio.run(async_execute(sample_task, 10))
# # Example function to run
# def sample_task(n: int) -> int:
# return n * n

# Schedule task with high priority and delay of 5 seconds
execute_with_priority("high", sample_task, delay=5, n=10)

# Monitor resources during execution
monitor_resources()
# # Monitor resources during execution
# monitor_resources()

# Profile task execution and collect metrics
profile_execution(sample_task, 10)
# # Profile task execution and collect metrics
# profile_execution(sample_task, 10)

# Execute distributed across multiple GPUs
distributed_execute_on_gpus([0, 1], sample_task, 10)
# # Execute distributed across multiple GPUs
# distributed_execute_on_gpus([0, 1], sample_task, 10)
4 changes: 3 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api"

[tool.poetry]
name = "clusterops"
version = "0.0.3"
version = "0.0.5"
description = "Paper - Pytorch"
license = "MIT"
authors = ["Kye Gomez <[email protected]>"]
Expand All @@ -26,6 +26,8 @@ python = "^3.10"
loguru = "*"
ray = "*"
gputil = "*"
tenacity = "*"
psutil = "*"



Expand Down
2 changes: 2 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
loguru
ray
GPUtil
tenacity
psutil

0 comments on commit 274bf4f

Please sign in to comment.