Skip to content

Commit

Permalink
add additional pike.test.compound test cases
Browse files Browse the repository at this point in the history
ensure that not decoding the "extra" byte doesn't break chained command
handling when a command in the middle of the chain returns an error.
  • Loading branch information
isi-mfurer committed Jun 6, 2018
1 parent 16fd19f commit cb3a120
Showing 1 changed file with 120 additions and 4 deletions.
124 changes: 120 additions & 4 deletions pike/test/compound.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,30 @@
#

import pike.model
import pike.ntstatus
import pike.smb2
import pike.test
import random
import array

def adopt(new_parent, obj):
"""
obj is adopted by new_parent
"""
new_parent.append(obj)
obj.parent = new_parent
if isinstance(obj, pike.smb2.Smb2):
obj.flags |= pike.smb2.SMB2_FLAGS_RELATED_OPERATIONS

class RelatedOpen(pike.model.Tree):
"""
Shim to insert the RELATED_FID into the request
"""
def __init__(self, tree=None):
self.tree_id = tree.tree_id
self.file_id = pike.smb2.RELATED_FID
self.encrypt_data = tree.encrypt_data if tree is not None else False

class CompoundTest(pike.test.PikeTest):

# Compounded create/close of the same file, with maximal access request
Expand All @@ -52,15 +71,112 @@ def test_create_close(self):
smb_req2 = chan.request(nb_req, obj=tree)
create_req = pike.smb2.CreateRequest(smb_req1)
close_req = pike.smb2.CloseRequest(smb_req2)

create_req.name = 'hello.txt'
create_req.desired_access = pike.smb2.GENERIC_READ | pike.smb2.GENERIC_WRITE
create_req.file_attributes = pike.smb2.FILE_ATTRIBUTE_NORMAL
create_req.create_disposition = pike.smb2.FILE_OPEN_IF

max_req = pike.smb2.MaximalAccessRequest(create_req)

close_req.file_id = pike.smb2.RELATED_FID
smb_req2.flags |= pike.smb2.SMB2_FLAGS_RELATED_OPERATIONS

chan.connection.transceive(nb_req)

# Compound create/query/close
def test_create_query_close(self):
chan, tree = self.tree_connect()

create_req = chan.create_request(
tree,
"create_query_close",
access=(pike.smb2.GENERIC_READ |
pike.smb2.GENERIC_WRITE |
pike.smb2.DELETE),
options=pike.smb2.FILE_DELETE_ON_CLOSE)
nb_req = create_req.parent.parent
query_req = chan.query_file_info_request(RelatedOpen(tree))
adopt(nb_req, query_req.parent)

close_req = chan.close_request(RelatedOpen(tree))
adopt(nb_req, close_req.parent)

(create_res,
query_res,
close_res) = chan.connection.transceive(nb_req)

compare_attributes = [
"creation_time",
"change_time",
"last_access_time",
"last_write_time",
"file_attributes"]
for attr in compare_attributes:
self.assertEqual(getattr(create_res[0], attr),
getattr(query_res[0][0], attr))

# Compound create/write/close & create/read/close
def test_create_write_close(self):
filename = "create_write_close"
buf = "compounded write"

chan, tree = self.tree_connect()

create_req1 = chan.create_request(
tree,
filename,
access=pike.smb2.GENERIC_WRITE)
nb_req = create_req1.parent.parent
write_req = chan.write_request(RelatedOpen(tree), 0, buf)
adopt(nb_req, write_req.parent)

close_req = chan.close_request(RelatedOpen(tree))
adopt(nb_req, close_req.parent)

(create_res1,
write_res,
close_res) = chan.connection.transceive(nb_req)

create_req2 = chan.create_request(
tree,
filename,
access=(pike.smb2.GENERIC_READ |
pike.smb2.DELETE),
options=pike.smb2.FILE_DELETE_ON_CLOSE)
nb_req = create_req2.parent.parent
read_req = chan.read_request(RelatedOpen(tree), 1024, 0)
adopt(nb_req, read_req.parent)

close_req = chan.close_request(RelatedOpen(tree))
adopt(nb_req, close_req.parent)

(create_res2,
read_res,
close_res) = chan.connection.transceive(nb_req)

self.assertEqual(buf, read_res[0].data.tostring())

# Compound create/write/close with insufficient access
def test_create_write_close_access_denied(self):
filename = "create_write_close_access_denied"

chan, tree = self.tree_connect()

create_req1 = chan.create_request(
tree,
filename,
access=(pike.smb2.GENERIC_READ |
pike.smb2.DELETE),
options=pike.smb2.FILE_DELETE_ON_CLOSE)
nb_req = create_req1.parent.parent
write_req = chan.write_request(RelatedOpen(tree), 0, "Expect fail")
adopt(nb_req, write_req.parent)

close_req = chan.close_request(RelatedOpen(tree))
adopt(nb_req, close_req.parent)

with self.assert_error(pike.ntstatus.STATUS_ACCESS_DENIED):
(create_res1,
write_res,
close_res) = chan.connection.transceive(nb_req)

0 comments on commit cb3a120

Please sign in to comment.