Source code for padrelay.protocol.tcp

"""TCP protocol utilities for the PadRelay"""
import asyncio
import json
import struct
from ..core.logging_utils import get_logger
from .constants import MAX_MESSAGE_SIZE

logger = get_logger(__name__)

[docs] class TCPProtocolHandler: """Handle TCP communication"""
[docs] def __init__(self, reader=None, writer=None): """Initialize with optional reader/writer""" self.reader = reader self.writer = writer
[docs] async def read_message(self): """Read and decode a length-prefixed message""" try: # Read 4-byte length prefix header = await self.reader.readexactly(4) msg_length = struct.unpack('>I', header)[0] if msg_length > MAX_MESSAGE_SIZE: logger.warning(f"Message too large: {msg_length} bytes") raise ValueError("Message too large") # Read the message data data = await self.reader.readexactly(msg_length) return json.loads(data.decode('utf-8')) except asyncio.IncompleteReadError: # Connection closed logger.debug("Connection closed") return None except (json.JSONDecodeError, struct.error) as e: logger.error(f"Error decoding message: {e}") return None except Exception as e: logger.error(f"Unexpected error reading message: {e}") return None
[docs] async def send_message(self, message): """Send a length-prefixed message""" try: if isinstance(message, dict): data = json.dumps(message).encode('utf-8') elif isinstance(message, str): data = message.encode('utf-8') else: data = message self.writer.write(struct.pack('>I', len(data)) + data) return True except Exception as e: logger.error(f"Error sending message: {e}") return False
[docs] async def drain(self): """Flush the writer buffer""" try: await self.writer.drain() return True except Exception as e: logger.error(f"Error draining writer: {e}") return False
[docs] def close(self): """Close the writer connection""" if self.writer: try: self.writer.close() except Exception as e: logger.error(f"Error closing writer: {e}")
[docs] async def wait_closed(self) -> None: """Wait for the writer to close""" if self.writer: try: await self.writer.wait_closed() except Exception as e: logger.error(f"Error waiting for writer to close: {e}")