mirror of
				https://github.com/samba-team/samba.git
				synced 2025-11-04 00:23:49 +03:00 
			
		
		
		
	
		
			
				
	
	
		
			302 lines
		
	
	
		
			7.9 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
			
		
		
	
	
			302 lines
		
	
	
		
			7.9 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
#!/usr/bin/python
 | 
						|
 | 
						|
import sys, os, string
 | 
						|
from cmd import Cmd
 | 
						|
from optparse import OptionParser
 | 
						|
from pprint import pprint
 | 
						|
 | 
						|
import dcerpc, samr
 | 
						|
 | 
						|
def swig2dict(obj):
 | 
						|
    """Convert a swig object to a dictionary."""
 | 
						|
 | 
						|
    result = {}
 | 
						|
 | 
						|
    for attr in filter(lambda x: type(x) == str, dir(obj)):
 | 
						|
 | 
						|
        if attr[:2] == '__' and attr[-2:] == '__':
 | 
						|
            continue
 | 
						|
 | 
						|
        if attr == 'this' or attr == 'thisown':
 | 
						|
            continue
 | 
						|
        
 | 
						|
        result[attr] = getattr(obj, attr)
 | 
						|
 | 
						|
    return result
 | 
						|
 | 
						|
class rpcclient(Cmd):
 | 
						|
 | 
						|
    prompt = 'rpcclient$ '
 | 
						|
 | 
						|
    def __init__(self, binding, domain, username, password):
 | 
						|
        Cmd.__init__(self)
 | 
						|
        self.binding = binding
 | 
						|
        self.domain = domain
 | 
						|
        self.username = username
 | 
						|
        self.password = password
 | 
						|
 | 
						|
    def emptyline(self):
 | 
						|
 | 
						|
        # Default for empty line is to repeat last command - yuck
 | 
						|
 | 
						|
        pass
 | 
						|
 | 
						|
    def onecmd(self, line):
 | 
						|
 | 
						|
        # Override the onecmd() method so we can trap error returns
 | 
						|
 | 
						|
        try:
 | 
						|
            Cmd.onecmd(self, line)
 | 
						|
        except dcerpc.NTSTATUS, arg:
 | 
						|
            print 'The command returned an error: %s' % arg[1]
 | 
						|
            
 | 
						|
    # Command handlers
 | 
						|
 | 
						|
    def do_help(self, line):
 | 
						|
        """Displays on-line help for rpcclient commands."""
 | 
						|
        Cmd.do_help(self, line)
 | 
						|
 | 
						|
    def do_shell(self, line):
 | 
						|
 | 
						|
        status = os.system(line)
 | 
						|
 | 
						|
        if os.WIFEXITED(status):
 | 
						|
            if os.WEXITSTATUS(status) != 0:
 | 
						|
                print 'Command exited with code %d' % os.WEXITSTATUS(status)
 | 
						|
        else:
 | 
						|
            print 'Command exited with signal %d' % os.WTERMSIG(status)
 | 
						|
            
 | 
						|
    def do_EOF(self, line):
 | 
						|
        """Exits rpcclient."""
 | 
						|
        print
 | 
						|
        sys.exit(0)
 | 
						|
 | 
						|
    # SAMR pipe commands
 | 
						|
 | 
						|
    def do_SamrEnumDomains(self, line):
 | 
						|
        """Enumerate domain names."""
 | 
						|
        
 | 
						|
        usage = 'usage: SamrEnumDomains'
 | 
						|
 | 
						|
        if line != '':
 | 
						|
            print usage
 | 
						|
            return
 | 
						|
 | 
						|
        pipe = dcerpc.pipe_connect(
 | 
						|
            self.binding,
 | 
						|
            dcerpc.DCERPC_SAMR_UUID, int(dcerpc.DCERPC_SAMR_VERSION),
 | 
						|
            self.domain, self.username, self.password)
 | 
						|
 | 
						|
        connect_handle = samr.Connect(pipe)
 | 
						|
 | 
						|
        for i in connect_handle.EnumDomains():
 | 
						|
            print i
 | 
						|
 | 
						|
    def do_SamrLookupDomain(self, line):
 | 
						|
        """Return the SID for a domain."""
 | 
						|
 | 
						|
        usage = 'SamrLookupDomain DOMAIN'
 | 
						|
 | 
						|
        parser = OptionParser(usage)
 | 
						|
        options, args = parser.parse_args(string.split(line))
 | 
						|
 | 
						|
        if len(args) != 1:
 | 
						|
            print 'usage:', usage
 | 
						|
            return
 | 
						|
 | 
						|
        pipe = dcerpc.pipe_connect(
 | 
						|
            self.binding,
 | 
						|
            dcerpc.DCERPC_SAMR_UUID, int(dcerpc.DCERPC_SAMR_VERSION),
 | 
						|
            self.domain, self.username, self.password)
 | 
						|
 | 
						|
        connect_handle = samr.Connect(pipe)
 | 
						|
 | 
						|
        print connect_handle.LookupDomain(args[0])
 | 
						|
 | 
						|
    def do_SamrQueryDomInfo(self, line):
 | 
						|
	"""Return information about a domain designated by its SID."""
 | 
						|
 | 
						|
	usage = 'SamrQueryDomInfo DOMAIN_SID [info_level]'
 | 
						|
 | 
						|
	parser = OptionParser(usage)
 | 
						|
	options, args = parser.parse_args(string.split(line))
 | 
						|
 | 
						|
	if (len(args) == 0) or (len(args) > 2):
 | 
						|
	    print 'usage:', usage
 | 
						|
	    return
 | 
						|
 | 
						|
        pipe = dcerpc.pipe_connect(
 | 
						|
            self.binding,
 | 
						|
            dcerpc.DCERPC_SAMR_UUID, int(dcerpc.DCERPC_SAMR_VERSION),
 | 
						|
            self.domain, self.username, self.password)
 | 
						|
 | 
						|
        connect_handle = samr.Connect(pipe)
 | 
						|
	domain_handle = connect_handle.OpenDomain(args[0])
 | 
						|
 | 
						|
	if (len(args) == 2):
 | 
						|
	    result = domain_handle.QueryDomainInfo(int(args[1]))
 | 
						|
	else:
 | 
						|
	    result = domain_handle.QueryDomainInfo()
 | 
						|
 | 
						|
        pprint(swig2dict(result))
 | 
						|
 | 
						|
    def do_SamrQueryDomInfo2(self, line):
 | 
						|
	"""Return information about a domain designated by its SID.
 | 
						|
        (Windows 2000 and >)"""
 | 
						|
 | 
						|
	usage = 'SamrQueryDomInfo2 DOMAIN_SID [info_level] (Windows 2000 and >)'
 | 
						|
	parser = OptionParser(usage)
 | 
						|
	options, args = parser.parse_args(string.split(line))
 | 
						|
 | 
						|
	if len(args) == 0 or len(args) > 2:
 | 
						|
	    print 'usage:', usage
 | 
						|
	    return
 | 
						|
 | 
						|
        pipe = dcerpc.pipe_connect(
 | 
						|
            self.binding,
 | 
						|
            dcerpc.DCERPC_SAMR_UUID, int(dcerpc.DCERPC_SAMR_VERSION),
 | 
						|
            self.domain, self.username, self.password)
 | 
						|
 | 
						|
        connect_handle = samr.Connect(pipe)
 | 
						|
	domain_handle = connect_handle.OpenDomain(args[0])
 | 
						|
 | 
						|
	if (len(args) == 2):
 | 
						|
	    result = domain_handle.QueryDomainInfo2(int(args[1]))
 | 
						|
	else:
 | 
						|
	    result = domain_handle.QueryDomainInfo2()
 | 
						|
 | 
						|
        pprint(swig2dict(result))
 | 
						|
 | 
						|
    def do_SamrEnumDomainGroups(self, line):
 | 
						|
	"""Return the list of groups of a domain designated by its SID."""
 | 
						|
 | 
						|
	usage = 'SamrEnumDomainGroups DOMAIN_SID'
 | 
						|
 | 
						|
        parser = OptionParser(usage)
 | 
						|
        options, args = parser.parse_args(string.split(line))
 | 
						|
 | 
						|
        if len(args) != 1:
 | 
						|
            print 'usage:', usage
 | 
						|
            return
 | 
						|
 | 
						|
        pipe = dcerpc.pipe_connect(
 | 
						|
            self.binding,
 | 
						|
            dcerpc.DCERPC_SAMR_UUID, int(dcerpc.DCERPC_SAMR_VERSION),
 | 
						|
            self.domain, self.username, self.password)
 | 
						|
 | 
						|
        connect_handle = samr.Connect(pipe)	
 | 
						|
	domain_handle = connect_handle.OpenDomain(args[0])
 | 
						|
 | 
						|
	result = domain_handle.EnumDomainGroups()
 | 
						|
 | 
						|
        pprint(result)
 | 
						|
 | 
						|
    def do_SamrEnumDomainAliases(self, line):
 | 
						|
        """Return the list of aliases (local groups) of a domain designated
 | 
						|
        by its SID."""
 | 
						|
 | 
						|
	usage = 'SamrEnumDomainAliases DOMAIN_SID'
 | 
						|
 | 
						|
        parser = OptionParser(usage)
 | 
						|
        options, args = parser.parse_args(string.split(line))
 | 
						|
 | 
						|
        if len(args) != 1:
 | 
						|
            print 'usage:', usage
 | 
						|
            return
 | 
						|
 | 
						|
        pipe = dcerpc.pipe_connect(
 | 
						|
            self.binding,
 | 
						|
            dcerpc.DCERPC_SAMR_UUID, int(dcerpc.DCERPC_SAMR_VERSION),
 | 
						|
            self.domain, self.username, self.password)
 | 
						|
 | 
						|
        connect_handle = samr.Connect(pipe)
 | 
						|
        domain_handle = connect_handle.OpenDomain(args[0])
 | 
						|
 | 
						|
	result = domain_handle.EnumDomainAliases()
 | 
						|
 | 
						|
        pprint(result)
 | 
						|
 | 
						|
    def do_SamrEnumDomainUsers(self, line):
 | 
						|
	"""Return the list of users of a domain designated by its SID."""
 | 
						|
 | 
						|
	usage = 'SamrEnumDomainUsers DOMAIN_SID [user_account_flags]'
 | 
						|
 | 
						|
        parser = OptionParser(usage)
 | 
						|
        options, args = parser.parse_args(string.split(line))
 | 
						|
 | 
						|
	if (len(args) == 0) or (len(args) > 2):
 | 
						|
	    print 'usage:', usage
 | 
						|
	    return
 | 
						|
 | 
						|
        pipe = dcerpc.pipe_connect(
 | 
						|
            self.binding,
 | 
						|
            dcerpc.DCERPC_SAMR_UUID, int(dcerpc.DCERPC_SAMR_VERSION),
 | 
						|
            self.domain, self.username, self.password)
 | 
						|
 | 
						|
        connect_handle = samr.Connect(pipe)
 | 
						|
        domain_handle = connect_handle.OpenDomain(args[0])
 | 
						|
 | 
						|
	if (len(args) == 2):
 | 
						|
	    result = domain_handle.EnumDomainUsers(int(args[1]))
 | 
						|
	else:
 | 
						|
	    result = domain_handle.EnumDomainUsers()
 | 
						|
 | 
						|
        pprint(result)
 | 
						|
 | 
						|
if __name__ == '__main__':
 | 
						|
 | 
						|
    # Parse command line
 | 
						|
 | 
						|
    usage = 'rpcclient BINDING [options]'
 | 
						|
 | 
						|
    if len(sys.argv) == 1:
 | 
						|
        print usage
 | 
						|
        sys.exit(1)
 | 
						|
 | 
						|
    binding = sys.argv[1]
 | 
						|
    del(sys.argv[1])
 | 
						|
 | 
						|
    if string.find(binding, ':') == -1:
 | 
						|
        binding = 'ncacn_np:' + binding
 | 
						|
 | 
						|
    parser = OptionParser(usage)
 | 
						|
 | 
						|
    parser.add_option('-U', '--username', action='store', type='string',
 | 
						|
                      help='Use given credentials when connecting',
 | 
						|
                      metavar='DOMAIN\\username%password',
 | 
						|
                      dest='username')
 | 
						|
 | 
						|
    parser.add_option('-c', '--command', action='store', type='string',
 | 
						|
                      help='Execute COMMAND', dest='command')
 | 
						|
 | 
						|
    options, args = parser.parse_args()
 | 
						|
 | 
						|
    # Break --username up into domain, usernamd and password
 | 
						|
 | 
						|
    if not options.username:
 | 
						|
        options.username = '%'
 | 
						|
 | 
						|
    domain = ''
 | 
						|
    if string.find(options.username, '\\') != -1:
 | 
						|
        domain, options.username = string.split(options.username, '\\')
 | 
						|
 | 
						|
    password = ''
 | 
						|
    if string.find(options.username, '%') != -1:
 | 
						|
        options.username, password  = string.split(options.username, '%')
 | 
						|
 | 
						|
    username = options.username
 | 
						|
 | 
						|
    # Run command loop
 | 
						|
 | 
						|
    c = rpcclient(binding, domain, username, password)
 | 
						|
 | 
						|
    if options.command:
 | 
						|
        c.onecmd(options.command)
 | 
						|
        sys.exit(0)
 | 
						|
 | 
						|
    while 1:
 | 
						|
        try:
 | 
						|
            c.cmdloop()
 | 
						|
        except KeyboardInterrupt:
 | 
						|
            print 'KeyboardInterrupt'
 |