Skip to content

Commit

Permalink
Support local control connections again (#112)
Browse files Browse the repository at this point in the history
* Add tests for local control connections

To check for issue #111

* Handle local control connections properly again

For some reason, monetdbd does not use the block protocol for
connections over the unix domain socket with language == 'control'.

Earlier I removed the code that dealt with this because I thought
it wasn't used anymore.  Turns out it is.

* shutdown the socket after sending control command

After having sent the response to a control command, the server waits
for one full second before closing the connection.  It is actually
waiting for the client to send another command. This never happens
because because the control protocol is a one-shot protocol, but the
server does it anyway, probably because the same code is also used
for normal connections.

This means the client has to wait a full second before it knows it has
received the full response.

By shutting down the write side of the socket after sending the
command we let the server know no more commands are coming so it can
shut down the connection immediately.

This speeds up the test_control.py test significantly.

* Silence the checkers

* Pass the passphrase to TestLocalControl

* Fix permissions of monetdbd's .merovingian socket
  • Loading branch information
joerivanruth authored Sep 26, 2022
1 parent 9ccc37a commit 75a5334
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 9 deletions.
1 change: 1 addition & 0 deletions .github/workflows/pytest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ jobs:
sudo -u monetdb monetdbd set control=yes ${{ env.DBFARM }}
sudo -u monetdb monetdbd set passphrase=testdb ${{ env.DBFARM }}
sudo -u monetdb monetdbd start ${{ env.DBFARM }}
sudo -u monetdb chmod o+rwx /tmp/.s.mero*
- name: Create MonetDB test database
run: |
sudo -u monetdb monetdb create demo
Expand Down
43 changes: 38 additions & 5 deletions pymonetdb/mapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,10 @@ def _challenge_response(self, challenge: str, password: str): # noqa: C901

def _getblock_and_transfer_files(self):
""" read one mapi encoded block and take care of any file transfers the server requests"""
if self.language == 'control' and not self.hostname:
# control connections do not use the blocking protocol and do not transfer files
return self._recv_to_end()

buffer = self._get_buffer()
offset = 0

Expand All @@ -405,6 +409,9 @@ def _getblock_and_transfer_files(self):

def _getblock(self) -> str:
""" read one mapi encoded block """
if self.language == 'control' and not self.hostname:
# control connections do not use the blocking protocol
return self._recv_to_end()
buf = self._get_buffer()
end = self._getblock_raw(buf, 0)
ret = str(memoryview(buf)[:end], 'utf-8')
Expand Down Expand Up @@ -450,6 +457,19 @@ def _getbytes(self, buffer: bytearray, offset: int, count: int) -> int:
offset += n
return end

def _recv_to_end(self) -> str:
"""
Read bytes from the socket until the server closes the connection
"""
parts = []
while True:
assert self.socket
received = self.socket.recv(4096)
if not received:
break
parts.append(received)
return str(b"".join(parts).strip(), 'utf-8')

def _get_buffer(self) -> bytearray:
"""Retrieve a previously stashed buffer for reuse, or create a new one"""
if self.stashed_buffer:
Expand All @@ -466,13 +486,15 @@ def _stash_buffer(self, buffer):

def _putblock(self, block):
""" wrap the line in mapi format and put it into the socket """
self._putblock_inet_raw(block.encode(), True)
data = block.encode('utf-8')
if self.language == 'control' and not self.hostname:
# control does not use the blocking protocol
return self._send_all_and_shutdown(data)
else:
self._putblock_raw(block.encode(), True)

def _putblock_raw(self, block, finish: bool):
def _putblock_raw(self, block, finish):
""" put the data into the socket """
self._putblock_inet_raw(block, finish)

def _putblock_inet_raw(self, block, finish):
pos = 0
last = 0
while not last:
Expand All @@ -485,6 +507,17 @@ def _putblock_inet_raw(self, block, finish):
self.socket.send(data)
pos += length

def _send_all_and_shutdown(self, block):
""" put the data into the socket """
pos = 0
end = len(block)
block = memoryview(block)
while pos < end:
data = block[pos:pos + 8192]
nsent = self.socket.send(data)
pos += nsent
self.socket.shutdown(socket.SHUT_WR)

def __del__(self):
if self.socket:
self.socket.close()
Expand Down
14 changes: 10 additions & 4 deletions tests/test_control.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@ class TestControl(unittest.TestCase):
Where /var/lib/monetdb is the path to your dbfarm. Don't forget to restart the db after setting the credentials.
"""

def setUp(self):
def setUpControl(self):
# use tcp
self.control = Control(test_hostname, test_port, test_passphrase)
return Control(hostname=test_hostname, port=test_port, passphrase=test_passphrase)

# use socket
# self.control = Control()
def setUp(self):
self.control = self.setUpControl()

do_without_fail(lambda: self.control.stop(database_name))
do_without_fail(lambda: self.control.destroy(database_name))
Expand Down Expand Up @@ -139,3 +139,9 @@ def test_defaults(self):
@unittest.skipUnless(test_full, "full test disabled")
def test_neighbours(self):
self.control.neighbours()


class TestLocalControl(TestControl):
def setUpControl(self):
# use unix domain socket
return Control(port=test_port, passphrase=test_passphrase)

0 comments on commit 75a5334

Please sign in to comment.