From 6475810170a878ce4273c86a58f07ce677f4f95d Mon Sep 17 00:00:00 2001 From: Farouk Adeleke Date: Wed, 29 Oct 2025 17:55:28 -0400 Subject: [PATCH] (no commit message) --- agent.json | 37 +++++++ agent/__init__.py | 3 + agent/modules.py | 239 ++++++++++++++++++++++++++++++++++++++++++++++ config.json | 5 +- main.py | 29 +++++- 5 files changed, 307 insertions(+), 6 deletions(-) create mode 100644 agent/__init__.py create mode 100644 agent/modules.py diff --git a/agent.json b/agent.json index d3f1728..4fbe0e2 100644 --- a/agent.json +++ b/agent.json @@ -1,4 +1,41 @@ { + "signature_generator.generator": { + "traces": [], + "train": [], + "demos": [], + "signature": { + "instructions": "Given the fields `prompt`, produce the fields `task_description`, `signature_fields`, `signature_name`.", + "fields": [ + { + "prefix": "Prompt:", + "description": "Natural language description of the desired functionality" + }, + { + "prefix": "Task Description:", + "description": "Clear description of what the signature accomplishes" + }, + { + "prefix": "Signature Fields:", + "description": "List of input and output fields for the signature" + }, + { + "prefix": "Signature Name:", + "description": "Suggested class name for the signature (PascalCase)" + } + ] + }, + "lm": { + "model": "gemini/gemini-2.5-pro-preview-03-25", + "model_type": "chat", + "cache": true, + "num_retries": 3, + "finetuning_model": null, + "launch_kwargs": {}, + "train_kwargs": {}, + "temperature": 0.7, + "max_tokens": 4096 + } + }, "metadata": { "dependency_versions": { "python": "3.11", diff --git a/agent/__init__.py b/agent/__init__.py new file mode 100644 index 0000000..98962fa --- /dev/null +++ b/agent/__init__.py @@ -0,0 +1,3 @@ +from .modules import SignatureGenerator + +__all__ = ["SignatureGenerator"] diff --git a/agent/modules.py b/agent/modules.py new file mode 100644 index 0000000..d6e3615 --- /dev/null +++ b/agent/modules.py @@ -0,0 +1,239 @@ +from enum import Enum +from typing import Any, Dict, List, Optional, Literal +from pydantic import BaseModel, Field, ValidationError +import dspy + + +class FieldType(str, Enum): + STRING = "str" + INTEGER = "int" + FLOAT = "float" + BOOLEAN = "bool" + LIST_STRING = "list[str]" + LIST_INT = "list[int]" + LIST_FLOAT = "list[float]" + DICT_STR_STR = "dict[str, str]" + DICT_STR_INT = "dict[str, int]" + DICT_STR_ANY = "dict[str, Any]" + IMAGE = "dspy.Image" + AUDIO = "dspy.Audio" + LITERAL = "Literal" + OPTIONAL_STR = "Optional[str]" + OPTIONAL_INT = "Optional[int]" + OPTIONAL_FLOAT = "Optional[float]" + + +class FieldRole(str, Enum): + INPUT = "input" + OUTPUT = "output" + + +class GeneratedField(BaseModel): + name: str = Field(description="The field name (snake_case, descriptive)") + type: FieldType = Field(description="The Python type for this field") + role: FieldRole = Field(description="Whether this is an input or output field") + description: str = Field(description="Description of what this field represents") + literal_values: Optional[List[str]] = Field( + default=None, description="For Literal types, the allowed values" + ) + default_value: Optional[str] = Field( + default=None, + description="Default value for the field (as string representation)", + ) + + def to_dspy_field_code(self) -> str: + if self.type == FieldType.LITERAL and self.literal_values: + type_annotation = ( + f"Literal[{', '.join(repr(v) for v in self.literal_values)}]" + ) + else: + type_annotation = self.type.value + + field_type = "InputField" if self.role == FieldRole.INPUT else "OutputField" + + if self.description: + return f'{self.name}: {type_annotation} = dspy.{field_type}(desc="{self.description}")' + else: + return f"{self.name}: {type_annotation} = dspy.{field_type}()" + + +class SignatureGeneration(dspy.Signature): + prompt: str = dspy.InputField( + desc="Natural language description of the desired functionality" + ) + task_description: str = dspy.OutputField( + desc="Clear description of what the signature accomplishes" + ) + signature_fields: list[GeneratedField] = dspy.OutputField( + desc="List of input and output fields for the signature" + ) + signature_name: str = dspy.OutputField( + desc="Suggested class name for the signature (PascalCase)" + ) + + +class SignatureGenerator(dspy.Module): + def __init__(self): + super().__init__() + self.generator = dspy.Predict(SignatureGeneration) + + def forward(self, prompt: str): + """Generate DSPy signature and return raw prediction attributes""" + result = self.generator(prompt=prompt) + + return dspy.Prediction( + signature_name=result.signature_name, + task_description=result.task_description, + signature_fields=result.signature_fields, + reasoning=result.reasoning if hasattr(result, "reasoning") else None, + ) + + def generate_signature(self, prompt: str) -> Dict[str, Any]: + """Legacy method for backward compatibility - returns formatted dict""" + try: + result = self.forward(prompt=prompt) + + return { + "signature_name": result.signature_name, + "task_description": result.task_description, + "fields": [field.model_dump() for field in result.signature_fields], + "code": self.generate_code(result), + "reasoning": result.reasoning, + } + except ValidationError as e: + error_msg = f"Data validation error from Pydantic: {e}" + return self._format_error(error_msg) + except Exception as e: + # Catch other potential errors (e.g., from dspy) + error_msg = f"An unexpected error occurred: {e}" + return self._format_error(error_msg) + + def _format_error(self, error_message: str) -> Dict[str, Any]: + """Helper to create a standardized error dictionary.""" + return { + "error": error_message, + "signature_name": None, + "task_description": None, + "fields": [], + "code": None, + } + + @classmethod + def create_signature_class(cls, prediction: dspy.Prediction) -> type: + """ + Dynamically creates a dspy.Signature class from a prediction object. + + Args: + prediction: An object with attributes `signature_name`, `task_description`, + and `signature_fields`. + + Returns: + A new class that inherits from dspy.Signature. + """ + class_name = prediction.signature_name + docstring = prediction.task_description + + class_attrs = {"__doc__": docstring, "__annotations__": {}} + + for field in prediction.signature_fields: + field_name = field.name + py_type = cls._get_python_type_from_field(field) + + dspy_field_class = ( + dspy.InputField if field.role == FieldRole.INPUT else dspy.OutputField + ) + dspy_field_instance = dspy_field_class(desc=field.description) + + class_attrs[field_name] = dspy_field_instance + class_attrs["__annotations__"][field_name] = py_type + + DynamicSignature = type(class_name, (dspy.Signature,), class_attrs) + return DynamicSignature + + @staticmethod + def _get_python_type_from_field(field: "GeneratedField") -> type: + """Converts a GeneratedField into a Python type for annotations.""" + type_str = field.type.value + + type_map = { + "str": str, + "int": int, + "float": float, + "bool": bool, + "list[str]": List[str], + "list[int]": List[int], + "list[float]": List[float], + "dict[str, str]": Dict[str, str], + "dict[str, int]": Dict[str, int], + "dict[str, Any]": Dict[str, Any], + "Optional[str]": Optional[str], + "Optional[int]": Optional[int], + "Optional[float]": Optional[float], + "dspy.Image": dspy.Image, + "dspy.Audio": dspy.Audio, + } + + if field.type == FieldType.LITERAL and field.literal_values: + return Literal[tuple(field.literal_values)] + + if type_str in type_map: + return type_map[type_str] + + raise TypeError( + f"Unsupported field type for dynamic class creation: {type_str}" + ) + + @classmethod + def generate_code(cls, prediction) -> str: + """Generate Python code from a signature prediction""" + imports = cls.get_required_imports(prediction.signature_fields) + + code_lines = [] + code_lines.extend(imports) + code_lines.append("") + code_lines.append(f"class {prediction.signature_name}(dspy.Signature):") + code_lines.append(f' """{prediction.task_description}"""') + code_lines.append("") + + for field in prediction.signature_fields: + code_lines.append(f" {field.to_dspy_field_code()}") + + return "\n".join(code_lines) + + @classmethod + def get_required_imports(cls, fields: List[GeneratedField]) -> List[str]: + """Determine required imports based on field types""" + imports = ["import dspy"] + typing_imports = set() + + for field in fields: + if field.type == FieldType.LITERAL: + typing_imports.add("Literal") + elif field.type in [ + FieldType.OPTIONAL_STR, + FieldType.OPTIONAL_INT, + FieldType.OPTIONAL_FLOAT, + ]: + typing_imports.add("Optional") + elif field.type in [ + FieldType.LIST_STRING, + FieldType.LIST_INT, + FieldType.LIST_FLOAT, + ]: + typing_imports.add("List") + elif field.type in [ + FieldType.DICT_STR_STR, + FieldType.DICT_STR_INT, + FieldType.DICT_STR_ANY, + ]: + typing_imports.add("Dict") + + if field.type == FieldType.DICT_STR_ANY: + typing_imports.add("Any") + + if typing_imports: + imports.append( + f"from typing import {', '.join(sorted(list(typing_imports)))}" + ) + + return imports diff --git a/config.json b/config.json index 831713f..5fe6a03 100644 --- a/config.json +++ b/config.json @@ -1,4 +1,5 @@ { - "lm": "gpt-4o", - "max_tokens": 1024 + "lm": "gemini/gemini-2.5-pro-preview-03-25", + "max_tokens": 4096, + "temperature": 0.7 } \ No newline at end of file diff --git a/main.py b/main.py index 7be506a..57b6bb1 100644 --- a/main.py +++ b/main.py @@ -1,23 +1,44 @@ from modaic import PrecompiledAgent, PrecompiledConfig +from agent import SignatureGenerator +import dspy + class PromptToSignatureConfig(PrecompiledConfig): - lm: str = "gpt-4o" - max_tokens: int = 1024 + lm: str = "gemini/gemini-2.5-pro-preview-03-25" + max_tokens: int = 4096 + temperature: float = 0.7 + class PromptToSignatureAgent(PrecompiledAgent): config: PromptToSignatureConfig def __init__(self, config: PromptToSignatureConfig, **kwargs): super().__init__(config, **kwargs) + self.signature_generator = SignatureGenerator() - def forward(self, prompt: str) -> str: - return "hello world" + lm = dspy.LM( + model=config.lm, + max_tokens=config.max_tokens, + temperature=config.temperature, + ) + + self.signature_generator.set_lm(lm) + + def forward(self, prompt: str, as_dict: bool = False) -> dspy.Prediction: + # returns dspy.Prediction object or dict + return ( + self.signature_generator.generate_signature(prompt) + if as_dict + else self.signature_generator(prompt) + ) agent = PromptToSignatureAgent(PromptToSignatureConfig()) + def main(): agent.push_to_hub("fadeleke/prompt-to-signature", with_code=True) + if __name__ == "__main__": main()