Skip to content
This repository has been archived by the owner on Mar 3, 2023. It is now read-only.

Commit

Permalink
Fix new pylint issues
Browse files Browse the repository at this point in the history
  • Loading branch information
Code0x58 committed May 4, 2020
1 parent 5eecc70 commit 1ace938
Show file tree
Hide file tree
Showing 88 changed files with 313 additions and 400 deletions.
4 changes: 2 additions & 2 deletions heron/common/src/python/pex_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ def _get_deps_list(abs_path_to_pex):
Note that dependencies are located under `.deps` directory
"""
pex = zipfile.ZipFile(abs_path_to_pex, mode='r')
deps = list(set([re.match(egg_regex, i).group(1) for i in pex.namelist()
if re.match(egg_regex, i) is not None]))
deps = list({re.match(egg_regex, i).group(1) for i in pex.namelist()
if re.match(egg_regex, i) is not None})
return deps

def load_pex(path_to_pex, include_deps=True):
Expand Down
5 changes: 2 additions & 3 deletions heron/common/src/python/utils/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ def configure(level=logging.INFO, logfile=None):
# otherwise, use StreamHandler to output to stream (stdout, stderr...)
else:
log_format = "[%(asctime)s] %(log_color)s[%(levelname)s]%(reset)s: %(message)s"
# pylint: disable=redefined-variable-type
formatter = colorlog.ColoredFormatter(fmt=log_format, datefmt=date_format)
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(formatter)
Expand All @@ -85,9 +84,9 @@ def init_rotating_logger(level, logfile, max_files, max_bytes):
root_logger.addHandler(handler)

for handler in root_logger.handlers:
root_logger.debug("Associated handlers - " + str(handler))
root_logger.debug("Associated handlers - %s", str(handler))
if isinstance(handler, logging.StreamHandler):
root_logger.debug("Removing StreamHandler: " + str(handler))
root_logger.debug("Removing StreamHandler: %s", str(handler))
root_logger.handlers.remove(handler)

def set_logging_level(cl_args):
Expand Down
4 changes: 1 addition & 3 deletions heron/common/src/python/utils/proc.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ def async_stream_process_stderr(process, handler):
"""
return _async_stream_process_output(process, stream_process_stderr, handler)

class StringBuilder(object):
class StringBuilder:
def __init__(self):
self.end = False
self.strs = []
Expand All @@ -96,8 +96,6 @@ def result(self):
while True:
if self.end:
return ''.join(self.strs)
else:
continue

def async_stdout_builder(proc):
""" Save stdout into string builder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

'''sample.py: sample module as testdata for pex_loader unittest'''

class SampleClass(object):
class SampleClass:
"""Sample class"""
name = "sample class"
age = 100
6 changes: 3 additions & 3 deletions heron/executor/src/python/heron_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ def stdout_log_fn(cmd):
# Log the messages to stdout and strip off the newline because Log.info adds one automatically
return lambda line: Log.info("%s stdout: %s", cmd, line.rstrip('\n'))

class Command(object):
class Command:
"""
Command to run as a separate process using subprocess.POpen
:param cmd: command to run (as a list)
Expand Down Expand Up @@ -179,7 +179,7 @@ def __str__(self):
def __eq__(self, other):
return self.cmd == other.cmd

class ProcessInfo(object):
class ProcessInfo:
def __init__(self, process, name, command, attempts=1):
"""
Container for info related to a running process
Expand All @@ -200,7 +200,7 @@ def increment_attempts(self):
return self

# pylint: disable=too-many-instance-attributes,too-many-statements
class HeronExecutor(object):
class HeronExecutor:
""" Heron executor is a class that is responsible for running each of the process on a given
container. Based on the container id and the instance distribution, it determines if the container
is a master node or a worker node and it starts processes accordingly."""
Expand Down
2 changes: 1 addition & 1 deletion heron/executor/tests/python/heron_executor_unittest.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class CommandEncoder(json.JSONEncoder):
def default(self, o):
return o.cmd

class MockPOpen(object):
class MockPOpen:
"""fake subprocess.Popen object that we can use to mock processes and pids"""
next_pid = 0

Expand Down
4 changes: 2 additions & 2 deletions heron/instance/src/python/basics/base_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
from heronpy.api.state.stateful_component import StatefulComponent

# pylint: disable=too-many-instance-attributes
class BaseInstance(object):
class BaseInstance:
"""The base class for heron bolt/spout instance
Implements the following functionality:
Expand Down Expand Up @@ -83,7 +83,7 @@ def log(self, message, level=None):
if level is None:
_log_level = logging.INFO
else:
if level == "trace" or level == "debug":
if level in ("trace", "debug"):
_log_level = logging.DEBUG
elif level == "info":
_log_level = logging.INFO
Expand Down
7 changes: 4 additions & 3 deletions heron/instance/src/python/basics/bolt_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,8 @@ def emit(self, tup, stream=Stream.DEFAULT_STREAM_ID,
# Set the anchors for a tuple
if anchors is not None:
merged_roots = set()
for tup in [t for t in anchors if isinstance(t, HeronTuple) and t.roots is not None]:
merged_roots.update(tup.roots)
for tuple_ in [t for t in anchors if isinstance(t, HeronTuple) and t.roots is not None]:
merged_roots.update(tuple_.roots)
for rt in merged_roots:
to_add = data_tuple.roots.add()
to_add.CopyFrom(rt)
Expand All @@ -154,6 +154,7 @@ def emit(self, tup, stream=Stream.DEFAULT_STREAM_ID,
if direct_task is not None:
sent_task_ids.append(direct_task)
return sent_task_ids
return None

def process_incoming_tuples(self):
"""Should be called when tuple was buffered into in_stream
Expand Down Expand Up @@ -184,7 +185,7 @@ def _read_tuples_and_execute(self):
if isinstance(tuples, tuple_pb2.HeronTupleSet):
if tuples.HasField("control"):
raise RuntimeError("Bolt cannot get acks/fails from other components")
elif tuples.HasField("data"):
if tuples.HasField("data"):
stream = tuples.data.stream

for data_tuple in tuples.data.tuples:
Expand Down
10 changes: 5 additions & 5 deletions heron/instance/src/python/basics/spout_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ def emit(self, tup, tup_id=None, stream=Stream.DEFAULT_STREAM_ID,
if direct_task is not None:
sent_task_ids.append(direct_task)
return sent_task_ids
return None

# pylint: disable=no-self-use
def process_incoming_tuples(self):
Expand All @@ -191,7 +192,7 @@ def _read_tuples(self):
if isinstance(tuples, tuple_pb2.HeronTupleSet):
if tuples.HasField("data"):
raise RuntimeError("Spout cannot get incoming data tuples from other components")
elif tuples.HasField("control"):
if tuples.HasField("control"):
for ack_tuple in tuples.control.acks:
self._handle_ack_tuple(ack_tuple, True)
for fail_tuple in tuples.control.fails:
Expand Down Expand Up @@ -291,13 +292,12 @@ def _is_continue_to_work(self):

if not self.acking_enabled and self.output_helper.is_out_queue_available():
return True
elif self.acking_enabled and self.output_helper.is_out_queue_available() and \
if self.acking_enabled and self.output_helper.is_out_queue_available() and \
len(self.in_flight_tuples) < max_spout_pending:
return True
elif self.acking_enabled and not self.in_stream.is_empty():
if self.acking_enabled and not self.in_stream.is_empty():
return True
else:
return False
return False

def _look_for_timeouts(self):
spout_config = self.pplan_helper.context.get_cluster_config()
Expand Down
5 changes: 2 additions & 3 deletions heron/instance/src/python/instance/st_heron_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,10 @@
import resource
import signal
import traceback
from heron.common.src.python.utils import log
import yaml

from heron.common.src.python.utils import log
from heron.proto import physical_plan_pb2, tuple_pb2, ckptmgr_pb2, common_pb2

from heron.instance.src.python.utils.misc import HeronCommunicator
from heron.instance.src.python.utils.misc import SerializerHelper
from heron.instance.src.python.utils.misc import PhysicalPlanHelper
Expand All @@ -52,7 +51,7 @@ def set_resource_limit(max_ram):
resource.setrlimit(resource.RLIMIT_RSS, (max_ram, max_ram))

# pylint: disable=too-many-instance-attributes
class SingleThreadHeronInstance(object):
class SingleThreadHeronInstance:
"""SingleThreadHeronInstance is an implementation of Heron Instance in python"""
STREAM_MGR_HOST = "127.0.0.1"
METRICS_MGR_HOST = "127.0.0.1"
Expand Down
12 changes: 5 additions & 7 deletions heron/instance/src/python/network/event_looper.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

from heron.common.src.python.utils.log import Log

class EventLooper(object):
class EventLooper:
"""EventLooper is a Python implementation of WakeableLooper.java
EventLooper is a class for scheduling recurring tasks that could:
Expand Down Expand Up @@ -85,15 +85,13 @@ def on_exit(self):
@abstractmethod
def do_wait(self):
"""Blocking operation, should be implemented by a subclass"""
pass

@abstractmethod
def wake_up(self):
"""Wakes up do_wait() operation, should be implemented by a subclass
Note that this method should be implemented in a thread-safe way.
"""
pass

def add_wakeup_task(self, task):
"""Add a wakeup task
Expand Down Expand Up @@ -135,9 +133,8 @@ def _get_next_timeout_interval(self):
"""
if len(self.timer_tasks) == 0:
return sys.maxsize
else:
next_timeout_interval = self.timer_tasks[0][0] - time.time()
return next_timeout_interval
next_timeout_interval = self.timer_tasks[0][0] - time.time()
return next_timeout_interval

def _execute_wakeup_tasks(self):
"""Executes wakeup tasks, should only be called from loop()"""
Expand All @@ -149,6 +146,7 @@ def _execute_wakeup_tasks(self):
def _trigger_timers(self):
"""Triggers expired timers"""
current = time.time()
while len(self.timer_tasks) > 0 and (self.timer_tasks[0][0] - current <= 0):
# pylint: disable=chained-comparison
while self.timer_tasks and (self.timer_tasks[0][0] - current <= 0):
task = heappop(self.timer_tasks)[1]
task()
3 changes: 1 addition & 2 deletions heron/instance/src/python/network/gateway_looper.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,7 @@ def poll(self, timeout=0.0):
Log.debug("Trivial error: " + str(err))
if err.args[0] != errno.EINTR:
raise
else:
return
return
Log.debug("Selected [r]: " + str(readable_lst) +
" [w]: " + str(writable_lst) + " [e]: " + str(error_lst))

Expand Down
5 changes: 1 addition & 4 deletions heron/instance/src/python/network/heron_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ def handle_write(self):
write_batch_time_sec = self.socket_options.nw_write_batch_time_ms * constants.MS_TO_SEC
write_batch_size_bytes = self.socket_options.nw_write_batch_size_bytes

# pylint: disable=chained-comparison
while (time.time() - start_cycle_time - write_batch_time_sec) < 0 and \
bytes_written < write_batch_size_bytes and len(self.out_buffer) > 0:
outgoing_pkt = self.out_buffer[0]
Expand Down Expand Up @@ -320,23 +321,20 @@ def on_connect(self, status):
Should be implemented by a subclass.
"""
pass

@abstractmethod
def on_response(self, status, context, response):
"""Called when the client receives a response
Should be implemented by a subclass.
"""
pass

@abstractmethod
def on_incoming_message(self, message):
"""Called when the client receives a message
Should be implemented by a subclass.
"""
pass

@abstractmethod
def on_error(self):
Expand All @@ -345,4 +343,3 @@ def on_error(self):
Note that this method is not called when a connection is not yet established.
In such a case, ``on_connect()`` with status == StatusCode.CONNECT_ERROR is called.
"""
pass
13 changes: 6 additions & 7 deletions heron/instance/src/python/network/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

from heron.common.src.python.utils.log import Log

class HeronProtocol(object):
class HeronProtocol:
"""Heron's application level network protocol"""
INT_PACK_FMT = ">I"
HEADER_SIZE = 4
Expand Down Expand Up @@ -84,7 +84,7 @@ def decode_packet(packet):

return typename, reqid, serialized_msg

class OutgoingPacket(object):
class OutgoingPacket:
"""Wrapper class for outgoing packet"""
def __init__(self, raw_data):
self.raw = str(raw_data)
Expand Down Expand Up @@ -137,7 +137,7 @@ def send(self, dispatcher):
sent = dispatcher.send(self.to_send)
self.to_send = self.to_send[sent:]

class IncomingPacket(object):
class IncomingPacket:
"""Helper class for incoming packet"""
def __init__(self):
"""Initializes IncomingPacket object"""
Expand Down Expand Up @@ -218,7 +218,7 @@ def __str__(self):
(str(self.id), self.is_header_read, self.is_complete)


class REQID(object):
class REQID:
"""Helper class for REQID"""
REQID_SIZE = 32

Expand Down Expand Up @@ -259,10 +259,9 @@ def __hash__(self):
def __str__(self):
if self.is_zero():
return "ZERO"
else:
return ''.join([str(i) for i in list(self.bytes)])
return ''.join([str(i) for i in list(self.bytes)])

class StatusCode(object):
class StatusCode:
"""StatusCode for Response"""
OK = 0
WRITE_ERROR = 1
Expand Down
6 changes: 3 additions & 3 deletions heron/instance/src/python/utils/metrics/metrics_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from heronpy.api.metrics import (CountMetric, MultiCountMetric, MeanReducedMetric,
ReducedMetric, MultiMeanReducedMetric, MultiReducedMetric)

class BaseMetricsHelper(object):
class BaseMetricsHelper:
"""Helper class for metrics management
It registers metrics to the metrics collector and provides methods for
Expand Down Expand Up @@ -325,7 +325,7 @@ def failed_tuple(self, stream_id, source_component, latency_in_ns):
self.update_count(self.FAIL_COUNT, key=global_stream_id)
self.update_reduced_metric(self.FAIL_LATENCY, latency_in_ns, global_stream_id)

class MetricsCollector(object):
class MetricsCollector:
"""Helper class for pushing metrics to Out-Metrics queue"""
def __init__(self, looper, out_metrics):
self.looper = looper
Expand Down Expand Up @@ -378,7 +378,7 @@ def _gather_one_metric(self, name, message):

if metric_value is None:
return
elif isinstance(metric_value, dict):
if isinstance(metric_value, dict):
for key, value in list(metric_value.items()):
if key is not None and value is not None:
self._add_data_to_message(message, name + "/" + str(key), value)
Expand Down
2 changes: 1 addition & 1 deletion heron/instance/src/python/utils/misc/communicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

from heron.common.src.python.utils.log import Log

class HeronCommunicator(object):
class HeronCommunicator:
"""HeronCommunicator: a wrapper class for non-blocking queue in Heron.
Note that this class does not yet implement the dynamic tuning of expected available capacity,
Expand Down
Loading

0 comments on commit 1ace938

Please sign in to comment.