Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Only one executing per subprocess being executed #2

Open
autibequi opened this issue Sep 16, 2022 · 6 comments
Open

Only one executing per subprocess being executed #2

autibequi opened this issue Sep 16, 2022 · 6 comments

Comments

@autibequi
Copy link

autibequi commented Sep 16, 2022

Hi there,

I'm trying to use this package but for some weird reason each subprocess is only executing once and hanging the rest of the code.

I'm using python 3.9, here is a snippet of the code:

# create page data for parallel execution
pages_data = []
for current_page in page_indices:
    pages_data.append((
        reader, 
        vertical, 
        title, 
        teacher,
        current_page,
        pages_total
    ))

# Creates final page and append to final PDF
with Pool() as executor:
    rendered_pages = executor.starmap(generators.CreatePageWithWatermark, pages_data)

    for page in rendered_pages:
        result_pdf_final.add_page(page)

Each generators.CreatePageWithWatermark execution prints the current page and only 1 page is being processed by each subprocess. If i have less pages than the total of cores the code run just fine.

Just for fun I've set 1000 subprocess and my pc freezed but all pages were rendered.

The same behavior happens on the aws lambda.

Thks! that's a great little package.

edit: so now (out of the blue) its working on my pc but not on awslambda, only 6 executions are happening. Very weird.

@autibequi
Copy link
Author

I managed to get this working.

For some reason when using less than 12 subprocess it simply get stuck until the lambda execution times out.

I forced with 24 and it worked but it consumed way too much ram. Not sure if this happens because of my workload or if it's how the child processes are beeing distributed.

Something that i noticed is that running locally my CPU usage graph looks like a square function graph.

Not sure why this behaviour I will take a look this weekend.

@mdavis-xyz
Copy link
Owner

What does generators.CreatePageWithWatermark do?
Please provide a minimal working example so that I can reproduce the behavior.

The queuing is currently a bit dumb. It assigns all tasks up front, assuming that each task takes approximately the same amount of time. Does that assumption hold for your use case?

@autibequi
Copy link
Author

I'm working with a lot of buffers to avoid saving the PDFs pages to disk (but this may be the solution for this scenario). The CreatePageWithWatermark() function mostly read a page from the buffer provided, generate a page and merge both in a new buffer.

It took a while, but I managed to recreate the issue with the code below:

from lambda_multiprocessing import Pool as LambdaPool
import timeit
import time
import requests

def CreatePageWithWatermark(title, author, page, buffer):
    time.sleep(5)
    print(len(buffer))
    return buffer


def PDFMergeWorker(event, context):
    start_time = timeit.default_timer()

    url='https://pdfs.semanticscholar.org/c029/baf196f33050ceea9ecbf90f054fd5654277.pdf'
    r = requests.get(url, stream=True)

    pages_total = 100

    pages_data = [(
            'test',
            'test',
            current_page,
            r.content,
        ) for current_page in range(0,100)]
    
    with LambdaPool() as executor:
        rendered_pages = executor.starmap(CreatePageWithWatermark, pages_data)


    # Speedometer
    stop_time = timeit.default_timer()
    delta_time = stop_time - start_time
    print('Time: ', delta_time)  
    print('Time per Page: ',  delta_time / pages_total)  

    return 'Done'

The behavior is the same. on my machine there is only 16 print() calls (i have 16 threads) and on aws lambda only 6 print() call are made. (i setted 10G for this lambda)

If you remove the sleep() call the code runs fine.
If you keep the sleep() and remove the pdf stream argument, the code also runs fine.

Only with both the code crashes.

@mdavis-xyz
Copy link
Owner

mdavis-xyz commented Sep 19, 2022

Hmm, interesting.

I have seen an issue previously where print statements within a for loop, in one process, containing a sleep get buffered. So if you're printing thousands of lines, the stdout buffer will fill and get flushed. But if it's low volume stuff stays in the buffer longer than you'd expect.

I would expect that the process termination would result in the stdout buffer being flushed. But I don't know what special stuff Lambda does with stdout.

If you swap print for sys.stdout.write, do you get the same behaviour?

Also, in your MWE you're doing only one requests.get() call, and then attempting to operate on the .content response 100 times concurrently. Are you sure that whatever .requests returns is itself concurrency safe? I don't know what len() does to a buffer object. Does it consume it?

(I will play around to debug this weekend.)

@mdavis-xyz
Copy link
Owner

What is this script supposed to do? That URL ends with .pdf, but it's not a PDF.

This is what it redirects to:

Screenshot from 2022-09-20 20-39-40

Are you trying to download a PDF, and process each page in the PDF in parallel?
Are you trying to stream the download of the PDF, and process each page while downloading the next page?

@mdavis-xyz
Copy link
Owner

Have you looked at this again?

I'm not able to reproduce the bug because the example is not a minimal working example.

Most importantly:

  • You described the failure mode as "hanging" and "crashes". Those are two different things. (Opposites, really). Hanging is when the script appears to be doing nothing (it has not exited, but it is not printing anything either). Whereas I think of "crash" as an uncaught exception being thrown (in which case you should paste the exception in), or a lower level fault (like a seg fault), in which case you should also paste in the text output. So I don't know what outcome I'm trying to reproduce. In particular, for "hanging", how long did you wait, relative to how long it takes multiprocessing.Pool to run the script? It could be just that stdout is buffered until the end of the script, but the code is otherwise running as expected, so you just need to wait longer. Try calling sys.stdout.flush() immediately after the print, before the sleep, and after .map(). (You'll need import sys.)
  • pages_data is a list of tuples of ints, strings and a byte array. It contains nothing exotic, or specific to PDFs or the requests library, really. Can you just hard-code pages_data and reproduce the issue?
  • Can you confirm that you ran this code on your laptop, using the standard multiprocessing.Pool() function instead of this library, and you got a different result?

But also:

  • you mention "remove the pdf stream argument", but it's not clear which argument that is. Do you mean stream=True for the requests.get() call, which is not related to the PDF format or any PDF library?
  • your functions and variables are named after PDFs, but you're not doing anything PDF-specific. If the issue is related to streaming content with the requests library, can you download non-PDF content? (Yes I suspect whatever issue you're seeing is related to the concurrency safety of the return value of requests.get() with stream=True)
  • For a machine with N CPUs, this example will take 100*5/N seconds. If you drop the number of 'pages' to 10, or 2, do you still get the same issue? If you drop the sleep time to 1s, do you still get the same issue? If you can do both then we can reproduce the issue in 2 seconds instead of 80, which makes it easier to resolve.
  • You define pages_total, but then on the next line you use 100 as a literal. So why define pages_total? If you don't need it to reproduce the issue, then delete it, check you still get the issue, and post the script without it.
  • Why do you specify stream=True? The very first thing you do after calling .get(stream=True) is to access .content, which will call .read() on the stream. So why bother calling stream=True? Since you're accessing .content repeatedly you're reading from the stream for the first entry of pages_data, and then accessing a cache of it on the next.
  • Why did you name your argument buffer? It's a bytes array, not a file-like object. (Note that in general I would expect that buffers can't be safely passed between threads unless the relevant documentation explicitly says it can.)
  • Why does CreatePageWithWatermark return something? Why is the result from .map() saved to a variable? Yes for your real usage that is probably required. But for providing a minimal reproducable example you should delete everything that isn't required to reproduce the bug.
  • Since you are using stream=True, have you tried using with requests.get(stream=True) as r (see the docs)
  • You imported the timeit module, but you just use it to return the current time at two points. Why not just use time.time()? You're printing out time values. What times are you expecting? What times are you observing?
  • That specific URL returns a body of length 0, and an HTTP 301 redirect status. The requests library follows the redirect (I think). Can you reproduce this issue with a URL that isn't redirected? In fact that URL redirects in my browser to this URL, which actually returns an HTTP 202 status. That's very strange. Can you reproduce this error with a normal URL? (e.g. a URL to a JPEG of a cat?)

Here's a MWE (except it doesn't reproduce the issue). (It could be reduced even further, depending on what the issue is.)

import multiprocessing
import time
import sys

from lambda_multiprocessing import Pool as LambdaPool
import requests

def CreatePageWithWatermark(body: bytes, url):
    time.sleep(1)
    ret = (len(body), url)
    print(f"Returning ({type(ret[0])}, {type(ret[1])})")
    return ret

def PDFMergeWorker(event={}, context=None):

    url='https://example.com/'
    r = requests.get(url)

    pages_data = [(
            r.content,
            url
        ) for _ in range(0,2)]
    
    with LambdaPool() as executor:
        rendered_pages = executor.starmap(CreatePageWithWatermark, pages_data)

    expected_length = len(requests.get(url).content)
    expected = [(expected_length, url) for _ in pages_data]
    assert(rendered_pages == expected)

if __name__ == '__main__':
    PDFMergeWorker()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants