Skip to content

Commit

Permalink
Add Parser mode functionality (#203)
Browse files Browse the repository at this point in the history
* first pass at a parser

* minor fixes

* Some logging options

* use new options

* more logging options

* minor linting

* minor linting

* release prep
  • Loading branch information
bellrichm authored Mar 8, 2024
1 parent a5bbf25 commit 62995f7
Show file tree
Hide file tree
Showing 5 changed files with 156 additions and 17 deletions.
40 changes: 36 additions & 4 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -85,25 +85,57 @@
"args": [
"configure",
//"driver",
//"service",
"service",

"--create-example", "mqttsubscribe.example.conf",
//"--create-example", "mqttsubscribe.example.conf",
//"--export", "${workspaceFolder}/tmp/temp.conf",
//"--enable", "true",
//"--top-level",
//"--validate",
"--validate",

// "--add-from", "${workspaceFolder}/devtools/smoketest.archive.conf",
//"--replace-with", "/home/richbell/mqttsubscribe.driver.conf",
//"--update-from", "${workspaceFolder}/devtools/smoketest.archive.conf",

//"--conf", "${workspaceFolder}/mqttsubscribe.example.conf",
"--conf", "${workspaceFolder}/mqttsubscribe.example.conf",
//"--conf", "/home/richbell/weewx-data/weewx.conf",
//"--conf", "/home/fork.weewx/weewx.debug.conf"
//"--output", "${workspaceFolder}/tmp/temp.conf",
//"--no-backup",
],
"console": "integratedTerminal"
},
{
"name": "Parse",
"type": "python",
"request": "launch",
"program": "${workspaceFolder}/bin/user/MQTTSubscribe.py",
"args": [
"parse",
//"driver",
"service",
//"--console",
"--top-level",

"--conf", "${workspaceFolder}/tmp/complex.json.conf",
"--topic", "topic1",
"--message-file", "${workspaceFolder}/tmp/complex.json",
"--log-file", "${workspaceFolder}/tmp/complex.log",
//"--log-level", "TRACE",

],
"console": "integratedTerminal"
},
{
"name": "Create Example",
"type": "python",
"request": "launch",
"program": "${workspaceFolder}/bin/user/MQTTSubscribe.py",
"args": [
"configure",
"--create-example", "mqttsubscribe.example.conf",
],
"console": "integratedTerminal"
},
{
// Requires MQTTSubscribe.py to be in 'weewx/bin/user' directory
Expand Down
120 changes: 112 additions & 8 deletions bin/user/MQTTSubscribe.py
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,7 @@
from weewx.engine import StdEngine, StdService
# pylint: enable=wrong-import-position

VERSION = '3.0.0-rc06a'
VERSION = '3.0.0-rc06'
DRIVER_NAME = 'MQTTSubscribeDriver'
DRIVER_VERSION = VERSION

Expand Down Expand Up @@ -2593,6 +2593,107 @@ def _validate_topics_section(self, parent, hierarchy, section, section_configspe
self.topic_as_field_deprecated_options,
error_msgs,
warn_msgs)
class Parser():
""" Parse a MQTT message that is read from a file. """
description='''
'''

class Msg:
''' A MQTT message.'''
# pylint: disable=too-few-public-methods
def __init__(self, topic, payload, qos, retain):
self.topic = topic
self.payload = payload
self.qos = qos
self.retain = retain

@classmethod
def add_common_options(cls, parser):
''' Add the comon options to the parser.'''
parser.add_argument("--conf",
required=True,
help="The WeeWX configuration file. Typically weewx.conf.")
parser.add_argument("--topic",
required=True,
help="The topic to 'publish' the '--message-file' message.")
parser.add_argument("--message-file",
required=True,
help="The file containing the MQTT message.")
parser.add_argument("--top-level", action="store_true", dest="top_level",
help="Use the complete input configuration as the MQTTSubscribeDriver/MQTTSubscribeService configuration section.")
parser.add_argument("--console", action="store_true", dest="console",
help="Log to console in addition to syslog.")
parser.add_argument("--log-file",
help="A file to log to.")
parser.add_argument("--log-level", choices=["TRACE", "DEBUG", "INFO", "ERROR"],
help="The logging level.",
default="NOTSET")
@classmethod
def add_parsers(cls, parser): # pragma: no cover
''' Add the parsers.'''
subparser = parser.add_parser('parse',
description=cls.description,
formatter_class=argparse.RawDescriptionHelpFormatter)

parser_subparsers = subparser.add_subparsers(dest='type')

parser_service_parser = parser_subparsers.add_parser('service')
cls.add_common_options(parser_service_parser)

parser_driver_parser = parser_subparsers.add_parser('driver')
cls.add_common_options(parser_driver_parser)

return subparser

def __init__(self, parser, options):
self.topic = options.topic
self.message_file = options.message_file

config_path = os.path.abspath(options.conf)
config_input_dict = configobj.ConfigObj(config_path, encoding='utf-8', file_error=True)

if options.type == 'service':
self.section = 'MQTTSubscribeService'
elif options.type == 'driver':
self.section = 'MQTTSubscribeDriver'
else:
self.section = None

if options.top_level:
if len(config_input_dict.sections) > 1:
parser.error(f"When specifying '--top-level, only one top level section is allowed. Found {config_input_dict.sections}")
self.config_dict = weeutil.config.deep_copy(config_input_dict[config_input_dict.sections[0]])
else:
self.config_dict = weeutil.config.deep_copy(config_input_dict[self.section])

topics_dict = self.config_dict.get('topics', None)
if topics_dict is None:
raise ValueError("[[topics]] is required.")

message_callback_config = self.config_dict.get('message_callback', None)

logger = Logger('Service', level=options.log_level, filename=options.log_file, console=options.console)
self.manager = TopicManager(None, topics_dict, logger)
self.message_callback_provider = MessageCallbackProvider(message_callback_config, logger, self.manager)

def parse(self):
''' Parse it'''
payload = ''
with open(self.message_file, encoding='UTF-8') as file_object:
message = file_object.readline()
while message:
payload += message
message = file_object.readline()

payload = payload.encode("utf-8")
msg = self.Msg(self.topic, payload, 0, 0)

self.message_callback_provider.on_message_multi(None, None, msg)

queue = self.manager._get_queue(self.topic) # pylint: disable=protected-access
data_queue = self.manager.get_data(queue)
for data in data_queue:
print(data)

class Simulator():
""" Run the service or driver. """
Expand Down Expand Up @@ -2814,7 +2915,6 @@ def simulate_service_archive(self):
logger.info(packet_msg)
print(packet_msg)


i += 1

service.shutDown()
Expand Down Expand Up @@ -3099,17 +3199,21 @@ def _update_interactively(self):
def main():
""" Run it."""

parser = argparse.ArgumentParser()
parser.add_argument('--version', action='version', version=f"MQTTSubscribe version is {VERSION}")
arg_parser = argparse.ArgumentParser()
arg_parser.add_argument('--version', action='version', version=f"MQTTSubscribe version is {VERSION}")

subparsers = parser.add_subparsers(dest='command')
subparsers = arg_parser.add_subparsers(dest='command')

parser_subparser = Parser.add_parsers(subparsers)
simulator_subparser = Simulator.add_parsers(subparsers)
configurator_subparser = Configurator.add_parsers(subparsers)

options = parser.parse_args()
options = arg_parser.parse_args()

if options.command == 'simulate':
if options.command == 'parse':
parser = Parser(parser_subparser, options)
parser.parse()
elif options.command == 'simulate':
simulator = Simulator(simulator_subparser, options)
simulator.init_configuration(simulator_subparser)
simulator.init_weewx()
Expand All @@ -3118,6 +3222,6 @@ def main():
configurator = Configurator(configurator_subparser, options)
configurator.run()
else:
parser.print_help()
arg_parser.print_help()

main()
8 changes: 6 additions & 2 deletions changes.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,15 @@ V3.0.0
Enhancements:
- Configuration mode
Provide functions to export, update, and validate MQTTSubscribe's configuration.
For more information see, https://github.com/bellrichm/WeeWX-MQTTSubscribe/wiki/MQTTSubscribe-Configurator-Mode.
For more information see, https://github.com/bellrichm/WeeWX-MQTTSubscribe/wiki/Configurator-Mode.

- Simulation mode
Simulates running MQTTSubscribe as a WeeWX driver or service.
For more information see, https://github.com/bellrichm/WeeWX-MQTTSubscribe/wiki/MQTTSubscribe-Simulator-mode
For more information see, https://github.com/bellrichm/WeeWX-MQTTSubscribe/wiki/Simulator-mode

- Parser mode
Read a MQTT 'message' from a file, parse it and extract the data.
For more information see, https://github.com/bellrichm/WeeWX-MQTTSubscribe/wiki/Parser-mode

- MQTTSubscribe's configuration is now validated on startup.
- Support for paho-mqtt v2.
Expand Down
2 changes: 1 addition & 1 deletion install.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

from weecfg.extension import ExtensionInstaller

VERSION = '3.0.0-rc06a'
VERSION = '3.0.0-rc06'

MQTTSUBSCRIBE_CONFIG = """
Expand Down
3 changes: 1 addition & 2 deletions mqttsubscribe.example.conf
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#
# This is an example configuration for MQTTSubscribe
# It was created on 2024-01-25 at 10:27:26 with MQTTSubscribe version 3.0.0-rc05
# It was created on 2024-03-08 at 08:38:12 with MQTTSubscribe version 3.0.0-rc06
#

# Replace '[MQTTSubscribe]' with '[MQTTSubscribeService]' or '[MQTTSubscribeDriver]'
Expand Down Expand Up @@ -99,7 +99,6 @@
ignore = False

# Configuration information about the MQTT message format for this topic
# ToDo: create wiki entry and reference it
[[[[message]]]]
# The format of the MQTT payload.
# Currently support: individual, json, keyword.
Expand Down

0 comments on commit 62995f7

Please sign in to comment.