Skip to content

Commit

Permalink
WIP feed rename changes
Browse files Browse the repository at this point in the history
  • Loading branch information
Sandip117 committed Sep 12, 2024
1 parent 9aabda3 commit 69ce8b8
Show file tree
Hide file tree
Showing 8 changed files with 1,713 additions and 5 deletions.
713 changes: 713 additions & 0 deletions build/lib/control/action.py

Large diffs are not rendered by default.

80 changes: 80 additions & 0 deletions build/lib/control/filter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
str_about = '''
A simple class that provides rudimentary filtering on some input
directory for files conforming to some pattern. The primary purpose
of this class is to provide an alternative to the built-in chris_plugin
'PathMapper' object.
'''


from argparse import ArgumentParser, Namespace
from pathlib import Path
import pfmisc
import glob
import os

class PathFilter:
'''
A simple filter class that operates on directories to catalog
some filtered subset of the input filesystem space.
'''

def __init__(self, inputdir, outputdir, *args, **kwargs):
"""Main constructor
"""

self.inputdir : Path = inputdir
self.outputdir : Path = outputdir
self.glob : str = '*'
self.l_files : list = []
self.LOG : pfmisc.debug = None
self.b_filesOnly : bool = False

for k,v in kwargs.items():
if k == 'glob' : self.glob = v
if k == 'logger' : self.LOG = v
if k == 'only_files' : self.b_filesOnly = True

self.inputdir_filter(self.inputdir)

def __iter__(self):
return PathIterator(self)

def log(self, message, **kwargs):
if self.LOG: self.LOG(message)

def inputdir_filter(self, input: Path) -> list:
'''
Filter the files in Path according to the passed options.pattern --
mostly for debugging
'''

self.LOG("Parent directory contains at root level")
l_ls = [self.LOG(f) for f in os.listdir(str(input))]
self.LOG("Filtering files in %s containing '%s'" % (str(input), self.glob))
str_glob : str = '%s/%s' % (str(self.inputdir), self.glob)
self.LOG("glob = %s" % str_glob)

self.l_files = glob.glob(str_glob)

l_glob = [self.LOG(f) for f in self.l_files]
return self.l_files

class PathIterator:
'''
An iterator over the PathFilter class
'''

def __init__(self, pathfilter):

self._pathfilter = pathfilter
self._index = 0

def __next__(self):
'''
Iterate over the PathFilter self.files list
'''
if self._index < len(self._pathfilter.l_files):
result = (self._pathfilter.l_files[self._index])
self._index += 1
return (result, self._pathfilter.outputdir)
raise StopIteration
182 changes: 182 additions & 0 deletions build/lib/control/jobber.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
str_about = '''
This module provides the Jobber class -- an object designed to simplify/
abstract running CLI. The actual command to run is specified as a string,
and the Jobber class executes the command, returning to the caller a
dictionary structure containing misc info such as <stdout>, <stderr>, and
<returncode>.
'''

import subprocess
import os
os.environ['XDG_CONFIG_HOME'] = '/tmp'
import pudb
import json

class Jobber:

def __init__(self, d_args : dict):
"""Constructor for the jobber class.
Args:
d_args (dict): a dictionary of "arguments" (parameters) for the
object.
"""
self.args = d_args.copy()
if not 'verbosity' in self.args.keys(): self.args['verbosity'] = 0
if not 'noJobLogging' in self.args.keys(): self.args['noJobLogging'] = False

def dict2JSONcli(self, d_dict : dict) -> str:
"""Convert a dictionary into a CLI conformant JSON string.
An input dictionary of
{
'key1': 'value1',
'key2': 'value2'
}
is converted to a string:
"{\"key1\":\"value1\",\"key2\":\"value2\"}"
Args:
d_dict (dict): a python dictionary to convert
Returns:
str: CLI equivalent string.
"""

str_JSON = json.dumps(d_dict)
str_JSON = str_JSON.replace('"', r'\"')
return str_JSON

def dict2cli(self, d_dict : dict) -> str:
"""Convert a dictionary into a CLI conformant JSON string.
An input dictionary of
{
'key1': 'value1',
'key2': 'value2',
'key3': true,
'key4': false
}
is converted to a string:
"--key1 value1 --key2 value2 --key3"
Args:
d_dict (dict): a python dictionary to convert
Returns:
str: CLI equivalent string.
"""
str_cli : str = ""
for k,v in d_dict.items():
if type(v) == bool:
if v:
str_cli += '--%s ' % k
elif len(v):
str_cli += '--%s %s ' % (k, v)
return str_cli

def job_run(self, str_cmd):
"""
Running some CLI process via python is cumbersome. The typical/easy
path of
os.system(str_cmd)
is deprecated and prone to hidden complexity. The preferred
method is via subprocess, which has a cumbersome processing
syntax. Still, this method runs the `str_cmd` and returns the
stderr and stdout strings as well as a returncode.
Providing readtime output of both stdout and stderr seems
problematic. The approach here is to provide realtime
output on stdout and only provide stderr on process completion.
"""
d_ret : dict = {
'stdout': "",
'stderr': "",
'cmd': "",
'cwd': "",
'returncode': 0
}
str_stdoutLine : str = ""
str_stdout : str = ""

p = subprocess.Popen(
str_cmd.split(),
stdout = subprocess.PIPE,
stderr = subprocess.PIPE,
)

# Realtime output on stdout
while True:
stdout = p.stdout.readline()
if p.poll() is not None:
break
if stdout:
str_stdoutLine = stdout.decode()
if int(self.args['verbosity']):
print(str_stdoutLine, end = '')
str_stdout += str_stdoutLine
d_ret['cmd'] = str_cmd
d_ret['cwd'] = os.getcwd()
d_ret['stdout'] = str_stdout
d_ret['stderr'] = p.stderr.read().decode()
d_ret['returncode'] = p.returncode
with open('/tmp/job.json', 'w') as f:
json.dump(d_ret, f, indent=4)
if int(self.args['verbosity']) and len(d_ret['stderr']):
print('\nstderr: \n%s' % d_ret['stderr'])
return d_ret

def job_runbg(self, str_cmd : str) -> dict:
"""Run a job in the background
Args:
str_cmd (str): CLI string to run
Returns:
dict: a dictionary of exec state
"""
d_ret : dict = {
'uid' : "",
'cmd' : "",
'cwd' : ""
}
# str_stdoutLine : str = ""
# str_stdout : str = ""

p = subprocess.Popen(
str_cmd.split(),
stdout = subprocess.PIPE,
stderr = subprocess.PIPE,
)

d_ret['uid'] = str(os.getuid())
d_ret['cmd'] = str_cmd
d_ret['cwd'] = os.getcwd()
# d_ret['stdout'] = str_stdout
# d_ret['stderr'] = p.stderr.read().decode()
# d_ret['returncode'] = p.returncode
# if int(self.args['verbosity']) and len(d_ret['stderr']):
# print('\nstderr: \n%s' % d_ret['stderr'])
return d_ret

def job_stdwrite(self, d_job, str_outputDir, str_prefix = ""):
"""
Capture the d_job entries to respective files.
"""
if not self.args['noJobLogging']:
for key in d_job.keys():
with open(
'%s/%s%s' % (str_outputDir, str_prefix, key), "w"
) as f:
f.write(str(d_job[key]))
f.close()
return {
'status': True
}
Loading

0 comments on commit 69ce8b8

Please sign in to comment.