Stream Sanitization Utilities (llm4free.sanitize)

June 26, 2026 · View on GitHub

Last updated: 2025-12-20 Maintained by LLM4Free

LLM4Free's llm4free.sanitize module provides a comprehensive suite of utilities for processing, transforming, and sanitizing data streams. These tools are designed for robust, flexible, and high-performance handling of text and byte streams, including real-time data, API responses, and streaming content from various sources.

Table of Contents

  1. Core Components
  2. Main Function
  3. Parameters Reference
  4. Processing Modes
  5. Internal Functions
  6. Advanced Usage Examples
  7. Error Handling
  8. Performance Considerations
  9. Troubleshooting
  10. Integration Guide

Core Components

sanitize.py

The main sanitization module that provides comprehensive stream processing capabilities.

from llm4free.sanitize import sanitize_stream, LITSTREAM, sanitize_stream_decorator, lit_streamer

# Basic usage
gen = sanitize_stream(data, intro_value="data:", to_json=True)
for item in gen:
    print(item)

Key Features:

  • Stream processing for strings, bytes, and iterables
  • JSON parsing and validation
  • Regex-based filtering and extraction
  • Marker-based content selection
  • Custom error handling and formatting
  • Support for both synchronous and asynchronous streams
  • Flexible output formatting

Main Function

sanitize_stream()

The main entry point for stream processing that handles multiple data types and processing modes.

from llm4free.sanitize import sanitize_stream

# Basic usage
gen = sanitize_stream(data, intro_value="data:", to_json=True)
for item in gen:
    print(item)

Function Signature:

def sanitize_stream(
    data: Union[str, bytes, Iterable[str], Iterable[bytes], AsyncIterable[str],
                AsyncIterable[bytes], dict, list, int, float, bool, None],
    intro_value: str = "data:",
    to_json: bool = True,
    skip_markers: Optional[List[str]] = None,
    strip_chars: Optional[str] = None,
    start_marker: Optional[str] = None,
    end_marker: Optional[str] = None,
    content_extractor: Optional[Callable[[Union[str, Dict[str, Any]]], Optional[Any]]] = None,
    yield_raw_on_error: bool = True,
    encoding: EncodingType = "utf-8",
    encoding_errors: str = "replace",
    buffer_size: int = 8192,
    line_delimiter: Optional[str] = None,
    error_handler: Optional[Callable[[Exception, str], Optional[Any]]] = None,
    skip_regexes: Optional[List[Union[str, Pattern[str]]]] = None,
    extract_regexes: Optional[List[Union[str, Pattern[str]]]] = None,
    object_mode: Literal["as_is", "json", "str"] = "json",
    raw: bool = False,
    output_formatter: Optional[Callable[[Any], Any]] = None,
) -> Union[Generator[Any, None, None], AsyncGenerator[Any, None]]

Parameters Reference

ParameterTypeDefaultDescription
dataUnion[str, bytes, Iterable, AsyncIterable, dict, list, int, float, bool, None]-Input data to process
intro_valuestr"data:"Prefix to remove from each chunk
to_jsonboolTrueParse chunks as JSON
skip_markersOptional[List[str]]NoneSkip lines containing these exact strings
strip_charsOptional[str]NoneCharacters to strip from lines
start_markerOptional[str]NoneBegin processing after this marker
end_markerOptional[str]NoneStop processing at this marker
content_extractorOptional[Callable]NoneTransform content before yielding
yield_raw_on_errorboolTrueYield raw content on JSON parse errors
encodingEncodingType"utf-8"Encoding for byte streams
encoding_errorsstr"replace"How to handle encoding errors
buffer_sizeint8192Buffer size for byte decoding
line_delimiterOptional[str]NoneCustom line delimiter
error_handlerOptional[Callable]NoneCustom error handling callback
skip_regexesOptional[List[Union[str, Pattern]]]NoneRegex patterns to skip
extract_regexesOptional[List[Union[str, Pattern]]]NoneRegex patterns for content extraction
object_modeLiteral["as_is", "json", "str"]"json"How to handle non-iterable objects
rawboolFalseYield raw API response chunks
output_formatterOptional[Callable[[Any], Any]]NoneCustom callable to format/transform each output item before yielding

Supported Encodings

The EncodingType supports a wide range of character encodings:

  • Unicode: utf-8, utf-16, utf-32
  • ASCII/Latin: ascii, latin1, iso-8859-1, iso-8859-2
  • Windows: cp1252, windows-1250, windows-1251, windows-1252
  • Asian: gbk, big5, shift_jis, euc-jp, euc-kr

Processing Modes

Synchronous Mode

Handles strings, bytes, and synchronous iterables:

# String processing
data = "data: {'result': 42}\ndata: {'result': 99}"
for item in sanitize_stream(data):
    print(item)  # {'result': 42}, {'result': 99}

# List processing
data = ['data: {"status": "ok"}', 'data: {"count": 5}']
for item in sanitize_stream(data):
    print(item)

Asynchronous Mode

Handles async iterables automatically:

import asyncio

async def async_data_source():
    yield 'data: {"message": "hello"}'
    yield 'data: {"message": "world"}'

async def process_async():
    async for item in sanitize_stream(async_data_source()):
        print(item)

asyncio.run(process_async())

Raw Mode

Returns unprocessed chunks as received:

# Raw mode preserves original formatting
for chunk in sanitize_stream(data, raw=True):
    print(repr(chunk))  # Shows exact content with whitespace

Object Mode

Handles non-iterable objects:

# JSON mode (default)
result = list(sanitize_stream({"key": "value"}))
# Returns: ['{"key": "value"}']

# String mode
result = list(sanitize_stream({"key": "value"}, object_mode="str"))
# Returns: ["{'key': 'value'}"]

# As-is mode
result = list(sanitize_stream({"key": "value"}, object_mode="as_is"))
# Returns: [{"key": "value"}]

Internal Functions

_compile_regexes(patterns)

Compiles regex patterns for efficient matching.

Parameters:

  • patterns: List of regex patterns as strings or compiled Pattern objects

Returns:

  • List of compiled Pattern objects, or None if input is None

Raises:

  • ValueError: If any pattern is invalid

_process_chunk(...)

Core chunk processing function that handles sanitization and parsing.

Key Operations:

  1. Prefix removal (intro_value)
  2. Character stripping (strip_chars)
  3. Marker-based skipping (skip_markers)
  4. Regex-based extraction (extract_regexes)
  5. Regex-based skipping (skip_regexes)
  6. JSON parsing (to_json)
  7. Error handling (error_handler)

_decode_byte_stream(byte_iterator, ...)

Synchronous byte stream decoder with flexible encoding support.

Features:

  • Real-time decoding with buffering
  • Graceful encoding error handling
  • Performance optimization with memory views
  • Support for multiple character encodings

_decode_byte_stream_async(byte_iterator, ...)

Asynchronous counterpart to the synchronous byte decoder.

_sanitize_stream_sync(...)

Synchronous stream processing engine with advanced filtering and transformation capabilities.

_sanitize_stream_async(...)

Asynchronous stream processing engine that mirrors synchronous functionality.

Advanced Usage Examples

Complex Filtering with Regex

import re
from llm4free.sanitize import sanitize_stream

data = [
    'data: {"type": "message", "content": "Hello"}',
    'data: {"type": "error", "content": "Failed"}',
    'data: {"type": "message", "content": "World"}',
]

# Skip error messages and extract only content
skip_regexes = [r'"type":\s*"error"']
extract_regexes = [r'"content":\s*"([^"]+)"']

for item in sanitize_stream(
    data,
    skip_regexes=skip_regexes,
    extract_regexes=extract_regexes,
    to_json=False  # Since we're extracting strings
):
    print(item)  # "Hello", "World"

Custom Content Transformation

def extract_important_data(content):
    """Extract only important fields from parsed JSON."""
    if isinstance(content, dict):
        return {k: v for k, v in content.items() if k.startswith('important_')}
    return content

data = [
    'data: {"important_field": "keep", "noise": "ignore"}',
    'data: {"important_data": 42, "metadata": "skip"}',
]

for item in sanitize_stream(data, content_extractor=extract_important_data):
    print(item)

Error Handling with Custom Handler

def custom_error_handler(exception, raw_chunk):
    """Custom error handling for JSON parsing failures."""
    print(f"JSON Error: {exception}")
    # Try to extract useful info even from malformed JSON
    if "partial_data" in raw_chunk:
        return {"status": "partial", "raw": raw_chunk}
    return None  # Skip this chunk

data = [
    'data: {"valid": "json"}',
    'data: {invalid json with partial_data}',
    'data: {completely broken}',
]

for item in sanitize_stream(
    data,
    error_handler=custom_error_handler,
    yield_raw_on_error=False
):
    print(item)

File Processing

# Process file-like objects
with open('data.jsonl', 'r') as f:
    for item in sanitize_stream(f, intro_value="", to_json=True):
        print(item)

# Process bytes from file
with open('data.txt', 'rb') as f:
    for item in sanitize_stream(f, encoding='utf-8'):
        print(item)

Streaming API Responses

import requests

def process_streaming_api():
    response = requests.get('https://api.example.com/stream', stream=True)

    # Process raw chunks
    for chunk in sanitize_stream(response.iter_content(chunk_size=1024), raw=True):
        print(f"Raw chunk: {chunk}")

    # Process as JSON stream
    for item in sanitize_stream(response.iter_lines(decode_unicode=True)):
        print(f"Parsed: {item}")

Output Formatting with output_formatter

The output_formatter parameter allows you to transform each output item into any desired structure before yielding. Define your own custom formatter function to structure the output however you need.

from llm4free.sanitize import sanitize_stream
import time

# Simple custom formatter - wrap content in a dict with timestamp
def simple_formatter(content):
    return {'text': content, 'timestamp': time.time()}

data = ['data: {"message": "Hello"}', 'data: {"message": "World"}']
for item in sanitize_stream(data, output_formatter=simple_formatter):
    print(item)  # {'text': {'message': 'Hello'}, 'timestamp': 1234567890.123}

# Message format with role
def message_formatter(content):
    return {'role': 'assistant', 'content': content}

for item in sanitize_stream(data, output_formatter=message_formatter):
    print(item)  # {'role': 'assistant', 'content': {'message': 'Hello'}}

# Custom OpenAI-like response format
def openai_formatter(content):
    return {
        'choices': [{'message': {'role': 'assistant', 'content': content}}]
    }

for item in sanitize_stream(data, output_formatter=openai_formatter):
    print(item['choices'][0]['message']['content'])

Decorator Usage

from llm4free.sanitize import sanitize_stream_decorator, lit_streamer

# Basic decorator
@sanitize_stream_decorator
def api_data_generator():
    yield 'data: {"result": 1}'
    yield 'data: {"result": 2}'

# Decorator with parameters
@lit_streamer(skip_markers=["[DONE]"], to_json=True)
def streaming_response():
    yield 'data: {"message": "hello"}'
    yield 'data: {"message": "world"}'
    yield '[DONE]'

# Async decorator
@sanitize_stream_decorator(to_json=True)
async def async_api_generator():
    yield 'data: {"async": "result"}'

Marker-Based Processing

# Process content between markers
data = """
START_DATA
data: {"item": 1}
data: {"item": 2}
END_DATA
other content
"""

for item in sanitize_stream(
    data,
    start_marker="START_DATA",
    end_marker="END_DATA"
):
    print(item)  # Only processes content between markers

Multi-Pattern Regex Extraction

# Extract different types of content with multiple patterns
data = [
    'log: [INFO] User login: john_doe',
    'log: [ERROR] Failed attempt from 192.168.1.1',
    'log: [INFO] User logout: jane_smith',
]

# Extract usernames from INFO logs and IPs from ERROR logs
extract_regexes = [
    r'\[INFO\] User \w+: (\w+)',  # Extract usernames
    r'\[ERROR\].*?(\d+\.\d+\.\d+\.\d+)',  # Extract IP addresses
]

for item in sanitize_stream(
    data,
    intro_value="log: ",
    extract_regexes=extract_regexes,
    to_json=False
):
    print(item)  # "john_doe", "192.168.1.1", "jane_smith"

Error Handling

Exception Types

The module handles several types of errors gracefully:

  1. JSON Parsing Errors: When to_json=True and content isn't valid JSON
  2. Encoding Errors: When byte streams can't be decoded with specified encoding
  3. Regex Errors: When invalid regex patterns are provided
  4. Type Errors: When unsupported data types are passed

Error Handler Callback

def comprehensive_error_handler(exception, raw_chunk):
    """Comprehensive error handling with logging and recovery."""
    import logging
    
    if isinstance(exception, json.JSONDecodeError):
        logging.warning(f"JSON decode error: {exception}")
        # Try to fix common JSON issues
        if raw_chunk.endswith(','):
            try:
                return json.loads(raw_chunk[:-1])
            except:
                pass
    
    # Log other errors
    logging.error(f"Processing error: {type(exception).__name__}: {exception}")
    
    # Return structured error info
    return {
        "error": True,
        "error_type": type(exception).__name__,
        "raw_content": raw_chunk,
        "timestamp": time.time()
    }

Graceful Degradation

# Handle mixed content types gracefully
mixed_data = [
    'data: {"valid": "json"}',
    'data: invalid json content',
    'data: {"another": "valid"}',
    b'data: {"bytes": "content"}',  # Mixed bytes and strings
]

for item in sanitize_stream(
    mixed_data,
    yield_raw_on_error=True,  # Keep processing on errors
    encoding_errors='ignore'  # Skip problematic bytes
):
    print(type(item), item)

Performance Considerations

Optimization Tips

  1. Buffer Size Tuning: Adjust buffer_size based on your data characteristics

    # For small, frequent chunks
    sanitize_stream(data, buffer_size=1024)
    
    # For large data streams
    sanitize_stream(data, buffer_size=32768)
    
  2. Regex Compilation: Pre-compile regex patterns for better performance

    import re
    
    # Pre-compiled patterns are more efficient
    skip_patterns = [re.compile(r'debug|trace'), re.compile(r'temporary')]
    sanitize_stream(data, skip_regexes=skip_patterns)
    
  3. Memory Management: Use generators instead of collecting all results

    # Memory efficient - processes one item at a time
    for item in sanitize_stream(large_dataset):
        process_item(item)
    
    # Memory intensive - loads everything into memory
    all_items = list(sanitize_stream(large_dataset))
    
  4. Encoding Selection: Choose appropriate encoding for your data

    # Use specific encoding if known
    sanitize_stream(data, encoding='latin1')  # Faster than utf-8 for some data
    

Troubleshooting

Common Issues

Issue: "TypeError: Input must be a string or an iterable"

Cause: Passing unsupported data type Solution:

# Wrong
sanitize_stream(None)

# Right - handle None explicitly
data = data or []
sanitize_stream(data)

Issue: "ValueError: Invalid regex pattern"

Cause: Malformed regex in skip_regexes or extract_regexes Solution:

import re

# Test regex patterns first
try:
    pattern = re.compile(r'your_pattern_here')
    sanitize_stream(data, skip_regexes=[pattern])
except re.error as e:
    print(f"Invalid regex: {e}")

Issue: Memory usage grows continuously

Cause: Accumulating results instead of processing incrementally Solution:

# Wrong - accumulates in memory
results = list(sanitize_stream(large_data))

# Right - process incrementally
for item in sanitize_stream(large_data):
    process_and_discard(item)

Issue: JSON parsing fails frequently

Cause: Malformed JSON in stream Solution:

# Use custom error handler for recovery
def json_fixer(exception, chunk):
    # Try common fixes
    fixes = [
        lambda x: x.replace("'", '"'),  # Single to double quotes
        lambda x: x + '}',              # Add missing closing brace
        lambda x: x.rstrip(',')         # Remove trailing comma
    ]
    
    for fix in fixes:
        try:
            return json.loads(fix(chunk))
        except:
            continue
    return None

sanitize_stream(data, error_handler=json_fixer)

Issue: Encoding errors with international text

Cause: Wrong encoding specified Solution:

# Try multiple encodings
encodings = ['utf-8', 'latin1', 'cp1252', 'iso-8859-1']

for encoding in encodings:
    try:
        results = list(sanitize_stream(data, encoding=encoding))
        break
    except UnicodeDecodeError:
        continue
else:
    # Fallback to error-tolerant mode
    results = list(sanitize_stream(data, encoding_errors='replace'))

Debug Mode

Enable detailed logging for troubleshooting:

import logging
logging.basicConfig(level=logging.DEBUG)

# The module will output debug information to stderr
for item in sanitize_stream(problematic_data):
    print(item)

Testing Your Configuration

def test_sanitize_config(data_sample, **kwargs):
    """Test sanitize_stream configuration with sample data."""
    try:
        results = list(sanitize_stream(data_sample, **kwargs))
        print(f"✓ Processed {len(results)} items successfully")
        print(f"Sample output: {results[:3]}")
        return True
    except Exception as e:
        print(f"✗ Configuration failed: {e}")
        return False

# Test your configuration
test_data = ['data: {"test": "value"}', 'data: {"test": 123}']
test_sanitize_config(test_data, intro_value="data:", to_json=True)

Decorator Aliases

The module provides several aliases for decorator usage:

from llm4free.sanitize import LITSTREAM, lit_streamer

# All equivalent
@sanitize_stream_decorator(to_json=True)
@lit_streamer(to_json=True)  
@LITSTREAM.__decorator__(to_json=True)
def my_generator():
    yield 'data: {"key": "value"}'

Integration Guide

With FastAPI

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from llm4free.sanitize import sanitize_stream

app = FastAPI()

@app.get("/stream")
async def stream_data():
    async def generate():
        data_source = get_async_data_source()
        async for item in sanitize_stream(data_source):
            yield f"data: {json.dumps(item)}\n\n"
    
    return StreamingResponse(generate(), media_type="text/plain")

With asyncio

import asyncio
from llm4free.sanitize import sanitize_stream

async def process_multiple_streams():
    streams = [get_stream_1(), get_stream_2(), get_stream_3()]
    
    async def process_stream(stream):
        async for item in sanitize_stream(stream):
            await handle_item(item)
    
    # Process all streams concurrently
    await asyncio.gather(*[process_stream(s) for s in streams])

With requests

import requests
from llm4free.sanitize import sanitize_stream

def stream_api_data(url):
    response = requests.get(url, stream=True)
    response.raise_for_status()
    
    # Process streaming response
    for item in sanitize_stream(
        response.iter_lines(decode_unicode=True),
        intro_value="data: ",
        to_json=True
    ):
        yield item

With LLM4Free Providers

from llm4free.Provider import ChatGPT
from llm4free.sanitize import sanitize_stream

# Process streaming responses from LLM4Free providers
provider = ChatGPT()
response = provider.ask_stream("Tell me a story")

# Clean and format the streaming response
for chunk in sanitize_stream(
    response,
    intro_value="data:",
    to_json=True,
    output_formatter=lambda x: {'role': 'assistant', 'content': x}
):
    print(chunk)

With File Processing

# Process large log files efficiently
with open('large_log_file.log', 'r') as f:
    for item in sanitize_stream(
        f,
        intro_value="LOG:",
        skip_regexes=[r'DEBUG.*', r'TRACE.*'],
        extract_regexes=[r'ERROR: (.*)'],
        to_json=False
    ):
        # Process only ERROR messages
        log_error(item)

With Data Cleaning Pipelines

# Create a data cleaning pipeline
def clean_data_pipeline(raw_data):
    """Multi-stage data cleaning pipeline."""
    
    # Stage 1: Basic cleaning
    cleaned = sanitize_stream(
        raw_data,
        intro_value="data:",
        skip_markers=["[INVALID]", "[CORRUPTED]"],
        strip_chars="\t\r\n"
    )
    
    # Stage 2: JSON validation and extraction
    validated = sanitize_stream(
        cleaned,
        to_json=True,
        extract_regexes=[r'"valid_data": (.*)'],
        error_handler=custom_json_fixer
    )
    
    # Stage 3: Final formatting
    formatted = sanitize_stream(
        validated,
        output_formatter=final_formatter
    )
    
    return formatted

This documentation covers the comprehensive functionality of the llm4free.sanitize module. For the most up-to-date information, refer to the source code and inline documentation.