Skip to content

Commit

Permalink
Pending tasks timeout (#2332)
Browse files Browse the repository at this point in the history
  • Loading branch information
cccs-mog authored Sep 29, 2024
1 parent a02f6fd commit 3003c89
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 3 deletions.
5 changes: 5 additions & 0 deletions conf/default/cuckoo.conf.default
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ scaling_semaphore = off
# A configurable wait time between updating the limit value of the scaling bounded semaphore
scaling_semaphore_update_timer = 10

# Specify a timeout for tasks, useful if you are bound to timely reports awaited by users
task_timeout = off
task_pending_timeout = 0
task_timeout_scan_interval = 30

# Enable creation of memory dump of the analysis machine before shutting
# down. Even if turned off, this functionality can also be enabled at
# submission. Currently available for: VirtualBox and libvirt modules (KVM).
Expand Down
21 changes: 18 additions & 3 deletions lib/cuckoo/core/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,8 @@
tasks_tags = Table(
"tasks_tags",
Base.metadata,
Column("task_id", Integer, ForeignKey("tasks.id")),
Column("tag_id", Integer, ForeignKey("tags.id")),
Column("task_id", Integer, ForeignKey("tasks.id", ondelete='cascade')),
Column("tag_id", Integer, ForeignKey("tags.id", ondelete='cascade')),
)


Expand Down Expand Up @@ -268,7 +268,7 @@ class Guest(Base):
manager = Column(String(255), nullable=False)
started_on = Column(DateTime(timezone=False), default=datetime.now, nullable=False)
shutdown_on = Column(DateTime(timezone=False), nullable=True)
task_id = Column(Integer, ForeignKey("tasks.id"), nullable=False, unique=True)
task_id = Column(Integer, ForeignKey("tasks.id", ondelete='cascade'), nullable=False, unique=True)

def __repr__(self):
return f"<Guest({self.id}, '{self.name}')>"
Expand Down Expand Up @@ -2078,6 +2078,21 @@ def list_tasks(

return tasks

def check_tasks_timeout(self, timeout):
"""Find tasks which were added_on more than timeout ago and clean
"""
tasks: List[Task] = []
ids_to_delete = []
if timeout == 0:
return
search = self.session.query(Task).filter(Task.status == TASK_PENDING).order_by(Task.added_on.desc())
tasks = search.all()
for task in tasks:
if task.added_on + timedelta(seconds = timeout) < datetime.now():
ids_to_delete.append(task.id)
if len(ids_to_delete) > 0:
self.session.query(Task).filter(Task.id.in_(ids_to_delete)).delete(synchronize_session=False)

def minmax_tasks(self):
"""Find tasks minimum and maximum
@return: unix timestamps of minimum and maximum
Expand Down
8 changes: 8 additions & 0 deletions lib/cuckoo/core/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ def __init__(self, maxcount=0):
self.analysis_threads: List[AnalysisManager] = []
self.analyzing_categories, categories_need_VM = load_categories()
self.machinery_manager = MachineryManager() if categories_need_VM else None
if self.cfg.cuckoo.get("task_timeout", False):
self.next_timeout_time = time.time() + self.cfg.cuckoo.get("task_timeout_scan_interval", 30)
log.info("Creating scheduler with max_analysis_count=%s", self.max_analysis_count or "unlimited")

@property
Expand Down Expand Up @@ -98,6 +100,12 @@ def do_main_loop_work(self, error_queue: queue.Queue) -> SchedulerCycleDelay:
if self.is_short_on_disk_space():
return SchedulerCycleDelay.LOW_DISK_SPACE

if self.cfg.cuckoo.get("task_timeout", False):
if self.next_timeout_time < time.time():
self.next_timeout_time = time.time() + self.cfg.cuckoo.get("task_timeout_scan_interval", 30)
with self.db.session.begin():
self.db.check_tasks_timeout(self.cfg.cuckoo.get("task_pending_timeout", 0))

analysis_manager: Optional[AnalysisManager] = None
with self.db.session.begin():
max_machines_reached = False
Expand Down
3 changes: 3 additions & 0 deletions tests/test_analysis_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,9 @@ def test_init(self, task: Task):
"sanitize_to_len": 24,
"scaling_semaphore": False,
"scaling_semaphore_update_timer": 10,
"task_pending_timeout": 0,
"task_timeout": False,
"task_timeout_scan_interval": 30,
"freespace_processing": 15000,
"periodic_log": False,
"fail_unserviceable": True,
Expand Down

0 comments on commit 3003c89

Please sign in to comment.