-
Notifications
You must be signed in to change notification settings - Fork 76
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
UnicodeEncodeError ... surrogates not allowed #18
Comments
Here is a proposal for an improvement import sys
import os
import io
import json
import argparse
import time
import concurrent.futures
from client import ApiClient
import ntpath
import requests
'''
This version uses the standard ProcessPoolExecutor for parallelizing the concurrent calls to the GROBID services.
Given the limits of ThreadPoolExecutor (input stored in memory, blocking Executor.map until the whole input
is acquired), it works with batches of PDF of a size indicated in the config.json file (default is 1000 entries).
We are moving from first batch to the second one only when the first is entirely processed - which means it is
slightly sub-optimal, but should scale better. Working without batch would mean acquiring a list of million of
files in directories would require something scalable too (e.g. done in a separate thread), which is not
implemented for the moment.
'''
class grobid_client(ApiClient):
"""
Command line cline for the RESTFul API of the GROBID service
see config.json for the configuraton of the service
"""
def __init__(self, config_path='./config.json'):
"""
construct me from the given config.json file
Args:
config_path(str): the file to use for configuration
"""
self.config = None
self._load_config(config_path)
def _load_config(self, path='./config.json'):
"""
Load the json configuration
Args:
path(str): the
"""
config_json = open(path).read()
self.config = json.loads(config_json)
# test if the server is up and running...
the_url = 'http://'+self.config['grobid_server']
if len(self.config['grobid_port'])>0:
the_url += ":"+self.config['grobid_port']
the_url += "/api/isalive"
r = requests.get(the_url)
status = r.status_code
if status != 200:
print('GROBID server does not appear up and running ' + str(status))
else:
print("GROBID server is up and running")
self.debug=self.config['debug']
def getPDFs(self,input_path):
'''
get PDFs for the given input_path
Args:
input_path(str): the path to the input PDF files
Returns:
list: a list of paths to the PDF files found having a pdf extension in the director tree specified by the input_path
'''
pdf_files = []
for (dirpath, dirnames, filenames) in os.walk(input_path):
if self.debug:
print(dirpath, dirnames, filenames)
for filename in filenames:
if filename.endswith('.pdf') or filename.endswith('.PDF'):
if self.debug:
try:
print(filename)
except Exception:
# may happen on linux see https://stackoverflow.com/questions/27366479/python-3-os-walk-file-paths-unicodeencodeerror-utf-8-codec-cant-encode-s
pass
pdf_files.append(os.sep.join([dirpath, filename]))
return pdf_files
def process(self, service, input_path,
output=None,
n=10,
generateIDs=False,
consolidate_header=True,
consolidate_citations=False,
include_raw_citations=False,
include_raw_affiliations=False,
teiCoordinates=False,
force=True):
batch_size_pdf = self.config['batch_size']
print("Processing files from %s" % input_path)
pdf_files = self.getPDFs(input_path)
total=len(pdf_files)
print("Processing %d PDF files" % total)
for fromIndex in range(0,total,batch_size_pdf):
toIndex=fromIndex+batch_size_pdf
if toIndex>total: toIndex=total
pdf_files_batch=pdf_files[fromIndex:toIndex]
print("Processing %5d - %5d " % (fromIndex+1,toIndex))
self.process_batch(service, pdf_files_batch, output, n, generateIDs, consolidate_header, consolidate_citations, include_raw_citations, include_raw_affiliations, teiCoordinates, force)
def process_batch(self, service, pdf_files, output, n, generateIDs, consolidate_header, consolidate_citations, include_raw_citations, include_raw_affiliations, teiCoordinates, force):
print(len(pdf_files), "PDF files to process")
#with concurrent.futures.ThreadPoolExecutor(max_workers=n) as executor:
with concurrent.futures.ProcessPoolExecutor(max_workers=n) as executor:
for pdf_file in pdf_files:
executor.submit(self.process_pdf, service, pdf_file, output, generateIDs, consolidate_header, consolidate_citations, include_raw_citations, include_raw_affiliations, teiCoordinates, force)
def process_pdf(self, service, pdf_file, output, generateIDs, consolidate_header, consolidate_citations, include_raw_citations, include_raw_affiliations, teiCoordinates, force):
# check if TEI file is already produced
# we use ntpath here to be sure it will work on Windows too
pdf_file_name = ntpath.basename(pdf_file)
if output is not None:
filename = os.path.join(output, os.path.splitext(pdf_file_name)[0] + '.tei.xml')
else:
filename = os.path.join(ntpath.dirname(pdf_file), os.path.splitext(pdf_file_name)[0] + '.tei.xml')
if not force and os.path.isfile(filename):
print(filename, "already exist, skipping... (use --force to reprocess pdf input files)")
return
print(pdf_file)
files = {
'input': (
pdf_file,
open(pdf_file, 'rb'),
'application/pdf',
{'Expires': '0'}
)
}
the_url = 'http://'+self.config['grobid_server']
if len(self.config['grobid_port'])>0:
the_url += ":"+self.config['grobid_port']
the_url += "/api/"+service
# set the GROBID parameters
the_data = {}
if generateIDs:
the_data['generateIDs'] = '1'
if consolidate_header:
the_data['consolidateHeader'] = '1'
if consolidate_citations:
the_data['consolidateCitations'] = '1'
if include_raw_citations:
the_data['includeRawCitations'] = '1'
if include_raw_affiliations:
the_data['includeRawAffiliations'] = '1'
if teiCoordinates:
the_data['teiCoordinates'] = self.config['coordinates']
res, status = self.post(
url=the_url,
files=files,
data=the_data,
headers={'Accept': 'text/plain'}
)
if status == 503:
time.sleep(self.config['sleep_time'])
return self.process_pdf(pdf_file, output, service, generateIDs, consolidate_header, consolidate_citations, include_raw_citations, include_raw_affiliations, force, teiCoordinates)
elif status != 200:
print('Processing failed with error ' + str(status))
else:
# writing TEI file
try:
with io.open(filename,'w',encoding='utf8') as tei_file:
tei_file.write(res.text)
except OSError:
print ("Writing resulting TEI XML file %s failed" % filename)
pass
if __name__ == "__main__":
parser = argparse.ArgumentParser(description = "Client for GROBID services")
parser.add_argument("service", help="one of [processFulltextDocument, processHeaderDocument, processReferences]")
parser.add_argument("--input", default=None, help="path to the directory containing PDF to process")
parser.add_argument("--output", default=None, help="path to the directory where to put the results (optional)")
parser.add_argument("--config", default="./config.json", help="path to the config file, default is ./config.json")
parser.add_argument("--n", default=10, help="concurrency for service usage")
parser.add_argument("--generateIDs", action='store_true', help="generate random xml:id to textual XML elements of the result files")
parser.add_argument("--consolidate_header", action='store_true', help="call GROBID with consolidation of the metadata extracted from the header")
parser.add_argument("--consolidate_citations", action='store_true', help="call GROBID with consolidation of the extracted bibliographical references")
parser.add_argument("--include_raw_citations", action='store_true', help="call GROBID requesting the extraction of raw citations")
parser.add_argument("--include_raw_affiliations", action='store_true', help="call GROBID requestiong the extraciton of raw affiliations")
parser.add_argument("--force", action='store_true', help="force re-processing pdf input files when tei output files already exist")
parser.add_argument("--teiCoordinates", action='store_true', help="add the original PDF coordinates (bounding boxes) to the extracted elements")
args = parser.parse_args()
input_path = args.input
config_path = args.config
output_path = args.output
if args.n is not None:
try:
n = int(args.n)
except ValueError:
print("Invalid concurrency parameter n:", n, "n = 10 will be used by default")
pass
# if output path does not exist, we create it
if output_path is not None and not os.path.isdir(output_path):
try:
print("output directory does not exist but will be created:", output_path)
os.makedirs(output_path)
except OSError:
print ("Creation of the directory %s failed" % output_path)
else:
print ("Successfully created the directory %s" % output_path)
service = args.service
generateIDs = args.generateIDs
consolidate_header = args.consolidate_header
consolidate_citations = args.consolidate_citations
include_raw_citations = args.include_raw_citations
include_raw_affiliations = args.include_raw_affiliations
force = args.force
teiCoordinates = args.teiCoordinates
client = grobid_client(config_path=config_path)
start_time = time.time()
client.process(service, input_path,
output=output_path,
n=n,
generateIDs=generateIDs,
consolidate_header=consolidate_header,
consolidate_citations=consolidate_citations,
include_raw_citations=include_raw_citations,
include_raw_affiliations=include_raw_affiliations,
teiCoordinates=teiCoordinates,
force=force)
runtime = round(time.time() - start_time, 3)
print("runtime: %s seconds " % (runtime)) |
Thanks a lot for the issue! I have opened PR #19 based on your proposal to catch the error when printing a file name with invalid unicode bytes. I've added a verbose option rather than "debug", which is more clear I think with what is done. However, I think we don't want to build a list of files before processing, because if we have millions of files (which is actually my use cases!), we might blow out the memory - this is why I introduced the processing by batch while walking in the subdirectories of PDF files to be processed. |
Hi @WolfgangFahl - thanks for looking into this. There are more advantages to changing the logic than just processing the files in advance. A proper progress bar will be possible and errors like the utf-8 file name problem can be checked in advance and files could be excluded from processing e.g. if they are inaccessible, to big or don't fit certain other criteria. The memory problem can be avoided by using a generator but i also think that even with millions of files the memory footprint will not be to bad 1 million files with a length of 100 bytes on average will just need 100 MB of main memory which is not a lot for computers these days. Given that e.g. in my case processing 50.000 files takes some 10 hours its probably a good idea to talk about memory per process and there the limitation is IMHO more in the time realm than in the memory realm. Tracking the process and being able to select files is IMHO crucial again in this context. Even creating a small SqLite table that tracks things might then be better than worrying about memory. The limitation would then be disk space and having a GB of disk space available is much easier than a GB of RAM. Whether this is all worthwhile and relevant IMHO also depends on how often the tools needs to be used. In my case it was just this on error and the second run was swift. |
Thanks for the feedback @WolfgangFahl ! The idea with this client was to provide an example of usage of the GROBID REST API for parallel processing, easy to adapt to the need of different users (like the client in Java or Javascript), so I tried to keep it simple. The exact workflow certainly differs from users and for instance I have other projects where I am using LMDB to manage at scale files and tasks, combining PDF harvesting/metadata enrichment (https://github.com/kermitt2/article-dataset-builder) - so it's like the SQLite you mention, and I simply reused and adapted this client. But I agree that for a small set of files like 50K PDF, having a progress bar for instance is more relevant than saving a bit of memory. We could enrich this client for this scenario maybe. |
see also http://ceur-ws.bitplan.com/index.php/Grobid#Test
The test input are the PDF files of the http://ceur-ws.org/ Workhop Proceeding Publishing site.
The site had 53133 PDF files to be processed at the time of the issue:
The service was installed using docker see documentation at http://ceur-ws.bitplan.com/index.php/Grobid
The environment is Ubuntu 20 and holds a complete copy of the original CEUR-WS files which are hosted at RWTH Aachen University.
What I tried
At this point some 1/3 of all files had been processed
What I had expected
That the processing would continue and at the end there would be a list of errors that happened during processing.
The text was updated successfully, but these errors were encountered: