From b34eb437fe2508ad445125bc7139300981c43002 Mon Sep 17 00:00:00 2001 From: Stefan Metzmacher Date: Thu, 22 Nov 2018 18:21:03 +0100 Subject: [PATCH] py:dcerpc/raw_testcase: add helper functions for ncacn_np: SMB connection support BUG: https://bugzilla.samba.org/show_bug.cgi?id=7113 BUG: https://bugzilla.samba.org/show_bug.cgi?id=11892 Signed-off-by: Stefan Metzmacher Reviewed-by: Jeremy Allison --- python/samba/tests/dcerpc/raw_testcase.py | 146 ++++++++++++++++++++-- 1 file changed, 136 insertions(+), 10 deletions(-) diff --git a/python/samba/tests/dcerpc/raw_testcase.py b/python/samba/tests/dcerpc/raw_testcase.py index 77722d82b93..ca1b546a399 100644 --- a/python/samba/tests/dcerpc/raw_testcase.py +++ b/python/samba/tests/dcerpc/raw_testcase.py @@ -21,13 +21,63 @@ import socket import samba.dcerpc.dcerpc as dcerpc import samba.dcerpc.base import samba.dcerpc.epmapper +import samba.dcerpc.security as security import samba.tests from samba import gensec from samba.credentials import Credentials from samba.tests import TestCase from samba.ndr import ndr_pack, ndr_unpack, ndr_unpack_out from samba.compat import text_type +from samba.ntstatus import ( + NT_STATUS_CONNECTION_DISCONNECTED, + NT_STATUS_PIPE_DISCONNECTED, + NT_STATUS_IO_TIMEOUT +) +from samba import NTSTATUSError +from samba.samba3 import param as s3param +from samba.samba3 import libsmb_samba_internal +class smb_pipe_socket(object): + + def __init__(self, target_hostname, pipename, creds, impersonation_level, lp): + lp3 = s3param.get_context() + lp3.load(lp.configfile) + self.smbconn = libsmb_samba_internal.Conn(target_hostname, 'IPC$', + credentials=creds, sign=True) + self.smbfid = self.smbconn.create(pipename, + DesiredAccess=0x12019f, + ShareAccess=0x7, + CreateDisposition=1, + CreateOptions=0x400040, + ImpersonationLevel=impersonation_level) + return + + def close(self): + self.smbconn.close(self.smbfid) + del self.smbconn + + def settimeout(self, timeo): + # The socket module we simulate there + # specifies the timeo as seconds as float. + msecs = int(timeo * 1000) + assert msecs >= 0 + self.smbconn.settimeout(msecs) + return + + def send(self, buf, flags=0): + return self.smbconn.write(self.smbfid, buffer=buf, offset=0, mode=8) + + def recv(self, len, flags=0): + try: + return self.smbconn.read(self.smbfid, offset=0, size=len) + except NTSTATUSError as e: + if e.args[0] == NT_STATUS_CONNECTION_DISCONNECTED: + return b'\0' * 0 + if e.args[0] == NT_STATUS_PIPE_DISCONNECTED: + return b'\0' * 0 + if e.args[0] == NT_STATUS_IO_TIMEOUT: + raise socket.timeout(str(e)) + raise e class RawDCERPCTest(TestCase): """A raw DCE/RPC Test case.""" @@ -40,9 +90,10 @@ class RawDCERPCTest(TestCase): if self.do_hexdump: sys.stderr.write("disconnect[%s]\n" % reason) - def connect(self): + def _connect_tcp(self): + tcp_port = int(self.primary_address) try: - self.a = socket.getaddrinfo(self.host, self.tcp_port, socket.AF_UNSPEC, + self.a = socket.getaddrinfo(self.host, tcp_port, socket.AF_UNSPEC, socket.SOCK_STREAM, socket.SOL_TCP, 0) self.s = socket.socket(self.a[0][0], self.a[0][1], self.a[0][2]) @@ -60,7 +111,36 @@ class RawDCERPCTest(TestCase): pass self.max_xmit_frag = 5840 self.max_recv_frag = 5840 - self.secondary_address = "%d" % self.tcp_port + if self.secondary_address is None: + self.secondary_address = self.primary_address + # compat for older tests + self.tcp_port = tcp_port + + def _connect_smb(self): + a = self.primary_address.split('\\') + self.assertEquals(len(a), 3) + self.assertEquals(a[0], "") + self.assertEquals(a[1], "pipe") + pipename = a[2] + self.s = smb_pipe_socket(self.target_hostname, + pipename, + self.transport_creds, + self.transport_impersonation, + self.lp_ctx) + self.max_xmit_frag = 4280 + self.max_recv_frag = 4280 + if self.secondary_address is None: + self.secondary_address = self.primary_address + + def connect(self): + self.assertNotConnected() + if self.primary_address.startswith("\\pipe\\"): + self._connect_smb() + else: + self._connect_tcp() + if self.secondary_address is None: + self.secondary_address = self.primary_address + return def setUp(self): super(RawDCERPCTest, self).setUp() @@ -73,12 +153,16 @@ class RawDCERPCTest(TestCase): self.target_hostname = samba.tests.env_get_var_value('TARGET_HOSTNAME', allow_missing=True) if self.target_hostname is None: self.target_hostname = self.host - self.tcp_port = 135 + self.primary_address = "135" + self.secondary_address = None + self.transport_creds = self.get_anon_creds() + self.transport_impersonation = 0x2 self.settings = {} self.settings["lp_ctx"] = self.lp_ctx = samba.tests.env_loadparm() self.settings["target_hostname"] = self.target_hostname + self.s = None self.connect() def tearDown(self): @@ -88,7 +172,27 @@ class RawDCERPCTest(TestCase): def noop(self): return - def second_connection(self, tcp_port=None): + def reconnect_smb_pipe(self, primary_address, secondary_address=None, + transport_creds=None, transport_impersonation=None): + self._disconnect("reconnect_smb_pipe") + self.assertIsNotNone(primary_address) + self.primary_address = primary_address + if secondary_address is not None: + self.secondary_address = secondary_address + else: + self.secondary_address = None + + if transport_creds is not None: + self.transport_creds = transport_creds + + if transport_impersonation is not None: + self.transport_impersonation = transport_impersonation + + self.connect() + return + + def second_connection(self, primary_address=None, secondary_address=None, + transport_creds=None, transport_impersonation=None): c = RawDCERPCTest(methodName='noop') c.do_ndr_print = self.do_ndr_print c.do_hexdump = self.do_hexdump @@ -96,13 +200,31 @@ class RawDCERPCTest(TestCase): c.host = self.host c.target_hostname = self.target_hostname - if tcp_port is not None: - c.tcp_port = tcp_port + if primary_address is not None: + c.primary_address = primary_address + if secondary_address is not None: + c.secondary_address = secondary_address + else: + c.secondary_address = None else: - c.tcp_port = self.tcp_port + self.assertIsNone(secondary_address) + c.primary_address = self.primary_address + c.secondary_address = self.secondary_address + if transport_creds is not None: + c.transport_creds = transport_creds + else: + c.transport_creds = self.transport_creds + + if transport_impersonation is not None: + c.transport_impersonation = transport_impersonation + else: + c.transport_impersonation = self.transport_impersonation + + c.lp_ctx = self.lp_ctx c.settings = self.settings + c.s = None c.connect() return c @@ -478,7 +600,7 @@ class RawDCERPCTest(TestCase): lhs4.lhs_data = b"" floor4 = samba.dcerpc.epmapper.epm_floor() floor4.lhs = lhs4 - floor4.rhs.port = self.tcp_port + floor4.rhs.port = int(self.primary_address) lhs5 = samba.dcerpc.epmapper.epm_lhs() lhs5.protocol = samba.dcerpc.epmapper.EPM_PROTOCOL_IP lhs5.lhs_data = b"" @@ -514,7 +636,8 @@ class RawDCERPCTest(TestCase): # reconnect to the given port self._disconnect("epmap_reconnect") - self.tcp_port = rep_twr.tower.floors[3].rhs.port + self.primary_address = "%d" % rep_twr.tower.floors[3].rhs.port + self.secondary_address = None self.connect() def send_pdu(self, req, ndr_print=None, hexdump=None): @@ -539,6 +662,9 @@ class RawDCERPCTest(TestCase): except IOError as e: self._disconnect("send_pdu: %s" % e) raise + except NTSTATUSError as e: + self._disconnect("send_pdu: %s" % e) + raise finally: pass