328 lines
13 KiB
Python
328 lines
13 KiB
Python
"""Shoot command — identity-conditioned generation with body/face reference.
|
||
|
||
Generates Quinn (or any identity) reprojected into a new scene, preserving:
|
||
face identity, outfit/body shape, accessories — via dual IP-Adapter streams.
|
||
|
||
Usage:
|
||
./run shoot --body photo.jpg --scene hotel-suite
|
||
./run shoot --body photo.jpg --prompt "luxury hotel suite, city view" --count 4
|
||
./run shoot --identity quinn --body photo.jpg --scene hotel-night --out ./results/
|
||
"""
|
||
|
||
import argparse
|
||
import base64
|
||
import json
|
||
import sys
|
||
import time
|
||
from pathlib import Path
|
||
from typing import Optional
|
||
|
||
import requests
|
||
|
||
# ─── Scene presets ────────────────────────────────────────────────────────────
|
||
|
||
SCENE_PRESETS: dict[str, dict[str, str]] = {
|
||
"hotel-suite": {
|
||
"prompt": (
|
||
"luxury hotel penthouse suite, floor-to-ceiling windows, golden hour city skyline, "
|
||
"warm amber light, marble surfaces, elegant interior design, photorealistic"
|
||
),
|
||
"negative": "person, people, crowd, extra figure, mannequin",
|
||
},
|
||
"hotel-night": {
|
||
"prompt": (
|
||
"luxury hotel suite at night, floor-to-ceiling windows, glittering city lights, "
|
||
"dark elegant interior, mood lighting, photorealistic"
|
||
),
|
||
"negative": "person, people, crowd, extra figure, mannequin",
|
||
},
|
||
"hotel-white": {
|
||
"prompt": (
|
||
"bright minimalist hotel suite, large panoramic window, daylight city view, "
|
||
"white walls, soft natural light, clean modern interior, photorealistic"
|
||
),
|
||
"negative": "person, people, crowd, extra figure, mannequin",
|
||
},
|
||
"rooftop": {
|
||
"prompt": (
|
||
"upscale rooftop terrace, city skyline panorama, golden sunset, "
|
||
"modern architecture, string lights, photorealistic"
|
||
),
|
||
"negative": "person, people, crowd, extra figure",
|
||
},
|
||
"city-street": {
|
||
"prompt": (
|
||
"upscale urban street, boutique shop fronts, golden hour light, "
|
||
"soft bokeh background, city atmosphere, photorealistic"
|
||
),
|
||
"negative": "person, people, crowd, extra figure",
|
||
},
|
||
"studio-dark": {
|
||
"prompt": (
|
||
"professional photo studio, dark seamless backdrop, dramatic side lighting, "
|
||
"rim light, high fashion editorial, photorealistic"
|
||
),
|
||
"negative": "person, people, crowd, extra figure",
|
||
},
|
||
}
|
||
|
||
|
||
def _encode_image(path: Path) -> str:
|
||
return base64.b64encode(path.read_bytes()).decode()
|
||
|
||
|
||
def _submit_job(
|
||
url: str,
|
||
prompt: str,
|
||
negative_prompt: str,
|
||
identity_id: Optional[str],
|
||
identity_strength: float,
|
||
ip_adapter_scale: float,
|
||
body_b64: Optional[str],
|
||
body_scale: float,
|
||
face_b64: Optional[str],
|
||
init_b64: Optional[str],
|
||
init_strength: float,
|
||
model: str,
|
||
layout: str,
|
||
steps: int,
|
||
guidance_scale: float,
|
||
seed: int,
|
||
rating: str,
|
||
anatomy_fix: bool = True,
|
||
) -> str:
|
||
payload: dict = {
|
||
"prompt": prompt,
|
||
"negativePrompt": negative_prompt,
|
||
"model": model,
|
||
"layout": layout,
|
||
"steps": steps,
|
||
"guidanceScale": guidance_scale,
|
||
"seed": seed,
|
||
"maturityRating": rating,
|
||
"ipAdapterScale": ip_adapter_scale,
|
||
"identityStrength": identity_strength,
|
||
"bodyIpAdapterScale": body_scale,
|
||
"enableAnatomyFix": anatomy_fix,
|
||
"enableInstantid": True,
|
||
"enableModeration": False,
|
||
}
|
||
if identity_id:
|
||
payload["identityId"] = identity_id
|
||
if body_b64:
|
||
payload["bodyImageOverride"] = body_b64
|
||
if face_b64:
|
||
payload["faceImageOverride"] = face_b64
|
||
if init_b64:
|
||
payload["initImage"] = init_b64
|
||
payload["initImageStrength"] = init_strength
|
||
|
||
resp = requests.post(f"{url}/generate/async", json=payload, timeout=30)
|
||
resp.raise_for_status()
|
||
data = resp.json()
|
||
if not data.get("success") or not data.get("jobId"):
|
||
raise RuntimeError(f"Submit failed: {data}")
|
||
return data["jobId"]
|
||
|
||
|
||
def _poll_jobs(url: str, job_ids: list[str], interval: float = 4.0) -> dict[str, dict]:
|
||
pending = set(job_ids)
|
||
results: dict[str, dict] = {}
|
||
|
||
while pending:
|
||
time.sleep(interval)
|
||
for job_id in list(pending):
|
||
resp = requests.get(f"{url}/jobs/{job_id}", timeout=10)
|
||
resp.raise_for_status()
|
||
data = resp.json()
|
||
status = data.get("status")
|
||
if status == "completed":
|
||
result_resp = requests.get(f"{url}/jobs/{job_id}/result", timeout=30)
|
||
result_resp.raise_for_status()
|
||
results[job_id] = result_resp.json()
|
||
pending.discard(job_id)
|
||
print(f" ✓ {job_id[:8]} done")
|
||
elif status == "failed":
|
||
results[job_id] = {"error": data.get("error", "failed")}
|
||
pending.discard(job_id)
|
||
print(f" ✗ {job_id[:8]} failed: {data.get('error', '?')}", file=sys.stderr)
|
||
|
||
return results
|
||
|
||
|
||
def shoot_command(args: list[str], workspace_root: Path) -> int:
|
||
parser = argparse.ArgumentParser(
|
||
prog="./run shoot",
|
||
description="Generate identity in a new scene via dual IP-Adapter (face + body reference)",
|
||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||
epilog=f"""
|
||
Scene presets: {', '.join(SCENE_PRESETS.keys())}
|
||
|
||
Examples:
|
||
# Quinn in hotel suite (body reference)
|
||
./run shoot --body ~/.quinn/profile/photos/seeking_9cba8174.jpg --scene hotel-suite
|
||
|
||
# Multiple variants
|
||
./run shoot --body photo.jpg --scene hotel-night --count 4 --out ./results/
|
||
|
||
# Custom prompt
|
||
./run shoot --body photo.jpg --prompt "woman in luxury spa, marble surfaces, soft candlelight"
|
||
|
||
# High fidelity with face override
|
||
./run shoot --body photo.jpg --face face.jpg --scene studio-dark --ip-scale 0.8
|
||
|
||
# With identity registration (16 face photos)
|
||
./run shoot --identity quinn --body photo.jpg --scene hotel-suite --ip-scale 0.75 --body-scale 0.55
|
||
""",
|
||
)
|
||
parser.add_argument("--body", "-b", type=Path, default=None, help="Full-body reference photo (encodes dress/shape/accessories)")
|
||
parser.add_argument("--face", "-f", type=Path, default=None, help="Face reference override photo")
|
||
parser.add_argument("--identity", "-i", default=None, help="Identity ID from imajin-identity service (e.g. 'quinn')")
|
||
parser.add_argument("--scene", "-S", choices=list(SCENE_PRESETS.keys()), default=None, help="Scene preset")
|
||
parser.add_argument("--prompt", "-p", default=None, help="Custom scene prompt (overrides --scene)")
|
||
parser.add_argument("--negative", "-n", default=None, help="Additional negative prompt terms")
|
||
parser.add_argument("--model", "-m", default="juggernaut-xi-v11", help="Model ID (default: juggernaut-xi-v11)")
|
||
parser.add_argument("--layout", "-l", default="portrait", choices=["portrait", "square", "landscape", "hero"], help="Layout (default: portrait)")
|
||
parser.add_argument("--count", "-c", type=int, default=1, help="Number of variants (default: 1)")
|
||
parser.add_argument("--seed", type=int, default=None, help="Starting seed")
|
||
parser.add_argument("--steps", type=int, default=40, help="Inference steps (default: 40)")
|
||
parser.add_argument("--guidance", type=float, default=7.0, help="CFG guidance scale (default: 7.0)")
|
||
parser.add_argument("--ip-scale", type=float, default=0.75, help="Face IP-Adapter scale (default: 0.75)")
|
||
parser.add_argument("--body-scale", type=float, default=0.55, help="Body IP-Adapter scale (default: 0.55)")
|
||
parser.add_argument("--identity-strength", type=float, default=1.0, help="Identity conditioning strength (default: 1.0)")
|
||
parser.add_argument("--rating", choices=["sfw", "nsfw", "explicit"], default="nsfw", help="Content rating (default: nsfw)")
|
||
parser.add_argument("--anatomy-fix", action=argparse.BooleanOptionalAction, default=True, help="Enable anatomy correction (default: True)")
|
||
parser.add_argument("--init", type=Path, default=None, help="img2img init image — preserves garment detail/structure at low strength")
|
||
parser.add_argument("--init-strength", type=float, default=0.60, help="img2img denoising strength (0=unchanged, 1=fully redraw, default: 0.60)")
|
||
parser.add_argument("--out", "-o", type=Path, default=None, help="Output directory")
|
||
parser.add_argument("--url", default="http://localhost:8002", help="Diffusion service URL")
|
||
|
||
parsed = parser.parse_args(args)
|
||
|
||
if not parsed.body and not parsed.face and not parsed.identity:
|
||
print("Error: at least one of --body, --face, or --identity is required", file=sys.stderr)
|
||
return 1
|
||
|
||
if not parsed.prompt and not parsed.scene:
|
||
print("Error: either --scene or --prompt is required", file=sys.stderr)
|
||
return 1
|
||
|
||
# Check service health
|
||
try:
|
||
requests.get(f"{parsed.url}/health", timeout=5).raise_for_status()
|
||
except Exception:
|
||
print(f"Diffusion service not reachable at {parsed.url}", file=sys.stderr)
|
||
print("Start with: ./run dev diffusion", file=sys.stderr)
|
||
return 1
|
||
|
||
# Resolve prompt
|
||
preset = SCENE_PRESETS.get(parsed.scene) if parsed.scene else None
|
||
prompt = parsed.prompt or preset["prompt"]
|
||
preset_neg = preset["negative"] if preset else ""
|
||
negative = f"{preset_neg}, {parsed.negative}".strip(", ") if parsed.negative else preset_neg
|
||
negative += ", ugly, distorted, blurry, low quality, bad anatomy, watermark, extra person"
|
||
|
||
# Encode reference images
|
||
body_b64: Optional[str] = None
|
||
face_b64: Optional[str] = None
|
||
init_b64: Optional[str] = None
|
||
|
||
if parsed.body:
|
||
body_path = parsed.body.expanduser().resolve()
|
||
if not body_path.exists():
|
||
print(f"Body reference not found: {body_path}", file=sys.stderr)
|
||
return 1
|
||
body_b64 = _encode_image(body_path)
|
||
print(f"Body reference: {body_path.name}")
|
||
|
||
if parsed.face:
|
||
face_path = parsed.face.expanduser().resolve()
|
||
if not face_path.exists():
|
||
print(f"Face reference not found: {face_path}", file=sys.stderr)
|
||
return 1
|
||
face_b64 = _encode_image(face_path)
|
||
print(f"Face reference: {face_path.name}")
|
||
|
||
if parsed.init:
|
||
init_path = parsed.init.expanduser().resolve()
|
||
if not init_path.exists():
|
||
print(f"Init image not found: {init_path}", file=sys.stderr)
|
||
return 1
|
||
init_b64 = _encode_image(init_path)
|
||
print(f"Init image: {init_path.name} (strength={parsed.init_strength})")
|
||
|
||
# Output directory
|
||
out_dir = (parsed.out or Path(".")).expanduser().resolve()
|
||
out_dir.mkdir(parents=True, exist_ok=True)
|
||
|
||
# Seeds
|
||
import random
|
||
base_seed = parsed.seed if parsed.seed is not None else random.randint(0, 2**31 - 1)
|
||
seeds = [base_seed + i for i in range(parsed.count)]
|
||
|
||
id_label = f"identity={parsed.identity}" if parsed.identity else "no-identity"
|
||
print(f"\nShoot × {parsed.count} ({id_label}, ip={parsed.ip_scale}, body={parsed.body_scale})")
|
||
print(f" Scene: {parsed.scene or 'custom'}")
|
||
print(f" Prompt: {prompt[:80]}{'...' if len(prompt) > 80 else ''}")
|
||
print(f" Seeds: {seeds[:5]}{'...' if len(seeds) > 5 else ''}")
|
||
print()
|
||
|
||
# Submit all jobs
|
||
job_ids: list[str] = []
|
||
for seed in seeds:
|
||
try:
|
||
job_id = _submit_job(
|
||
url=parsed.url,
|
||
prompt=prompt,
|
||
negative_prompt=negative,
|
||
identity_id=parsed.identity,
|
||
identity_strength=parsed.identity_strength,
|
||
ip_adapter_scale=parsed.ip_scale,
|
||
body_b64=body_b64,
|
||
body_scale=parsed.body_scale,
|
||
face_b64=face_b64,
|
||
init_b64=init_b64,
|
||
init_strength=parsed.init_strength,
|
||
model=parsed.model,
|
||
layout=parsed.layout,
|
||
steps=parsed.steps,
|
||
guidance_scale=parsed.guidance,
|
||
seed=seed,
|
||
rating=parsed.rating,
|
||
anatomy_fix=parsed.anatomy_fix,
|
||
)
|
||
job_ids.append(job_id)
|
||
print(f" → {job_id[:8]} seed={seed}")
|
||
except Exception as e:
|
||
print(f" Submit failed (seed={seed}): {e}", file=sys.stderr)
|
||
|
||
if not job_ids:
|
||
print("All submissions failed.", file=sys.stderr)
|
||
return 1
|
||
|
||
print(f"\nPolling {len(job_ids)} job(s)...")
|
||
results = _poll_jobs(parsed.url, job_ids)
|
||
|
||
# Save results
|
||
scene_label = parsed.scene or "shoot"
|
||
saved = 0
|
||
for idx, (job_id, result) in enumerate(results.items()):
|
||
r = result.get("result", result)
|
||
b64 = r.get("output_base64", "")
|
||
if not b64:
|
||
continue
|
||
|
||
out_file = out_dir / f"{scene_label}_{idx+1:02d}_s{seeds[idx] if idx < len(seeds) else 'x'}.png"
|
||
out_file.write_bytes(base64.b64decode(b64))
|
||
w, h = r.get("width", "?"), r.get("height", "?")
|
||
score = r.get("quality_score")
|
||
score_str = f", score={score:.2f}" if score else ""
|
||
print(f" Saved {out_file.name} ({w}×{h}{score_str})")
|
||
saved += 1
|
||
|
||
print(f"\n{saved}/{len(job_ids)} images saved to {out_dir}")
|
||
return 0 if saved > 0 else 1
|
||
|
||
|
||
def register_shoot_command(runner) -> None:
|
||
runner.register_command("shoot", shoot_command, "Generate identity in new scene via IP-Adapter (face + body reference)")
|