non-building checkpoint 1

This commit is contained in:
2026-01-03 11:18:56 -07:00
parent 9c64cb0c2f
commit 81ea22eae9
37 changed files with 4804 additions and 275 deletions

View File

@@ -0,0 +1,416 @@
"""
Main prompt service that orchestrates prompt generation and management.
"""
from typing import List, Dict, Any, Optional
from datetime import datetime
from app.core.config import settings
from app.core.logging import setup_logging
from app.services.data_service import DataService
from app.services.ai_service import AIService
from app.models.prompt import (
PromptResponse,
PoolStatsResponse,
HistoryStatsResponse,
FeedbackWord,
FeedbackHistoryItem
)
logger = setup_logging()
class PromptService:
"""Main service for prompt generation and management."""
def __init__(self):
"""Initialize prompt service with dependencies."""
self.data_service = DataService()
self.ai_service = AIService()
# Load settings from config file
self.settings_config = {}
# Cache for loaded data
self._prompts_historic_cache = None
self._prompts_pool_cache = None
self._feedback_words_cache = None
self._feedback_historic_cache = None
self._prompt_template_cache = None
self._feedback_template_cache = None
async def _load_settings_config(self):
"""Load settings from config file if not already loaded."""
if not self.settings_config:
self.settings_config = await self.data_service.load_settings_config()
async def _get_setting(self, key: str, default: Any) -> Any:
"""Get setting value, preferring config file over environment."""
await self._load_settings_config()
return self.settings_config.get(key, default)
# Data loading methods with caching
async def get_prompts_historic(self) -> List[Dict[str, str]]:
"""Get historic prompts with caching."""
if self._prompts_historic_cache is None:
self._prompts_historic_cache = await self.data_service.load_prompts_historic()
return self._prompts_historic_cache
async def get_prompts_pool(self) -> List[str]:
"""Get prompt pool with caching."""
if self._prompts_pool_cache is None:
self._prompts_pool_cache = await self.data_service.load_prompts_pool()
return self._prompts_pool_cache
async def get_feedback_words(self) -> List[Dict[str, Any]]:
"""Get feedback words with caching."""
if self._feedback_words_cache is None:
self._feedback_words_cache = await self.data_service.load_feedback_words()
return self._feedback_words_cache
async def get_feedback_historic(self) -> List[Dict[str, str]]:
"""Get historic feedback words with caching."""
if self._feedback_historic_cache is None:
self._feedback_historic_cache = await self.data_service.load_feedback_historic()
return self._feedback_historic_cache
async def get_prompt_template(self) -> str:
"""Get prompt template with caching."""
if self._prompt_template_cache is None:
self._prompt_template_cache = await self.data_service.load_prompt_template()
return self._prompt_template_cache
async def get_feedback_template(self) -> str:
"""Get feedback template with caching."""
if self._feedback_template_cache is None:
self._feedback_template_cache = await self.data_service.load_feedback_template()
return self._feedback_template_cache
# Core prompt operations
async def draw_prompts_from_pool(self, count: Optional[int] = None) -> List[str]:
"""
Draw prompts from the pool.
Args:
count: Number of prompts to draw
Returns:
List of drawn prompts
"""
if count is None:
count = await self._get_setting('num_prompts', settings.NUM_PROMPTS_PER_SESSION)
pool = await self.get_prompts_pool()
if len(pool) < count:
raise ValueError(
f"Pool only has {len(pool)} prompts, requested {count}. "
f"Use fill-pool endpoint to add more prompts."
)
# Draw prompts from the beginning of the pool
drawn_prompts = pool[:count]
remaining_pool = pool[count:]
# Update cache and save
self._prompts_pool_cache = remaining_pool
await self.data_service.save_prompts_pool(remaining_pool)
logger.info(f"Drew {len(drawn_prompts)} prompts from pool, {len(remaining_pool)} remaining")
return drawn_prompts
async def fill_pool_to_target(self) -> int:
"""
Fill the prompt pool to target volume.
Returns:
Number of prompts added
"""
target_volume = await self._get_setting('cached_pool_volume', settings.CACHED_POOL_VOLUME)
current_pool = await self.get_prompts_pool()
current_size = len(current_pool)
if current_size >= target_volume:
logger.info(f"Pool already at target volume: {current_size}/{target_volume}")
return 0
prompts_needed = target_volume - current_size
logger.info(f"Generating {prompts_needed} prompts to fill pool")
# Generate prompts
new_prompts = await self.generate_prompts(
count=prompts_needed,
use_history=True,
use_feedback=True
)
if not new_prompts:
logger.error("Failed to generate prompts for pool")
return 0
# Add to pool
updated_pool = current_pool + new_prompts
self._prompts_pool_cache = updated_pool
await self.data_service.save_prompts_pool(updated_pool)
added_count = len(new_prompts)
logger.info(f"Added {added_count} prompts to pool, new size: {len(updated_pool)}")
return added_count
async def generate_prompts(
self,
count: Optional[int] = None,
use_history: bool = True,
use_feedback: bool = True
) -> List[str]:
"""
Generate new prompts using AI.
Args:
count: Number of prompts to generate
use_history: Whether to use historic prompts as context
use_feedback: Whether to use feedback words as context
Returns:
List of generated prompts
"""
if count is None:
count = await self._get_setting('num_prompts', settings.NUM_PROMPTS_PER_SESSION)
min_length = await self._get_setting('min_length', settings.MIN_PROMPT_LENGTH)
max_length = await self._get_setting('max_length', settings.MAX_PROMPT_LENGTH)
# Load templates and data
prompt_template = await self.get_prompt_template()
if not prompt_template:
raise ValueError("Prompt template not found")
historic_prompts = await self.get_prompts_historic() if use_history else []
feedback_words = await self.get_feedback_words() if use_feedback else None
# Generate prompts using AI
new_prompts = await self.ai_service.generate_prompts(
prompt_template=prompt_template,
historic_prompts=historic_prompts,
feedback_words=feedback_words,
count=count,
min_length=min_length,
max_length=max_length
)
return new_prompts
async def add_prompt_to_history(self, prompt_text: str) -> str:
"""
Add a prompt to the historic prompts cyclic buffer.
Args:
prompt_text: Prompt text to add
Returns:
Position key of the added prompt (e.g., "prompt00")
"""
historic_prompts = await self.get_prompts_historic()
# Create the new prompt object
new_prompt = {"prompt00": prompt_text}
# Shift all existing prompts down by one position
updated_prompts = [new_prompt]
# Add all existing prompts, shifting their numbers down by one
for i, prompt_dict in enumerate(historic_prompts):
if i >= settings.HISTORY_BUFFER_SIZE - 1: # Keep only HISTORY_BUFFER_SIZE prompts
break
# Get the prompt text
prompt_key = list(prompt_dict.keys())[0]
prompt_text = prompt_dict[prompt_key]
# Create prompt with new number (shifted down by one)
new_prompt_key = f"prompt{i+1:02d}"
updated_prompts.append({new_prompt_key: prompt_text})
# Update cache and save
self._prompts_historic_cache = updated_prompts
await self.data_service.save_prompts_historic(updated_prompts)
logger.info(f"Added prompt to history as prompt00, history size: {len(updated_prompts)}")
return "prompt00"
# Statistics methods
async def get_pool_stats(self) -> PoolStatsResponse:
"""Get statistics about the prompt pool."""
pool = await self.get_prompts_pool()
total_prompts = len(pool)
prompts_per_session = await self._get_setting('num_prompts', settings.NUM_PROMPTS_PER_SESSION)
target_pool_size = await self._get_setting('cached_pool_volume', settings.CACHED_POOL_VOLUME)
available_sessions = total_prompts // prompts_per_session if prompts_per_session > 0 else 0
needs_refill = total_prompts < target_pool_size
return PoolStatsResponse(
total_prompts=total_prompts,
prompts_per_session=prompts_per_session,
target_pool_size=target_pool_size,
available_sessions=available_sessions,
needs_refill=needs_refill
)
async def get_history_stats(self) -> HistoryStatsResponse:
"""Get statistics about prompt history."""
historic_prompts = await self.get_prompts_historic()
total_prompts = len(historic_prompts)
history_capacity = settings.HISTORY_BUFFER_SIZE
available_slots = max(0, history_capacity - total_prompts)
is_full = total_prompts >= history_capacity
return HistoryStatsResponse(
total_prompts=total_prompts,
history_capacity=history_capacity,
available_slots=available_slots,
is_full=is_full
)
async def get_prompt_history(self, limit: Optional[int] = None) -> List[PromptResponse]:
"""
Get prompt history.
Args:
limit: Maximum number of history items to return
Returns:
List of historical prompts
"""
historic_prompts = await self.get_prompts_historic()
if limit is not None:
historic_prompts = historic_prompts[:limit]
prompts = []
for i, prompt_dict in enumerate(historic_prompts):
prompt_key = list(prompt_dict.keys())[0]
prompt_text = prompt_dict[prompt_key]
prompts.append(PromptResponse(
key=prompt_key,
text=prompt_text,
position=i
))
return prompts
# Feedback operations
async def generate_theme_feedback_words(self) -> List[str]:
"""Generate 6 theme feedback words using AI."""
feedback_template = await self.get_feedback_template()
if not feedback_template:
raise ValueError("Feedback template not found")
historic_prompts = await self.get_prompts_historic()
if not historic_prompts:
raise ValueError("No historic prompts available for feedback analysis")
current_feedback_words = await self.get_feedback_words()
historic_feedback_words = await self.get_feedback_historic()
theme_words = await self.ai_service.generate_theme_feedback_words(
feedback_template=feedback_template,
historic_prompts=historic_prompts,
current_feedback_words=current_feedback_words,
historic_feedback_words=historic_feedback_words
)
return theme_words
async def update_feedback_words(self, ratings: Dict[str, int]) -> List[FeedbackWord]:
"""
Update feedback words with new ratings.
Args:
ratings: Dictionary of word to rating (0-6)
Returns:
Updated feedback words
"""
if len(ratings) != 6:
raise ValueError(f"Expected 6 ratings, got {len(ratings)}")
feedback_items = []
for i, (word, rating) in enumerate(ratings.items()):
if not 0 <= rating <= 6:
raise ValueError(f"Rating for '{word}' must be between 0 and 6, got {rating}")
feedback_key = f"feedback{i:02d}"
feedback_items.append({
feedback_key: word,
"weight": rating
})
# Update cache and save
self._feedback_words_cache = feedback_items
await self.data_service.save_feedback_words(feedback_items)
# Also add to historic feedback
await self._add_feedback_words_to_history(feedback_items)
# Convert to FeedbackWord models
feedback_words = []
for item in feedback_items:
key = list(item.keys())[0]
word = item[key]
weight = item["weight"]
feedback_words.append(FeedbackWord(key=key, word=word, weight=weight))
logger.info(f"Updated feedback words with {len(feedback_words)} items")
return feedback_words
async def _add_feedback_words_to_history(self, feedback_items: List[Dict[str, Any]]) -> None:
"""Add feedback words to historic buffer."""
historic_feedback = await self.get_feedback_historic()
# Extract just the words from current feedback
new_feedback_words = []
for i, item in enumerate(feedback_items):
feedback_key = f"feedback{i:02d}"
if feedback_key in item:
word = item[feedback_key]
new_feedback_words.append({feedback_key: word})
if len(new_feedback_words) != 6:
logger.warning(f"Expected 6 feedback words, got {len(new_feedback_words)}. Not adding to history.")
return
# Shift all existing feedback words down by 6 positions
updated_feedback_historic = new_feedback_words
# Add all existing feedback words, shifting their numbers down by 6
for i, feedback_dict in enumerate(historic_feedback):
if i >= settings.FEEDBACK_HISTORY_SIZE - 6: # Keep only FEEDBACK_HISTORY_SIZE items
break
feedback_key = list(feedback_dict.keys())[0]
word = feedback_dict[feedback_key]
new_feedback_key = f"feedback{i+6:02d}"
updated_feedback_historic.append({new_feedback_key: word})
# Update cache and save
self._feedback_historic_cache = updated_feedback_historic
await self.data_service.save_feedback_historic(updated_feedback_historic)
logger.info(f"Added 6 feedback words to history, history size: {len(updated_feedback_historic)}")
# Utility methods for API endpoints
def get_pool_size(self) -> int:
"""Get current pool size (synchronous for API endpoints)."""
if self._prompts_pool_cache is None:
raise RuntimeError("Pool cache not initialized")
return len(self._prompts_pool_cache)
def get_target_volume(self) -> int:
"""Get target pool volume (synchronous for API endpoints)."""
return settings.CACHED_POOL_VOLUME