stable-diffusion-webui-forge/modules/progress.py
2025-09-22 12:20:25 -07:00

201 lines
9.2 KiB
Python

# --- START OF FILE progress.py (MODIFIED ORIGINAL) ---
from __future__ import annotations # Keep if your Python version needs it
import base64
import io
import time
import gradio as gr
from pydantic import BaseModel, Field
from modules.shared import opts # Assuming this import is correct for your env
import modules.shared as shared # Assuming this import is correct
from collections import OrderedDict
import string
import random
from typing import List, Optional # Optional is good practice, or use | None
current_task = None
pending_tasks = OrderedDict()
finished_tasks = []
recorded_results = []
recorded_results_limit = 2
def start_task(id_task):
global current_task
current_task = id_task
pending_tasks.pop(id_task, None)
def finish_task(id_task):
global current_task
if current_task == id_task:
current_task = None
finished_tasks.append(id_task)
if len(finished_tasks) > 16:
finished_tasks.pop(0)
def create_task_id(task_type):
N = 7
res = ''.join(random.choices(string.ascii_uppercase + string.digits, k=N))
return f"task({task_type}-{res})"
def record_results(id_task, res):
recorded_results.append((id_task, res))
if len(recorded_results) > recorded_results_limit:
recorded_results.pop(0)
def add_task_to_queue(id_job):
pending_tasks[id_job] = time.time()
class PendingTasksResponse(BaseModel):
size: int = Field(title="Pending task size")
tasks: List[str] = Field(title="Pending task ids")
class ProgressRequest(BaseModel):
id_task: str = Field(default=None, title="Task ID", description="id of the task to get progress for")
id_live_preview: int = Field(default=-1, title="Live preview image ID", description="id of last received last preview image")
live_preview: bool = Field(default=True, title="Include live preview", description="boolean flag indicating whether to include the live preview image")
class ProgressResponse(BaseModel):
active: bool = Field(title="Whether the task is being worked on right now")
queued: bool = Field(title="Whether the task is in queue")
completed: bool = Field(title="Whether the task has already finished")
# FIX: Use | None or Optional[] for fields that can be None
progress: float | None = Field(default=None, title="Progress", description="The progress with a range of 0 to 1")
eta: float | None = Field(default=None, title="ETA in secs")
live_preview: str | None = Field(default=None, title="Live preview image", description="Current live preview; a data: uri")
id_live_preview: int | None = Field(default=None, title="Live preview image ID", description="Send this together with next request to prevent receiving same image")
textinfo: str | None = Field(default=None, title="Info text", description="Info text used by WebUI.")
def setup_progress_api(app):
app.add_api_route("/internal/pending-tasks", get_pending_tasks, methods=["GET"])
# Ensure response_model matches the actual return possibilities
return app.add_api_route("/internal/progress", progressapi, methods=["POST"], response_model=ProgressResponse)
def get_pending_tasks():
pending_tasks_ids = list(pending_tasks)
pending_len = len(pending_tasks_ids)
return PendingTasksResponse(size=pending_len, tasks=pending_tasks_ids)
def progressapi(req: ProgressRequest):
active = req.id_task == current_task
queued = req.id_task in pending_tasks
completed = req.id_task in finished_tasks
#print(f"PYTHON progressapi: Request for id_task='{req.id_task}'. Current pending_tasks: {list(pending_tasks.keys())}. Current current_task: {current_task}. Timestamp: {time.time()}")
active = req.id_task == current_task
queued = req.id_task in pending_tasks # Check membership *before* any potential modifications
completed = req.id_task in finished_tasks
# Initialize all potentially None fields for the response
# This makes it clearer what's being returned, especially for the non-active case
current_progress = None
current_eta = None
current_live_preview = None
current_id_live_preview = req.id_live_preview # Start with requested id
current_textinfo = None
if not active:
current_textinfo = "In queue :>"
if queued:
# Ensure req.id_task is actually in pending_tasks before calling index
if req.id_task in pending_tasks:
sorted_queued = sorted(pending_tasks.keys(), key=lambda x: pending_tasks[x])
try:
queue_index = sorted_queued.index(req.id_task)
current_textinfo = "In queue: {}/{}".format(queue_index + 1, len(sorted_queued))
except ValueError:
# Should not happen if req.id_task in pending_tasks, but good for robustness
current_textinfo = "In queue (error finding position)"
else:
# Task is not active, not in pending_tasks. Could be completed or unknown.
if completed:
current_textinfo = "Completed"
else:
current_textinfo = "Status unknown" # Or some other appropriate default
# When not active, we only send back active, queued, completed, id_live_preview and textinfo.
# The Pydantic model will use default=None for progress, eta, live_preview.
#print(f"DEBUG: Responding for task {req.id_task}: active={active}, queued={queued}, completed={completed}, textinfo='{current_textinfo}'")
return ProgressResponse(
active=active,
queued=queued,
completed=completed,
progress=None, # Explicitly None
eta=None, # Explicitly None
live_preview=None, # Explicitly None
id_live_preview= -1 if not shared.state.id_live_preview else shared.state.id_live_preview, # or req.id_live_preview
textinfo=current_textinfo
)
# This part is ONLY reached if active is True
current_progress = 0.0 # Default to float if active
# Ensure shared.state is initialized before accessing attributes
if shared.state.job_count is not None and shared.state.job_no is not None:
if shared.state.job_count > 0:
current_progress += shared.state.job_no / shared.state.job_count
if shared.state.sampling_steps is not None and shared.state.sampling_step is not None and shared.state.job_count is not None:
if shared.state.sampling_steps > 0 and shared.state.job_count > 0:
current_progress += (1 / shared.state.job_count) * (shared.state.sampling_step / shared.state.sampling_steps)
current_progress = min(current_progress, 1.0)
if shared.state.time_start is not None:
elapsed_since_start = time.time() - shared.state.time_start
if current_progress > 0:
predicted_duration = elapsed_since_start / current_progress
current_eta = predicted_duration - elapsed_since_start
else:
current_eta = None # Explicitly None if progress is 0
else:
current_eta = None # Explicitly None if time_start is not set
# current_live_preview remains None by default
# current_id_live_preview is already req.id_live_preview
if opts is not None and opts.live_previews_enable and req.live_preview:
shared.state.set_current_image() # Make sure this method exists and is safe
if shared.state.id_live_preview != req.id_live_preview:
image = shared.state.current_image
if image is not None:
buffered = io.BytesIO()
save_kwargs = {}
image_format = getattr(opts, 'live_previews_image_format', 'png') if opts else 'png'
if image_format == "png":
if max(*image.size) <= 256:
save_kwargs = {"optimize": True}
else:
save_kwargs = {"optimize": False, "compress_level": 1}
image.save(buffered, format=image_format, **save_kwargs)
base64_image = base64.b64encode(buffered.getvalue()).decode('ascii')
current_live_preview = f"data:image/{image_format};base64,{base64_image}"
current_id_live_preview = shared.state.id_live_preview
current_textinfo = shared.state.textinfo # This could be None
#print(f"DEBUG: Responding for task {req.id_task}: active={active}, queued={queued}, completed={completed}, textinfo='{current_textinfo}'")
return ProgressResponse(
active=active,
queued=queued,
completed=completed,
progress=current_progress,
eta=current_eta,
live_preview=current_live_preview,
id_live_preview=current_id_live_preview,
textinfo=current_textinfo
)
def restore_progress(id_task):
while id_task == current_task or id_task in pending_tasks:
time.sleep(0.1)
res = next(iter([x[1] for x in recorded_results if id_task == x[0]]), None)
if res is not None:
return res
return gr.update(), gr.update(), gr.update(), f"Couldn't restore progress for {id_task}: results either have been discarded or never were obtained"