Lucene search
K

Modbus Client Utility

This module allows reading and writing data to a PLC using the Modbus protocol. It supports various function codes such as reading bits from several coils and discrete inputs, as well as writing to coils and registers

Code
##
# This module requires Metasploit: https://metasploit.com/download
# Current source: https://github.com/rapid7/metasploit-framework
##

class MetasploitModule < Msf::Auxiliary
  include Msf::Exploit::Remote::Tcp

  def initialize(info = {})
    super(
      update_info(
        info,
        'Name' => 'Modbus Client Utility',
        'Description' => %q{
          This module allows reading and writing data to a PLC using the Modbus protocol.
          This module is based on the 'modiconstop.rb' Basecamp module from DigitalBond,
          as well as the mbtget perl script.
        },
        'Author' => [
          'EsMnemon <esm[at]mnemonic.no>', # original write-only module
          'Arnaud SOULLIE  <arnaud.soullie[at]solucom.fr>', # code that allows read/write
          'Alexandrine TORRENTS <alexandrine.torrents[at]eurecom.fr>', # code that allows reading/writing at multiple consecutive addresses
          'Mathieu CHEVALIER <mathieu.chevalier[at]eurecom.fr>',
          'AZSG <[email protected]>' # updated read actions to include function codes 2 and 4 and renamed actions to align with modbus standard 1.1b3
        ],
        'License' => MSF_LICENSE,
        'Actions' => [
          ['READ_COILS', { 'Description' => 'Read bits from several coils' } ], # Function Code 1 Read Coils
          ['READ_DISCRETE_INPUTS', { 'Description' => 'Read bits from several DISCRETE INPUTS' } ], # Function Code 2 Read Discrete Inputs
          ['READ_HOLDING_REGISTERS', { 'Description' => 'Read words from several HOLDING registers' } ], # Function Code 3 Read Holding Registers
          ['READ_INPUT_REGISTERS', { 'Description' => 'Read words from several INPUT registers' } ], # Function Code 4 Read Input Registers
          ['WRITE_COIL', { 'Description' => 'Write one bit to a coil' } ],
          ['WRITE_REGISTER', { 'Description' => 'Write one word to a register' } ],
          ['WRITE_COILS', { 'Description' => 'Write bits to several coils' } ],
          ['WRITE_REGISTERS', { 'Description' => 'Write words to several registers' } ],
          ['READ_ID', { 'Description' => 'Read device id' } ]
        ],
        'DefaultAction' => 'READ_HOLDING_REGISTERS'
      )
    )

    register_options(
      [
        Opt::RPORT(502),
        OptInt.new('DATA_ADDRESS', [true, 'Modbus data address']),
        OptInt.new('NUMBER', [false, 'Number of coils/registers to read (READ_COILS, READ_DISCRETE_INPUTS, READ_HOLDING_REGISTERS, READ_INPUT_REGISTERS modes only)', 1]),
        OptInt.new('DATA', [false, 'Data to write (WRITE_COIL and WRITE_REGISTER modes only)']),
        OptString.new('DATA_COILS', [false, 'Data in binary to write (WRITE_COILS mode only) e.g. 0110']),
        OptString.new('DATA_REGISTERS', [false, 'Words to write to each register separated with a comma (WRITE_REGISTERS mode only) e.g. 1,2,3,4']),
        OptInt.new('UNIT_NUMBER', [false, 'Modbus unit number', 1]),
        OptBool.new('HEXDUMP', [false, 'Print hex dump of response', false]),
      ]
    )
  end

  # a wrapper just to be sure we increment the counter
  def send_frame(payload)
    sock.put(payload)
    @modbus_counter += 1
    rsp = sock.get_once(-1, sock.def_read_timeout)
    dump_response(rsp)
    rsp
  end

  def dump_response(response)
    print_good('response: ' + response.unpack1('H*')) if datastore['HEXDUMP']
  end

  def make_payload(payload)
    packet_data = [@modbus_counter].pack('n')
    packet_data += "\x00\x00\x00" # dunno what these are
    packet_data += [payload.size].pack('c') # size byte
    packet_data += payload

    packet_data
  end

  def make_read_payload
    payload = [datastore['UNIT_NUMBER']].pack('c')
    payload += [@function_code].pack('c')
    payload += [datastore['DATA_ADDRESS']].pack('n')
    payload += [datastore['NUMBER']].pack('n')
    make_payload(payload)
  end

  def make_write_coil_payload(data)
    payload = [datastore['UNIT_NUMBER']].pack('c')
    payload += [@function_code].pack('c')
    payload += [datastore['DATA_ADDRESS']].pack('n')
    payload += [data].pack('c')
    payload += "\x00"

    packet_data = make_payload(payload)

    packet_data
  end

  def make_write_coils_payload(data, byte)
    payload = [datastore['UNIT_NUMBER']].pack('c')
    payload += [@function_code].pack('c')
    payload += [datastore['DATA_ADDRESS']].pack('n')
    payload += [datastore['DATA_COILS'].size].pack('n') # bit count
    payload += [byte].pack('c') # byte count
    for i in 0..(byte - 1)
      payload += [data[i]].pack('b*')
    end

    packet_data = make_payload(payload)

    packet_data
  end

  def make_write_register_payload(data)
    payload = [datastore['UNIT_NUMBER']].pack('c')
    payload += [@function_code].pack('c')
    payload += [datastore['DATA_ADDRESS']].pack('n')
    payload += [data].pack('n')

    make_payload(payload)
  end

  def make_write_registers_payload(data, size)
    payload = [datastore['UNIT_NUMBER']].pack('c')
    payload += [@function_code].pack('c')
    payload += [datastore['DATA_ADDRESS']].pack('n')
    payload += [size].pack('n') # word count
    payload += [2 * size].pack('c') # byte count
    for i in 0..(size - 1)
      payload += [data[i]].pack('n')
    end

    make_payload(payload)
  end

  def make_read_id_payload
    payload = [datastore['UNIT_NUMBER']].pack('c')
    payload += [@function_code].pack('c')
    payload += "\x0E\x01\x00" # dunno what these are
    make_payload(payload)
  end

  def handle_error(response)
    case response.reverse.unpack('c')[0].to_i
    when 1
      print_error('Error : ILLEGAL FUNCTION')
    when 2
      print_error('Error : ILLEGAL DATA ADDRESS')
    when 3
      print_error('Error : ILLEGAL DATA VALUE')
    when 4
      print_error('Error : SLAVE DEVICE FAILURE')
    when 6
      print_error('Error : SLAVE DEVICE BUSY')
    else
      print_error('Unknown error')
    end
    return
  end

  def read_coils
    if datastore['NUMBER'] + datastore['DATA_ADDRESS'] > 65535
      print_error('Coils addresses go from 0 to 65535. You cannot go beyond.')
      return
    end
    @function_code = 0x1
    print_status('Sending READ COILS...')
    response = send_frame(make_read_payload)
    values = []
    if response.nil?
      print_error('No answer for the READ COILS')
      return
    elsif response.unpack('C*')[7] == (0x80 | @function_code)
      handle_error(response)
    elsif response.unpack('C*')[7] == @function_code
      loop = (datastore['NUMBER'] - 1) / 8
      for i in 0..loop
        bin_value = response[9 + i].unpack('b*')[0]
        list = bin_value.split('')
        for j in 0..7
          list[j] = list[j].to_i
          values[i * 8 + j] = list[j]
        end
      end
      values = values[0..(datastore['NUMBER'] - 1)]
      print_good("#{datastore['NUMBER']} coil values from address #{datastore['DATA_ADDRESS']} : ")
      print_good(values.to_s)
    else
      print_error('Unknown answer')
    end
  end

  def read_discrete_inputs
    if datastore['NUMBER'] + datastore['DATA_ADDRESS'] > 65535
      print_error('DISCRETE INPUT addresses go from 0 to 65535. You cannot go beyond.')
      return
    end
    @function_code = 0x2
    print_status('Sending READ DISCRETE INPUTS...')
    response = send_frame(make_read_payload)
    values = []
    if response.nil?
      print_error('No answer for the READ DISCRETE INPUTS')
      return
    elsif response.unpack('C*')[7] == (0x80 | @function_code)
      handle_error(response)
    elsif response.unpack('C*')[7] == @function_code
      loop = (datastore['NUMBER'] - 1) / 8
      for i in 0..loop
        bin_value = response[9 + i].unpack('b*')[0]
        list = bin_value.split('')
        for j in 0..7
          list[j] = list[j].to_i
          values[i * 8 + j] = list[j]
        end
      end
      values = values[0..(datastore['NUMBER'] - 1)]
      print_good("#{datastore['NUMBER']} DISCRETE INPUT values from address #{datastore['DATA_ADDRESS']} : ")
      print_good(values.to_s)
    else
      print_error('Unknown answer')
    end
  end

  def read_holding_registers
    if datastore['NUMBER'] + datastore['DATA_ADDRESS'] > 65535
      print_error('Holding Registers addresses go from 0 to 65535. You cannot go beyond.')
      return
    end
    @function_code = 3
    print_status('Sending READ HOLDING REGISTERS...')
    response = send_frame(make_read_payload)
    values = []
    if response.nil?
      print_error('No answer for the READ HOLDING REGISTERS')
    elsif response.unpack('C*')[7] == (0x80 | @function_code)
      handle_error(response)
    elsif response.unpack('C*')[7] == @function_code
      for i in 0..(datastore['NUMBER'] - 1)
        values.push(response[9 + 2 * i..10 + 2 * i].unpack('n')[0])
      end
      print_good("#{datastore['NUMBER']} register values from address #{datastore['DATA_ADDRESS']} : ")
      print_good(values.to_s)
    else
      print_error('Unknown answer')
    end
  end

  def read_input_registers
    if datastore['NUMBER'] + datastore['DATA_ADDRESS'] > 65535
      print_error('Input Registers addresses go from 0 to 65535. You cannot go beyond.')
      return
    end
    @function_code = 4
    print_status('Sending READ INPUT REGISTERS...')
    response = send_frame(make_read_payload)
    values = []
    if response.nil?
      print_error('No answer for the READ INPUT REGISTERS')
    elsif response.unpack('C*')[7] == (0x80 | @function_code)
      handle_error(response)
    elsif response.unpack('C*')[7] == @function_code
      for i in 0..(datastore['NUMBER'] - 1)
        values.push(response[9 + 2 * i..10 + 2 * i].unpack('n')[0])
      end
      print_good("#{datastore['NUMBER']} register values from address #{datastore['DATA_ADDRESS']} : ")
      print_good(values.to_s)
    else
      print_error('Unknown answer')
    end
  end

  def write_coil
    @function_code = 5
    if datastore['DATA'] == 0
      data = 0
    elsif datastore['DATA'] == 1
      data = 255
    else
      print_error('Data value must be 0 or 1 in WRITE_COIL mode')
      return
    end
    print_status('Sending WRITE COIL...')
    response = send_frame(make_write_coil_payload(data))
    if response.nil?
      print_error('No answer for the WRITE COIL')
    elsif response.unpack('C*')[7] == (0x80 | @function_code)
      handle_error(response)
    elsif response.unpack('C*')[7] == @function_code
      print_good("Value #{datastore['DATA']} successfully written at coil address #{datastore['DATA_ADDRESS']}")
    else
      print_error('Unknown answer')
    end
  end

  def write_coils
    @function_code = 15
    temp = datastore['DATA_COILS']
    check = temp.split('')
    if temp.size > 65535
      print_error('DATA_COILS size must be between 0 and 65535')
      return
    end
    for j in check
      unless (j == '0') || (j == '1')
        print_error('DATA_COILS value must only contain 0s and 1s without space')
        return
      end
    end
    byte_number = (temp.size - 1) / 8 + 1
    data = []
    for i in 0..(byte_number - 1)
      data.push(temp[(i * 8 + 0)..(i * 8 + 7)])
    end
    print_status('Sending WRITE COILS...')
    response = send_frame(make_write_coils_payload(data, byte_number))
    if response.nil?
      print_error('No answer for the WRITE COILS')
    elsif response.unpack('C*')[7] == (0x80 | @function_code)
      handle_error(response)
    elsif response.unpack('C*')[7] == @function_code
      print_good("Values #{datastore['DATA_COILS']} successfully written from coil address #{datastore['DATA_ADDRESS']}")
    else
      print_error('Unknown answer')
    end
  end

  def write_register
    @function_code = 6
    if datastore['DATA'] < 0 || datastore['DATA'] > 65535
      print_error('Data to write must be an integer between 0 and 65535 in WRITE_REGISTER mode')
      return
    end
    print_status('Sending WRITE REGISTER...')
    response = send_frame(make_write_register_payload(datastore['DATA']))
    if response.nil?
      print_error('No answer for the WRITE REGISTER')
    elsif response.unpack('C*')[7] == (0x80 | @function_code)
      handle_error(response)
    elsif response.unpack('C*')[7] == @function_code
      print_good("Value #{datastore['DATA']} successfully written at registry address #{datastore['DATA_ADDRESS']}")
    else
      print_error('Unknown answer')
    end
  end

  def write_registers
    @function_code = 16
    check = datastore['DATA_REGISTERS'].split('')
    for j in 0..(check.size - 1)
      if (check[j] == '0') || (check[j] == '1') || (check[j] == '2') || (check[j] == '3') || (check[j] == '4') || (check[j] == '5') || (check[j] == '6') || (check[j] == '7') || (check[j] == '8') || (check[j] == '9') || (check[j] == ',')
        if (check[j] == ',') && (check[j + 1] == ',')
          print_error('DATA_REGISTERS cannot contain two consecutive commas')
          return
        end
      else
        print_error('DATA_REGISTERS value must only contain numbers and commas without space')
        return
      end
    end
    list = datastore['DATA_REGISTERS'].split(',')
    if list.size + datastore['DATA_ADDRESS'] > 65535
      print_error('Registers addresses go from 0 to 65535. You cannot go beyond.')
      return
    end
    data = []
    for i in 0..(list.size - 1)
      data[i] = list[i].to_i
    end
    for j in 0..(data.size - 1)
      if data[j] < 0 || data[j] > 65535
        print_error('Each word to write must be an integer between 0 and 65535 in WRITE_REGISTERS mode')
        return
      end
    end
    print_status('Sending WRITE REGISTERS...')
    response = send_frame(make_write_registers_payload(data, data.size))
    if response.nil?
      print_error('No answer for the WRITE REGISTERS')
    elsif response.unpack('C*')[7] == (0x80 | @function_code)
      handle_error(response)
    elsif response.unpack('C*')[7] == @function_code
      print_good("Values #{datastore['DATA_REGISTERS']} successfully written from registry address #{datastore['DATA_ADDRESS']}")
    else
      print_error('Unknown answer')
    end
  end

  def read_id
    @function_code = 0x2b
    obj_cnt = 0
    obj_id_pos = 0
    obj_len = 0
    print_status('Sending READ ID...')
    response = send_frame(make_read_id_payload)
    if response.nil?
      print_error('No answer for READ ID')
    elsif response.size < 9
      print_error('response is not modbus conform')
    elsif response.unpack('C*')[7] == (0x80 | @function_code)
      handle_error(response)
    elsif response.size < 16
      print_error('response is too short for READ ID')
    elsif response.unpack('C*')[7] == @function_code
      obj_cnt = response[13].unpack('C')[0]
      obj_id_pos = 14
      max_ref_size = response.size(-1)
      loop do
        obj_len = response[obj_id_pos + 1].unpack('C')[0]
        value = response.slice(obj_id_pos + 2, obj_len).unpack('a*')[0]
        obj_id = response[obj_id_pos].unpack('C')[0]
        print_good("Object ID #{obj_id}: #{value}")
        obj_cnt -= 1
        obj_id_pos = obj_id_pos + obj_len + 2
        if obj_id_pos > max_ref_size
          print_error('Out of bounds reference occurred whilst processing READ ID operation! Check sender data!')
          break
        end
        break unless obj_cnt > 0
      end
    else
      print_error('Unknown answer')
    end
  end

  def run
    @modbus_counter = 0x0000 # used for modbus frames
    connect
    case action.name
    when 'READ_COILS'
      read_coils
    when 'READ_DISCRETE_INPUTS'
      read_discrete_inputs
    when 'READ_HOLDING_REGISTERS'
      read_holding_registers
    when 'READ_INPUT_REGISTERS'
      read_input_registers
    when 'WRITE_COIL'
      write_coil
    when 'WRITE_REGISTER'
      write_register
    when 'WRITE_COILS'
      if datastore['DATA_COILS'].nil?
        print_error('The following option is needed in WRITE_COILS mode: DATA_COILS.')
        return
      else
        write_coils
      end
    when 'WRITE_REGISTERS'
      if datastore['DATA_REGISTERS'].nil?
        print_error('The following option is needed in WRITE_REGISTERS mode: DATA_REGISTERS.')
        return
      else
        write_registers
      end
    when 'READ_ID'
      read_id
    else
      print_error('Invalid ACTION')
    end
    disconnect
  end
end

Data

Build on a solid foundation with Vulners data

We provide the essential building blocks for cybersecurity solutions with comprehensive, structured, and constantly updated vulnerability and exploits data

Api

Power your application with Vulners API

The Vulners REST API offers reliable, high-performance access to vulnerability intelligence, with 99.9% SLA uptime and CDN-backed data delivery for seamless global access

App

Assess and manage vulnerabilities with Vulners tools

Built on top of Vulners' database and SDK, end-user solutions give security professionals and developers lightweight and powerful tools for vulnerability remediation

07 Jan 2024 20:02Current
7.2High risk
Vulners AI Score7.2
49