Axum Server Architecture
Axum Server Architecture
Section titled “Axum Server Architecture”This document covers the Axum-based game server architecture, the MSCP (Message Service Communication Protocol) pattern, caching strategies, and future shared worker integration for RentEarth.
Architecture Overview
Section titled “Architecture Overview”┌─────────────────────────────────────────────────────────────────────────────┐│ AXUM SERVER (Rust) │├─────────────────────────────────────────────────────────────────────────────┤│ ││ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────────────┐ ││ │ HTTP Routes │ │ WebSocket │ │ Future: gRPC │ ││ │ (Askama HTML) │ │ (Protobuf) │ │ TCP, etc. │ ││ └────────┬────────┘ └────────┬────────┘ └───────────┬─────────────┘ ││ │ │ │ ││ └──────────────────────┼─────────────────────────┘ ││ │ ││ ┌──────▼──────┐ ││ │ GameHandle │ ← Single interface to world ││ │ (mpsc tx) │ ││ └──────┬──────┘ ││ │ GameCommand ││ ┌──────▼──────┐ ││ │ World │ ← Single source of truth ││ │ Runtime │ ││ └──────┬──────┘ ││ │ ││ ┌─────────────────────────────┼─────────────────────────────────┐ ││ │ │ │ ││ ▼ ▼ ▼ ││ ┌──────────────┐ ┌────────────────────┐ ┌──────────────────────┐ ││ │ EntityState │ │ EnvironmentManager │ │ PhysicsWorker │ ││ │ Manager │ │ (Objects/Trees) │ │ (Rapier3D) │ ││ └──────────────┘ └────────────────────┘ └──────────────────────┘ ││ │└─────────────────────────────────────────────────────────────────────────────┘ │ │ WebSocket (10 Hz snapshots) ▼┌─────────────────────────────────────────────────────────────────────────────┐│ CLIENTS │├─────────────────────────────────────────────────────────────────────────────┤│ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────────────────┐ ││ │ Unity │ │ Browser │ │ Future: Shared Worker │ ││ │ (Game) │ │ (Astro/JS) │ │ (Protobuf Processing) │ ││ └─────────────┘ └─────────────┘ └─────────────────────────────────┘ │└─────────────────────────────────────────────────────────────────────────────┘Current Implementation Status
Section titled “Current Implementation Status”| Component | Status | Description |
|---|---|---|
| MSCP Message Pipe | ✅ Complete | GameCommand/GameEvent channels |
| HTTP + Askama | ✅ Complete | Server-rendered HTML templates |
| WebSocket (Protobuf) | ✅ Complete | Binary game protocol |
| Debug Object Query | ✅ Complete | MSCP-based object inspection |
| JWT Authentication | ✅ Complete | Supabase JWT validation |
| Connection State | ✅ Complete | State machine for connections |
| LRU Cache | 🔲 Planned | Cache ServerMessage responses |
| Shared Worker | 🔲 Planned | Browser-side protobuf processing |
| gRPC Transport | 🔲 Planned | Alternative transport layer |
MSCP: Message Service Communication Protocol
Section titled “MSCP: Message Service Communication Protocol”The MSCP pattern separates transport concerns from game logic. All game state mutations flow through the world runtime via typed commands.
Command Channel Architecture
Section titled “Command Channel Architecture”// GameHandle - the ONLY interface to world state from transportspub struct GameHandle { cmd_tx: mpsc::Sender<GameCommand>, // Commands TO world (4096 capacity) event_tx: broadcast::Sender<GameEvent>, // Events FROM world (1024 capacity) world_seed: u64, // Cached immutable data}
// Example: Send command and await responselet (reply_tx, reply_rx) = oneshot::channel();game_handle.send_command(GameCommand::DebugObjectQuery { object_id, reply: reply_tx,}).await?;let result = reply_rx.await?;GameCommand Variants
Section titled “GameCommand Variants”pub enum GameCommand { // Connection lifecycle PlayerConnect { player_id, username, spawn_position, reply }, PlayerDisconnect { player_id }, ClientReady { player_id, reply },
// High-frequency input (no reply needed) PlayerInput { player_id, input },
// Actions with responses HarvestAction { player_id, action, reply }, InventoryAction { player_id, action, reply }, HealthAction { player_id, action, reply }, LeaveGame { player_id, reply }, GetGameState { player_id, reply },
// Debug/Admin DebugObjectQuery { object_id, reply }, Admin(AdminCommand),
// Physics queries PhysicsRaycast { origin, direction, max_distance, reply }, PhysicsTerrainHeight { x, z, reply }, PhysicsOverlapSphere { center, radius, reply },}Request-Response Pattern
Section titled “Request-Response Pattern”Transport Layer World Runtime │ │ │ GameCommand::DebugObjectQuery │ │ ─────────────────────────────► │ │ (mpsc channel) │ │ │ │ ┌───────┴───────┐ │ │ Query Manager │ │ └───────┬───────┘ │ │ │ DebugObjectQueryResult │ │ ◄───────────────────────────── │ │ (oneshot channel) │ │ │ ▼ ▼Dual Response Pattern: HTTP vs WebSocket
Section titled “Dual Response Pattern: HTTP vs WebSocket”The same MSCP command can serve both HTTP (HTML) and WebSocket (binary) clients:
HTTP Route (Askama HTML)
Section titled “HTTP Route (Askama HTML)”// GET /debug/object/{id} → HTML pageasync fn debug_object_by_id( State((_, _, game_handle, _, _)): State<AppState>, Path(id): Path<String>,) -> impl IntoResponse { let object_id = id.parse::<u64>()?;
// Send MSCP command let (reply_tx, reply_rx) = oneshot::channel(); game_handle.send_command(GameCommand::DebugObjectQuery { object_id, reply: reply_tx, }).await?;
let result = reply_rx.await?;
// Extract data and render Askama template if let Some(Payload::DebugObjectInfo(info)) = result.response.payload { let template = DebugObjectTemplate { object_id: info.object_id, asset_name: info.asset_name, // ... populate template fields }; TemplateResponse(template).into_response() }}WebSocket Handler (Protobuf Binary)
Section titled “WebSocket Handler (Protobuf Binary)”// WebSocket binary message → Protobuf responseSome(client_message::Payload::DebugObjectQuery(query)) => { let (reply_tx, reply_rx) = oneshot::channel(); game_handle.send_command(GameCommand::DebugObjectQuery { object_id: query.object_id, reply: reply_tx, }).await?;
let result = reply_rx.await?;
// Send pre-encoded protobuf directly sender.send(Message::Binary(result.response.encode_to_vec().into())).await?;}Planned: LRU Cache for ServerMessage
Section titled “Planned: LRU Cache for ServerMessage”Cache frequently-requested responses at the ServerMessage level for both transports.
Architecture
Section titled “Architecture”┌─────────────────────────────────────────────────────────────────┐│ World Runtime │├─────────────────────────────────────────────────────────────────┤│ ││ ┌─────────────────┐ ┌─────────────────────────────────────┐ ││ │ EnvironmentMgr │───►│ LRU Cache<u64, CachedResponse> │ ││ └─────────────────┘ │ │ ││ │ Key: object_id │ ││ │ Value: { │ ││ │ response: ServerMessage, │ ││ │ encoded: Vec<u8>, // Pre-encoded│ ││ │ version: u64, // Invalidate │ ││ │ expires_at: Instant │ ││ │ } │ ││ └─────────────────────────────────────┘ ││ │└─────────────────────────────────────────────────────────────────┘Implementation Plan
Section titled “Implementation Plan”/// Cached response with pre-encoded protobufpub struct CachedResponse { pub response: ServerMessage, pub encoded: Vec<u8>, // Pre-encoded for WebSocket pub version: u64, // Incremented on state change pub created_at: Instant, pub ttl: Duration,}
/// LRU cache for object queriespub struct ObjectQueryCache { cache: lru::LruCache<u64, CachedResponse>, max_size: usize, default_ttl: Duration,}
impl ObjectQueryCache { /// Get cached response or compute new one pub fn get_or_insert<F>( &mut self, object_id: u64, current_version: u64, compute: F, ) -> &CachedResponse where F: FnOnce() -> ServerMessage, { // Check cache validity if let Some(cached) = self.cache.get(&object_id) { if cached.version == current_version && !cached.is_expired() { return cached; } }
// Compute and cache let response = compute(); let encoded = response.encode_to_vec(); self.cache.put(object_id, CachedResponse { response, encoded, version: current_version, created_at: Instant::now(), ttl: self.default_ttl, });
self.cache.get(&object_id).unwrap() }
/// Invalidate cache entry when object state changes pub fn invalidate(&mut self, object_id: u64) { self.cache.pop(&object_id); }}Cache Invalidation Triggers
Section titled “Cache Invalidation Triggers”// Invalidate when object is harvestedfn handle_harvest_complete(object_id: u64, cache: &mut ObjectQueryCache) { cache.invalidate(object_id);}
// Invalidate when object respawnsfn handle_object_respawn(object_id: u64, cache: &mut ObjectQueryCache) { cache.invalidate(object_id);}
// Version-based invalidation (alternative)impl EnvironmentManager { pub fn get_object_version(&self, object_id: u64) -> u64 { // Hash of (is_harvested, harvested_at, resource_amount) // Changes when object state changes }}Benefits
Section titled “Benefits”- Reduced CPU - Skip repeated serialization for popular objects
- Lower Latency - Pre-encoded bytes ready for WebSocket
- Memory Bounded - LRU eviction prevents unbounded growth
- Consistent - Same data served to HTTP and WebSocket clients
Phase 2: Shared Worker Protobuf Upgrade
Section titled “Phase 2: Shared Worker Protobuf Upgrade”The SharedWorker infrastructure already exists and handles WebSocket connections. The next step is upgrading it to handle protobuf binary messages for the game protocol.
Current Implementation
Section titled “Current Implementation”The existing SharedWorker is located at:
- SharedWorker:
src/workers/supabase.shared.ts - Realtime Worker:
src/components/realtime/Realtime.worker.ts - Worker URLs:
src/lib/gateway/workers.ts
Current Architecture (JSON-based)
Section titled “Current Architecture (JSON-based)”┌─────────────────────────────────────────────────────────────────┐│ BROWSER │├─────────────────────────────────────────────────────────────────┤│ ││ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────┐ ││ │ Tab 1 │ │ Tab 2 │ │ Tab 3 │ ││ │ (Astro) │ │ (Astro) │ │ (Debug Panel) │ ││ └──────┬──────┘ └──────┬──────┘ └──────┬──────────────┘ ││ │ │ │ ││ └──────────────────┼──────────────────┘ ││ │ MessagePort ││ ┌──────▼──────┐ ││ │ Shared │ ← src/workers/ ││ │ Worker │ supabase.shared.ts ││ │ │ ││ │ ✅ WebSocket│ ││ │ mgmt │ ││ │ ✅ Auth │ ││ │ tokens │ ││ │ ✅ Heartbeat│ ││ │ ❌ Protobuf │ ← Upgrade needed ││ │ (JSON) │ ││ └──────┬──────┘ ││ │ │└────────────────────────────┼────────────────────────────────────┘ │ WebSocket (JSON currently) ▼ ┌──────────────┐ │ Axum Server │ └──────────────┘Current WebSocket Handler (supabase.shared.ts:208-236)
Section titled “Current WebSocket Handler (supabase.shared.ts:208-236)”// Current: JSON message handlingws.onmessage = (event) => { try { const message = JSON.parse(event.data); // ← JSON parsing console.log('[SharedWorker] WebSocket message:', message);
// Handle pong response if (message.type === 'pong') { wsLastPongTime = Date.now(); return; }
// Broadcast message to all connected tabs for (const p of ports) { p.postMessage({ type: 'ws.message', data: message }); } } catch (error) { console.error('[SharedWorker] Failed to parse WebSocket message:', error); }};Upgrade Path: Add Protobuf Support
Section titled “Upgrade Path: Add Protobuf Support”Step 1: Add protobuf-ts dependency
Section titled “Step 1: Add protobuf-ts dependency”pnpm add @protobuf-ts/runtimepnpm add -D @protobuf-ts/pluginStep 2: Generate TypeScript from proto
Section titled “Step 2: Generate TypeScript from proto”# Add to package.json scripts"proto:gen": "protoc --ts_out=src/lib/proto --proto_path=../../proto ../../proto/rentearth/snapshot.proto"Step 3: Update SharedWorker for binary messages
Section titled “Step 3: Update SharedWorker for binary messages”// src/workers/supabase.shared.ts - Updated onmessage handler
import { ServerMessage, ClientMessage } from '../lib/proto/snapshot';
// Add binary message type to Req uniontype Req = | { id: string; type: 'ws.sendBinary'; payload: { message: Partial<ClientMessage> } } // ... existing types ...
// Update WebSocket connectionasync function connectWebSocket(wsUrl?: string, forceReconnect = false) { // ... existing setup code ...
ws = new WebSocket(authenticatedUrl); ws.binaryType = 'arraybuffer'; // ← Enable binary mode
ws.onmessage = (event) => { // Handle both binary (protobuf) and text (JSON) messages if (event.data instanceof ArrayBuffer) { handleBinaryMessage(event.data); } else { handleJsonMessage(event.data); } };}
function handleBinaryMessage(data: ArrayBuffer) { try { const bytes = new Uint8Array(data); const message = ServerMessage.fromBinary(bytes);
// Determine message type from oneof payload const messageType = getServerMessageType(message);
// Broadcast to all connected tabs with type info for (const p of ports) { p.postMessage({ type: 'ws.binary', messageType, payload: message, raw: bytes, // Include raw bytes for forwarding }); }
// Also broadcast via BroadcastChannel for worker pool comm.broadcast({ type: 'ws.binary', data: { messageType, payload: message } });
} catch (error) { console.error('[SharedWorker] Failed to decode protobuf:', error); }}
function getServerMessageType(message: ServerMessage): string { if (message.payload.oneofKind === 'worldSnapshot') return 'worldSnapshot'; if (message.payload.oneofKind === 'debugObjectInfo') return 'debugObjectInfo'; if (message.payload.oneofKind === 'environmentBatch') return 'environmentBatch'; if (message.payload.oneofKind === 'connectResponse') return 'connectResponse'; if (message.payload.oneofKind === 'harvestUpdate') return 'harvestUpdate'; if (message.payload.oneofKind === 'inventoryUpdate') return 'inventoryUpdate'; if (message.payload.oneofKind === 'playerJoined') return 'playerJoined'; if (message.payload.oneofKind === 'playerLeft') return 'playerLeft'; if (message.payload.oneofKind === 'pong') return 'pong'; if (message.payload.oneofKind === 'error') return 'error'; return 'unknown';}
// Add binary send handlercase 'ws.sendBinary': { if (!ws || ws.readyState !== WebSocket.OPEN) { throw new Error('WebSocket not connected'); }
const clientMessage = ClientMessage.create(m.payload.message); const bytes = ClientMessage.toBinary(clientMessage);
ws.send(bytes); reply(port, { id: m.id, ok: true }); break;}Step 4: Add subscription filtering
Section titled “Step 4: Add subscription filtering”// Track per-port message subscriptionsinterface PortState { port: MessagePort; subscriptions: Set<string>; // Message types this port wants}
const portStates = new Map<MessagePort, PortState>();
// Add subscription message typetype Req = | { id: string; type: 'ws.subscribe'; payload: { messageTypes: string[] } } | { id: string; type: 'ws.unsubscribe'; payload: { messageTypes: string[] } } // ... existing types ...
// Handle subscriptionscase 'ws.subscribe': { const state = portStates.get(port); if (state) { for (const type of m.payload.messageTypes) { state.subscriptions.add(type); } } reply(port, { id: m.id, ok: true }); break;}
// Filter broadcasts by subscriptionfunction handleBinaryMessage(data: ArrayBuffer) { const message = ServerMessage.fromBinary(new Uint8Array(data)); const messageType = getServerMessageType(message);
for (const [port, state] of portStates) { // Send if subscribed to '*' (all) or specific type if (state.subscriptions.has('*') || state.subscriptions.has(messageType)) { port.postMessage({ type: 'ws.binary', messageType, payload: message, }); } }}Client Usage (Updated)
Section titled “Client Usage (Updated)”// src/lib/useGameSocket.tsimport { useEffect, useCallback, useState } from 'react';import { SupaShared } from './supabase-shared';
export function useGameSocket() { const [connected, setConnected] = useState(false); const supaShared = new SupaShared();
useEffect(() => { // Initialize and connect supaShared.init(SUPABASE_URL, SUPABASE_ANON_KEY);
// Subscribe to game messages supaShared.subscribe(['worldSnapshot', 'debugObjectInfo'], (msg) => { console.log('Game message:', msg.messageType, msg.payload); });
return () => supaShared.unsubscribe(); }, []);
const sendDebugQuery = useCallback((objectId: bigint) => { supaShared.sendBinary({ payload: { oneofKind: 'debugObjectQuery', debugObjectQuery: { objectId } } }); }, []);
return { connected, sendDebugQuery };}Migration Checklist
Section titled “Migration Checklist”| Task | File | Status |
|---|---|---|
| Add protobuf-ts dependency | package.json | 🔲 |
| Generate TS from proto | src/lib/proto/ | 🔲 |
Add binaryType = 'arraybuffer' | supabase.shared.ts:182 | 🔲 |
| Add binary message handler | supabase.shared.ts | 🔲 |
Add ws.sendBinary command | supabase.shared.ts | 🔲 |
| Add subscription filtering | supabase.shared.ts | 🔲 |
| Update SupaShared client class | supabase-shared.ts | 🔲 |
| Add useGameSocket hook | src/lib/useGameSocket.ts | 🔲 |
| Update Realtime.worker.ts | Realtime.worker.ts | 🔲 |
Benefits of Protobuf in SharedWorker
Section titled “Benefits of Protobuf in SharedWorker”- Single decode - Protobuf decoded once in worker, not per-tab
- Type safety - Generated TypeScript types match server exactly
- Smaller payloads - Binary protobuf vs JSON text
- Structured routing - Route by
oneofKindmessage type - Raw forwarding - Can forward
Uint8Arraywithout re-encoding
Client Usage (Astro/React)
Section titled “Client Usage (Astro/React)”// useGameWorker.tsimport { useEffect, useRef, useCallback } from 'react';
export function useGameWorker() { const workerRef = useRef<SharedWorker | null>(null); const portRef = useRef<MessagePort | null>(null);
useEffect(() => { // Connect to shared worker // Note: Using variable to avoid Vite worker transformation in MDX const workerUrl = '/workers/game-worker.js'; // vite-ignore workerRef.current = new SharedWorker(workerUrl); portRef.current = workerRef.current.port; portRef.current.start();
return () => { portRef.current?.close(); }; }, []);
const send = useCallback((payload: any) => { portRef.current?.postMessage({ type: 'send', payload }); }, []);
const subscribe = useCallback(( messageType: string, handler: (msg: any) => void ) => { portRef.current?.postMessage({ type: 'subscribe', payload: { messageType } });
const listener = (e: MessageEvent) => { if (e.data.type === messageType) { handler(e.data.payload); } };
portRef.current?.addEventListener('message', listener); return () => { portRef.current?.removeEventListener('message', listener); portRef.current?.postMessage({ type: 'unsubscribe', payload: { messageType } }); }; }, []);
return { send, subscribe };}
// Usage in componentfunction DebugPanel({ objectId }: { objectId: number }) { const { send, subscribe } = useGameWorker(); const [objectInfo, setObjectInfo] = useState<DebugObjectInfo | null>(null);
useEffect(() => { // Subscribe to debug object responses return subscribe('debugObjectInfo', (msg) => { if (msg.objectId === objectId) { setObjectInfo(msg); } }); }, [objectId, subscribe]);
const queryObject = () => { send({ debugObjectQuery: { objectId } }); };
return ( <div> <button onClick={queryObject}>Query Object</button> {objectInfo && <ObjectDetails info={objectInfo} />} </div> );}Benefits of SharedWorker
Section titled “Benefits of SharedWorker”- Single WebSocket - All tabs share one connection
- Off-main-thread - Protobuf processing doesn’t block UI
- Message Routing - Efficient pub/sub to interested tabs
- Connection Resilience - Worker maintains connection across tab refreshes
- Memory Efficiency - Single message buffer shared across tabs
Protobuf Message Definitions
Section titled “Protobuf Message Definitions”Debug Object Query (MSCP)
Section titled “Debug Object Query (MSCP)”// proto/rentearth/snapshot.proto
// Client request to query object details by IDmessage DebugObjectQuery { uint64 object_id = 1;}
// Server response with full object detailsmessage DebugObjectInfo { uint64 object_id = 1; string asset_name = 2; int32 object_type = 3; // EnvironmentObjectType enum Vec3 position = 4; Quat rotation = 5; Vec3 scale = 6; int32 resource_type = 7; // ResourceType enum uint32 resource_amount = 8; float harvest_time = 9; bool is_harvested = 10; int64 harvested_at_ms = 11; // Unix timestamp when harvested float respawn_time_seconds = 12; bool found = 13; // False if object not found string error = 14; // Error message if not found}
// Added to ClientMessage oneofmessage ClientMessage { oneof payload { // ... existing fields ... DebugObjectQuery debug_object_query = 12; }}
// Added to ServerMessage oneofmessage ServerMessage { oneof payload { // ... existing fields ... DebugObjectInfo debug_object_info = 15; }}Transport Layer Abstraction
Section titled “Transport Layer Abstraction”The MSCP pattern enables easy addition of new transports:
// Future: gRPC transportpub mod grpc { use crate::game::GameHandle;
pub async fn serve(game_handle: GameHandle) -> Result<()> { // Same GameCommand interface, different wire protocol let service = GameService { game_handle }; tonic::transport::Server::builder() .add_service(GameServiceServer::new(service)) .serve(addr) .await }}
// Future: TCP transport (for dedicated game clients)pub mod tcp { pub async fn serve(game_handle: GameHandle) -> Result<()> { // Raw TCP with length-prefixed protobuf let listener = TcpListener::bind(addr).await?; while let Ok((stream, _)) = listener.accept().await { let handle = game_handle.clone(); tokio::spawn(handle_tcp_client(stream, handle)); } }}Future Enhancements
Section titled “Future Enhancements”Phase 1 - Caching (v1.1)
Section titled “Phase 1 - Caching (v1.1)”- LRU Cache for DebugObjectInfo - Cache hot object queries
- Pre-encoded protobuf cache - Skip repeated serialization
- Version-based invalidation - Invalidate on state change
- TTL expiration - Time-based cache eviction
Phase 2 - Shared Worker Protobuf (v1.2)
Section titled “Phase 2 - Shared Worker Protobuf (v1.2)”Existing infrastructure (already implemented):
- SharedWorker -
src/workers/supabase.shared.ts - WebSocket management - Connection, reconnect, heartbeat
- Auth token handling - JWT refresh, session sync
- Multi-tab broadcast - Fan-out to all connected ports
- BroadcastChannel - Worker pool communication
Upgrade needed (protobuf support):
- Add protobuf-ts - Generate TS types from
snapshot.proto - Binary message handler -
ws.binaryType = 'arraybuffer' - Protobuf decode -
ServerMessage.fromBinary() - Protobuf encode -
ClientMessage.toBinary() - Message type routing - Route by
oneofKind - Subscription filtering - Per-port message subscriptions
- useGameSocket hook - React hook for game protocol
Phase 3 - gRPC Transport (v1.3)
Section titled “Phase 3 - gRPC Transport (v1.3)”gRPC transport for server-to-server communication, admin tools, and high-performance clients.
Existing infrastructure:
- Tonic dependencies -
Cargo.tomllines 67-69 (tonic, tonic-health, tonic-reflection) - Proto definitions -
proto/rentearth/snapshot.proto(shared with WebSocket) - GameHandle pattern - Same MSCP interface works for all transports
- Connection state -
src/transports/connection.rssupports gRPC
Placeholder in main.rs:114-116:
// Future transports:// let grpc = tokio::spawn(transports::grpc::serve(game_handle.clone()));// let tcp = tokio::spawn(transports::tcp::serve(game_handle.clone()));Implementation tasks:
- Create gRPC service proto -
proto/rentearth/game_service.proto - Generate tonic server -
build.rswith tonic-build - Implement GameService -
src/transports/grpc.rs - Add gRPC interceptor - JWT validation for metadata
- Enable reflection - For grpcurl/grpcui debugging
- Health checks - tonic-health for load balancers
Use cases:
- Admin CLI tools (kick, ban, teleport)
- Server-to-server (matchmaking, analytics)
- Unity dedicated server mode
- Load testing with ghz
Phase 3 Details: gRPC Service Design
Section titled “Phase 3 Details: gRPC Service Design”Service Proto Definition
Section titled “Service Proto Definition”// proto/rentearth/game_service.protosyntax = "proto3";package rentearth;
import "rentearth/snapshot.proto";
// Game service for admin and server-to-server communicationservice GameService { // Unary RPCs - request/response rpc GetServerStatus(GetServerStatusRequest) returns (ServerStatus); rpc GetPlayerInfo(GetPlayerInfoRequest) returns (PlayerInfo); rpc DebugObjectQuery(DebugObjectQuery) returns (DebugObjectInfo);
// Admin commands rpc KickPlayer(KickPlayerRequest) returns (KickPlayerResponse); rpc TeleportPlayer(TeleportPlayerRequest) returns (TeleportPlayerResponse); rpc SpawnNpc(SpawnNpcRequest) returns (SpawnNpcResponse);
// Server streaming - server pushes updates to client rpc StreamWorldSnapshots(StreamRequest) returns (stream WorldSnapshot); rpc StreamPlayerEvents(StreamRequest) returns (stream PlayerEvent);
// Bidirectional streaming - full duplex game protocol rpc GameSession(stream ClientMessage) returns (stream ServerMessage);}
// Request/Response messagesmessage GetServerStatusRequest {}
message ServerStatus { uint32 player_count = 1; uint32 npc_count = 2; uint64 world_seed = 3; uint64 uptime_seconds = 4; float tick_rate = 5; repeated string active_transports = 6;}
message GetPlayerInfoRequest { string player_id = 1; // UUID string}
message PlayerInfo { string player_id = 1; string username = 2; Vec3 position = 3; float health = 4; uint32 connection_flags = 5; int64 connected_at_ms = 6;}
message KickPlayerRequest { string player_id = 1; string reason = 2;}
message KickPlayerResponse { bool success = 1; string message = 2;}
message TeleportPlayerRequest { string player_id = 1; Vec3 position = 2;}
message TeleportPlayerResponse { bool success = 1; Vec3 actual_position = 2;}
message SpawnNpcRequest { string npc_type = 1; Vec3 position = 2; repeated Vec3 patrol_route = 3;}
message SpawnNpcResponse { bool success = 1; string npc_id = 2;}
message StreamRequest { string filter = 1; // Optional filter expression}
message PlayerEvent { oneof event { PlayerJoined joined = 1; PlayerLeft left = 2; PlayerHealthChanged health_changed = 3; }}gRPC Server Implementation
Section titled “gRPC Server Implementation”// src/transports/grpc.rs
use tonic::{Request, Response, Status, Streaming};use tokio_stream::wrappers::ReceiverStream;use crate::game::{GameHandle, GameCommand};use crate::proto::game_service::game_service_server::{GameService, GameServiceServer};use crate::proto::game_service::*;use crate::proto::snapshot::*;
pub struct GameServiceImpl { game_handle: GameHandle,}
impl GameServiceImpl { pub fn new(game_handle: GameHandle) -> Self { Self { game_handle } }}
#[tonic::async_trait]impl GameService for GameServiceImpl { /// Get current server status async fn get_server_status( &self, _request: Request<GetServerStatusRequest>, ) -> Result<Response<ServerStatus>, Status> { // Query world runtime for stats let (reply_tx, reply_rx) = oneshot::channel(); self.game_handle .send_command(GameCommand::Admin(AdminCommand::GetStats { reply: reply_tx })) .await .map_err(|e| Status::internal(e.to_string()))?;
let stats = reply_rx.await .map_err(|_| Status::internal("World runtime channel closed"))?;
Ok(Response::new(ServerStatus { player_count: stats.player_count, npc_count: stats.npc_count, world_seed: self.game_handle.world_seed(), uptime_seconds: stats.uptime_seconds, tick_rate: 10.0, active_transports: vec!["https".into(), "grpc".into()], })) }
/// Debug object query - reuses existing MSCP command async fn debug_object_query( &self, request: Request<DebugObjectQuery>, ) -> Result<Response<DebugObjectInfo>, Status> { let query = request.into_inner();
let (reply_tx, reply_rx) = oneshot::channel(); self.game_handle .send_command(GameCommand::DebugObjectQuery { object_id: query.object_id, reply: reply_tx, }) .await .map_err(|e| Status::internal(e.to_string()))?;
let result = reply_rx.await .map_err(|_| Status::internal("World runtime channel closed"))?;
// Extract DebugObjectInfo from ServerMessage if let Some(server_message::Payload::DebugObjectInfo(info)) = result.response.payload { Ok(Response::new(info)) } else { Err(Status::internal("Unexpected response type")) } }
/// Bidirectional streaming - full game protocol type GameSessionStream = ReceiverStream<Result<ServerMessage, Status>>;
async fn game_session( &self, request: Request<Streaming<ClientMessage>>, ) -> Result<Response<Self::GameSessionStream>, Status> { let mut stream = request.into_inner(); let (tx, rx) = tokio::sync::mpsc::channel(128); let game_handle = self.game_handle.clone();
// Spawn task to handle incoming messages tokio::spawn(async move { while let Some(result) = stream.message().await.transpose() { match result { Ok(client_msg) => { // Process client message similar to WebSocket handler // Route through GameCommand pattern handle_client_message(&game_handle, client_msg, &tx).await; } Err(e) => { tracing::error!(error = %e, "gRPC stream error"); break; } } } });
Ok(Response::new(ReceiverStream::new(rx))) }
// ... implement other methods similarly}
/// JWT authentication interceptorfn auth_interceptor(req: Request<()>) -> Result<Request<()>, Status> { let metadata = req.metadata();
// Check for authorization header let token = metadata .get("authorization") .and_then(|v| v.to_str().ok()) .and_then(|s| s.strip_prefix("Bearer ")) .ok_or_else(|| Status::unauthenticated("Missing authorization token"))?;
// Validate JWT (reuse existing jwt_cache logic) // For now, just check token exists if token.is_empty() { return Err(Status::unauthenticated("Invalid token")); }
Ok(req)}
/// Start gRPC serverpub async fn serve(game_handle: GameHandle) -> Result<(), Box<dyn std::error::Error>> { let addr = "[::]:50051".parse()?;
let game_service = GameServiceImpl::new(game_handle);
// Create reflection service for debugging let reflection_service = tonic_reflection::server::Builder::configure() .register_encoded_file_descriptor_set(crate::proto::FILE_DESCRIPTOR_SET) .build()?;
// Create health service let (mut health_reporter, health_service) = tonic_health::server::health_reporter(); health_reporter .set_serving::<GameServiceServer<GameServiceImpl>>() .await;
tracing::info!("gRPC server listening on {}", addr);
tonic::transport::Server::builder() .add_service(health_service) .add_service(reflection_service) .add_service(GameServiceServer::with_interceptor( game_service, auth_interceptor, )) .serve(addr) .await?;
Ok(())}Build Configuration
Section titled “Build Configuration”// build.rsfn main() -> Result<(), Box<dyn std::error::Error>> { // Compile snapshot.proto (existing) prost_build::compile_protos(&["proto/rentearth/snapshot.proto"], &["proto/"])?;
// Compile game_service.proto with tonic tonic_build::configure() .build_server(true) .build_client(true) // For testing .file_descriptor_set_path("src/proto/descriptor.bin") .compile( &["proto/rentearth/game_service.proto"], &["proto/"], )?;
Ok(())}Usage Examples
Section titled “Usage Examples”# Check server healthgrpcurl -plaintext localhost:50051 grpc.health.v1.Health/Check
# Get server statusgrpcurl -plaintext localhost:50051 rentearth.GameService/GetServerStatus
# Query object (with reflection)grpcurl -plaintext -d '{"object_id": 14638407266815390206}' \ localhost:50051 rentearth.GameService/DebugObjectQuery
# Admin: Kick playergrpcurl -plaintext \ -H "authorization: Bearer <admin_jwt>" \ -d '{"player_id": "uuid-here", "reason": "AFK"}' \ localhost:50051 rentearth.GameService/KickPlayerPhase 3 Implementation Checklist
Section titled “Phase 3 Implementation Checklist”| Task | File | Status |
|---|---|---|
| Create game_service.proto | proto/rentearth/game_service.proto | 🔲 |
| Update build.rs for tonic | build.rs | 🔲 |
| Implement GameServiceImpl | src/transports/grpc.rs | 🔲 |
| Add auth interceptor | src/transports/grpc.rs | 🔲 |
| Enable reflection | src/transports/grpc.rs | 🔲 |
| Add health checks | src/transports/grpc.rs | 🔲 |
| Update main.rs | src/main.rs:115 | 🔲 |
| Add transports/mod.rs export | src/transports/mod.rs | 🔲 |
| Test with grpcurl | - | 🔲 |
| Document admin commands | axum.mdx | 🔲 |
Other Transports (Future)
Section titled “Other Transports (Future)”- TCP transport - For dedicated game clients (raw protobuf over TCP)
- UDP transport - For latency-critical game state (unreliable snapshots)
Phase 4 - Performance (v2.0)
Section titled “Phase 4 - Performance (v2.0)”- Delta compression - Only send changed fields
- Message batching - Combine multiple queries into one frame
- Priority queuing - Critical messages bypass queue
- Backpressure handling - Graceful degradation under load