Skip to content

Commit

Permalink
Use telnetlib3 to replace the now obsolete telnetlib.
Browse files Browse the repository at this point in the history
bcoconni committed Jan 11, 2025
1 parent 70505e6 commit 14e6339
Showing 4 changed files with 165 additions and 102 deletions.
1 change: 1 addition & 0 deletions python/requirements.txt
Original file line number Diff line number Diff line change
@@ -4,3 +4,4 @@ cython
pandas
scipy
setuptools>=60.0.0
telnetlib3
4 changes: 2 additions & 2 deletions src/input_output/FGInputSocket.cpp
Original file line number Diff line number Diff line change
@@ -259,15 +259,15 @@ void FGInputSocket::Read(bool Holding)
} else if (command == "help") { // HELP

socket->Reply(
" JSBSim Server commands:\n\r\n"
" JSBSim Server commands:\r\n\r\n"
" get {property name}\r\n"
" set {property name} {value}\r\n"
" hold\r\n"
" resume\r\n"
" iterate {value}\r\n"
" help\r\n"
" quit\r\n"
" info\n\r\n");
" info\r\n\r\n");

} else {
socket->Reply(string("Unknown command: ") + command + "\r\n");
2 changes: 1 addition & 1 deletion src/input_output/FGfdmSocket.cpp
Original file line number Diff line number Diff line change
@@ -269,7 +269,7 @@ string FGfdmSocket::Receive(void)
int flags = fcntl(sckt_in, F_GETFL, 0);
fcntl(sckt_in, F_SETFL, flags | O_NONBLOCK);
#endif
if (send(sckt_in, "Connected to JSBSim server\n\rJSBSim> ", 36, 0) == SOCKET_ERROR)
if (send(sckt_in, "Connected to JSBSim server\r\nJSBSim> ", 36, 0) == SOCKET_ERROR)
LogSocketError("Receive - TCP connection acknowledgement");
}
}
260 changes: 161 additions & 99 deletions tests/TestInputSocket.py
Original file line number Diff line number Diff line change
@@ -3,7 +3,7 @@
# A test case that checks that providing commands to JSBSim via an input socket
# is working.
#
# Copyright (c) 2015-2022 Bertrand Coconnier
# Copyright (c) 2015-2024 Bertrand Coconnier
#
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
@@ -19,74 +19,86 @@
# this program; if not, see <http://www.gnu.org/licenses/>
#

import telnetlib, socket, time
import asyncio
import xml.etree.ElementTree as et
from JSBSim_utils import JSBSimTestCase, CopyAircraftDef, RunTest

import telnetlib3
from JSBSim_utils import CopyAircraftDef, JSBSimTestCase, RunTest


class TelnetInterface:
def __init__(self, fdm, port):
self.tn = telnetlib.Telnet("localhost", port)
self.fdm = fdm
fdm.run()

def __del__(self):
if (
"tn" in self.__dict__.keys()
): # Check if the Telnet session has been succesfully open
self.tn.close()
self.fdm = None

def sendCommand(self, command):
self.tn.write("{}\n".format(command).encode())
for _ in range(50):
self.fdm.run()
self.fdm.check_incremental_hold()
return self.getOutput()

def getOutput(self):
time.sleep(1.0) # Wait for the socket to process all the data.
return self.tn.read_very_eager().decode()

def getPropertyValue(self, property):
msg = self.sendCommand(f"get {property}").split("\n")
reader = None
writer = None

async def run(self, port, shell):
self.reader, self.writer = await telnetlib3.open_connection(
"localhost", port, shell=shell
)
await self.writer.protocol.waiter_closed

async def get_output(self):
msg = await self.reader.read(1024)
lines = msg.split("\r\n")
if lines[-1] == "JSBSim> ":
return "\n".join(lines[:-1])
else:
prompt = await self.reader.read(1024)
assert prompt == "JSBSim> "
return "\n".join(lines)

async def send_command(self, command):
self.writer.write(f"{command}\n")
await self.writer.drain()
return await self.get_output()

async def get_property_value(self, property_name):
msg = (await self.send_command(f"get {property_name}")).split("\n")
return float(msg[0].split("=")[1])


class TestInputSocket(JSBSimTestCase):
def setUp(self):
JSBSimTestCase.setUp(self)
def setUp(self, *args):
super().setUp(*args)
self._fdm = self.create_fdm()
self.telnet = TelnetInterface()
self.script_path = self.sandbox.path_to_jsbsim_file("scripts", "c1722.xml")

def sanityCheck(self, tn):
# Check that the connection has been established
out = tn.getOutput()
self.assertTrue(
out.split("\n")[0] == "Connected to JSBSim server",
msg="Not connected to the JSBSim server.\nGot message '%s' instead"
% (out,),
)

# Check that "help" returns the minimum set of commands that will be
# tested
self.assertEqual(
sorted(
map(
lambda x: x.split("{")[0].strip(),
tn.sendCommand("help").split("\n")[2:-2],
)
),
["get", "help", "hold", "info", "iterate", "quit", "resume", "set"],
)
self.assertion_failed = False

def tearDown(self):
self.assertFalse(self.assertion_failed)
super().tearDown()
self._fdm = None
self.telnet = None

def assertEqual(self, first, second, msg=None):
try:
super().assertEqual(first, second, msg)
except AssertionError as e:
print(e, flush=True)
self.assertion_failed = True

def assertAlmostEqual(self, first, second, places=None, msg=None, delta=None):
try:
super().assertAlmostEqual(first, second, places, msg, delta)
except AssertionError as e:
print(e, flush=True)
self.assertion_failed = True

async def run_fdm(self):
while True:
for _ in range(50):
if not self._fdm.run():
return
self._fdm.check_incremental_hold()
await asyncio.sleep(0.1)

def test_no_input(self):
fdm = self.create_fdm()
fdm.load_script(self.script_path)
fdm.run_ic()
fdm.hold()
self._fdm.load_script(self.script_path)
self._fdm.run_ic()
self._fdm.hold()

with self.assertRaises(socket.error):
TelnetInterface(fdm, 2222)
with self.assertRaises(OSError):
asyncio.run(self.run_test(2222, self.sanity_check))

def test_input_socket(self):
# First, extract the time step from the script file
@@ -95,93 +107,143 @@ def test_input_socket(self):

# The aircraft c172x does not contain an <input> tag so we need
# to add one.
tree, aircraft_name, b = CopyAircraftDef(self.script_path, self.sandbox)
tree, aircraft_name, _ = CopyAircraftDef(self.script_path, self.sandbox)
root = tree.getroot()
input_tag = et.SubElement(root, "input")
input_tag.attrib["port"] = "1137"
tree.write(self.sandbox("aircraft", aircraft_name, aircraft_name + ".xml"))

fdm = self.create_fdm()
fdm.set_aircraft_path("aircraft")
fdm.load_script(self.script_path)
fdm.run_ic()
fdm.hold()
self._fdm.set_aircraft_path("aircraft")
self._fdm.load_script(self.script_path)
self._fdm.run_ic()
self._fdm.hold()

asyncio.run(self.run_test(1137, lambda r, w: self.shell(root, dt, r, w)))

tn = TelnetInterface(fdm, 1137)
self.sanityCheck(tn)
async def shell(self, root, dt, reader, writer):
await self.sanity_check(reader, writer)
msg = (await self.telnet.send_command("info")).split("\n")

# Check the aircraft name and its version
msg = tn.sendCommand("info").split("\n")
self.assertEqual(msg[2].split(":")[1].strip(), root.attrib["name"].strip())
self.assertEqual(msg[1].split(":")[1].strip(), root.attrib["version"].strip())

# Check that the simulation time is 0.0
self.assertEqual(float(msg[3].split(":")[1].strip()), 0.0)
self.assertEqual(fdm.get_sim_time(), 0.0)
self.assertEqual(tn.getPropertyValue("simulation/sim-time-sec"), 0.0)
self.assertEqual(self._fdm.get_sim_time(), 0.0)
self.assertEqual(
await self.telnet.get_property_value("simulation/sim-time-sec"),
0.0,
)

# Check that 'iterate' iterates the correct number of times
tn.sendCommand("iterate 19")
self.assertEqual(fdm.get_sim_time(), 19.0 * dt)
await self.telnet.send_command("iterate 19")
self.assertEqual(self._fdm.get_sim_time(), 19.0 * dt)
self.assertAlmostEqual(
tn.getPropertyValue("simulation/sim-time-sec"),
fdm.get_sim_time(),
await self.telnet.get_property_value("simulation/sim-time-sec"),
self._fdm.get_sim_time(),
delta=1e-5,
)
self.assertTrue(fdm.holding())
self.assertTrue(self._fdm.holding())

# Wait a little bit and make sure that the simulation time has not
# changed meanwhile thus confirming that the simulation is on hold.
for _ in range(40):
fdm.run()
self.assertEqual(fdm.get_sim_time(), 19.0 * dt)
await asyncio.sleep(0.5)
self.assertEqual(self._fdm.get_sim_time(), 19.0 * dt)
self.assertAlmostEqual(
tn.getPropertyValue("simulation/sim-time-sec"),
fdm.get_sim_time(),
await self.telnet.get_property_value("simulation/sim-time-sec"),
self._fdm.get_sim_time(),
delta=1e-5,
)

# Modify the tank[0] contents via the "send" command
half_contents = 0.5 * tn.getPropertyValue("propulsion/tank/contents-lbs")
tn.sendCommand("set propulsion/tank/contents-lbs " + str(half_contents))
half_contents = 0.5 * (
await self.telnet.get_property_value("propulsion/tank/contents-lbs")
)
await self.telnet.send_command(
"set propulsion/tank/contents-lbs " + str(half_contents)
)
self.assertEqual(
tn.getPropertyValue("propulsion/tank/contents-lbs"), half_contents
await self.telnet.get_property_value("propulsion/tank/contents-lbs"),
half_contents,
)

# Check the resume/hold commands
t = fdm.get_sim_time()
tn.sendCommand("resume")
self.assertNotEqual(fdm.get_sim_time(), t)
self.assertFalse(fdm.holding())
tn.sendCommand("hold")
t = fdm.get_sim_time()
self.assertTrue(fdm.holding())
t = self._fdm.get_sim_time()
await self.telnet.send_command("resume")
self.assertNotEqual(self._fdm.get_sim_time(), t)
self.assertFalse(self._fdm.holding())
await self.telnet.send_command("hold")
t = self._fdm.get_sim_time()
self.assertTrue(self._fdm.holding())
self.assertAlmostEqual(
tn.getPropertyValue("simulation/sim-time-sec"), t, delta=1e-5
await self.telnet.get_property_value("simulation/sim-time-sec"),
t,
delta=1e-5,
)

# Wait a little bit and make sure that the simulation time has not
# changed meanwhile thus confirming that the simulation is on hold.
for _ in range(40):
fdm.run()
self.assertEqual(fdm.get_sim_time(), t)
await asyncio.sleep(0.5)
self.assertEqual(self._fdm.get_sim_time(), t)
self.assertAlmostEqual(
tn.getPropertyValue("simulation/sim-time-sec"), t, delta=1e-5
await self.telnet.get_property_value("simulation/sim-time-sec"),
t,
delta=1e-5,
)

writer.close()

async def sanity_check(self, _, __):
out = await self.telnet.get_output()

self.assertTrue(
out == "Connected to JSBSim server",
msg=f"Not connected to the JSBSim server.\nGot message '{out}' instead",
)

out = await self.telnet.send_command("help")

# Check that "help" returns the minimum set of commands that will be
# tested
self.assertEqual(
sorted(
map(
lambda x: x.split("{")[0].strip(),
out.split("\n")[2:-1],
)
),
["get", "help", "hold", "info", "iterate", "quit", "resume", "set"],
)

async def run_test(self, port, shell):
telnet_task = asyncio.create_task(self.telnet.run(port, shell))
fdm_task = asyncio.create_task(self.run_fdm())

done, pending = await asyncio.wait(
[telnet_task, fdm_task], return_when=asyncio.FIRST_COMPLETED
)

# Cancel the fdm_task if it is still pending
for task in pending:
task.cancel()

# Handle exceptions if any
for task in done:
if task.exception():
raise task.exception()

def test_script_input(self):
tree = et.parse(self.script_path)
input_tag = et.SubElement(tree.getroot(), "input")
input_tag.attrib["port"] = "1138"
tree.write("c1722_1.xml")

fdm = self.create_fdm()
fdm.load_script("c1722_1.xml")
fdm.run_ic()
fdm.hold()
self._fdm.load_script("c1722_1.xml")
self._fdm.run_ic()
self._fdm.hold()

tn = TelnetInterface(fdm, 1138)
self.sanityCheck(tn)
asyncio.run(self.run_test(1138, self.sanity_check))


RunTest(TestInputSocket)

0 comments on commit 14e6339

Please sign in to comment.