Files
daily-journal-prompt/generate_prompts.py

655 lines
26 KiB
Python

#!/usr/bin/env python3
"""
Daily Journal Prompt Generator
A tool that uses AI to generate creative writing prompts for daily journaling.
"""
import os
import json
import sys
import argparse
import configparser
from datetime import datetime
from typing import List, Dict, Any, Optional
from pathlib import Path
from openai import OpenAI
from dotenv import load_dotenv
from rich.console import Console
from rich.panel import Panel
from rich.table import Table
from rich.prompt import Prompt, Confirm
from rich.progress import Progress, SpinnerColumn, TextColumn
class JournalPromptGenerator:
"""Main class for generating journal prompts using AI."""
def __init__(self, config_path: str = ".env"):
"""Initialize the generator with configuration."""
self.console = Console()
self.config_path = config_path
self.client = None
self.historic_prompts = []
self.pool_prompts = []
self.prompt_template = ""
self.settings = {}
# Load configuration
self._load_config()
self._load_settings()
# Load data files
self._load_prompt_template()
self._load_historic_prompts()
self._load_pool_prompts()
def _load_config(self):
"""Load configuration from environment file."""
load_dotenv(self.config_path)
# Get API key
self.api_key = os.getenv("DEEPSEEK_API_KEY") or os.getenv("OPENAI_API_KEY")
if not self.api_key:
self.console.print("[red]Error: No API key found in .env file[/red]")
self.console.print("Please add DEEPSEEK_API_KEY or OPENAI_API_KEY to your .env file")
sys.exit(1)
# Get API base URL (default to DeepSeek)
self.base_url = os.getenv("API_BASE_URL", "https://api.deepseek.com")
# Get model (default to deepseek-chat)
self.model = os.getenv("MODEL", "deepseek-chat")
# Initialize OpenAI client
self.client = OpenAI(
api_key=self.api_key,
base_url=self.base_url
)
def _load_settings(self):
"""Load settings from settings.cfg configuration file."""
config = configparser.ConfigParser()
# Set default values
self.settings = {
'min_length': 500,
'max_length': 1000,
'num_prompts': 6
}
try:
config.read('settings.cfg')
if 'prompts' in config:
prompts_section = config['prompts']
# Load min_length
if 'min_length' in prompts_section:
self.settings['min_length'] = int(prompts_section['min_length'])
# Load max_length
if 'max_length' in prompts_section:
self.settings['max_length'] = int(prompts_section['max_length'])
# Load num_prompts
if 'num_prompts' in prompts_section:
self.settings['num_prompts'] = int(prompts_section['num_prompts'])
except FileNotFoundError:
self.console.print("[yellow]Warning: settings.cfg not found, using default values[/yellow]")
except ValueError as e:
self.console.print(f"[yellow]Warning: Invalid value in settings.cfg: {e}, using default values[/yellow]")
except Exception as e:
self.console.print(f"[yellow]Warning: Error reading settings.cfg: {e}, using default values[/yellow]")
def _load_prompt_template(self):
"""Load the prompt template from ds_prompt.txt and update with config values."""
try:
with open("ds_prompt.txt", "r") as f:
template = f.read()
# Replace hardcoded values with config values
template = template.replace(
"between 500 and 1000 characters",
f"between {self.settings['min_length']} and {self.settings['max_length']} characters"
)
# Replace the number of prompts (6) with config value
template = template.replace(
"Please generate 6 writing prompts",
f"Please generate {self.settings['num_prompts']} writing prompts"
)
self.prompt_template = template
except FileNotFoundError:
self.console.print("[red]Error: ds_prompt.txt not found[/red]")
sys.exit(1)
def _load_historic_prompts(self):
"""Load historic prompts from JSON file."""
try:
with open("historic_prompts.json", "r") as f:
self.historic_prompts = json.load(f)
except FileNotFoundError:
self.console.print("[yellow]Warning: historic_prompts.json not found, starting with empty history[/yellow]")
self.historic_prompts = []
except json.JSONDecodeError:
self.console.print("[yellow]Warning: historic_prompts.json is corrupted, starting with empty history[/yellow]")
self.historic_prompts = []
def _save_historic_prompts(self):
"""Save historic prompts to JSON file (keeping only first 60)."""
# Keep only the first 60 prompts (newest are at the beginning)
if len(self.historic_prompts) > 60:
self.historic_prompts = self.historic_prompts[:60]
with open("historic_prompts.json", "w") as f:
json.dump(self.historic_prompts, f, indent=2)
def _load_pool_prompts(self):
"""Load pool prompts from JSON file."""
try:
with open("pool_prompts.json", "r") as f:
self.pool_prompts = json.load(f)
except FileNotFoundError:
self.console.print("[yellow]Warning: pool_prompts.json not found, starting with empty pool[/yellow]")
self.pool_prompts = []
except json.JSONDecodeError:
self.console.print("[yellow]Warning: pool_prompts.json is corrupted, starting with empty pool[/yellow]")
self.pool_prompts = []
def _save_pool_prompts(self):
"""Save pool prompts to JSON file."""
with open("pool_prompts.json", "w") as f:
json.dump(self.pool_prompts, f, indent=2)
def add_prompts_to_pool(self, prompts: List[Dict[str, str]]):
"""Add generated prompts to the pool."""
for prompt_dict in prompts:
# Extract prompt text
prompt_key = list(prompt_dict.keys())[0]
prompt_text = prompt_dict[prompt_key]
# Add to pool with a pool-specific key
pool_key = f"poolprompt{len(self.pool_prompts):03d}"
self.pool_prompts.append({
pool_key: prompt_text
})
self._save_pool_prompts()
self.console.print(f"[green]Added {len(prompts)} prompts to pool[/green]")
def draw_prompts_from_pool(self, count: int = None) -> List[Dict[str, str]]:
"""Draw prompts from the pool (removes them from pool)."""
if count is None:
count = self.settings['num_prompts']
if len(self.pool_prompts) < count:
self.console.print(f"[yellow]Warning: Pool only has {len(self.pool_prompts)} prompts, requested {count}[/yellow]")
count = len(self.pool_prompts)
if count == 0:
self.console.print("[red]Error: Pool is empty[/red]")
return []
# Draw prompts from the beginning of the pool
drawn_prompts = self.pool_prompts[:count]
self.pool_prompts = self.pool_prompts[count:]
# Save updated pool
self._save_pool_prompts()
# Renumber remaining pool prompts
self._renumber_pool_prompts()
return drawn_prompts
def _renumber_pool_prompts(self):
"""Renumber pool prompts to maintain sequential numbering."""
renumbered_prompts = []
for i, prompt_dict in enumerate(self.pool_prompts):
# Get the prompt text from the first key in the dictionary
prompt_key = list(prompt_dict.keys())[0]
prompt_text = prompt_dict[prompt_key]
# Create new prompt with correct numbering
new_prompt_key = f"poolprompt{i:03d}"
renumbered_prompts.append({
new_prompt_key: prompt_text
})
self.pool_prompts = renumbered_prompts
def show_pool_stats(self):
"""Show statistics about the prompt pool."""
total_prompts = len(self.pool_prompts)
table = Table(title="Prompt Pool Statistics")
table.add_column("Metric", style="cyan")
table.add_column("Value", style="green")
table.add_row("Prompts in pool", str(total_prompts))
table.add_row("Prompts per session", str(self.settings['num_prompts']))
# Get cached_pool_volume from settings if available
cached_pool_volume = 20 # Default
try:
config = configparser.ConfigParser()
config.read('settings.cfg')
if 'prefetch' in config and 'cached_pool_volume' in config['prefetch']:
cached_pool_volume = int(config['prefetch']['cached_pool_volume'])
except:
pass
table.add_row("Target pool size", str(cached_pool_volume))
table.add_row("Available sessions", str(total_prompts // self.settings['num_prompts']))
self.console.print(table)
def _renumber_prompts(self):
"""Renumber all prompts to maintain prompt00-prompt59 range."""
renumbered_prompts = []
for i, prompt_dict in enumerate(self.historic_prompts):
# Get the prompt text from the first key in the dictionary
prompt_key = list(prompt_dict.keys())[0]
prompt_text = prompt_dict[prompt_key]
# Create new prompt with correct numbering
new_prompt_key = f"prompt{i:02d}"
renumbered_prompts.append({
new_prompt_key: prompt_text
})
self.historic_prompts = renumbered_prompts
def add_prompt_to_history(self, prompt_text: str):
"""
Add a single prompt to the historic prompts cyclic buffer.
The new prompt becomes prompt00, all others shift down, and prompt59 is discarded.
"""
# Create the new prompt object
new_prompt = {
"prompt00": prompt_text
}
# Shift all existing prompts down by one position
# We'll create a new list starting with the new prompt
updated_prompts = [new_prompt]
# Add all existing prompts, shifting their numbers down by one
for i, prompt_dict in enumerate(self.historic_prompts):
if i >= 59: # We only keep 60 prompts total (00-59)
break
# Get the prompt text
prompt_key = list(prompt_dict.keys())[0]
prompt_text = prompt_dict[prompt_key]
# Create prompt with new number (shifted down by one)
new_prompt_key = f"prompt{i+1:02d}"
updated_prompts.append({
new_prompt_key: prompt_text
})
self.historic_prompts = updated_prompts
self._save_historic_prompts()
def _prepare_prompt(self) -> str:
"""Prepare the full prompt with historic context."""
# Format historic prompts for the AI
if self.historic_prompts:
historic_context = json.dumps(self.historic_prompts, indent=2)
full_prompt = f"{self.prompt_template}\n\nPrevious prompts:\n{historic_context}"
else:
full_prompt = self.prompt_template
return full_prompt
def _parse_ai_response(self, response_content: str) -> List[Dict[str, str]]:
"""
Parse the AI response to extract new prompts.
Expected format: JSON array with keys "newprompt0" to "newpromptN" where N = num_prompts-1
Handles DeepSeek API responses that may include backticks and leading "json" string.
"""
# First, try to clean up the response content
cleaned_content = self._clean_ai_response(response_content)
try:
# Try to parse as JSON
data = json.loads(cleaned_content)
# Convert to list of prompt dictionaries
new_prompts = []
for i in range(self.settings['num_prompts']):
key = f"newprompt{i}"
if key in data:
prompt_text = data[key]
prompt_obj = {
f"prompt{len(self.historic_prompts) + i:02d}": prompt_text
}
new_prompts.append(prompt_obj)
# If no prompts were found in the JSON, provide debug info
if not new_prompts:
self.console.print("\n[yellow]Warning: JSON parsed successfully but no prompts found with expected keys[/yellow]")
self.console.print(f"[yellow]Expected keys: newprompt0 to newprompt{self.settings['num_prompts']-1}[/yellow]")
self.console.print(f"[yellow]Keys found in JSON: {list(data.keys())}[/yellow]")
self.console.print(f"[yellow]Full JSON data: {json.dumps(data, indent=2)}[/yellow]")
return new_prompts
except json.JSONDecodeError:
# If not valid JSON, try to extract prompts from text
self.console.print("[yellow]Warning: AI response is not valid JSON, attempting to extract prompts...[/yellow]")
self.console.print(f"[yellow]Full response content for debugging:[/yellow]")
self.console.print(f"[yellow]{response_content}[/yellow]")
self.console.print(f"[yellow]Cleaned content: {cleaned_content}[/yellow]")
# Look for patterns in the text
lines = response_content.strip().split('\n')
new_prompts = []
for i, line in enumerate(lines[:self.settings['num_prompts']]): # Take first N non-empty lines
line = line.strip()
if line and len(line) > 50: # Reasonable minimum length for a prompt
prompt_obj = {
f"prompt{len(self.historic_prompts) + i:02d}": line
}
new_prompts.append(prompt_obj)
# If still no prompts could be parsed, provide detailed debug information
if not new_prompts:
self.console.print("\n[red]ERROR: Could not extract any prompts from AI response[/red]")
self.console.print("[red]="*60 + "[/red]")
self.console.print("[bold red]DEBUG INFORMATION:[/bold red]")
self.console.print("[red]="*60 + "[/red]")
# Show response metadata
self.console.print(f"[yellow]Response length: {len(response_content)} characters[/yellow]")
self.console.print(f"[yellow]Expected number of prompts: {self.settings['num_prompts']}[/yellow]")
# Show first 500 characters of response
preview = response_content[:500]
if len(response_content) > 500:
preview += "..."
self.console.print(f"[yellow]Response preview (first 500 chars):[/yellow]")
self.console.print(f"[yellow]{preview}[/yellow]")
# Show cleaned content analysis
self.console.print(f"[yellow]Cleaned content length: {len(cleaned_content)} characters[/yellow]")
self.console.print(f"[yellow]Cleaned content preview: {cleaned_content[:200]}...[/yellow]")
# Show line analysis
self.console.print(f"[yellow]Number of lines in response: {len(lines)}[/yellow]")
self.console.print(f"[yellow]First 5 lines:[/yellow]")
for i, line in enumerate(lines[:5]):
self.console.print(f"[yellow] Line {i+1}: {line[:100]}{'...' if len(line) > 100 else ''}[/yellow]")
# Show JSON parsing attempt details
self.console.print(f"[yellow]JSON parsing attempted on cleaned content:[/yellow]")
self.console.print(f"[yellow] Cleaned content starts with: {cleaned_content[:50]}...[/yellow]")
# Show full payload for debugging
self.console.print("\n[bold red]FULL PAYLOAD DUMP:[/bold red]")
self.console.print("[red]" + "="*60 + "[/red]")
self.console.print(f"[red]{response_content}[/red]")
self.console.print("[red]" + "="*60 + "[/red]")
return new_prompts
def _clean_ai_response(self, response_content: str) -> str:
"""
Clean up AI response content to handle common formatting issues from DeepSeek API.
Handles:
1. Leading/trailing backticks (```json ... ```)
2. Leading "json" string on its own line
3. Extra whitespace and newlines
"""
content = response_content.strip()
# Remove leading/trailing backticks (```json ... ```)
if content.startswith('```'):
# Find the first newline after the opening backticks
lines = content.split('\n')
if len(lines) > 1:
# Check if first line contains "json" or other language specifier
first_line = lines[0].strip()
if 'json' in first_line.lower() or first_line == '```':
# Remove the first line (```json or ```)
content = '\n'.join(lines[1:])
# Remove trailing backticks if present
if content.endswith('```'):
content = content[:-3].rstrip()
# Remove leading "json" string on its own line (case-insensitive)
lines = content.split('\n')
if len(lines) > 0:
first_line = lines[0].strip().lower()
if first_line == 'json':
content = '\n'.join(lines[1:])
# Also handle the case where "json" might be at the beginning of the first line
# but not the entire line (e.g., "json\n{...}")
content = content.strip()
if content.lower().startswith('json\n'):
content = content[4:].strip()
return content.strip()
def generate_prompts(self) -> List[Dict[str, str]]:
"""Generate new journal prompts using AI."""
self.console.print("\n[cyan]Generating new journal prompts...[/cyan]")
# Prepare the prompt
full_prompt = self._prepare_prompt()
# Show progress
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
transient=True,
) as progress:
task = progress.add_task("Calling AI API...", total=None)
try:
# Call the AI API
response = self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": "You are a creative writing assistant that generates journal prompts. Always respond with valid JSON."},
{"role": "user", "content": full_prompt}
],
temperature=0.7,
max_tokens=2000
)
response_content = response.choices[0].message.content
except Exception as e:
self.console.print(f"[red]Error calling AI API: {e}[/red]")
self.console.print(f"[yellow]Full prompt sent to API (first 500 chars):[/yellow]")
self.console.print(f"[yellow]{full_prompt[:500]}...[/yellow]")
return []
# Parse the response
new_prompts = self._parse_ai_response(response_content)
if not new_prompts:
self.console.print("[red]Error: Could not parse any prompts from AI response[/red]")
return []
# Note: Prompts are NOT added to historic_prompts here
# They will be added only when the user chooses one in interactive mode
# via the add_prompt_to_history() method
return new_prompts
def display_prompts(self, prompts: List[Dict[str, str]]):
"""Display generated prompts in a nice format."""
self.console.print("\n" + "="*60)
self.console.print("[bold green]✨ NEW JOURNAL PROMPTS GENERATED ✨[/bold green]")
self.console.print("="*60 + "\n")
for i, prompt_dict in enumerate(prompts, 1):
# Extract prompt text (key is like "prompt60", "prompt61", etc.)
prompt_key = list(prompt_dict.keys())[0]
prompt_text = prompt_dict[prompt_key]
# Create a panel for each prompt
panel = Panel(
f"[cyan]{prompt_text}[/cyan]",
title=f"[bold]Prompt #{i}[/bold]",
border_style="blue",
padding=(1, 2)
)
self.console.print(panel)
self.console.print() # Empty line between prompts
def show_history_stats(self):
"""Show statistics about prompt history."""
total_prompts = len(self.historic_prompts)
table = Table(title="Prompt History Statistics")
table.add_column("Metric", style="cyan")
table.add_column("Value", style="green")
table.add_row("Total prompts in history", str(total_prompts))
table.add_row("History capacity", "60 prompts")
table.add_row("Available slots", str(max(0, 60 - total_prompts)))
self.console.print(table)
def interactive_mode(self):
"""Run in interactive mode with user prompts."""
self.console.print(Panel.fit(
"[bold]Daily Journal Prompt Generator[/bold]\n"
"Generate creative writing prompts for your journal practice",
border_style="green"
))
while True:
self.console.print("\n[bold]Options:[/bold]")
self.console.print("1. Draw prompts from pool (no API call)")
self.console.print("2. Fill prompt pool using API")
self.console.print("3. View pool statistics")
self.console.print("4. View history statistics")
self.console.print("5. Exit")
choice = Prompt.ask("\nEnter your choice", choices=["1", "2", "3", "4", "5"], default="1")
if choice == "1":
# Draw prompts from pool
drawn_prompts = self.draw_prompts_from_pool()
if drawn_prompts:
self.display_prompts(drawn_prompts)
# Ask if user wants to save a prompt
if Confirm.ask("\nWould you like to save one of these prompts to a file?"):
prompt_num = Prompt.ask(
"Which prompt number would you like to save?",
choices=[str(i) for i in range(1, len(drawn_prompts) + 1)],
default="1"
)
prompt_idx = int(prompt_num) - 1
prompt_dict = drawn_prompts[prompt_idx]
prompt_key = list(prompt_dict.keys())[0]
prompt_text = prompt_dict[prompt_key]
# Save to file
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"journal_prompt_{timestamp}.txt"
with open(filename, "w") as f:
f.write(f"Journal Prompt - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write("="*50 + "\n\n")
f.write(prompt_text)
f.write("\n\n" + "="*50 + "\n")
f.write("Happy writing! ✍️\n")
self.console.print(f"[green]Prompt saved to {filename}[/green]")
# Add the chosen prompt to historic prompts cyclic buffer
self.add_prompt_to_history(prompt_text)
self.console.print(f"[green]Prompt added to history as prompt00[/green]")
elif choice == "2":
# Fill prompt pool using API
new_prompts = self.generate_prompts()
if new_prompts:
self.add_prompts_to_pool(new_prompts)
self.console.print(f"[green]Added {len(new_prompts)} prompts to pool[/green]")
elif choice == "3":
self.show_pool_stats()
elif choice == "4":
self.show_history_stats()
elif choice == "5":
self.console.print("[green]Goodbye! Happy journaling! 📓[/green]")
break
def main():
"""Main entry point for the script."""
parser = argparse.ArgumentParser(description="Generate journal prompts using AI")
parser.add_argument(
"--interactive", "-i",
action="store_true",
help="Run in interactive mode"
)
parser.add_argument(
"--config", "-c",
default=".env",
help="Path to configuration file (default: .env)"
)
parser.add_argument(
"--stats", "-s",
action="store_true",
help="Show history statistics"
)
parser.add_argument(
"--pool-stats", "-p",
action="store_true",
help="Show pool statistics"
)
parser.add_argument(
"--fill-pool", "-f",
action="store_true",
help="Fill prompt pool using API"
)
args = parser.parse_args()
# Initialize generator
generator = JournalPromptGenerator(config_path=args.config)
if args.stats:
generator.show_history_stats()
elif args.pool_stats:
generator.show_pool_stats()
elif args.fill_pool:
# Fill prompt pool using API
new_prompts = generator.generate_prompts()
if new_prompts:
generator.add_prompts_to_pool(new_prompts)
generator.console.print(f"[green]Added {len(new_prompts)} prompts to pool[/green]")
elif args.interactive:
generator.interactive_mode()
else:
# Default: draw prompts from pool (no API call)
drawn_prompts = generator.draw_prompts_from_pool()
if drawn_prompts:
generator.display_prompts(drawn_prompts)
generator.console.print("[yellow]Note: These prompts were drawn from the pool. Use --fill-pool to add more prompts.[/yellow]")
if __name__ == "__main__":
main()