1
0
mirror of https://github.com/samba-team/samba.git synced 2024-12-23 17:34:34 +03:00

py:dcerpc/raw_testcase: add helper functions for ncacn_np: SMB connection support

BUG: https://bugzilla.samba.org/show_bug.cgi?id=7113
BUG: https://bugzilla.samba.org/show_bug.cgi?id=11892

Signed-off-by: Stefan Metzmacher <metze@samba.org>
Reviewed-by: Jeremy Allison <jra@samba.org>
This commit is contained in:
Stefan Metzmacher 2018-11-22 18:21:03 +01:00 committed by Jeremy Allison
parent df7d478715
commit b34eb437fe

View File

@ -21,13 +21,63 @@ import socket
import samba.dcerpc.dcerpc as dcerpc
import samba.dcerpc.base
import samba.dcerpc.epmapper
import samba.dcerpc.security as security
import samba.tests
from samba import gensec
from samba.credentials import Credentials
from samba.tests import TestCase
from samba.ndr import ndr_pack, ndr_unpack, ndr_unpack_out
from samba.compat import text_type
from samba.ntstatus import (
NT_STATUS_CONNECTION_DISCONNECTED,
NT_STATUS_PIPE_DISCONNECTED,
NT_STATUS_IO_TIMEOUT
)
from samba import NTSTATUSError
from samba.samba3 import param as s3param
from samba.samba3 import libsmb_samba_internal
class smb_pipe_socket(object):
def __init__(self, target_hostname, pipename, creds, impersonation_level, lp):
lp3 = s3param.get_context()
lp3.load(lp.configfile)
self.smbconn = libsmb_samba_internal.Conn(target_hostname, 'IPC$',
credentials=creds, sign=True)
self.smbfid = self.smbconn.create(pipename,
DesiredAccess=0x12019f,
ShareAccess=0x7,
CreateDisposition=1,
CreateOptions=0x400040,
ImpersonationLevel=impersonation_level)
return
def close(self):
self.smbconn.close(self.smbfid)
del self.smbconn
def settimeout(self, timeo):
# The socket module we simulate there
# specifies the timeo as seconds as float.
msecs = int(timeo * 1000)
assert msecs >= 0
self.smbconn.settimeout(msecs)
return
def send(self, buf, flags=0):
return self.smbconn.write(self.smbfid, buffer=buf, offset=0, mode=8)
def recv(self, len, flags=0):
try:
return self.smbconn.read(self.smbfid, offset=0, size=len)
except NTSTATUSError as e:
if e.args[0] == NT_STATUS_CONNECTION_DISCONNECTED:
return b'\0' * 0
if e.args[0] == NT_STATUS_PIPE_DISCONNECTED:
return b'\0' * 0
if e.args[0] == NT_STATUS_IO_TIMEOUT:
raise socket.timeout(str(e))
raise e
class RawDCERPCTest(TestCase):
"""A raw DCE/RPC Test case."""
@ -40,9 +90,10 @@ class RawDCERPCTest(TestCase):
if self.do_hexdump:
sys.stderr.write("disconnect[%s]\n" % reason)
def connect(self):
def _connect_tcp(self):
tcp_port = int(self.primary_address)
try:
self.a = socket.getaddrinfo(self.host, self.tcp_port, socket.AF_UNSPEC,
self.a = socket.getaddrinfo(self.host, tcp_port, socket.AF_UNSPEC,
socket.SOCK_STREAM, socket.SOL_TCP,
0)
self.s = socket.socket(self.a[0][0], self.a[0][1], self.a[0][2])
@ -60,7 +111,36 @@ class RawDCERPCTest(TestCase):
pass
self.max_xmit_frag = 5840
self.max_recv_frag = 5840
self.secondary_address = "%d" % self.tcp_port
if self.secondary_address is None:
self.secondary_address = self.primary_address
# compat for older tests
self.tcp_port = tcp_port
def _connect_smb(self):
a = self.primary_address.split('\\')
self.assertEquals(len(a), 3)
self.assertEquals(a[0], "")
self.assertEquals(a[1], "pipe")
pipename = a[2]
self.s = smb_pipe_socket(self.target_hostname,
pipename,
self.transport_creds,
self.transport_impersonation,
self.lp_ctx)
self.max_xmit_frag = 4280
self.max_recv_frag = 4280
if self.secondary_address is None:
self.secondary_address = self.primary_address
def connect(self):
self.assertNotConnected()
if self.primary_address.startswith("\\pipe\\"):
self._connect_smb()
else:
self._connect_tcp()
if self.secondary_address is None:
self.secondary_address = self.primary_address
return
def setUp(self):
super(RawDCERPCTest, self).setUp()
@ -73,12 +153,16 @@ class RawDCERPCTest(TestCase):
self.target_hostname = samba.tests.env_get_var_value('TARGET_HOSTNAME', allow_missing=True)
if self.target_hostname is None:
self.target_hostname = self.host
self.tcp_port = 135
self.primary_address = "135"
self.secondary_address = None
self.transport_creds = self.get_anon_creds()
self.transport_impersonation = 0x2
self.settings = {}
self.settings["lp_ctx"] = self.lp_ctx = samba.tests.env_loadparm()
self.settings["target_hostname"] = self.target_hostname
self.s = None
self.connect()
def tearDown(self):
@ -88,7 +172,27 @@ class RawDCERPCTest(TestCase):
def noop(self):
return
def second_connection(self, tcp_port=None):
def reconnect_smb_pipe(self, primary_address, secondary_address=None,
transport_creds=None, transport_impersonation=None):
self._disconnect("reconnect_smb_pipe")
self.assertIsNotNone(primary_address)
self.primary_address = primary_address
if secondary_address is not None:
self.secondary_address = secondary_address
else:
self.secondary_address = None
if transport_creds is not None:
self.transport_creds = transport_creds
if transport_impersonation is not None:
self.transport_impersonation = transport_impersonation
self.connect()
return
def second_connection(self, primary_address=None, secondary_address=None,
transport_creds=None, transport_impersonation=None):
c = RawDCERPCTest(methodName='noop')
c.do_ndr_print = self.do_ndr_print
c.do_hexdump = self.do_hexdump
@ -96,13 +200,31 @@ class RawDCERPCTest(TestCase):
c.host = self.host
c.target_hostname = self.target_hostname
if tcp_port is not None:
c.tcp_port = tcp_port
if primary_address is not None:
c.primary_address = primary_address
if secondary_address is not None:
c.secondary_address = secondary_address
else:
c.secondary_address = None
else:
c.tcp_port = self.tcp_port
self.assertIsNone(secondary_address)
c.primary_address = self.primary_address
c.secondary_address = self.secondary_address
if transport_creds is not None:
c.transport_creds = transport_creds
else:
c.transport_creds = self.transport_creds
if transport_impersonation is not None:
c.transport_impersonation = transport_impersonation
else:
c.transport_impersonation = self.transport_impersonation
c.lp_ctx = self.lp_ctx
c.settings = self.settings
c.s = None
c.connect()
return c
@ -478,7 +600,7 @@ class RawDCERPCTest(TestCase):
lhs4.lhs_data = b""
floor4 = samba.dcerpc.epmapper.epm_floor()
floor4.lhs = lhs4
floor4.rhs.port = self.tcp_port
floor4.rhs.port = int(self.primary_address)
lhs5 = samba.dcerpc.epmapper.epm_lhs()
lhs5.protocol = samba.dcerpc.epmapper.EPM_PROTOCOL_IP
lhs5.lhs_data = b""
@ -514,7 +636,8 @@ class RawDCERPCTest(TestCase):
# reconnect to the given port
self._disconnect("epmap_reconnect")
self.tcp_port = rep_twr.tower.floors[3].rhs.port
self.primary_address = "%d" % rep_twr.tower.floors[3].rhs.port
self.secondary_address = None
self.connect()
def send_pdu(self, req, ndr_print=None, hexdump=None):
@ -539,6 +662,9 @@ class RawDCERPCTest(TestCase):
except IOError as e:
self._disconnect("send_pdu: %s" % e)
raise
except NTSTATUSError as e:
self._disconnect("send_pdu: %s" % e)
raise
finally:
pass