Implementation Plan: News Synthesizer
October 28, 2025 · View on GitHub
Overview
This document provides a comprehensive implementation plan for the News Synthesizer application, following the kliewerdaniel/workflow methodology. The implementation will progress through all development phases: Requirements → Architecture → Implementation → Testing → Security → Deployment → Operations.
Development Methodology
The implementation follows a structured, iterative approach with departmental specialization:
- Requirements (Complete): Functional and non-functional specs defined
- Architecture (Complete): System design with API mappings completed
- Implementation (In Progress): Component development and integration
- Testing (Planned): Comprehensive validation strategy
- Security (Planned): OWASP practices and model isolation
- Deployment (Planned): Containerization and CI/CD
- Operations (Planned): Maintenance and monitoring procedures
Phase 1: Backend Infrastructure Setup
1.1 FastAPI Application Structure
Target Location: backend/main.py → backend/src/api/
Implement: Convert CLI main.py to FastAPI application with the following structure:
# backend/src/api/app.py
from fastapi import FastAPI, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from .routes import feeds, articles, personas, synthesis, compose, tts, chat
from .core.config import settings
from .core.database import init_database
from .core.llm import load_model
app = FastAPI(
title="News Synthesizer API",
description="LLM-powered RSS news processing and synthesis",
version="1.0.0"
)
# CORS for frontend integration
app.add_middleware(
CORSMiddleware,
allow_origins=["http://localhost:3000", "http://127.0.0.1:3000"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Include routers
app.include_router(feeds.router, prefix="/api/v1")
app.include_router(articles.router, prefix="/api/v1")
# ... other routers
# Background tasks setup
background_tasks = BackgroundTasks()
@app.on_event("startup")
async def startup_event():
init_database()
load_model()
@app.get("/health")
async def health_check():
return {"status": "healthy"}
1.2 Database Schema Implementation
Target Location: backend/src/core/database.py
Implement: SQLite with SQLAlchemy for local data persistence:
from sqlalchemy import Column, Integer, String, Text, DateTime, JSON, create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
Base = declarative_base()
class Feed(Base):
__tablename__ = "feeds"
id = Column(Integer, primary_key=True)
url = Column(String, unique=True, nullable=False)
name = Column(String)
category = Column(String)
enabled = Column(Boolean, default=True)
class Article(Base):
__tablename__ = "articles"
id = Column(Integer, primary_key=True)
feed_id = Column(Integer, ForeignKey("feeds.id"))
title = Column(String)
url = Column(String, unique=True)
content = Column(Text)
published_at = Column(DateTime)
metadata = Column(JSON) # LLM-generated metadata
processed_at = Column(DateTime)
class SynthesisJob(Base):
__tablename__ = "synthesis_jobs"
id = Column(Integer, primary_key=True)
query = Column(String)
article_ids = Column(JSON) # List of article IDs used
synthesis = Column(Text) # RAG-generated content
created_at = Column(DateTime)
class AudioFile(Base):
__tablename__ = "audio_files"
id = Column(Integer, primary_key=True)
segment_id = Column(String) # Reference to composed segment
file_path = Column(String)
duration = Column(Integer) # in seconds
voice = Column(String)
# Database initialization
engine = create_engine("sqlite:///news_synthesizer.db")
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
1.3 LLM Integration Layer
Target Location: backend/src/core/llm.py
Implement: llama.cpp wrapper with model management:
import llama_cpp
from llama_cpp import Llama
import logging
from typing import List, Dict, Any
from .config import settings
class LLMManager:
_instance = None
_model = None
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def load_model(self):
if self._model is None:
logging.info("Loading LLM model...")
self._model = Llama(
model_path=settings.LLM_MODEL_PATH,
n_ctx=settings.LLM_CONTEXT_SIZE,
n_gpu_layers=settings.LLM_GPU_LAYERS,
n_threads=settings.LLM_CPU_THREADS,
verbose=False
)
logging.info("LLM model loaded successfully")
return self._model
def generate_metadata(self, article_text: str) -> Dict[str, Any]:
"""Extract article metadata using LLM"""
prompt = f"""Analyze this news article and provide:
- Summary (2-3 sentences)
- Sentiment (positive/negative/neutral with confidence score)
- Keywords (top 10)
- Topics (categorized subjects)
Article: {article_text[:settings.MAX_ARTICLE_LENGTH]}
Output as JSON:"""
response = self.generate(prompt, max_tokens=200)
# Parse and validate JSON response
return self._parse_json_response(response)
def retrieve_and_synthesize(self, query: str, articles: List[Dict]) -> str:
"""Perform RAG synthesis"""
context = self._prepare_context(articles)
prompt = f"""Given these article summaries and this query, synthesize the most relevant information:
Query: {query}
Articles:
{context}
Provide a coherent synthesis addressing the query:"""
return self.generate(prompt, max_tokens=500)
def compose_with_persona(self, content: str, persona: Dict) -> str:
"""Compose using persona guidelines"""
prompt = f"""You are {persona['name']}.
{persona['description']}
Tone: {persona['tone']}
Style: {persona['style']}
Formality: {persona['formality']}
Vocabulary: {persona['vocabulary_level']}
Humor level: {persona['humor']}
Compose a news segment based on this content:
{content}
Segment:"""
return self.generate(prompt, max_tokens=600)
def generate(self, prompt: str, max_tokens: int = 300) -> str:
"""Generic generation method"""
if self._model is None:
raise RuntimeError("Model not loaded")
full_prompt = f"<s>[INST] {prompt} [/INST]"
response = self._model(
full_prompt,
max_tokens=max_tokens,
temperature=0.7,
top_p=0.9,
stop=["</s>"]
)
return response['choices'][0]['text'].strip()
def _prepare_context(self, articles: List[Dict]) -> str:
"""Format articles for RAG context"""
context_lines = []
for article in articles:
context_lines.append(f"- {article['title']}: {article['summary']}")
return "\n".join(context_lines)
def _parse_json_response(self, response: str) -> Dict[str, Any]:
"""Safely parse LLM JSON responses"""
try:
# Extract JSON from response
import json
start = response.find('{')
end = response.rfind('}') + 1
if start != -1 and end > start:
json_str = response[start:end]
return json.loads(json_str)
except Exception:
logging.warning(f"Could not parse JSON response: {response}")
return {}
Phase 2: API Router Implementation
2.1 Feed Management Router
Target Location: backend/src/api/routes/feeds.py
Implement: CRUD operations for RSS feed sources:
from fastapi import APIRouter, HTTPException, Query
from typing import List, Optional
import aiohttp
import feedparser
import yaml
from sqlalchemy.orm import Session
from ..dependencies import get_db
from ..models import Feed
router = APIRouter()
@router.get("/feeds")
async def get_feeds(
skip: int = Query(0, ge=0),
limit: int = Query(100, ge=1, le=1000)
) -> List[Dict[str, Any]]:
"""Retrieve configured RSS feeds"""
feeds = get_feeds_from_config()
return feeds[skip:skip+limit]
@router.post("/feeds")
async def add_feed(feed_data: FeedCreate):
"""Add new RSS feed source"""
# Validate feed URL
try:
async with aiohttp.ClientSession() as session:
async with session.get(feed_data.url, timeout=10) as response:
if response.status != 200:
raise HTTPException(400, "Feed URL not accessible")
feed_content = await response.text()
parsed = feedparser.parse(feed_content)
if parsed.feed == {}:
raise HTTPException(400, "Invalid RSS feed format")
except Exception as e:
raise HTTPException(400, f"Feed validation error: {str(e)}")
# Save to configuration
feeds_config = load_feeds_config()
new_feed = {
"name": feed_data.name,
"url": feed_data.url,
"category": feed_data.category
}
feeds_config.append(new_feed)
save_feeds_config(feeds_config)
return {"message": "Feed added successfully", "feed": new_feed}
@router.post("/feeds/test")
async def test_feed(url: str):
"""Test RSS feed connectivity and parse sample entries"""
try:
async with aiohttp.ClientSession() as session:
async with session.get(url, timeout=10) as response:
content = await response.text()
parsed = feedparser.parse(content)
sample_entries = []
for entry in parsed.entries[:3]: # Test first 3 entries
sample_entries.append({
"title": entry.get('title', ''),
"link": entry.get('link', ''),
"published": entry.get('published', ''),
"summary": entry.get('summary', '')[:200] + "..." if len(entry.get('summary', '')) > 200 else entry.get('summary', '')
})
return {
"feed_title": parsed.feed.get('title', ''),
"feed_description": parsed.feed.get('description', ''),
"total_entries": len(parsed.entries),
"sample_entries": sample_entries
}
except Exception as e:
raise HTTPException(400, f"Feed test failed: {str(e)}")
2.2 Article Processing Router
Target Location: backend/src/api/routes/articles.py
Implement: Article listing, metadata retrieval, and processing:
from fastapi import APIRouter, BackgroundTasks, HTTPException
from typing import List, Optional
from sqlalchemy.orm import Session
from ..dependencies import get_db
from ..models import Article
from ..services import rss_processor, metadata_generator
router = APIRouter()
@router.get("/articles")
async def get_articles(
skip: int = Query(0, ge=0),
limit: int = Query(50, ge=1, le=500),
feed_id: Optional[int] = None,
processed: Optional[bool] = None
) -> List[Dict[str, Any]]:
"""Retrieve processed articles"""
db = next(get_db())
query = db.query(Article)
if feed_id:
query = query.filter(Article.feed_id == feed_id)
if processed is not None:
if processed:
query = query.filter(Article.processed_at.isnot(None))
else:
query = query.filter(Article.processed_at.is_none())
articles = query.offset(skip).limit(limit).all()
return [{
"id": article.id,
"feed_id": article.feed_id,
"title": article.title,
"url": article.url,
"published_at": article.published_at,
"processed_at": article.processed_at,
"has_metadata": bool(article.metadata)
} for article in articles]
@router.post("/articles")
async def process_articles(
background_tasks: BackgroundTasks,
feed_ids: Optional[List[int]] = None
):
"""Trigger RSS fetching and metadata generation"""
background_tasks.add_task(process_rss_feeds_background, feed_ids)
return {"message": "Article processing started asynchronously"}
@router.get("/articles/{article_id}/metadata")
async def get_article_metadata(article_id: int):
"""Retrieve LLM-generated metadata for article"""
db = next(get_db())
article = db.query(Article).filter(Article.id == article_id).first()
if not article:
raise HTTPException(404, "Article not found")
if not article.metadata:
# Generate metadata if not exists
llm = LLMManager()
metadata = llm.generate_metadata(article.content)
article.metadata = metadata
db.commit()
db.refresh(article)
return article.metadata
async def process_rss_feeds_background(feed_ids: Optional[List[int]] = None):
"""Background task for RSS processing"""
processor = RSSProcessor()
llm_manager = LLMManager()
try:
# Fetch from RSS feeds
new_articles = await processor.fetch_from_feeds(feed_ids)
# Generate metadata for each article
for article in new_articles:
if not article.content:
continue
metadata = llm_manager.generate_metadata(article.content)
article.metadata = metadata
# Save to database
db = next(get_db())
for article in new_articles:
db_article = Article(
feed_id=article.feed_id,
title=article.title,
url=article.url,
content=article.content,
published_at=article.published_at,
metadata=article.metadata,
processed_at=datetime.utcnow()
)
db.add(db_article)
db.commit()
logging.info(f"Processed {len(new_articles)} articles")
except Exception as e:
logging.error(f"RSS processing failed: {str(e)}")
2.3 Synthesis and Composition Routers
Target Location: backend/src/api/routes/synthesis.py, backend/src/api/routes/compose.py
Implement: RAG synthesis and persona-driven composition:
# synthesis.py
from fastapi import APIRouter, HTTPException
from sqlalchemy.orm import Session
from ..dependencies import get_db
from ..models import Article, SynthesisJob
from ..core.llm import LLMManager
router = APIRouter()
@router.post("/synthesize")
async def synthesize_content(
request: SynthesisRequest
) -> SynthesisResponse:
"""Generate RAG context from articles"""
db = next(get_db())
# Retrieve relevant articles
articles_query = db.query(Article)
if request.article_ids:
articles_query = articles_query.filter(Article.id.in_(request.article_ids))
articles = articles_query.limit(50).all() # Limiting for context
if not articles:
raise HTTPException(400, "No articles available for synthesis")
# Perform RAG
llm = LLMManager()
synthesis = llm.retrieve_and_synthesize(request.query, articles)
# Save synthesis job
job = SynthesisJob(
query=request.query,
article_ids=[a.id for a in articles],
synthesis=synthesis
)
db.add(job)
db.commit()
db.refresh(job)
return {"synthesis_id": job.id, "synthesis": synthesis}
# compose.py
from fastapi import APIRouter, HTTPException
from ..dependencies import get_db
from ..core.llm import LLMManager
from ..services import persona_loader
router = APIRouter()
@router.post("/compose")
async def compose_segment(
request: CompositionRequest
) -> CompositionResponse:
"""Create persona-driven news segment"""
llm = LLMManager()
# Load persona
try:
persona = persona_loader.load_persona(request.persona_name)
except FileNotFoundError:
raise HTTPException(404, f"Persona '{request.persona_name}' not found")
# Compose segment
segment = llm.compose_with_persona(
content=request.synthesis_content,
persona=persona
)
return {"segment": segment, "persona_used": request.persona_name}
@router.get("/personas")
async def list_personas():
"""Get available personas"""
personas = persona_loader.list_personas()
persona_list = []
for persona_name in personas:
try:
persona_data = persona_loader.load_persona(persona_name)
persona_list.append({
"name": persona_name,
"description": persona_data.get("description", ""),
"tone": persona_data.get("tone", "")
})
except Exception:
pass # Skip invalid persona files
return {"personas": persona_list}
2.4 Text-to-Speech Router
Target Location: backend/src/api/routes/tts.py
Implement: Audio generation with caching:
import asyncio
import io
from fastapi import APIRouter, HTTPException, BackgroundTasks
from fastapi.responses import StreamingResponse
import edge_tts
import hashlib
from pathlib import Path
from ..core.config import settings
from ..models import AudioFile
router = APIRouter()
@router.post("/tts")
async def generate_tts(
request: TTSRequest,
background_tasks: BackgroundTasks
) -> Dict[str, Any]:
"""Generate text-to-speech audio"""
# Generate unique ID for this audio segment
segment_hash = hashlib.md5(f"{request.text}_{request.voice}_{request.speed}".encode()).hexdigest()
audio_filename = f"{segment_hash}.mp3"
audio_path = Path(settings.AUDIO_CACHE_DIR) / audio_filename
# Check cache
if audio_path.exists():
return {
"audio_id": segment_hash,
"cached": True,
"path": str(audio_path),
"voice": request.voice,
"speed": request.speed
}
# Generate audio
background_tasks.add_task(generate_audio_background, request, audio_path, segment_hash)
return {
"audio_id": segment_hash,
"status": "generating",
"message": "Audio generation started in background"
}
@router.get("/tts/{audio_id}")
async def get_tts_audio(audio_id: str):
"""Stream generated audio file"""
audio_path = Path(settings.AUDIO_CACHE_DIR) / f"{audio_id}.mp3"
if not audio_path.exists():
raise HTTPException(404, "Audio file not found or still generating")
def iterfile():
with open(audio_path, "rb") as file:
yield from file
return StreamingResponse(iterfile(), media_type="audio/mpeg")
async def generate_audio_background(
request: TTSRequest,
audio_path: Path,
audio_id: str
):
"""Background audio generation"""
try:
communicate = edge_tts.Communicate(
text=request.text,
voice=request.voice,
rate=f"{(request.speed - 1) * 100:+d}%"
)
await communicate.save(str(audio_path))
logging.info(f"Audio generated: {audio_path}")
# Save to database if needed
db = next(get_db())
audio_record = AudioFile(
segment_id=audio_id,
file_path=str(audio_path),
duration=0, # Could calculate from file
voice=request.voice
)
db.add(audio_record)
db.commit()
except Exception as e:
logging.error(f"Audio generation failed: {str(e)}")
# Clean up partial file
if audio_path.exists():
audio_path.unlink()
Phase 3: Frontend Implementation
3.1 API Client Layer
Target Location: frontend/src/lib/api/client.ts
Implement: TypeScript API client with error handling:
import { z } from 'zod';
// Schema definitions
const ArticleSchema = z.object({
id: z.number(),
feed_id: z.number(),
title: z.string(),
url: z.string(),
published_at: z.string().nullable(),
processed_at: z.string().nullable(),
has_metadata: z.boolean()
});
const SynthesisRequestSchema = z.object({
query: z.string(),
article_ids: z.array(z.number()).optional()
});
const CompositionRequestSchema = z.object({
synthesis_content: z.string(),
persona_name: z.string()
});
const TTSRequestSchema = z.object({
text: z.string(),
voice: z.string().optional().default("en-US-AriaRUS"),
speed: z.number().min(0.5).max(2).optional().default(1)
});
// API client class
class NewsSynthesizerAPI {
private baseUrl: string;
constructor(baseUrl: string = '/api/v1') {
this.baseUrl = baseUrl;
}
// Feed management
async getFeeds(params?: { skip?: number, limit?: number }) {
const query = new URLSearchParams(params as any);
const response = await fetch(`${this.baseUrl}/feeds?${query}`);
return response.json();
}
async addFeed(feedData: { name: string, url: string, category?: string }) {
const response = await fetch(`${this.baseUrl}/feeds`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(feedData)
});
return response.json();
}
async testFeed(url: string) {
const response = await fetch(`${this.baseUrl}/feeds/test`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ url })
});
return response.json();
}
// Article management
async getArticles(params?: {
skip?: number,
limit?: number,
feed_id?: number,
processed?: boolean
}): Promise<typeof ArticleSchema._type[]> {
const query = new URLSearchParams(params as any);
const response = await fetch(`${this.baseUrl}/articles?${query}`);
const data = await response.json();
return z.array(ArticleSchema).parse(data);
}
async processArticles(feed_ids?: number[]) {
const response = await fetch(`${this.baseUrl}/articles`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ feed_ids })
});
return response.json();
}
async getArticleMetadata(articleId: number) {
const response = await fetch(`${this.baseUrl}/articles/${articleId}/metadata`);
return response.json();
}
// Synthesis and composition
async synthesize(query: string, articleIds?: number[]) {
const requestData = { query, article_ids: articleIds };
SynthesisRequestSchema.parse(requestData);
const response = await fetch(`${this.baseUrl}/synthesize`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(requestData)
});
return response.json();
}
async compose(synthesisContent: string, personaName: string) {
const requestData = { synthesis_content: synthesisContent, persona_name: personaName };
CompositionRequestSchema.parse(requestData);
const response = await fetch(`${this.baseUrl}/compose`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(requestData)
});
return response.json();
}
async getPersonas() {
const response = await fetch(`${this.baseUrl}/personas`);
return response.json();
}
// Text-to-speech
async generateTTS(text: string, options?: { voice?: string, speed?: number }) {
const requestData = { text, ...options };
TTSRequestSchema.parse(requestData);
const response = await fetch(`${this.baseUrl}/tts`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(requestData)
});
return response.json();
}
async getTTS(audioId: string): Promise<Blob> {
const response = await fetch(`${this.baseUrl}/tts/${audioId}`);
return response.blob();
}
}
// Export singleton instance
export const apiClient = new NewsSynthesizerAPI(
process.env.NEXT_PUBLIC_API_URL || 'http://localhost:8000/api/v1'
);
3.2 React Components Structure
Target Location: frontend/src/components/
Implement: Reusable UI components following shadcn/ui patterns:
// components/articles/ArticleList.tsx import { useQuery } from '@tanstack/react-query'; import { apiClient } from '@/lib/api/client'; import { Card, CardContent, CardHeader, CardTitle } from '@/components/ui/card'; import { Badge } from '@/components/ui/badge';
export function ArticleList() { const { data: articles, isLoading, error } = useQuery({ queryKey: ['articles'], queryFn: () => apiClient.getArticles({ limit: 20 }) });
if (isLoading) return
return (
Published: {article.published_at ? new Date(article.published_at).toLocaleDateString() : 'Unknown'}
Read original// components/synthesis/SynthesisForm.tsx import { useState } from 'react'; import { useMutation } from '@tanstack/react-query'; import { apiClient } from '@/lib/api/client'; import { Button } from '@/components/ui/button'; import { Input } from '@/components/ui/input'; import { Textarea } from '@/components/ui/textarea';
export function SynthesisForm() { const [query, setQuery] = useState(''); const [articleIds, setArticleIds] = useState<number[]>([]);
const mutation = useMutation({ mutationFn: () => apiClient.synthesize(query, articleIds.length ? articleIds : undefined), onSuccess: (data) => { console.log('Synthesis completed:', data); } });
const handleSubmit = (e: React.FormEvent) => { e.preventDefault(); mutation.mutate(); };
return (