- Date Created: 2025-12-09
- Last Modified: 2025-12-09
- Status: DESIGN PHASE
Progress Log: EventQueue Design - Unified Event Buffering Architecture¶
Task Description¶
Design and plan the implementation of EventQueue, a detector-triggered event buffering system that complements CommandQueue for the unified device response protocol (ENABLE_DEVICE_RESPONSE=1).
Context:
- Phase 1-3: Device response protocol foundation (device_response_t, Command class, event_response module)
- Phase 4: EventQueue system for detector-triggered events (this task)
Goal: Create an EventQueue class that:
- Uses
device_response_tas the unified event message type (notevent_t) - Demonstrates architectural symmetry with CommandQueue (dual queuing system)
- Enables buffering of high-frequency detection events (10-50 Hz typical)
- Maintains clear separation between command path and event path
- Integrates seamlessly with ENABLE_DEVICE_RESPONSE=1 builds
Outcome¶
Architecture Analysis: CommandQueue vs EventQueue¶
CommandQueue (User-Triggered):
- Input:
command_t(parsed text command) - Trigger: Serial input available (low frequency, ~1 Hz)
- Queue: FreeRTOS static, COMMAND_QUEUE_SIZE=10 items
- Processing:
receive()→ parse → queue →execute()(dispatch to handler) - Output: JSON response via Serial (variable size, 100-500 bytes)
- Stream Control: Via text commands (SET_STREAM)
- Purpose: User-initiated configuration and queries
EventQueue (Detector-Triggered):
- Input:
device_response_t(detection event envelope) - Trigger: cosmic_read().detected (high frequency, 10-50 Hz possible)
- Queue: FreeRTOS static, EVENT_QUEUE_SIZE=200 items
- Processing:
enqueue()(from detection loop) →flush()(drains queue) - Output: JSON Lines via Serial (fixed ~150-200 bytes per event)
- Stream Control: Via Command.get_stream() (same as CommandQueue)
- Purpose: Detector-triggered event buffering and output
Key Architectural Decisions¶
1. device_response_t as Unified Type¶
Why NOT use event_t?
event_tis a legacy data structure (stream_data.h)- Not compatible with unified device response schema
- Mixing event_t and device_response_t violates protocol consistency
- ENABLE_DEVICE_RESPONSE=1 implies all output is device_response_t
Why use device_response_t?
- ✅ Single type for all messages (responses AND events)
- ✅ JSON schema validation (device-response.json)
- ✅ Type safety: compiler ensures correct structure
- ✅ Future multi-output support (Serial, WiFi, LittleFS) uses same type
- ✅ Consistent with Command class integration
Flow:
cosmic_read().detected
↓
event_t {hit1, hit2, hit3, adc, ...} [local detection data]
↓
event_to_device_response(event_t*) [conversion function]
↓
device_response_t {type, status, sent_at, hit1, hit2, hit3, adc, ...}
↓
EventQueue::enqueue(device_response_t*)
↓
EventQueue::flush() → Serial.println(JSON)
2. Queue Capacity Sizing¶
CommandQueue: 10 items
- Low frequency: 0.1-1 command per second
- Large messages: 100-500 bytes per response
- Use case: Occasional user queries, rare configuration changes
EventQueue: 200 items
- High frequency: 10-50 events per second possible
- Compact messages: ~150-200 bytes per event (JSON Lines)
- Use case: Continuous cosmic ray detection burst handling
- Memory budget: 200 × 150 bytes ≈ 30 KB (acceptable, ESP32 has 320KB RAM)
Rationale for 200:
- If detection rate is 50 Hz and flush() is called every 10 ms, queue fills ~5 items
- If commands cause brief Serial delays (50 ms), queue absorbs 2-3 events
- If complete event backlog occurs (e.g., wireless buffering), 200 provides 4 seconds at 50 Hz
- Conservative safety margin without excessive memory waste
3. Operational Symmetry with CommandQueue¶
// CommandQueue (User-Triggered)
void loop() {
CommandQueue::receive(); // Serial → queue
CommandQueue::execute(); // queue → dispatch → handler → response
}
// EventQueue (Detector-Triggered)
void loop() {
cosmic_detection_t det = cosmic_read();
if (det.detected) {
device_response_t resp = event_to_device_response(event);
EventQueue::enqueue(&resp); // Detection → queue
}
EventQueue::flush(); // queue → Serial output
}
Both follow identical pattern:
- Collect input (command / event)
- Queue for deferred processing (non-blocking)
- Drain queue and process (execute / flush)
Benefits:
- Developers understand both systems through single mental model
- Code review is consistent (same patterns, same conventions)
- Future multi-threaded scenarios have proven synchronization model
- Easy to add statistics/monitoring (both use FreeRTOS queue API)
4. FreeRTOS Queue Choice¶
Why FreeRTOS?
CommandQueue already uses xQueueCreateStatic() (FreeRTOS static queue):
- Thread-safe with minimal locking (hardware-level atomicity)
- Non-blocking enqueue/dequeue operations
- No heap allocation (static arrays, deterministic memory)
- Native ESP-IDF integration (no third-party dependencies)
- Proven reliability (battle-tested in RTOS kernel)
EventQueue will use identical approach:
// In event_queue.cpp
static StaticQueue_t g_event_queue_static;
static uint8_t g_event_queue_buffer[EVENT_QUEUE_SIZE * sizeof(device_response_t)];
static QueueHandle_t g_event_queue_handle;
void EventQueue::init() {
g_event_queue_handle = xQueueCreateStatic(
EVENT_QUEUE_SIZE,
sizeof(device_response_t),
g_event_queue_buffer,
&g_event_queue_static
);
}
Stream Control Integration¶
Unified Stream Flag:
- Both CommandQueue and EventQueue respect the same
stream_enabledflag - Flag source:
Command::getInstance().get_stream()(ENABLE_DEVICE_RESPONSE=1) - Fallback:
config_get_stream_enabled()(ENABLE_DEVICE_RESPONSE=0, legacy)
EventQueue Implementation:
bool EventQueue::flush() {
if (!get_stream_enabled()) {
// Stream disabled: Clear queue but don't output
clear();
return false;
}
// Stream enabled: Dequeue and send events
bool sent_any = false;
device_response_t response;
while (xQueueReceive(g_event_queue_handle, &response, 0)) {
send_device_response(&response);
sent_any = true;
}
return sent_any;
}
Design Comparison Matrix¶
| Aspect | CommandQueue | EventQueue | Relation |
|---|---|---|---|
| Input Type | command_t |
device_response_t |
Different; no mixing |
| Queue Container | FreeRTOS static (10) | FreeRTOS static (200) | Identical pattern, different size |
| Trigger | Serial.available() | cosmic_read().detected | Different events, same buffering |
| Main Operations | receive, execute | enqueue, flush | Symmetrical naming |
| Output Format | JSON (variable) | JSON Lines (fixed) | Both JSON, different cardinality |
| Stream Control | SET_STREAM command | Command.get_stream() | Unified flag source |
| Overflow Handling | Drop command | Drop event | Same policy (safety) |
| Thread Safety | FreeRTOS queue | FreeRTOS queue | Identical mechanism |
Learnings¶
Design Principles Applied¶
- Single Responsibility: EventQueue handles ONLY buffering/dispatching, not serialization
- Serialization delegated to
send_device_response()in event_response.cpp -
Decoupled: queue implementation independent of JSON format
-
Unified Protocol: device_response_t is the single message type
- No legacy event_t in queue (breaks protocol consistency)
- All output passes through same JSON serialization path
-
Future multi-output (WiFi, LittleFS) uses identical structures
-
Architectural Symmetry: CommandQueue and EventQueue are dual systems
- Same FreeRTOS queue primitives
- Parallel processing patterns (receive+execute vs enqueue+flush)
- Both feed into Serial output via unified protocol
-
Developers understand one → understand both
-
High-Frequency Buffering: Queue size (200) matches detector reality
- Handles burst detection (50 Hz × 4 seconds = 200 events max)
- Accommodates Serial delays (temporary backpressure)
- Conservative memory budget (~30 KB out of 320 KB RAM)
3-Layer Architecture: CommandQueue と EventQueue の対称性¶
Layer 1-3 の構造:
Layer 1: データ構造
command_t (command_queue.h) event_t (event.h)
↓ ↓
Layer 2: キューイング
CommandQueue EventQueue
↓ ↓
Layer 3: プロトコル変換
(JSON生成) event_to_device_response()
↓ ↓
device_response_t (共通型) device_response_t (共通型)
↓ ↓
CommandQueue::execute() send_device_response()
↓ ↓
Serial JSON応答 Serial JSON Lines出力
CommandQueue と EventQueue の完全な対称性:
| 側面 | CommandQueue | EventQueue |
|---|---|---|
| 入力データ型 | command_t |
event_t |
| 定義ファイル | command_queue.h | event.h (新規) |
| キュー容量 | 10項目 | 200項目 |
| トリガー | Serial.available() | cosmic_read().detected |
| キュークラス | CommandQueue | EventQueue |
| 主要操作 | receive(), execute() | enqueue(), flush() |
| 変換関数 | (JSON直接生成) | event_to_device_response() |
| 出力型 | device_response_t | device_response_t |
| 送信 | execute() 内 | send_device_response() |
| メモリ | ~200 bytes (command_t × 10) | ~30 KB (event_t × 200) |
event_t の設計(新規最適化版)¶
戦略: stream_data.hの event_tを再利用せず、EventQueue専用に最適化した新規構造体を定義
ファイル: include/event_queue.h(新規、event_t + EventQueue クラスを包含)
主要設計原則:
- 独立性: stream_data.hのevent_tと完全に独立
- legacy path(ENABLE_DEVICE_RESPONSE=0)と無関係
-
複数のevent構造体をサポート可能(e.g., event_queue版event_t vs file storage版event_t)
-
最適化: FreeRTOS キューイング向けにサイズと配置を最適化
- int16_t を使用(int より小さい)
- フィールド配置をキャッシュ効率化
-
200項目 × ~100-150 bytes = ~30 KB (acceptable on ESP32 with 320KB RAM)
-
拡張性: ENABLE_* フラグで optional フィールドを制御
- ENABLE_HITTYPE: hit_type bitmask
- ENABLE_ADCMV: ADC millivolt conversion
- ENABLE_BME280: Environmental sensor data
- ENABLE_TIMESTAMP: Timing data (uptime_ms, timedelta_us)
- ENABLE_RTC: Unix timestamp
-
ENABLE_GNSS: GNSS positioning
-
型安全性: event_tはEventQueue/event_response専用
- compilerが正しい構造体を強制
- legacy event_tとの混在を防止
構造体定義:
// include/event.h
#ifndef EVENT_H
#define EVENT_H
#include <stdint.h>
#include "config.h"
#if ENABLE_DEVICE_RESPONSE
/**
* @brief Detection event data optimized for EventQueue buffering
*
* Lightweight structure designed for high-frequency queue operations.
* Independent from legacy stream_data.h::event_t (legacy path).
*
* Core fields (always present, 8 bytes):
* - hit1, hit2, hit3: Detection counts (0-100) per channel
* - adc: ADC reading (12-bit, 0-4095)
*
* Optional fields (ENABLE_* flags):
* - hit_type: Detection pattern bitmask (ENABLE_HITTYPE)
* - adc_mv: ADC millivolt conversion (ENABLE_ADCMV)
* - temperature, pressure, humidity: Environmental (ENABLE_BME280)
* - uptime_ms, timedelta_us: Timing (ENABLE_TIMESTAMP)
* - unix_timestamp: Absolute time (ENABLE_RTC)
* - gnss_*: GNSS positioning (ENABLE_GNSS)
*/
typedef struct {
// Core fields (always present)
uint16_t hit1; /**< Detection count channel 1 (0-100) */
uint16_t hit2; /**< Detection count channel 2 (0-100) */
uint16_t hit3; /**< Detection count channel 3 (0-100) */
int16_t adc; /**< ADC reading (0-4095), optimized to int16_t */
// Optional fields controlled by ENABLE_* flags
#if ENABLE_HITTYPE
uint8_t hit_type; /**< Detection pattern bitmask (bit0=ch1, bit1=ch2, bit2=ch3) */
#endif
#if ENABLE_ADCMV
uint16_t adc_mv; /**< Converted ADC value in millivolts (0-3300 on 3.3V ESP32) */
#endif
#if ENABLE_BME280
float temperature; /**< Temperature in degrees Celsius */
float pressure; /**< Pressure in Pascals (Pa) */
float humidity; /**< Relative humidity percentage (0-100) */
#endif
#if ENABLE_TIMESTAMP
uint32_t uptime_ms; /**< Board uptime since boot (milliseconds) */
uint64_t timedelta_us; /**< Interval between consecutive events (microseconds) */
#endif
#if ENABLE_RTC
uint32_t unix_timestamp; /**< Absolute time of detection (unix seconds since 1970) */
#endif
#if ENABLE_GNSS
double gnss_latitude; /**< Latitude in decimal degrees (-90 to +90) */
double gnss_longitude; /**< Longitude in decimal degrees (-180 to +180) */
float gnss_altitude; /**< Altitude above sea level (meters) */
uint8_t gnss_satellites; /**< Number of satellites in use (0-24) */
uint8_t gnss_fix_quality; /**< Fix quality indicator (0-8 per NMEA GGA spec) */
float gnss_hdop; /**< Horizontal dilution of precision (unitless) */
bool gnss_fix_valid; /**< True if current fix is valid; false if stale/no-fix */
#endif
} event_t;
#endif // ENABLE_DEVICE_RESPONSE
#endif // EVENT_H
Processing Flow: 処理フロー比較¶
CommandQueue パス (ユーザーリクエスト):
// Step 1: データ収集
Serial input available
↓
// Step 2: キューイング
CommandQueue::receive()
→ parse(line) → command_t
→ CommandQueue::enqueue(command_t)
↓
// Step 3: 処理と変換
CommandQueue::execute()
→ dispatch(command_t) → handler
→ generate JSON response
→ (device_response_t作成は executor内)
↓
// Step 4: 出力
Serial.println(JSON)
EventQueue パス (検出器トリガー):
// Step 1: データ収集
cosmic_read().detected
↓
collect_sensors() → event_t
↓
// Step 2: キューイング
EventQueue::enqueue(event_t)
↓
// Step 3: 処理と変換
EventQueue::flush()
→ event_to_device_response(event_t)
→ device_response_t
↓
// Step 4: 出力
send_device_response(device_response_t)
→ Serial.println(JSON)
共通点: 1. Collect input (command / event) 2. Queue for deferred processing (non-blocking) 3. Convert to device_response_t 4. Output JSON to Serial
ファイル構成(新規ファイル含む)¶
include/
├── command_queue.h (既存: command_t定義, CommandQueue クラス)
├── command.h (既存: Command シングルトン)
├── event_queue.h (新規: event_t定義, EventQueue クラス定義)
├── event_response.h (既存: event_response 関数群)
├── device_response.h (既存: device_response_t, builder)
└── stream_data.h (既存: legacy event_t)
src/
├── command_queue.cpp (既存)
├── command.cpp (既存)
├── event_queue.cpp (新規: EventQueue実装)
├── event_response.cpp (更新: event_to_device_response追加)
└── device_response.cpp (既存)
Why NOT Use event_t in Queue?¶
Rejected: stream_data.h の event_t を再利用
- Legacy structure designed for stream_formatter (SSV/TSV/CSV/JSONL)
- Optional fields は #if ENABLE で混在(メモリ配置が複雑)
- stream_formatter.h との依存関係が発生
- ENABLE_DEVICE_RESPONSE=0 (legacy path) と無関係に進める場合に支障
Chosen: event.hの新規event_t(EventQueue専用最適化版)
- ✅ Direct EventQueue design(FreeRTOSキュー用に最適化)
- ✅ stream_data.hと完全独立(legacy pathと無関係)
- ✅ シンプルなファイル名(より汎用的)
- ✅ Zero conversion overhead at enqueue(データはそのままキューイング)
- ✅ Schema-validatable at every stage(device_response_t変換時)
- ✅ Scales to multi-output scenarios(WiFi, LittleFS など将来対応)
- ✅ Type safety(compiler が正しい構造体を強制)
Integration with event_response.h¶
Current (Phase 3):
// event_response.cpp
void send_event(const event_t *data) {
if (!get_stream_enabled()) return;
send_event_as_device_response(data);
}
void send_event_as_device_response(const event_t *data) {
// Inline JSON serialization
}
Future (Phase 4 with EventQueue):
// event_response.h - NEW function
inline device_response_t event_to_device_response(const event_t *event) {
device_response_t resp = device_response_ok(DEVICE_TYPE_EVENT);
// Copy fields: hit1, hit2, hit3, adc, temp_c, etc.
return resp;
}
// main.cpp - Updated detection loop
if (detection.detected) {
device_response_t response = event_to_device_response(&event);
EventQueue::enqueue(&response);
}
// event_response.cpp - Renamed function
void send_device_response(const device_response_t *response) {
// Serialize device_response_t (unified, not event_t-specific)
}
Potential Issues and Mitigations¶
- Issue: High-frequency queue fills faster than Serial output
- Symptom: Dropped events (enqueue returns false)
- Mitigation: Monitor queue depth via
get_queued_count() - Debug: Add statistics command to report overflow events
-
Future: Implement buffered Serial or alternative output (WiFi)
-
Issue: device_response_t is larger than event_t (additional envelope fields)
- Analysis: device_response_t = ~280 bytes, event_t = ~100 bytes
- Impact: Queue memory increases 3× (more cost for safety)
- Trade-off: Unified protocol worth the extra memory (DX improvement)
-
Optimization: Consider struct packing if memory becomes critical
-
Issue: Serialization of device_response_t to JSON per-event
- Current: send_device_response() creates JsonDocument per event
- Memory: ~2 KB stack per event (temporary, freed after send)
- Risk: Stack overflow if JsonDocument + detection variables overlap
-
Mitigation: Use JSONL format (no pretty-printing) to minimize document size
-
Issue: Command class integration (ENABLE_DEVICE_RESPONSE=1 only)
- Current: EventQueue uses Command::getInstance().get_stream()
- Risk: LinkError if ENABLE_DEVICE_RESPONSE=0
- Mitigation: EventQueue is ENABLE_DEVICE_RESPONSE=1 only (guarded by #if)
Implementation Checklist (Phase 4, v1.14.0 planned)¶
Design & Architecture ✅ COMPLETED (this document)¶
- Define EventQueue interface (enqueue, flush, has_pending, etc.)
- Justify device_response_t choice over event_t
- Document symmetry with CommandQueue
- Analyze FreeRTOS queue sizing (200 items)
- Outline integration with event_response.h
File Structure (PLANNED)¶
- Create
include/event_queue.h(static class definition) - Create
src/event_queue.cpp(FreeRTOS implementation) - Update
include/event_response.h(add event_to_device_response inline) - Update
src/event_response.cpp(rename, unify serialization)
Integration (PLANNED)¶
- Update
main.cppdetection loop: - Add device_response_t creation
- Call EventQueue::enqueue()
- Replace send_event() calls with EventQueue::flush()
- Verify build:
- No linker errors
- No circular dependencies
- ENABLE_DEVICE_RESPONSE=1 only
Testing (PLANNED)¶
- Unit test: EventQueue::enqueue/flush basic operations
- Unit test: Queue overflow (enqueue returns false when full)
- Integration test: Detection loop → EventQueue → Serial output
- Verification: JSON output matches device-response.json schema
- Regression: ENABLE_DEVICE_RESPONSE=0 still works (uses legacy path)
Documentation (PLANNED)¶
- Update REFACTORING_ROADMAP.md Phase 4 status
- Document EventQueue API in docs/api.md
- Update CLAUDE.md with EventQueue patterns
Next Steps¶
Immediate (v1.14.0 implementation)¶
- Implement
include/event_queue.h - Static class with FreeRTOS queue interface
- Match CommandQueue design patterns for consistency
-
Include comprehensive JSDoc comments
-
Implement
src/event_queue.cpp - FreeRTOS queue initialization and operations
- Overflow handling (safety drop on full)
-
Statistics tracking (get_queued_count, get_capacity)
-
Update
include/event_response.h - Add inline
event_to_device_response()conversion function - Populate all fields from event_t to device_response_t
-
Handle ENABLE_* flags (hittype, adcmv, bme280, timestamp, rtc, gnss)
-
Update
src/event_response.cpp - Rename
send_event_as_device_response()→send_device_response() - Accept device_response_t instead of event_t
-
Use ArduinoJson for JSONL serialization
-
Update
src/main.cpp - Add EventQueue::init() to setup()
- Modify detection loop to use EventQueue
- Replace send_event() calls with EventQueue::flush()
Short-term (v1.14.0+)¶
- Unit test suite for EventQueue operations
- Integration tests with cosmic detector simulation
- JSON schema validation of flushed events
- Performance profiling (queue depth monitoring)
Medium-term (v1.15.0+)¶
- Multi-output support: EventQueue → WiFi endpoint
- Event filtering/batching (e.g., "send every 10 events" mode)
- Persistent event logging (SD card, LittleFS)
- Event statistics dashboard (peak rate, total count, overflow incidents)
Long-term (v2.0.0+)¶
- Alternative serialization formats (MessagePack, Protobuf)
- Multi-queue support (separate critical/non-critical events)
- Time-series database integration
- Real-time event streaming via WebSocket
References¶
Related Design Documents¶
- REFACTORING_ROADMAP.md: Phase 1-4 planned roadmap
- docs/progress/entries/2025-12-09-event-response-phase3-design.md: Phase 3 event output unification
- include/command_queue.h: CommandQueue reference implementation (L1-435)
- include/device_response.h: Unified device response protocol (L1-150)
- include/command.h: Command singleton for unified config (L1-460)
Implementation References¶
- FreeRTOS Documentation: Queue API (xQueueCreateStatic, xQueueSend, xQueueReceive)
- ESP-IDF: FreeRTOS integration and queue patterns
- ArduinoJson: JSON serialization for device_response_t
Architecture Patterns¶
- Single Responsibility Principle: Queue handles buffering only, serialization separate
- Static Class Pattern: CommandQueue and EventQueue use identical C++ static approach
- Unified Protocol: All messages are device_response_t (command responses, detection events)
- Non-blocking Operations: enqueue/flush designed for main loop without blocking