Skip to content

Commit

Permalink
Merge pull request #304 from willcl-ark/cache-warnet
Browse files Browse the repository at this point in the history
Cache warnets on the server
  • Loading branch information
pinheadmz authored Mar 7, 2024
2 parents 942026b + 10e3e12 commit ee9b41c
Showing 1 changed file with 41 additions and 25 deletions.
66 changes: 41 additions & 25 deletions src/warnet/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ def __init__(self, backend):
self.setup_global_exception_handler()
self.setup_logging()
self.setup_rpc()
self.warnets: dict = dict()
self.logger.info("Started server")

# register a well known /-/healthy endpoint for liveness tests
Expand Down Expand Up @@ -179,13 +180,27 @@ def setup_rpc(self):
# Logs
self.jsonrpc.register(self.logs_grep)

def get_warnet(self, network: str) -> Warnet:
"""
Will get a warnet from the cache if it exists.
Otherwise it will create the network using from_network() and save it
to the cache before returning it.
"""
if network in self.warnets:
return self.warnets[network]
wn = Warnet.from_network(network, self.backend)
if isinstance(wn, Warnet):
self.warnets[network] = wn
return wn
raise ServerError(f"Could not find warnet {network}")

def tank_bcli(
self, node: int, method: str, params: list[str] | None = None, network: str = "warnet"
) -> str:
"""
Call bitcoin-cli on <node> <method> <params> in [network]
"""
wn = Warnet.from_network(network, self.backend)
wn = self.get_warnet(network)
try:
return wn.container_interface.get_bitcoin_cli(wn.tanks[node], method, params)
except Exception as e:
Expand All @@ -197,7 +212,7 @@ def tank_lncli(self, node: int, command: list[str], network: str = "warnet") ->
"""
Call lightning cli on <node> <command> in [network]
"""
wn = Warnet.from_network(network, self.backend)
wn = self.get_warnet(network)
try:
return wn.container_interface.ln_cli(wn.tanks[node], command)
except Exception as e:
Expand All @@ -221,7 +236,7 @@ def tank_messages(self, network: str, node_a: int, node_b: int) -> str:
"""
Fetch messages sent between <node_a> and <node_b>.
"""
wn = Warnet.from_network(network, self.backend)
wn = self.get_warnet(network)
try:
messages = [
msg
Expand Down Expand Up @@ -269,7 +284,7 @@ def network_export(self, network: str) -> str:
Export all data for sim-ln to subdirectory
"""
try:
wn = Warnet.from_network(network)
wn = self.get_warnet(network)
subdir = os.path.join(wn.config_dir, "simln")
os.makedirs(subdir, exist_ok=True)
wn.export(subdir)
Expand Down Expand Up @@ -424,23 +439,21 @@ def scenarios_list_running(self) -> list[dict]:
return running

def network_up(self, network: str = "warnet") -> str:
def thread_start(wn):

def thread_start(server: Server, network):
try:
# wn.container_interface.up()
# Update warnet from docker here to get ip addresses
wn = Warnet.from_network(network, self.backend)
wn = server.get_warnet(network)
wn.apply_network_conditions()
wn.wait_for_health()
self.logger.info(
server.logger.info(
f"Successfully resumed warnet named '{network}' from config dir {wn.config_dir}"
)
except Exception as e:
trace = traceback.format_exc()
self.logger.error(f"Unhandled exception bringing network up: {e}\n{trace}")
server.logger.error(f"Unhandled exception bringing network up: {e}\n{trace}")

try:
wn = Warnet.from_network(network, self.backend)
t = threading.Thread(target=lambda: thread_start(wn))
t = threading.Thread(target=lambda: thread_start(self, network))
t.daemon = True
t.start()
return "Resuming warnet..."
Expand All @@ -456,11 +469,12 @@ def network_from_file(
Run a warnet with topology loaded from a <graph_file>
"""

def thread_start(wn, lock: threading.Lock):
with lock:
def thread_start(server: Server, network):
with server.image_build_lock:
try:
wn = server.get_warnet(network)
wn.generate_deployment()
# wn.write_fork_observer_config()
wn.write_fork_observer_config()
wn.warnet_build()
wn.warnet_up()
wn.wait_for_health()
Expand All @@ -480,11 +494,13 @@ def thread_start(wn, lock: threading.Lock):
raise ServerError(message=message, code=CONFIG_DIR_ALREADY_EXISTS)

try:
wn = Warnet.from_graph_file(graph_file, config_dir, network, self.backend)
t = threading.Thread(target=lambda: thread_start(wn, self.image_build_lock))
self.warnets[network] = Warnet.from_graph_file(
graph_file, config_dir, network, self.backend
)
t = threading.Thread(target=lambda: thread_start(self, network))
t.daemon = True
t.start()
return wn._warnet_dict_representation()
return self.warnets[network]._warnet_dict_representation()
except Exception as e:
msg = f"Error bring up warnet: {e}"
self.logger.error(msg)
Expand Down Expand Up @@ -528,7 +544,7 @@ def network_down(self, network: str = "warnet") -> str:
"""
Stop all containers in <network>.
"""
wn = Warnet.from_network(network, self.backend)
wn = self.get_warnet(network)
try:
wn.warnet_down()
return "Stopping warnet"
Expand All @@ -541,15 +557,15 @@ def network_info(self, network: str = "warnet") -> dict:
"""
Get info about a warnet network named <network>
"""
wn = Warnet.from_network(network, self.backend)
wn = self.get_warnet(network)
return wn._warnet_dict_representation()

def network_status(self, network: str = "warnet") -> list[dict]:
"""
Get running status of a warnet network named <network>
"""
try:
wn = Warnet.from_network(network, self.backend)
wn = self.get_warnet(network)
stats = []
for tank in wn.tanks:
status = {"tank_index": tank.index, "bitcoin_status": tank.status.name.lower()}
Expand All @@ -569,7 +585,7 @@ def network_connected(self, network: str = "warnet") -> bool:
Indicate whether all of the graph edges are connected in <network>
"""
try:
wn = Warnet.from_network(network, self.backend)
wn = self.get_warnet(network)
return wn.network_connected()
except Exception as e:
self.logger.error(f"{e}")
Expand All @@ -585,7 +601,7 @@ def generate_deployment(self, graph_file: str, network: str = "warnet") -> str:
message = f"Config dir {config_dir} already exists, not overwriting existing warnet without --force"
self.logger.error(message)
raise ServerError(message=message, code=CONFIG_DIR_ALREADY_EXISTS)
wn = Warnet.from_graph_file(graph_file, config_dir, network, self.backend)
wn = self.get_warnet(network)
wn.generate_deployment()
if not wn.deployment_file or not wn.deployment_file.is_file():
raise ServerError(f"No deployment file found at {wn.deployment_file}")
Expand All @@ -610,7 +626,7 @@ def logs_grep(self, pattern: str, network: str = "warnet") -> str:
Grep the logs from the fluentd container for a regex pattern
"""
try:
wn = Warnet.from_network(network, self.backend)
wn = self.get_warnet(network)
return wn.container_interface.logs_grep(pattern, network)
except Exception as e:
msg = f"Error grepping logs using pattern {pattern}: {e}"
Expand All @@ -622,7 +638,7 @@ def exec_run(self, index: int, service_type: int, cmd: str, network: str = "warn
Execute an arbitrary command in an arbitrary container,
identified by tank index and ServiceType
"""
wn = Warnet.from_network(network, self.backend)
wn = self.get_warnet(network)
return wn.container_interface.exec_run(index, ServiceType(service_type), cmd)


Expand Down

0 comments on commit ee9b41c

Please sign in to comment.