(no commit message)

This commit is contained in:
2025-10-19 14:16:34 -05:00
parent d33cef379c
commit c1847586be
13 changed files with 884 additions and 1 deletions

173
plugins/llm_dspy.py Normal file
View File

@@ -0,0 +1,173 @@
import os, json, hashlib, random
import dspy
from typing import List, Dict, Tuple
from contracts.assumption_v1 import AssumptionV1
from contracts.job_v1 import JobV1
from contracts.scorecard_v1 import ScorecardV1, Criterion
from contracts.innovation_layer_v1 import InnovationLayerV1
TEMPERATURE = float(os.getenv("JTBD_LLM_TEMPERATURE", "0.2"))
SEED = int(os.getenv("JTBD_LLM_SEED", "42"))
USE_DOUBLE_JUDGE = os.getenv("JTBD_DOUBLE_JUDGE", "1") == "1" # default ON
def _uid(s: str) -> str:
return hashlib.sha1(s.encode()).hexdigest()[:10]
def configure_lm():
"""Configure DSPy global LLM. Edit model name here to your provider choice."""
model = os.getenv("JTBD_DSPY_MODEL", "gpt-4o-mini")
# Check if it's a Claude model
if "claude" in model.lower():
try:
lm = dspy.Anthropic(model=model, max_tokens=4000, temperature=TEMPERATURE)
except Exception:
# Fallback to generic LM
lm = dspy.LM(model=model, max_tokens=4000, temperature=TEMPERATURE)
else:
# Try OpenAI first
try:
lm = dspy.OpenAI(model=model, max_tokens=4000, temperature=TEMPERATURE, seed=SEED)
except Exception:
# Fallback to a generic LM
lm = dspy.LM(model=model, max_tokens=4000, temperature=TEMPERATURE)
dspy.configure(lm=lm)
# ---------------- Signatures ----------------
class DeconstructSig(dspy.Signature):
"""Extract assumptions and classify levels.
Return JSON list of objects: [{text, level(1..3), confidence, evidence:[]}]"""
idea: str = dspy.InputField()
hunches: List[str] = dspy.InputField()
assumptions_json: str = dspy.OutputField()
class JobsSig(dspy.Signature):
"""Generate 5 distinct JTBD statements with Four Forces (push/pull/anxiety/inertia) each.
Return JSON list: [{statement, forces:{push:[], pull:[], anxiety:[], inertia:[]}}]"""
context: str = dspy.InputField()
constraints: str = dspy.InputField()
jobs_json: str = dspy.OutputField()
class MoatSig(dspy.Signature):
"""Apply Doblin/10-types + timing/ops/customer/value triggers to strengthen concept.
Return JSON list: [{type, trigger, effect}]"""
concept: str = dspy.InputField()
triggers: str = dspy.InputField()
layers_json: str = dspy.OutputField()
class JudgeScoreSig(dspy.Signature):
"""Score business idea on exactly these 5 criteria (0-10 scale) with rationales.
Return JSON: {"criteria":[{"name":"Underserved Opportunity","score":7.0,"rationale":"Clear need exists..."}, {"name":"Strategic Impact","score":6.0,"rationale":"..."}, {"name":"Market Scale","score":8.0,"rationale":"..."}, {"name":"Solution Differentiability","score":5.0,"rationale":"..."}, {"name":"Business Model Innovation","score":7.0,"rationale":"..."}], "total":6.6}"""
summary: str = dspy.InputField()
scorecard_json: str = dspy.OutputField()
# ---------------- Modules ----------------
class Deconstruct(dspy.Module):
def __init__(self): super().__init__(); self.p = dspy.Predict(DeconstructSig)
def forward(self, idea: str, hunches: List[str]):
out = self.p(idea=idea, hunches=hunches)
data = json.loads(out.assumptions_json)
# post-process: bound / defaults
items = []
for obj in data[:8]:
text = obj.get("text","").strip()
if not text: continue
level = int(obj.get("level", 2))
level = 1 if level < 1 else 3 if level > 3 else level
conf = float(obj.get("confidence", 0.6))
conf = max(0.0, min(1.0, conf))
items.append(AssumptionV1(
assumption_id=f"assump:{_uid(text)}", text=text, level=level, confidence=conf,
evidence=[e for e in obj.get("evidence", []) if isinstance(e, str)]
))
return items
class Jobs(dspy.Module):
def __init__(self): super().__init__(); self.p = dspy.Predict(JobsSig)
def forward(self, context: Dict[str,str], constraints: List[str]):
out = self.p(context=json.dumps(context), constraints=json.dumps(constraints))
arr = json.loads(out.jobs_json)
jobs = []
seen = set()
for obj in arr[:12]:
stmt = obj.get("statement","").strip()
if not stmt or stmt in seen: continue
seen.add(stmt)
forces = obj.get("forces",{}) or {}
for k in ["push","pull","anxiety","inertia"]:
forces.setdefault(k, [])
jobs.append(JobV1(job_id=f"job:{_uid(stmt)}", statement=stmt, forces=forces))
if len(jobs) >= 5: break
return jobs
class Moat(dspy.Module):
def __init__(self): super().__init__(); self.p = dspy.Predict(MoatSig)
def forward(self, concept: str, triggers: str):
out = self.p(concept=concept, triggers=triggers)
arr = json.loads(out.layers_json)
layers = []
for obj in arr[:6]:
t = str(obj.get("type","")).strip()
tr = str(obj.get("trigger","")).strip()
ef = str(obj.get("effect","")).strip()
if not t or not tr or not ef: continue
layers.append(InnovationLayerV1(layer_id=f"layer:{_uid(t+tr+ef)}", type=t, trigger=tr, effect=ef))
return layers
CRITERIA = ["Underserved Opportunity","Strategic Impact","Market Scale","Solution Differentiability","Business Model Innovation"]
import pickle
JUDGE_COMPILED_PATH = os.getenv("JTBD_JUDGE_COMPILED")
_compiled_judge = None
if JUDGE_COMPILED_PATH and os.path.exists(JUDGE_COMPILED_PATH):
try:
with open(JUDGE_COMPILED_PATH, "rb") as f:
_compiled_judge = pickle.load(f)
except Exception:
_compiled_judge = None
class JudgeScore(dspy.Module):
def __init__(self): super().__init__(); self.p = _compiled_judge or dspy.Predict(JudgeScoreSig)
def forward(self, summary: str):
out = self.p(summary=summary)
try:
data = json.loads(out.scorecard_json)
except json.JSONDecodeError as e:
print(f"JSON decode error: {e}")
print(f"Raw output: {out.scorecard_json}")
# Return default scores if JSON parsing fails
data = {"criteria": [], "total": 5.0}
crits = []
for item in data.get("criteria", []):
name = item.get("name")
if name not in CRITERIA: continue
score = float(item.get("score", 5.0))
score = max(0.0, min(10.0, score))
rationale = item.get("rationale","")
crits.append(Criterion(name=name, score=score, rationale=rationale))
# Fill any missing criteria to maintain schema shape
present = {c.name for c in crits}
for name in CRITERIA:
if name not in present:
crits.append(Criterion(name=name, score=5.0, rationale="defaulted"))
total = round(sum(c.score for c in crits)/len(crits), 2)
return ScorecardV1(target_id="target:final", criteria=crits, total=total)
# --------------- Double-judge arbitration (optional) ---------------
def judge_with_arbitration(summary: str) -> ScorecardV1:
if not USE_DOUBLE_JUDGE:
return JudgeScore()(summary=summary)
j1 = JudgeScore()(summary=summary)
j2 = JudgeScore()(summary=summary)
# Simple tie-breaker: take the criterion-wise average if they differ by <=1.5, else choose the lower.
merged = []
for name in CRITERIA:
c1 = next(c for c in j1.criteria if c.name==name)
c2 = next(c for c in j2.criteria if c.name==name)
diff = abs(c1.score - c2.score)
score = (c1.score + c2.score)/2.0 if diff <= 1.5 else min(c1.score, c2.score)
rationale = f"arb: {c1.rationale} | {c2.rationale}"
merged.append(Criterion(name=name, score=round(score,1), rationale=rationale))
total = round(sum(c.score for c in merged)/len(merged), 2)
return ScorecardV1(target_id="target:final", criteria=merged, total=total)