Architecture & Protocol¶
This page covers the internals of C3PO: wire protocol, cryptography, protobuf messages, device lifecycle, and core components.
Wire Protocol¶
All communication between ESP32 agents and C3PO uses a line-based text protocol over TCP.
Frame Format¶
| Field | Size | Description |
|---|---|---|
device_id |
1-32 chars | Alphanumeric + -_ |
: |
1 byte | Separator |
nonce |
12 bytes | Random per-message |
ciphertext |
variable | ChaCha20 encrypted protobuf |
tag |
16 bytes | Poly1305 authentication tag |
\n |
1 byte | Line terminator |
The entire payload (nonce + ciphertext + tag) is Base64-encoded before transmission.
Validation regex: ^[A-Za-z0-9_-]+:[A-Za-z0-9+/=]+$
Buffer Limits¶
- Max buffer: 1 MB per connection (prevents memory exhaustion)
- Line-buffered: Messages are delimited by
\n - Idle timeout: 300 seconds per socket
Connection Handshake¶
HELLO Sequence¶
sequenceDiagram
participant Agent as ESP32 Agent
participant C3PO as C3PO Server
Agent->>C3PO: HELLO:device_id\n
Note over C3PO: Lookup device_id in KeyStore
Note over C3PO: Generate 32-byte random challenge
Note over C3PO: Encrypt with device's ChaCha20 key
C3PO->>Agent: BASE64(encrypted_challenge)\n
Note over Agent: Decrypt challenge with shared key
Note over Agent: Mutual authentication established
Agent->>C3PO: device_id:BASE64(encrypted_protobuf)\n
Note over C3PO: Decrypt + verify tag
Note over C3PO: Deserialize AgentMessage
C3PO->>Agent: device_id:BASE64(encrypted_command)\n
- Agent sends
HELLO:<device_id>\n - C3PO looks up the master key in
keys.json - C3PO generates a 32-byte random challenge, encrypts it, and sends it back
- Agent decrypts — proving it holds the correct key
- Normal encrypted communication begins
Hot-Reload
If a device ID is not found in memory, the KeyStore re-reads keys.json from disk. This allows provisioning new devices without restarting C3PO.
Cryptography¶
Key Hierarchy¶
Master Key (32 bytes) ← Stored in keys.json (C3PO) and NVS (ESP32)
│
├── HKDF-SHA256
│ salt = device_id (UTF-8)
│ info = "espilon-c2-v1"
│
└── Encryption Key (32 bytes) ← Used for ChaCha20-Poly1305
Per-Message Encryption¶
| Algorithm | Parameters |
|---|---|
| Cipher | ChaCha20-Poly1305 (AEAD) |
| Key | 32 bytes (derived via HKDF) |
| Nonce | 12 bytes (random per message) |
| Tag | 16 bytes (Poly1305 authenticator) |
| Plaintext | Serialized protobuf bytes |
Encrypt (TX)¶
# CryptoContext.encrypt(data)
nonce = random(12)
cipher = ChaCha20_Poly1305.new(key=derived_key, nonce=nonce)
ciphertext, tag = cipher.encrypt_and_digest(data)
return nonce + ciphertext + tag
Decrypt (RX)¶
# CryptoContext.decrypt(data)
nonce = data[:12]
tag = data[-16:]
ciphertext = data[12:-16]
cipher = ChaCha20_Poly1305.new(key=derived_key, nonce=nonce)
plaintext = cipher.decrypt_and_verify(ciphertext, tag)
return plaintext
If the tag verification fails, the message is silently dropped (prevents oracle attacks).
Key Derivation¶
# Per-device key derivation
from Crypto.Protocol.KDF import HKDF
from Crypto.Hash import SHA256
encryption_key = HKDF(
master=master_key, # 32 bytes from keys.json
key_len=32,
salt=device_id.encode(), # Device ID as salt
hashmod=SHA256,
context=b"espilon-c2-v1"
)
Each device has a unique encryption key derived from its master key + device ID. Even if two devices share the same master key, their encryption keys differ.
Protobuf Messages¶
C3PO uses Protocol Buffers for message serialization (nanoPB on ESP32, protobuf on Python).
Command (C3PO → Agent)¶
message Command {
string device_id = 1; // Target device
string command_name = 2; // e.g. "ping", "system_reboot"
repeated string argv = 3; // Arguments ["8.8.8.8"]
string request_id = 4; // UUID for tracking responses
}
AgentMessage (Agent → C3PO)¶
enum AgentMsgType {
AGENT_INFO = 0; // Informational message
AGENT_ERROR = 1; // Error report
AGENT_DATA = 2; // Data payload (HP|, CAN|, etc.)
AGENT_LOG = 3; // Log message
AGENT_CMD_RESULT = 4; // Command response
}
message AgentMessage {
string device_id = 1; // Source device
AgentMsgType type = 2; // Message type
string source = 3; // Module name
string request_id = 4; // Correlates to Command.request_id
bytes payload = 5; // UTF-8 encoded content
bool eof = 6; // End of response stream
}
Message Type Routing¶
| Type | Handling |
|---|---|
AGENT_CMD_RESULT |
Parse system_info responses (chip, modules). Forward to session.handle_command_response() |
AGENT_INFO |
Display in TUI/logs. Parse MLAT: prefix for positioning data |
AGENT_ERROR |
Display error. Forward to response handler |
AGENT_LOG |
Display log message |
AGENT_DATA |
Route by prefix: HP\| → honeypot store, CAN\| → CAN store, else → log |
Data Prefixes¶
| Prefix | Format | Destination |
|---|---|---|
HP\| |
HP\|session\|ts\|type\|src\|dst\|port\|payload |
HpStore (SQLite) |
CAN\| |
CAN\|timestamp_ms\|id_hex\|dlc\|data_hex |
CanStore (ring buffer) |
MLAT: |
MLAT:G;lat;lon;rssi or MLAT:L;x;y;rssi |
MlatEngine |
Request ID Routing¶
The request_id field in AgentMessage determines how responses are handled:
| Pattern | Handling |
|---|---|
probe-* |
Silent status processing (heartbeat) |
auto-status-* |
Device info update + TUI notification |
hp-* |
Forward to hp_commander.handle_response() |
| Other | Forward to session.handle_command_response() |
Device Lifecycle¶
stateDiagram-v2
[*] --> Connecting: TCP connect
Connecting --> Handshake: HELLO received
Handshake --> Connected: Key verified
Handshake --> Rejected: Unknown key
Connected --> Connected: Messages (touch last_seen)
Connected --> Probing: 240s inactivity
Probing --> Connected: Response received
Probing --> Inactive: 300s total inactivity
Inactive --> Connected: New message received
Inactive --> Disconnected: Socket error
Connected --> Disconnected: Socket error / close
Disconnected --> [*]
Status Checker Thread¶
Runs continuously in background:
| Threshold | Action |
|---|---|
240 seconds since last_seen |
Send system_info probe (request_id = probe-*) |
300 seconds since last_seen |
Set status to "Inactive" |
| On any message | device.touch() → reset last_seen, set "Connected" |
Core Components¶
Transport¶
The central communication layer. Handles all RX/TX with encryption.
RX Path (handle_incoming):
Raw TCP bytes
→ Parse "device_id:base64_payload"
→ Get CryptoContext (cached per device)
→ Base64 decode
→ ChaCha20-Poly1305 decrypt + verify
→ Protobuf deserialize (AgentMessage)
→ _dispatch() → register device + route message
TX Path (send_command):
Command object
→ Protobuf serialize
→ ChaCha20-Poly1305 encrypt
→ Base64 encode
→ sock.sendall(encoded + b"\n")
Crypto Cache: _crypto_cache[device_id] → CryptoContext — avoids re-deriving HKDF keys on every message.
KeyStore¶
Thread-safe key persistence backed by keys.json.
| Operation | Thread-Safe | Behavior |
|---|---|---|
get(device_id) |
Yes | Returns bytes. If not in memory, re-reads disk (hot-reload) |
add(device_id, key) |
Yes | Validates 32-byte key. Persists to disk |
remove(device_id) |
Yes | Removes + persists |
list_devices() |
Yes | Returns list of device IDs |
Device Registry¶
Thread-safe in-memory device store.
| Method | Description |
|---|---|
add(device) |
Register/replace device |
remove(esp_id) |
Remove + close socket |
get(esp_id) |
Lookup by ID |
get_device_by_sock(sock) |
Reverse lookup by socket |
all() |
List all devices |
touch(esp_id) |
Update last_seen timestamp |
Device Object¶
@dataclass
class Device:
id: str # Device identifier
sock: socket.socket # TCP connection
address: tuple[str, int] # (IP, port)
connected_at: float # Connection timestamp
last_seen: float # Last activity
status: str # "Connected" or "Inactive"
chip: str # ESP32 chip type (from system_info)
modules: str # "network,fakeap,recon" (from system_info)
Group Registry¶
Lightweight device grouping for batch operations.
groups.add_group("floor-2")
groups.add_device("floor-2", "abc12345")
groups.add_device("floor-2", "def67890")
# Send to all devices in group
for device_id in groups.get("floor-2"):
transport.send_command(device_id, cmd)
Session¶
Aggregates all runtime state. Passed to TUI, web server, and commander.
| Attribute | Type | Purpose |
|---|---|---|
registry |
DeviceRegistry | Connected devices |
commands |
CommandRegistry | Handler registration |
groups |
GroupRegistry | Device groups |
transport |
Transport | C2 channel |
active_commands |
dict | In-flight command tracking |
web_server |
UnifiedWebServer | Flask instance |
udp_receiver |
UDPReceiver | Camera receiver |
mlat_engine |
MlatEngine | MLAT positioning |
can_store |
CanStore | CAN frame buffer |
hp_store |
HpStore | Honeypot events (optional) |
Tunnel Server¶
Provides an encrypted SOCKS5 proxy channel through connected agents. Runs in its own asyncio event loop on a dedicated thread.
| Config Variable | Default | Description |
|---|---|---|
TUNNEL_LISTEN_PORT |
(see .env) | Port where C3PO accepts tunnel connections from agents |
TUNNEL_SOCKS_HOST |
127.0.0.1 |
Local SOCKS5 proxy bind address |
TUNNEL_SOCKS_PORT |
(see .env) | Local SOCKS5 proxy port (connect your tools here) |
Traffic flow: tool → SOCKS5 proxy → C3PO tunnel → agent → target.
CAN Store¶
Ring buffer (10,000 frames max) for CAN bus data.
Input format (from AGENT_DATA):
| Method | Returns |
|---|---|
store_frame(device_id, raw_line) |
Parse and store |
get_frames(device_id, can_id, limit, offset) |
Paginated frame list |
get_stats(device_id) |
{total_stored, total_received, unique_can_ids, can_ids} |
export_csv(device_id) |
CSV string |
Threading Model¶
Main Thread
├── TCP accept_loop
│ └── ThreadPoolExecutor (max 50)
│ ├── client_thread(agent_1) # Blocking recv loop
│ ├── client_thread(agent_2)
│ └── ...
│
├── device_status_checker # Heartbeat probe loop (10s interval)
│
├── tunnel-server (asyncio event loop) # SOCKS5 proxy + tunnel listener
│
├── TUI event loop (Textual) # OR headless sleep
│
└── Web server (Werkzeug, daemon)
├── Flask request handlers
└── SSE streaming threads
All shared state (DeviceRegistry, KeyStore, GroupRegistry, CanStore) is protected by threading.Lock().
Next
See the TUI for interactive usage, or the API Reference for programmatic access.