GitHub Profile Pic

Joshua Shoemaker

GitHub

Inspector Gadget State Machine

Where This Started

The original artifact for this submission is a Python State Machine designed for a Raspberry Pi 4 embedded system to simulate a smart thermostat as a proof of concept. This artifact shows an understanding of several concepts important to embedded programming. While creating the smart thermostat, I explored the growing field of the Internet of Things, safety and security concerns, as well as limitations of embedded smart systems. Embedded architecture particularly piqued my interest, and I’m grateful to have the opportunity to explore the concepts of Advanced RISC Machine (ARM) and System-on-Chip (SoC) architectures. I also explored the intricacies of peripheral integration through serial communication protocols, such as the Inter-Integrated Circuit (I2C), Universal Asynchronous Receiver/Transmitter (UART), and the Serial Peripheral Interface (SPI). If you would like to view both the original and enhanced State Machines you can see them in this GitHub repo here

A New Purpose, A New Domain

The enhancement for the Python State Machine drastically reconfigured the structure and purpose. I selected this artifact because I wanted to demonstrate competency with embedded systems architecture and show that the constraints of edge deployment are not a barrier to implementing meaningful security. I increased the complexity of the state machine by introducing a domain model in accordance with the Python State Machine docs (v3.0). This domain model integrated four components: a network packet retriever, a packet feature extractor, an inference model for traffic classification, and an alarm engine to forward identified traffic to a web server. This expanded the Python state machine to three states and three transitions.

The state machine itself is small on purpose. It delegates all of the real work to a separate domain model. Keeping state-transition logic decoupled from business logic is the architectural decision that makes everything else in this file manageable. The three states (`idle`, `monitor`, `hibernate`) and three transitions (`start`, `wait`, `stop`) are all that the machine class has to reason through:

class NetworkMonitorMachine(StateMachine):
	idle = State(initial=True)
	monitor = State()
	hibernate = State(final=True)

	start = idle.to(monitor)
	wait = monitor.to(idle)
	stop = idle.to(hibernate) | monitor.to(hibernate)

The four components described earlier (packet retriever, feature extractor, inference model, and alarm engine) are each their own class. They are all wired together in NetworkMonitorModel.__init__GitHub. This is the "domain model" the Python State Machine library expects, and a organized way to seperate concerns with modular design. This highlights the brilliance of Object Oriented Programming. If any one of these four responsibilities needs to change (a new capture library, a different model, a different backend), only its class changes. The state machine itself stays still. This state machine successfully demonstrates a modular design, separation of concerns, and a deliberate mapping of a problem domain onto a state-machine abstraction.

def __init__(self, interface, model_file):
	self.model_file = model_file
	self.packet_capture = PacketCapture()
	self.traffic_analyzer = TrafficAnalyzer()
	self.inference_engine = Inference_Model(self.model_file)
	self.alert_system = Alert_System()
	self.interface = interface
	self.monitoring_event = threading.Event()
	self.request_wait = threading.Event()

Designing With Constraints

Considering the Pi's limited resources, I ensured the queues handling thread-safe data exchange were capped at 2,000 each. The typicalMaximum Transmission Unit (MTU) for TCP is 1,500 bytes (Gargan, 2025). Therefore, the packet queue, which stores full packets until feature extraction, will consume approximately 3MB of data at maximum capacity. On the Raspberry Pi's 4GB of RAM, this is well within acceptable limits, along with all other functions. The second queue will consume even less volatile memory because the data stored is much smaller. These values can also be modified for a system with tighter resource requirements. This is one of the parts I'm most proud of architecturally. The number 2000 isn't a guess, it's the product of a memory calculation against the Pi's actual hardware. The bounded queue is the single line that enforces the whole budget:

def __init__(self):
	# Bounded queue prevents unbounded memory growth under high traffic
	self.packet_queue = queue.Queue(maxsize=2000)
	self.stop_capture = threading.Event()

And the matching feature queue cap in the Traffic Analyzer which feeds the inference engine:

# Bounded queue prevents memory growth if inference falls behind capture
self.feature_queue = queue.Queue(maxsize=2000)

Because these queues are bounded, a traffic burst can never eat the Pi's RAM. Packets will drop once the cap is hit, which means the rest of the system keeps working. I chose to accept that some packets may be missed during a spike. A better outcome than an out-of-memory crash that takes the whole monitor offline.

Guarding against hostile inputs

I do understand the limitations of using Python for such a network monitor due to the runtime environment overhead, the Global Interpreter Lock (GIL), and automatic memory management. However, this was a deliberate design consideration given the time frame of the project and the re-engineering required to implement a state machine in another language, such as C++. A manual memory management language like C++ also introduces several security concerns regarding buffer overflows and other memory vulnerabilities, which increase overall time investment. To guard against malformed packets, however unlikely for such an unremarkable residential network as mine, selecting only packets with IP and TCP layers reduces the attack surface during feature extraction. Additionally, null features and NaN fields are caught before data is passed to the inference model.

The first line of defense is a filter that drops anything that isn't a TCP/IP packet before it ever reaches the analyzer

def packet_callback(self, packet):
	if IP in packet and TCP in packet:
		self.packet_queue.put(packet)

To guard against flow table exhaustion, the flow statistics dictionary is capped and evicts the 100 oldest entries when the limit is reached, preventing an adversary from filling the table and operating freely in untracked flows. The NaN guard runs at the same boundary. np.std([]) on an empty packet list produces NaN, and a single NaN in the input tensor corrupts every neuron downstream. Both are addressed before any data reaches the interpreter:

# Evict the 100 oldest flows when the table exceeds 5000 entries
# to bound memory use under sustained high-traffic conditions
if len(self.flow_stats) >= 5000:
	for i in range(100):
		oldest = next(iter(self.flow_stats))
		del self.flow_stats[oldest]
# Replace any NaN values (e.g. std of an empty list) with 0.0
# so the tensor fed to the interpreter is always fully numeric
for k, v in attributes.items():
	if v != v: attributes[k] = 0.0  # NaN is the only value not equal to itself

This enhancement allowed me to investigate the intricacies of the GIL and how it affects threading in Python. I found David Beazley's slides from his presentation at PyCon in 2010 to be highly informative in this area. The Python state machine design utilizes several implementations that release the GIL, permitting other threads to operate during blocking operations. The HTTP post and get calls release the GIL during network I/O, meaning the capture and inference threads remain free to run during the full duration of a server round trip. TFLite's invoke call explicitly releases the GIL during inference, which is the most CPU-intensive operation in the pipeline. Scapy's sniff does hold the GIL during packet processing, but because the monitor and alert threads spend most of their time blocked on queue operations and network I/O, respectively, contention is minimal.

def inference_thread(feature_queue):
	while not self.stop_inference.is_set():
		try:
			data = feature_queue.get(timeout=1)
		except queue.Empty: continue

		# Slice the first 19 numeric features; the last 3 keys are
		# metadata (src, sport, dst) and must be excluded from inference
		numpy_data = np.array(list(data.values()))[:-3]
		numpy_data = self.scaler.transform(numpy_data.reshape(1, 19).astype(np.float32))
		self.interpreter.set_tensor(input_details[0]['index'], numpy_data)
		self.interpreter.invoke()
		result = self.interpreter.get_tensor(output_details[0]['index'])
		self.alert_callback(result, data)

A Secure Path to the Backend

For the secure API post method, I stored a shared key as an environment variable for separation and will ensure the connection is over HTTPS to protect the key in transit.

def __init__(self):
	self.key = os.environ['API_KEY']
	self.api_url = os.environ['SERVER_URL']
	self.alert_event = threading.Event()

Before sending a batch of alerts, the alert thread first checks the server is reachable with a heartbeat GET.

# Heartbeat GET confirms the server is reachable before posting
try:
	heartbeat = requests.get(
		url=f'{self.api_url}/heartbeat',
		headers={'Authorization': f'Bearer {self.key}'},
		timeout=5)
except requests.exceptions.Timeout:
	self.log_errors(activities=[], occasion={'heartbeat': 'Timeout'}, time=datetime.now())
	continue

If the heartbeat fails, the batch is never sent. However, it's also never lost, because the failure is logged to a local file. I implemented the same function if the POST itself fails:

# Post to database api
try:
	response = requests.post(
		url=f'{self.api_url}/alerts',
		json=alert_list,
		headers={'Authorization': f'Bearer {self.key}'},
		timeout=5)
except requests.exceptions.RequestException as e:
	# log in local storage file.
	self.log_errors(activities=alert_list, occasion={'Post': f'{e}'}, time=datetime.now())
	continue

Just Hanging Around

One detail I would like to give more attention to is the idle state behavior. A traditional state machine would either spin in idle (wasting CPU) or sleep for a fixed interval (wasting latency on a quiet network). This implementation does neither. Instead, it performs a single-packet blocking sniff. The first packet that arrives instantly wakes the machine back into monitor mode and is put into the packet queue so it isn't lost:

def on_enter_idle(self):
	sniffer = AsyncSniffer(iface=self.model.interface, count=1)
	sniffer.start()
	sniffer.join()
	for result in sniffer.results:
		self.model.add_packet(result)
	self.send('start')

The monitor state itself transitions back to idle after two consecutive empty-queue seconds. The machine smarlty transitions between idle and monitor automatically based on network activity. It's a small piece of control-flow design that I wanted to automate for efficiency.

def monitor_thread():
	self.packet_capture.start_capture(self.interface)
	self.inference_engine.start_inference(self.traffic_analyzer.feature_queue)
	self.alert_system.send_alerts(self.inference_engine.alert_queue)

	count = 0
	while not self.monitoring_event.is_set():
		try:
			packet = self.packet_capture.packet_queue.get(timeout=1)
			count = 0
			self.traffic_analyzer.analyze_packet(packet)
		except queue.Empty:
			count += 1
			if count > 1:
				self.request_wait.set()
				return
			continue

	self.packet_capture.stop()

Conclusions

This enhancement shows my ability to design and evaluate a computing solution while managing the trade-offs involved in design choices. Every deliberate decision I made about queue sizing, flow-table management, and thread coordination was a trade-off between throughput, memory, and resilience on constrained hardware. I applied algorithmic principles and industry-standard practices to a real problem. Throughout the project I maintained a security mindset that anticipates adversarial exploits. The HTTPS-plus-bearer-token path between the Pi and the server, a bounded flow table that closes a denial-of-service hole in my own monitor, and the NaN and malformed-packet guards are all the product of thinking like an attacker.

Deployment also surfaced a subtle but critical bug in the state machine class signature. The python-statemachine documentation shows a generic type parameter: NetworkMonitorMachine(StateMachine[ 'NetworkMonitorModel']) which looks reasonable on paper. At runtime, however, python-statemachine 3.x does not implement a true generic. The subscript is type-hint only. Python raised aTypeError the moment the class body was evaluated, before a single packet was ever seen. The fix was simply removing the subscript and writing class NetworkMonitorMachine(StateMachine):. It was a good reminder that type annotations and runtime behavior are not the same thing, and that reading library documentation is not a substitute for running the code.

The most significant structural lesson came from a threading mistake I made in the monitor loop. Initially I forgot to follow the controller-worker design pattern. I placed state-change logic inside monitor_thread, which called machine.send('wait') directly. Because send()is synchronous and immediately re-enters the state machine, it spawned a newmonitor_thread before the old one returned, creating an infinite threading loop that crashed with TransitionNotAllowed. I remembered to keep controlling logic separate from working logic. I then created a run() method on the state machine itself so it can drive all state transitions. I made NetworkMonitorModel a supervisor that only delegates and coordinates work to the other classes. It signals readiness via a threading.Event and returns. The state machine'srun() loop waits on that signal, then decides what to do next:

def run(self):
	try:
		while self.configuration != self.hibernate:
			self.model.request_wait.wait()
			self.model.request_wait.clear()
			self.send('wait')
	except KeyboardInterrupt:
		self.send('stop')

The biggest lesson for me was that "design" isn't something you do once at the start and walk away from. Every line in this file is a small design decision. The choice of queue size, the choice of when to drop a packet, the choice of which thread holds the GIL, the choice of what to do when the server is unreachable. All very valuable lessons when developing with embedded systems. Something I'm actually quite fond of.

Trial by Fire Updates

After letting the monitor run against a tcpreplay loop for 24 hours, it had analyzed over 12,000 flows and posted zero alerts. That result needed explaining.

The first problem was in flow tracking. The flow tracker discarded any flow that lacked a captured SYN handshake. With tcpreplay replaying a historical pcap, that's most flows. The fix assigns direction metadata to those flows instead of deleting them. The model doesn't need to know which side initiated the connection. It needs consistent feature values.

elif not stats['Destination Port']:
	# Flow seen mid-stream; assign as-is so it still reaches inference
	stats['Destination Port'] = dport
	stats['src'] = src
	stats['sport'] = sport
	stats['dst'] = dst

The second problem was quieter. When the queue is empty, the alert thread still posts an empty array to /api/alerts. insertMany([])returns 201 with no documents written. Railway showed clean 201s the entire time. The database was empty. This confirmed the real problem was upstream: flows weren't reaching inference.

A third issue was connected to the first. The FIN threshold was >= 1, extracting a flow the moment one side sent a FIN. Flows that never fully closed were going to inference with incomplete data. Raising the threshold to > 1 waits for both FINs. Flows that never close cleanly are handled by the stale sweep instead.

elif stats['FIN Flag Count'] > 1:
	# FIN observed - flow is closing; extract features now
	try:
		self.feature_queue.put_nowait(self.extract_attributes(stats))
		del self.flow_stats[flow_key]
	except queue.Full: pass

Not every flow accumulates 25 packets or reaches a clean FIN exchange. For those, I added an hourly cleanup pass to the Traffic Analyzer. Flows inactive for more than an hour get their features extracted and queued for inference. They're then removed from the flow table. The sweep uses packet.time from the pcap, not the system clock. Replayed timestamps work correctly as a result.

Every flow now has a path to inference: 25 or more packets, both FINs observed, or the stale sweep after an hour of inactivity. The monitor was architecturally sound. It just wasn't seeing most of its traffic.

elif self.timer + timedelta(hours=1) < datetime.now():
	old_flows = list()
	self.timer = datetime.now()
	for k, s in self.flow_stats.items():
		last_packet = s['last_bwd_time']
		if last_packet and packet.time - last_packet > 3600:
			self.feature_queue.put_nowait(self.extract_attributes(s))
			old_flows.append(k)
		elif packet.time - s['last_fwd_time'] > 3600:
			self.feature_queue.put_nowait(self.extract_attributes(s))
			old_flows.append(k)

	for key in old_flows:
		del self.flow_stats[key]

References

Beazley, D. (2010). Understanding the Python GIL. dabeaz.com. https://dabeaz.com/python/UnderstandingGIL.pdf

Gargan, R. (2025, January 16). What is MTU size? Effects on speed and network efficiency. Netmaker. https://www.netmaker.io/resources/mtu-size