Skip to content
This repository has been archived by the owner on Jul 14, 2021. It is now read-only.

need to upgrade all use of threading module to multiprocessing module #37

Open
4 tasks
2bndy5 opened this issue Sep 25, 2019 · 1 comment
Open
4 tasks
Assignees
Labels
bug Something isn't working chore Maintenance-related requests enhancement New feature or request help wanted Extra attention is needed master wizard This opportunity is meant for experts

Comments

@2bndy5
Copy link
Member

2bndy5 commented Sep 25, 2019

Just found out that the way I'm "joining" existing threads isn't a proper way "kill" them. The multiprocessing module seems to be an abstraction to aid this implementation. Need to do more research and testing though. I will start this escapade on the Drivetrain repo.

  • Drivetrain
  • GPS_Serial
  • integrated Terminal
  • IMU and distances sensors

see this example of how to handle the raspi's camera using multiprocessing

@2bndy5 2bndy5 self-assigned this Sep 25, 2019
@2bndy5 2bndy5 added bug Something isn't working chore Maintenance-related requests enhancement New feature or request help wanted Extra attention is needed labels Sep 25, 2019
@tejashah88
Copy link
Member

Here's some example code for using the multiprocessing module. It's for downloading a bunch of files in parallel.

import os
import urllib.request
import multiprocessing
import signal
import time

import numpy as np

def ensure_dir_exists(filename):
    dirname = os.path.dirname(filename)
    if not os.path.exists(dirname):
        try:
            os.makedirs(dirname)
        except OSError as exc: # Guard against race condition
            if exc.errno != errno.EEXIST:
                raise

def download_file(url, filename):
    # Create the containing directory if it doesn't exist
    ensure_dir_exists(filename)

    # Don't download the file if it already exists!
    if not os.path.exists(filename):
        urllib.request.urlretrieve(url, filename)

# Retrieves the contents of a file either locally or remotely
def get_contents_from_source(url, filename):
    download_file(url, filename)

    # Read the file and clean each of the lines for whitespacing
    with open(filename, 'r') as file:
        raw_contents = np.char.strip(file.readlines())

    return raw_contents

class ParallelFileDownloader:
    def __init__(self, num_processes = multiprocessing.cpu_count()):
        # The initializer argument tells the workers to ignore the 'Ctrl+C' signal,
        # to avoid raising any exceptions in the worker processes
        self.num_processes = num_processes
        self.pool = multiprocessing.Pool(
            processes = self.num_processes,
            initializer = lambda: signal.signal(signal.SIGINT, signal.SIG_IGN)
        )

    def download_files(self, files, downloader_fn):
        # Generate a list of 2-tuple params for the process workers to parse
        task_params = list(enumerate(files))
        num_tasks = len(task_params)

        print('\nRetrieving {0} files with {1} processors...'.format(num_tasks, self.num_processes))

        tasks_remaining = len(task_params)
        tasks_in_progress = -1

        try:
            result = self.pool.map_async(downloader_fn, task_params, chunksize=1)

            # This loop just keeps a track of the number of tasks in progress and remaining
            while not result.ready():
                tasks_remaining = result._number_left
                tasks_in_progress = min(self.num_processes, tasks_remaining)
                time.sleep(0.1)

            tasks_remaining = result._number_left
            tasks_in_progress = min(self.num_processes, tasks_remaining)
        except KeyboardInterrupt:
            # If the user wants to cancel, gracefully shut down
            self.pool.terminate()

            # Join all the processes to properly clean up
            self.pool.join()

            # Raise the error again to acknowledge the user wanting to cancel the operation
            raise KeyboardInterrupt
        except ValueError:
            print('Pool is already closed! Cancelling current job...')

        # Properly print the right number of newlines after the progress bars are done printing
        print('\n' * (num_tasks - tasks_remaining + tasks_in_progress))

    # Closes the pool to prevent more tasks from being processed and join the existing processes
    def cleanup(self):
        self.pool.close()
        self.pool.join()

@2bndy5 2bndy5 added the master wizard This opportunity is meant for experts label Sep 26, 2019
@tejashah88 tejashah88 added this to the Late November 2019 Sprint milestone Oct 6, 2019
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
bug Something isn't working chore Maintenance-related requests enhancement New feature or request help wanted Extra attention is needed master wizard This opportunity is meant for experts
Projects
None yet
Development

No branches or pull requests

2 participants