Skip to content
  • Date Created: 2025-12-09
  • Last Modified: 2025-12-09
  • Status: DESIGN PHASE

Progress Log: EventQueue Design - Unified Event Buffering Architecture

Task Description

Design and plan the implementation of EventQueue, a detector-triggered event buffering system that complements CommandQueue for the unified device response protocol (ENABLE_DEVICE_RESPONSE=1).

Context:

  • Phase 1-3: Device response protocol foundation (device_response_t, Command class, event_response module)
  • Phase 4: EventQueue system for detector-triggered events (this task)

Goal: Create an EventQueue class that:

  1. Uses device_response_t as the unified event message type (not event_t)
  2. Demonstrates architectural symmetry with CommandQueue (dual queuing system)
  3. Enables buffering of high-frequency detection events (10-50 Hz typical)
  4. Maintains clear separation between command path and event path
  5. Integrates seamlessly with ENABLE_DEVICE_RESPONSE=1 builds

Outcome

Architecture Analysis: CommandQueue vs EventQueue

CommandQueue (User-Triggered):

  • Input: command_t (parsed text command)
  • Trigger: Serial input available (low frequency, ~1 Hz)
  • Queue: FreeRTOS static, COMMAND_QUEUE_SIZE=10 items
  • Processing: receive() → parse → queue → execute() (dispatch to handler)
  • Output: JSON response via Serial (variable size, 100-500 bytes)
  • Stream Control: Via text commands (SET_STREAM)
  • Purpose: User-initiated configuration and queries

EventQueue (Detector-Triggered):

  • Input: device_response_t (detection event envelope)
  • Trigger: cosmic_read().detected (high frequency, 10-50 Hz possible)
  • Queue: FreeRTOS static, EVENT_QUEUE_SIZE=200 items
  • Processing: enqueue() (from detection loop) → flush() (drains queue)
  • Output: JSON Lines via Serial (fixed ~150-200 bytes per event)
  • Stream Control: Via Command.get_stream() (same as CommandQueue)
  • Purpose: Detector-triggered event buffering and output

Key Architectural Decisions

1. device_response_t as Unified Type

Why NOT use event_t?

  • event_t is a legacy data structure (stream_data.h)
  • Not compatible with unified device response schema
  • Mixing event_t and device_response_t violates protocol consistency
  • ENABLE_DEVICE_RESPONSE=1 implies all output is device_response_t

Why use device_response_t?

  • ✅ Single type for all messages (responses AND events)
  • ✅ JSON schema validation (device-response.json)
  • ✅ Type safety: compiler ensures correct structure
  • ✅ Future multi-output support (Serial, WiFi, LittleFS) uses same type
  • ✅ Consistent with Command class integration

Flow:

cosmic_read().detected
    ↓
event_t {hit1, hit2, hit3, adc, ...} [local detection data]
    ↓
event_to_device_response(event_t*) [conversion function]
    ↓
device_response_t {type, status, sent_at, hit1, hit2, hit3, adc, ...}
    ↓
EventQueue::enqueue(device_response_t*)
    ↓
EventQueue::flush() → Serial.println(JSON)

2. Queue Capacity Sizing

CommandQueue: 10 items

  • Low frequency: 0.1-1 command per second
  • Large messages: 100-500 bytes per response
  • Use case: Occasional user queries, rare configuration changes

EventQueue: 200 items

  • High frequency: 10-50 events per second possible
  • Compact messages: ~150-200 bytes per event (JSON Lines)
  • Use case: Continuous cosmic ray detection burst handling
  • Memory budget: 200 × 150 bytes ≈ 30 KB (acceptable, ESP32 has 320KB RAM)

Rationale for 200:

  • If detection rate is 50 Hz and flush() is called every 10 ms, queue fills ~5 items
  • If commands cause brief Serial delays (50 ms), queue absorbs 2-3 events
  • If complete event backlog occurs (e.g., wireless buffering), 200 provides 4 seconds at 50 Hz
  • Conservative safety margin without excessive memory waste

3. Operational Symmetry with CommandQueue

// CommandQueue (User-Triggered)
void loop() {
  CommandQueue::receive();   // Serial → queue
  CommandQueue::execute();   // queue → dispatch → handler → response
}

// EventQueue (Detector-Triggered)
void loop() {
  cosmic_detection_t det = cosmic_read();
  if (det.detected) {
    device_response_t resp = event_to_device_response(event);
    EventQueue::enqueue(&resp);  // Detection → queue
  }
  EventQueue::flush();  // queue → Serial output
}

Both follow identical pattern:

  1. Collect input (command / event)
  2. Queue for deferred processing (non-blocking)
  3. Drain queue and process (execute / flush)

Benefits:

  • Developers understand both systems through single mental model
  • Code review is consistent (same patterns, same conventions)
  • Future multi-threaded scenarios have proven synchronization model
  • Easy to add statistics/monitoring (both use FreeRTOS queue API)

4. FreeRTOS Queue Choice

Why FreeRTOS?

CommandQueue already uses xQueueCreateStatic() (FreeRTOS static queue):

  • Thread-safe with minimal locking (hardware-level atomicity)
  • Non-blocking enqueue/dequeue operations
  • No heap allocation (static arrays, deterministic memory)
  • Native ESP-IDF integration (no third-party dependencies)
  • Proven reliability (battle-tested in RTOS kernel)

EventQueue will use identical approach:

// In event_queue.cpp
static StaticQueue_t g_event_queue_static;
static uint8_t g_event_queue_buffer[EVENT_QUEUE_SIZE * sizeof(device_response_t)];
static QueueHandle_t g_event_queue_handle;

void EventQueue::init() {
  g_event_queue_handle = xQueueCreateStatic(
    EVENT_QUEUE_SIZE,
    sizeof(device_response_t),
    g_event_queue_buffer,
    &g_event_queue_static
  );
}

Stream Control Integration

Unified Stream Flag:

  • Both CommandQueue and EventQueue respect the same stream_enabled flag
  • Flag source: Command::getInstance().get_stream() (ENABLE_DEVICE_RESPONSE=1)
  • Fallback: config_get_stream_enabled() (ENABLE_DEVICE_RESPONSE=0, legacy)

EventQueue Implementation:

bool EventQueue::flush() {
  if (!get_stream_enabled()) {
    // Stream disabled: Clear queue but don't output
    clear();
    return false;
  }

  // Stream enabled: Dequeue and send events
  bool sent_any = false;
  device_response_t response;
  while (xQueueReceive(g_event_queue_handle, &response, 0)) {
    send_device_response(&response);
    sent_any = true;
  }
  return sent_any;
}

Design Comparison Matrix

Aspect CommandQueue EventQueue Relation
Input Type command_t device_response_t Different; no mixing
Queue Container FreeRTOS static (10) FreeRTOS static (200) Identical pattern, different size
Trigger Serial.available() cosmic_read().detected Different events, same buffering
Main Operations receive, execute enqueue, flush Symmetrical naming
Output Format JSON (variable) JSON Lines (fixed) Both JSON, different cardinality
Stream Control SET_STREAM command Command.get_stream() Unified flag source
Overflow Handling Drop command Drop event Same policy (safety)
Thread Safety FreeRTOS queue FreeRTOS queue Identical mechanism

Learnings

Design Principles Applied

  1. Single Responsibility: EventQueue handles ONLY buffering/dispatching, not serialization
  2. Serialization delegated to send_device_response() in event_response.cpp
  3. Decoupled: queue implementation independent of JSON format

  4. Unified Protocol: device_response_t is the single message type

  5. No legacy event_t in queue (breaks protocol consistency)
  6. All output passes through same JSON serialization path
  7. Future multi-output (WiFi, LittleFS) uses identical structures

  8. Architectural Symmetry: CommandQueue and EventQueue are dual systems

  9. Same FreeRTOS queue primitives
  10. Parallel processing patterns (receive+execute vs enqueue+flush)
  11. Both feed into Serial output via unified protocol
  12. Developers understand one → understand both

  13. High-Frequency Buffering: Queue size (200) matches detector reality

  14. Handles burst detection (50 Hz × 4 seconds = 200 events max)
  15. Accommodates Serial delays (temporary backpressure)
  16. Conservative memory budget (~30 KB out of 320 KB RAM)

3-Layer Architecture: CommandQueue と EventQueue の対称性

Layer 1-3 の構造:

Layer 1: データ構造
  command_t (command_queue.h)    event_t (event.h)
  ↓                                      ↓
Layer 2: キューイング
  CommandQueue                   EventQueue
  ↓                                      ↓
Layer 3: プロトコル変換
  (JSON生成)                     event_to_device_response()
  ↓                                      ↓
  device_response_t (共通型)     device_response_t (共通型)
  ↓                                      ↓
  CommandQueue::execute()        send_device_response()
  ↓                                      ↓
  Serial JSON応答                Serial JSON Lines出力

CommandQueue と EventQueue の完全な対称性:

側面 CommandQueue EventQueue
入力データ型 command_t event_t
定義ファイル command_queue.h event.h (新規)
キュー容量 10項目 200項目
トリガー Serial.available() cosmic_read().detected
キュークラス CommandQueue EventQueue
主要操作 receive(), execute() enqueue(), flush()
変換関数 (JSON直接生成) event_to_device_response()
出力型 device_response_t device_response_t
送信 execute() 内 send_device_response()
メモリ ~200 bytes (command_t × 10) ~30 KB (event_t × 200)

event_t の設計(新規最適化版)

戦略: stream_data.hの event_tを再利用せず、EventQueue専用に最適化した新規構造体を定義

ファイル: include/event_queue.h(新規、event_t + EventQueue クラスを包含)

主要設計原則:

  1. 独立性: stream_data.hのevent_tと完全に独立
  2. legacy path(ENABLE_DEVICE_RESPONSE=0)と無関係
  3. 複数のevent構造体をサポート可能(e.g., event_queue版event_t vs file storage版event_t)

  4. 最適化: FreeRTOS キューイング向けにサイズと配置を最適化

  5. int16_t を使用(int より小さい)
  6. フィールド配置をキャッシュ効率化
  7. 200項目 × ~100-150 bytes = ~30 KB (acceptable on ESP32 with 320KB RAM)

  8. 拡張性: ENABLE_* フラグで optional フィールドを制御

  9. ENABLE_HITTYPE: hit_type bitmask
  10. ENABLE_ADCMV: ADC millivolt conversion
  11. ENABLE_BME280: Environmental sensor data
  12. ENABLE_TIMESTAMP: Timing data (uptime_ms, timedelta_us)
  13. ENABLE_RTC: Unix timestamp
  14. ENABLE_GNSS: GNSS positioning

  15. 型安全性: event_tはEventQueue/event_response専用

  16. compilerが正しい構造体を強制
  17. legacy event_tとの混在を防止

構造体定義:

// include/event.h
#ifndef EVENT_H
#define EVENT_H

#include <stdint.h>
#include "config.h"

#if ENABLE_DEVICE_RESPONSE

/**
 * @brief Detection event data optimized for EventQueue buffering
 *
 * Lightweight structure designed for high-frequency queue operations.
 * Independent from legacy stream_data.h::event_t (legacy path).
 *
 * Core fields (always present, 8 bytes):
 * - hit1, hit2, hit3: Detection counts (0-100) per channel
 * - adc: ADC reading (12-bit, 0-4095)
 *
 * Optional fields (ENABLE_* flags):
 * - hit_type: Detection pattern bitmask (ENABLE_HITTYPE)
 * - adc_mv: ADC millivolt conversion (ENABLE_ADCMV)
 * - temperature, pressure, humidity: Environmental (ENABLE_BME280)
 * - uptime_ms, timedelta_us: Timing (ENABLE_TIMESTAMP)
 * - unix_timestamp: Absolute time (ENABLE_RTC)
 * - gnss_*: GNSS positioning (ENABLE_GNSS)
 */
typedef struct {
  // Core fields (always present)
  uint16_t hit1;   /**< Detection count channel 1 (0-100) */
  uint16_t hit2;   /**< Detection count channel 2 (0-100) */
  uint16_t hit3;   /**< Detection count channel 3 (0-100) */
  int16_t adc;     /**< ADC reading (0-4095), optimized to int16_t */

  // Optional fields controlled by ENABLE_* flags
#if ENABLE_HITTYPE
  uint8_t hit_type;  /**< Detection pattern bitmask (bit0=ch1, bit1=ch2, bit2=ch3) */
#endif

#if ENABLE_ADCMV
  uint16_t adc_mv;  /**< Converted ADC value in millivolts (0-3300 on 3.3V ESP32) */
#endif

#if ENABLE_BME280
  float temperature;  /**< Temperature in degrees Celsius */
  float pressure;     /**< Pressure in Pascals (Pa) */
  float humidity;     /**< Relative humidity percentage (0-100) */
#endif

#if ENABLE_TIMESTAMP
  uint32_t uptime_ms;     /**< Board uptime since boot (milliseconds) */
  uint64_t timedelta_us;  /**< Interval between consecutive events (microseconds) */
#endif

#if ENABLE_RTC
  uint32_t unix_timestamp;  /**< Absolute time of detection (unix seconds since 1970) */
#endif

#if ENABLE_GNSS
  double gnss_latitude;       /**< Latitude in decimal degrees (-90 to +90) */
  double gnss_longitude;      /**< Longitude in decimal degrees (-180 to +180) */
  float gnss_altitude;        /**< Altitude above sea level (meters) */
  uint8_t gnss_satellites;    /**< Number of satellites in use (0-24) */
  uint8_t gnss_fix_quality;   /**< Fix quality indicator (0-8 per NMEA GGA spec) */
  float gnss_hdop;            /**< Horizontal dilution of precision (unitless) */
  bool gnss_fix_valid;        /**< True if current fix is valid; false if stale/no-fix */
#endif
} event_t;

#endif  // ENABLE_DEVICE_RESPONSE
#endif  // EVENT_H

Processing Flow: 処理フロー比較

CommandQueue パス (ユーザーリクエスト):

// Step 1: データ収集
Serial input available
  

// Step 2: キューイング
CommandQueue::receive()
   parse(line)  command_t
   CommandQueue::enqueue(command_t)
  

// Step 3: 処理と変換
CommandQueue::execute()
   dispatch(command_t)  handler
   generate JSON response
   (device_response_t作成は executor内)
  

// Step 4: 出力
Serial.println(JSON)

EventQueue パス (検出器トリガー):

// Step 1: データ収集
cosmic_read().detected
  
collect_sensors()  event_t
  

// Step 2: キューイング
EventQueue::enqueue(event_t)
  

// Step 3: 処理と変換
EventQueue::flush()
   event_to_device_response(event_t)
   device_response_t
  

// Step 4: 出力
send_device_response(device_response_t)
   Serial.println(JSON)

共通点: 1. Collect input (command / event) 2. Queue for deferred processing (non-blocking) 3. Convert to device_response_t 4. Output JSON to Serial

ファイル構成(新規ファイル含む)

include/
  ├── command_queue.h           (既存: command_t定義, CommandQueue クラス)
  ├── command.h                 (既存: Command シングルトン)
  ├── event_queue.h             (新規: event_t定義, EventQueue クラス定義)
  ├── event_response.h          (既存: event_response 関数群)
  ├── device_response.h         (既存: device_response_t, builder)
  └── stream_data.h             (既存: legacy event_t)

src/
  ├── command_queue.cpp         (既存)
  ├── command.cpp               (既存)
  ├── event_queue.cpp           (新規: EventQueue実装)
  ├── event_response.cpp        (更新: event_to_device_response追加)
  └── device_response.cpp       (既存)

Why NOT Use event_t in Queue?

Rejected: stream_data.h の event_t を再利用

  • Legacy structure designed for stream_formatter (SSV/TSV/CSV/JSONL)
  • Optional fields は #if ENABLE で混在(メモリ配置が複雑)
  • stream_formatter.h との依存関係が発生
  • ENABLE_DEVICE_RESPONSE=0 (legacy path) と無関係に進める場合に支障

Chosen: event.hの新規event_t(EventQueue専用最適化版)

  • ✅ Direct EventQueue design(FreeRTOSキュー用に最適化)
  • ✅ stream_data.hと完全独立(legacy pathと無関係)
  • ✅ シンプルなファイル名(より汎用的)
  • ✅ Zero conversion overhead at enqueue(データはそのままキューイング)
  • ✅ Schema-validatable at every stage(device_response_t変換時)
  • ✅ Scales to multi-output scenarios(WiFi, LittleFS など将来対応)
  • ✅ Type safety(compiler が正しい構造体を強制)

Integration with event_response.h

Current (Phase 3):

// event_response.cpp
void send_event(const event_t *data) {
  if (!get_stream_enabled()) return;
  send_event_as_device_response(data);
}

void send_event_as_device_response(const event_t *data) {
  // Inline JSON serialization
}

Future (Phase 4 with EventQueue):

// event_response.h - NEW function
inline device_response_t event_to_device_response(const event_t *event) {
  device_response_t resp = device_response_ok(DEVICE_TYPE_EVENT);
  // Copy fields: hit1, hit2, hit3, adc, temp_c, etc.
  return resp;
}

// main.cpp - Updated detection loop
if (detection.detected) {
  device_response_t response = event_to_device_response(&event);
  EventQueue::enqueue(&response);
}

// event_response.cpp - Renamed function
void send_device_response(const device_response_t *response) {
  // Serialize device_response_t (unified, not event_t-specific)
}

Potential Issues and Mitigations

  1. Issue: High-frequency queue fills faster than Serial output
  2. Symptom: Dropped events (enqueue returns false)
  3. Mitigation: Monitor queue depth via get_queued_count()
  4. Debug: Add statistics command to report overflow events
  5. Future: Implement buffered Serial or alternative output (WiFi)

  6. Issue: device_response_t is larger than event_t (additional envelope fields)

  7. Analysis: device_response_t = ~280 bytes, event_t = ~100 bytes
  8. Impact: Queue memory increases 3× (more cost for safety)
  9. Trade-off: Unified protocol worth the extra memory (DX improvement)
  10. Optimization: Consider struct packing if memory becomes critical

  11. Issue: Serialization of device_response_t to JSON per-event

  12. Current: send_device_response() creates JsonDocument per event
  13. Memory: ~2 KB stack per event (temporary, freed after send)
  14. Risk: Stack overflow if JsonDocument + detection variables overlap
  15. Mitigation: Use JSONL format (no pretty-printing) to minimize document size

  16. Issue: Command class integration (ENABLE_DEVICE_RESPONSE=1 only)

  17. Current: EventQueue uses Command::getInstance().get_stream()
  18. Risk: LinkError if ENABLE_DEVICE_RESPONSE=0
  19. Mitigation: EventQueue is ENABLE_DEVICE_RESPONSE=1 only (guarded by #if)

Implementation Checklist (Phase 4, v1.14.0 planned)

Design & Architecture ✅ COMPLETED (this document)

  • Define EventQueue interface (enqueue, flush, has_pending, etc.)
  • Justify device_response_t choice over event_t
  • Document symmetry with CommandQueue
  • Analyze FreeRTOS queue sizing (200 items)
  • Outline integration with event_response.h

File Structure (PLANNED)

  • Create include/event_queue.h (static class definition)
  • Create src/event_queue.cpp (FreeRTOS implementation)
  • Update include/event_response.h (add event_to_device_response inline)
  • Update src/event_response.cpp (rename, unify serialization)

Integration (PLANNED)

  • Update main.cpp detection loop:
  • Add device_response_t creation
  • Call EventQueue::enqueue()
  • Replace send_event() calls with EventQueue::flush()
  • Verify build:
  • No linker errors
  • No circular dependencies
  • ENABLE_DEVICE_RESPONSE=1 only

Testing (PLANNED)

  • Unit test: EventQueue::enqueue/flush basic operations
  • Unit test: Queue overflow (enqueue returns false when full)
  • Integration test: Detection loop → EventQueue → Serial output
  • Verification: JSON output matches device-response.json schema
  • Regression: ENABLE_DEVICE_RESPONSE=0 still works (uses legacy path)

Documentation (PLANNED)

  • Update REFACTORING_ROADMAP.md Phase 4 status
  • Document EventQueue API in docs/api.md
  • Update CLAUDE.md with EventQueue patterns

Next Steps

Immediate (v1.14.0 implementation)

  1. Implement include/event_queue.h
  2. Static class with FreeRTOS queue interface
  3. Match CommandQueue design patterns for consistency
  4. Include comprehensive JSDoc comments

  5. Implement src/event_queue.cpp

  6. FreeRTOS queue initialization and operations
  7. Overflow handling (safety drop on full)
  8. Statistics tracking (get_queued_count, get_capacity)

  9. Update include/event_response.h

  10. Add inline event_to_device_response() conversion function
  11. Populate all fields from event_t to device_response_t
  12. Handle ENABLE_* flags (hittype, adcmv, bme280, timestamp, rtc, gnss)

  13. Update src/event_response.cpp

  14. Rename send_event_as_device_response()send_device_response()
  15. Accept device_response_t instead of event_t
  16. Use ArduinoJson for JSONL serialization

  17. Update src/main.cpp

  18. Add EventQueue::init() to setup()
  19. Modify detection loop to use EventQueue
  20. Replace send_event() calls with EventQueue::flush()

Short-term (v1.14.0+)

  • Unit test suite for EventQueue operations
  • Integration tests with cosmic detector simulation
  • JSON schema validation of flushed events
  • Performance profiling (queue depth monitoring)

Medium-term (v1.15.0+)

  • Multi-output support: EventQueue → WiFi endpoint
  • Event filtering/batching (e.g., "send every 10 events" mode)
  • Persistent event logging (SD card, LittleFS)
  • Event statistics dashboard (peak rate, total count, overflow incidents)

Long-term (v2.0.0+)

  • Alternative serialization formats (MessagePack, Protobuf)
  • Multi-queue support (separate critical/non-critical events)
  • Time-series database integration
  • Real-time event streaming via WebSocket

References

  • REFACTORING_ROADMAP.md: Phase 1-4 planned roadmap
  • docs/progress/entries/2025-12-09-event-response-phase3-design.md: Phase 3 event output unification
  • include/command_queue.h: CommandQueue reference implementation (L1-435)
  • include/device_response.h: Unified device response protocol (L1-150)
  • include/command.h: Command singleton for unified config (L1-460)

Implementation References

  • FreeRTOS Documentation: Queue API (xQueueCreateStatic, xQueueSend, xQueueReceive)
  • ESP-IDF: FreeRTOS integration and queue patterns
  • ArduinoJson: JSON serialization for device_response_t

Architecture Patterns

  • Single Responsibility Principle: Queue handles buffering only, serialization separate
  • Static Class Pattern: CommandQueue and EventQueue use identical C++ static approach
  • Unified Protocol: All messages are device_response_t (command responses, detection events)
  • Non-blocking Operations: enqueue/flush designed for main loop without blocking