From acb6d78c8fc4f4d15f45258e3019d7c5f1a2d22b Mon Sep 17 00:00:00 2001 From: Farouk Adeleke Date: Thu, 27 Nov 2025 18:47:26 -0500 Subject: [PATCH] Uncompiled CoTWithThoughtSimplifiedBaleen as baseline --- README.md | 30 +++- agent.json | 91 ++++++++++ auto_classes.json | 4 + config.json | 4 + get_data.py | 101 +++++++++++ main.py | 253 +++++++++++++++++++++++++++ opentom_evaluator.py | 367 ++++++++++++++++++++++++++++++++++++++++ pyproject.toml | 7 + src/__init__.py | 0 src/cot.py | 34 ++++ src/cot_with_thought.py | 51 ++++++ 11 files changed, 941 insertions(+), 1 deletion(-) create mode 100644 agent.json create mode 100644 auto_classes.json create mode 100644 config.json create mode 100644 get_data.py create mode 100644 main.py create mode 100644 opentom_evaluator.py create mode 100644 pyproject.toml create mode 100644 src/__init__.py create mode 100644 src/cot.py create mode 100644 src/cot_with_thought.py diff --git a/README.md b/README.md index 4bfa834..e1a9cdd 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,30 @@ -# CoTWithThoughtSimplifiedBaleen +# DSPy OpenTOM +This repo contains scripts for optimizing DSPy modules for the OpenTOM Benchmark. We support Chain of Thought and a method we thought might work where we generate a "thought" about the context to aid in answering the question (spoiler -- it didn't work better than just `BootstrapFewShotWithRandomSearch`). + +CLI Usage: +``` +usage: main.py [-h] [--student STUDENT] [--teacher TEACHER] [--train_size TRAIN_SIZE] [--download_dataset DOWNLOAD_DATASET] + [--question_types [QUESTION_TYPES ...]] + experiment_title dspy_method dspy_optimizer + +Run DSPY method. + +positional arguments: + experiment_title Title of new experiment + dspy_method The DSPY method to run + dspy_optimizer The DSPY optimizer to use + +options: + -h, --help show this help message and exit + --student STUDENT The LLM to optimize prompts for + --teacher TEACHER Teacher LLM for optimizing prompts. Defaults to Student LLM + --train_size TRAIN_SIZE + Number of training examples to use for optimization + --download_dataset DOWNLOAD_DATASET + Download dataset + --question_types [QUESTION_TYPES ...] + Question types. Defaults to all +``` + +Come chat with us in our [discord](https://discorg.gg/plasticlabs) or in the [DSPy thread](https://discord.com/channels/1161519468141355160/1214629969318252574) diff --git a/agent.json b/agent.json new file mode 100644 index 0000000..217a23f --- /dev/null +++ b/agent.json @@ -0,0 +1,91 @@ +{ + "generate_thought.predict": { + "traces": [], + "train": [], + "demos": [], + "signature": { + "instructions": "Generate thoughts about questions", + "fields": [ + { + "prefix": "Context:", + "description": "may contain relevant facts and psychological insights" + }, + { + "prefix": "Question:", + "description": "${question}" + }, + { + "prefix": "Reasoning: Let's think step by step in order to", + "description": "${reasoning}" + }, + { + "prefix": "Thought:", + "description": "a thought that might help answer the question" + } + ] + }, + "lm": { + "model": "gpt-3.5-turbo", + "model_type": "chat", + "cache": true, + "num_retries": 3, + "finetuning_model": null, + "launch_kwargs": {}, + "train_kwargs": {}, + "temperature": null, + "max_tokens": 1000 + } + }, + "generate_answer.predict": { + "traces": [], + "train": [], + "demos": [], + "signature": { + "instructions": "Generate answers to the questions", + "fields": [ + { + "prefix": "Context:", + "description": "may contain relevant facts and psychological insights" + }, + { + "prefix": "Question:", + "description": "${question}" + }, + { + "prefix": "Thought:", + "description": "a thought that might help answer the question" + }, + { + "prefix": "Answer Choices:", + "description": "${answer_choices}" + }, + { + "prefix": "Reasoning: Let's think step by step in order to", + "description": "${reasoning}" + }, + { + "prefix": "Answer:", + "description": "often between 1 and 5 words" + } + ] + }, + "lm": { + "model": "gpt-3.5-turbo", + "model_type": "chat", + "cache": true, + "num_retries": 3, + "finetuning_model": null, + "launch_kwargs": {}, + "train_kwargs": {}, + "temperature": null, + "max_tokens": 1000 + } + }, + "metadata": { + "dependency_versions": { + "python": "3.13", + "dspy": "3.0.4", + "cloudpickle": "3.1" + } + } +} \ No newline at end of file diff --git a/auto_classes.json b/auto_classes.json new file mode 100644 index 0000000..eb24fa3 --- /dev/null +++ b/auto_classes.json @@ -0,0 +1,4 @@ +{ + "AutoConfig": "src.cot_with_thought.CoTWithThoughtSimplifiedBaleenConfig", + "AutoAgent": "src.cot_with_thought.CoTWithThoughtSimplifiedBaleen" +} \ No newline at end of file diff --git a/config.json b/config.json new file mode 100644 index 0000000..03849bf --- /dev/null +++ b/config.json @@ -0,0 +1,4 @@ +{ + "model": "gpt-3.5-turbo", + "max_tokens": 1000 +} \ No newline at end of file diff --git a/get_data.py b/get_data.py new file mode 100644 index 0000000..74347b6 --- /dev/null +++ b/get_data.py @@ -0,0 +1,101 @@ +import dspy +import requests +import pickle +import json +import random +from collections import defaultdict +import pandas as pd + + +# this is the one that they sampled 100 existing OpenToM plots to produce "extra long" narratives +# URL = "https://raw.githubusercontent.com/SeacowX/OpenToM/main/data/opentom_long.json" +URL = "https://raw.githubusercontent.com/SeacowX/OpenToM/main/data/opentom.json" + + +def default_factory(): + return [] + + +def load_dataset(): + response = requests.get(URL).json() + + df = pd.DataFrame(response) + + # Extract 'type' and 'answer' into separate columns + df["type"] = df["question"].apply(lambda x: x["type"]) + df["answer"] = df["question"].apply(lambda x: x["answer"]) + + unique_answers_by_type = df.groupby("type")["answer"].unique() + + # convert the dataset to what DSPy expects (list of Example objects) + dataset = [] + + for index, row in df.iterrows(): + context = row["narrative"] + question = row["question"]["question"] + answer = row["question"]["answer"] + type = row["question"]["type"] + plot_info = json.dumps( + row["plot_info"] + ) # Keeping each example field as a string might be a good idea + + # update the type value if location is coarse or fine + if "location" in type: + location_granularity = ( + "fine" + if answer.lower().strip() != "yes" and answer.lower().strip() != "no" + else "coarse" + ) + type = f"{type}-{location_granularity}" + + # Answer choices + if "location" in type and ( + answer.lower().strip() != "yes" and answer.lower().strip() != "no" + ): # don't provide answer choices for fine grained location questions + answer_choices = "n/a, list a specific location" + elif "location" in type: + answer_choices = "No, Yes" + else: + answer_choices = ", ".join(unique_answers_by_type[type]) + + dataset.append( + dspy.Example( + context=context, + question=question, + answer=answer, + type=type, + plot_info=plot_info, + answer_choices=answer_choices, + ).with_inputs("context", "question", "answer_choices") + ) + + # split datasets by question types + datasets = defaultdict(default_factory) + + for example in dataset: + datasets[example.type].append(example) + + datasets.keys() + [len(dataset) for dataset in datasets.values()] + + # create train test split + for question_type, dataset in datasets.items(): + random.shuffle(dataset) + + datasets[question_type] = { + "train": dataset[int(len(dataset) * 0.8) :], # 80% test, 20% train + "test": dataset[: int(len(dataset) * 0.8)], + } + + print(f"Train {question_type}: {len(datasets[question_type]['train'])}") + print(f"Test {question_type}: {len(datasets[question_type]['test'])}") + + # Serialize and save the datasets object to a file + with open("datasets.pkl", "wb") as file: + pickle.dump(datasets, file) + + print("🫡 Datasets object has been saved to 'datasets.pkl' 🫡") + + +if __name__ == "__main__": + load_dataset() diff --git a/main.py b/main.py new file mode 100644 index 0000000..c083e30 --- /dev/null +++ b/main.py @@ -0,0 +1,253 @@ +# run with python main.py cot + +import pickle +import time +import argparse +from typing import Optional +from opentom_evaluator import OpenToMEvaluatorDspy +import dspy +from dspy.teleprompt import BootstrapFewShotWithRandomSearch +from dspy.evaluate.evaluate import Evaluate +from src.cot import CoTSimplifiedBaleen, CoTSimplifiedBaleenConfig +from src.cot_with_thought import CoTWithThoughtSimplifiedBaleen, CoTWithThoughtSimplifiedBaleenConfig +from get_data import default_factory, load_dataset +from collections import defaultdict +from dotenv import load_dotenv +import neptune +import numpy as np + +load_dotenv() + +# initialize neptune +run = neptune.init_run( + project="modaic/dspy-opentom", + capture_hardware_metrics=False, + capture_stderr=True, + capture_stdout=True, + capture_traceback=True, +) + +EVAL_QUESTION_TYPES = [ + "attitude", + "multihop-fo", + "multihop-so", + "location-fo-coarse", + "location-fo-fine", + "location-so-coarse", + "location-so-fine", +] + + +def dump_state(data, filename): + with open(filename, "wb") as file: + pickle.dump(data, file) + + +def main( + dspy_method, + dspy_optimizer, + download_dataset, + question_types, + teacher_lm, + train_size, +): + # load dataset + if download_dataset: + load_dataset() + + # read in the datasets pickle object + with open("datasets.pkl", "rb") as file: + datasets = pickle.load(file) + + if dspy_method == "cot": + module_type = CoTSimplifiedBaleen(CoTSimplifiedBaleenConfig()) + module_name = "CoTSimplifiedBaleen" + elif dspy_method == "cot_with_thought": + module_type = CoTWithThoughtSimplifiedBaleen(CoTWithThoughtSimplifiedBaleenConfig()) + module_name = "CoTWithThoughtSimplifiedBaleen" + else: + raise Exception(f"Dspy method '{dspy_method}' is not valid") + + module_type.push_to_hub(f"vintro/{module_name}", with_code=True, commit_message=f"Uncompiled {module_name} as baseline") + modules = {} + # define modules for each question type + for question_type in question_types: + print(f"TYPE: {question_type}") + evaluator = OpenToMEvaluatorDspy(model_name="(training set) complied baleen") + + if dspy_optimizer == "bootstrap_fewshot_with_random_search": + optimizer = BootstrapFewShotWithRandomSearch( + metric=evaluator.dspy_metric, + num_candidate_programs=25, + num_threads=1, + teacher_settings=dict(lm=teacher_lm), + ) + compiled_baleen = optimizer.compile( + module_type(), trainset=datasets[question_type]["train"][:train_size] + ) + # elif dspy_optimizer == "signature_optimizer": # Signature Optimizer is deprecated TODO: add a new one like GEPA + # optimizer = SignatureOptimizer( + # metric=evaluator.dspy_metric, + # breadth=10, + # depth=3, + # init_temperature=1.4, + # verbose=True, + # track_stats=True, + # prompt_model=teacher_lm, + # ) + # eval_kwargs = dict(num_threads=1, display_progress=True, display_table=0) + # compiled_baleen = optimizer.compile( + # module_type(), + # devset=datasets[question_type]["train"][:train_size], + # eval_kwargs=eval_kwargs, + # ) + else: + raise Exception(f"Invalid dspy optimizer type: {dspy_optimizer}") + + modules[question_type] = compiled_baleen + compiled_baleen.push_to_hub(f"vintro/{module_name}-{question_type}", with_code=True, commit_message=f"Compiled {module_name} with {dspy_optimizer} for {question_type}") + time.sleep(10) + + uncompiled_baleen = ( + CoTSimplifiedBaleen() + ) # regular cot is always the uncompiled baseline + + print("Beginning Evaluation") + for question_type in question_types: + compiled_baleen = modules[question_type] + + # Evaluation Procedure: Calculate the F1 Score for a randomly drawn batch of 50 questions 5 times and average the F1 Scores + batch_size = 50 + num_batches = 5 + + assert len(datasets[question_type]["test"]) >= batch_size * num_batches + test = datasets[question_type]["test"][: batch_size * num_batches] + test_sets = [test[i : i + batch_size] for i in range(num_batches)] + + uncompiled_f1_scores = [] + compiled_f1_scores = [] + + for test in test_sets: + # Set up the `evaluate_on_hotpotqa` function. + evaluate_on_opentom = Evaluate( + devset=test, num_threads=1, display_progress=True, display_table=0 + ) + + uncompiled_baleen_evaluator = OpenToMEvaluatorDspy( + model_name="uncompiled_baleen" + ) + evaluate_on_opentom( + uncompiled_baleen, + metric=uncompiled_baleen_evaluator.dspy_metric, + display=True, + ) + uncompiled_f1_scores.append( + uncompiled_baleen_evaluator.f1_score()[question_type]["macro_averaged"] + ) + + compiled_baleen_evaluator = OpenToMEvaluatorDspy( + model_name="compiled_baleen" + ) + evaluate_on_opentom( + compiled_baleen, + metric=compiled_baleen_evaluator.dspy_metric, + display=True, + ) + compiled_f1_scores.append( + compiled_baleen_evaluator.f1_score()[question_type]["macro_averaged"] + ) + + # overall f1 scores + uncompiled_mean_f1 = np.mean(uncompiled_f1_scores) + uncompiled_std_f1 = np.std(uncompiled_f1_scores) + + compiled_mean_f1 = np.mean(compiled_f1_scores) + compiled_std_f1 = np.std(compiled_f1_scores) + + run[f"evaluation/{question_type}/uncompiled/mean_macro_averaged_f1"] = ( + uncompiled_mean_f1 + ) + run[f"evaluation/{question_type}/uncompiled/mean_macro_averaged_f1"] = ( + uncompiled_std_f1 + ) + run[f"evaluation/{question_type}/compiled/mean_macro_averaged_f1"] = ( + compiled_mean_f1 + ) + run[f"evaluation/{question_type}/compiled/mean_macro_averaged_f1"] = ( + compiled_std_f1 + ) + + print( + f"Mean Macro Averaged F1 Scores (± std dev.) - {question_type} - Aggregated from {num_batches} batches of {batch_size} questions" + ) + print(f"uncompiled: {uncompiled_mean_f1:.3f} ± {uncompiled_std_f1:.3}") + print(f"compiled: {compiled_mean_f1:.3} ± {compiled_std_f1:.3}") + + dump_state(modules, "cot_modules.pkl") + run["cot_modules"].upload("cot_modules.pkl") + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Run DSPY method.") + + # dspy arguments + parser.add_argument("experiment_title", type=str, help="Title of new experiment") + parser.add_argument("dspy_method", type=str, help="The DSPY method to run") + parser.add_argument("dspy_optimizer", type=str, help="The DSPY optimizer to use") + parser.add_argument( + "--student", + default="gpt-3.5-turbo", + type=str, + help="The LLM to optimize prompts for", + ) + parser.add_argument( + "--teacher", + default=None, + type=str, + help="Teacher LLM for optimizing prompts. Defaults to Student LLM", + ) + parser.add_argument( + "--train_size", + default=50, + type=int, + help="Number of training examples to use for optimization", + ) + parser.add_argument( + "--download_dataset", default=True, type=bool, help="Download dataset" + ) + parser.add_argument( + "--question_types", + default=EVAL_QUESTION_TYPES, + nargs="*", + help="Question types. Defaults to all", + ) + + args = parser.parse_args() + + # setup LLMs + student_lm = dspy.LM(model=args.student, max_tokens=1000) + args.teacher = args.student if args.teacher is None else args.teacher + teacher_lm = dspy.LM(model=args.teacher, max_tokens=1000) + dspy.settings.configure(lm=student_lm) + + # validate question types + question_types = args.question_types + assert all( + [question_type in EVAL_QUESTION_TYPES for question_type in question_types] + ) + args.question_types = ", ".join( + question_types + ) # turn list into string for neptune logging + + # log run parameters + run["parameters"] = args + run["sys/name"] = args.experiment_title + + main( + args.dspy_method, + args.dspy_optimizer, + args.download_dataset, + question_types, + teacher_lm, + args.train_size, + ) diff --git a/opentom_evaluator.py b/opentom_evaluator.py new file mode 100644 index 0000000..817a231 --- /dev/null +++ b/opentom_evaluator.py @@ -0,0 +1,367 @@ +# taken from https://github.com/seacowx/OpenToM/blob/main/src/evaluate/opentom_evaluator.py +# modified for usability + +from collections import defaultdict +import json +import traceback + + +class OpenToMEvaluatorDspy: + def __init__(self, model_name="") -> None: + self.true_positives = defaultdict(lambda: 0) + self.false_positives = defaultdict(lambda: 0) + self.false_negatives = defaultdict(lambda: 0) + self.model_name = model_name + + def dspy_metric(self, example, pred_answer, trace=None): + type = example.type + + eval_result = self.check_answer(example, pred_answer.answer) + if ( + eval_result == None + ): # Hm what is the correct value to return as a dspy metric when there's an invalid example? + return None + gt, pred = eval_result # ground truth answer class, predicted answer class + + # store positive/negative results by class so we can calculate the f1 scores later + if gt == pred: + self.true_positives[f"{type}_{pred}"] += 1 + else: + self.false_positives[f"{type}_{pred}"] += 1 + self.false_negatives[f"{type}_{gt}"] += 1 + + # print("done", example.type, gt, pred, example.answer, pred_answer.answer) + + return gt == pred + + # this method was added to make dspy evaluation easier + def check_answer( + self, + example, + pred_answer, + cot_flag=False, + perspective="all", + ): + mover, affected_char, eoi, original_place, move_to_place = json.loads( + example.plot_info + ).values() + + cur_question_type = example.type + question_content = example.question + + gt_answer = example.answer.strip() + pred_answer = pred_answer.strip() + + # NOTE: evaluate based on the character + if perspective == "observer": + if mover in question_content and affected_char not in question_content: + return None + + if mover in question_content and affected_char in question_content: + question_tokens = ( + question_content.replace("'s", "").replace(",", "").split() + ) + + mover_idx = question_tokens.index(mover) + affected_char_idx = question_tokens.index(affected_char) + + if mover_idx < affected_char_idx: + return None + + elif perspective == "mover": + if mover not in question_content and affected_char in question_content: + return None + + if mover in question_content and affected_char in question_content: + question_tokens = ( + question_content.replace("'s", "").replace(",", "").split() + ) + + mover_idx = question_tokens.index(mover) + affected_char_idx = question_tokens.index(affected_char) + + if mover_idx > affected_char_idx: + return None + + if cot_flag: + pred_answer = self.parse_cot_answer(pred_answer) + + if cur_question_type == "location-fo-coarse": + gt, pred = self.check_answer_for_cg_location(pred_answer, gt_answer) + return gt, pred + + elif cur_question_type == "location-fo-fine": + gt, pred = self.check_answer_for_fg_location( + pred_answer, gt_answer, original_place, move_to_place + ) + return gt, pred + + elif cur_question_type == "location-so-coarse": + gt, pred = self.check_answer_for_cg_location(pred_answer, gt_answer) + return gt, pred + + elif cur_question_type == "location-so-fine": + gt, pred = self.check_answer_for_fg_location( + pred_answer, gt_answer, original_place, move_to_place + ) + return gt, pred + + elif cur_question_type == "multihop-fo": + if "fullness" in question_content: + gt, pred = self.check_fullness_answer(pred_answer, gt_answer) + return gt, pred + + elif "accessibility" in question_content: + if "|" in gt_answer: + gt_answer = "equally accessible" + + if isinstance(gt_answer, list): + gt_answer = [ele for ele in gt_answer if ele != "corrupted"] + assert len(gt_answer) == 1 + gt_answer = gt_answer[0] + + gt, pred = self.check_accessibility_answer(pred_answer, gt_answer) + return gt, pred + + elif cur_question_type == "multihop-so": + if "fullness" in question_content: + gt, pred = self.check_fullness_answer(pred_answer, gt_answer) + return gt, pred + + elif "accessibility" in question_content: + if "|" in gt_answer: + gt_answer = "equally accessible" + + if isinstance(gt_answer, list): + gt_answer = [ele for ele in gt_answer if ele != "corrupted"] + assert len(gt_answer) == 1 + gt_answer = gt_answer[0] + + gt, pred = self.check_accessibility_answer(pred_answer, gt_answer) + return gt, pred + + elif cur_question_type == "attitude": + gt, pred = self.check_attitude_answer(pred_answer, gt_answer) + return gt, pred + + def f1_score(self): + true_positives = self.true_positives + false_positives = self.false_positives + false_negatives = self.false_negatives + f1_scores = defaultdict(lambda: {"by_class": {}}) + + for _class in ( + true_positives.keys() | false_positives.keys() | false_negatives.keys() + ): + question_type, _ = _class.split("_") + class_true_positives = true_positives[_class] + class_false_positives = false_positives[_class] + class_false_negatives = false_negatives[_class] + class_precision = ( + class_true_positives / (class_true_positives + class_false_positives) + if class_true_positives > 0.0 + else 0.0 + ) # avoid dividing by zero + class_recall = ( + class_true_positives / (class_true_positives + class_false_negatives) + if class_true_positives > 0.0 + else 0.0 + ) + class_f1_score = ( + (2 * class_precision * class_recall) / (class_precision + class_recall) + if class_precision > 0.0 or class_recall > 0.0 + else 0.0 + ) + f1_scores[question_type]["by_class"][_class] = class_f1_score + + for question_type, type_f1_scores in f1_scores.items(): + type_f1_scores = type_f1_scores["by_class"] + macro_averaged_f1_score = sum(list(type_f1_scores.values())) / len( + type_f1_scores + ) + f1_scores[question_type]["macro_averaged"] = macro_averaged_f1_score + + return f1_scores + + # pretty print macro averaged f1 scores for each question type + def print_f1_results(self, round_decimal=2, print_header=False): + f1_scores = self.f1_score() + if print_header: + print("Macro Averaged F1 Scores by question type") + + print(self.model_name, end=" - ") + for question_type, type_f1_scores in f1_scores.items(): + print( + f"{question_type}: {round(type_f1_scores['macro_averaged'], ndigits=round_decimal + 2) * 100}", + end="\t", + ) + print() + + @staticmethod + def remove_determinant(word: str) -> str: + determinants = ["a", "an", "the"] + for det in determinants: + if word.startswith(det): + return word[len(det) :].strip() + return word + + @staticmethod + def compute_lexical_overlap(pred: str, location: str) -> float: + pred = pred.lower().replace("_", " ").replace("'s", "") + location = location.lower().replace("_", " ").replace("'s", "") + score = 0 + pred = pred.replace(".", "").split() + location = location.split() + visited_word = [] + + for word in pred: + if word in location and word not in visited_word: + score += 1 + visited_word.append(word) + + return score / len(location) + + def parse_cot_answer(self, answer: str) -> str: + # cot typically generate answer in the last sentence or paragraph + if "\n" in answer: + answer = answer.split("\n")[-1] + else: + answer = answer.split("Therefore")[-1] + return answer + + def check_answer_for_fg_location( + self, prediction: str, answer: str, original_place: str, move_to_place: str + ) -> list: + # truncate prediction as some of them contain explanations + answer = self.remove_determinant(answer).lower() + original_place = self.remove_determinant(original_place).lower() + move_to_place = self.remove_determinant(move_to_place).lower() + gt_label, pred_label = None, None + original_place_score = self.compute_lexical_overlap(prediction, original_place) + move_to_place_score = self.compute_lexical_overlap(prediction, move_to_place) + + if original_place_score == move_to_place_score: + pred_label = 3 + if original_place_score > move_to_place_score: + pred_label = 1 + elif original_place_score < move_to_place_score: + pred_label = 2 + + if original_place == answer: + gt_label = 1 + elif move_to_place == answer: + gt_label = 2 + + return [gt_label, pred_label] + + def check_answer_for_cg_location(self, prediction: str, answer: str) -> list: + prediction = prediction.lower() + answer = answer.lower() + + if "no" in prediction and "yes" not in prediction: + pred_label = 0 + elif "yes" in prediction and "no" not in prediction: + pred_label = 1 + else: + pred_label = -1 + + if "no" in answer: + gt_label = 0 + elif "yes" in answer: + gt_label = 1 + + return [gt_label, pred_label] + + def check_fullness_answer(self, prediction: str, answer: str) -> list: + prediction = prediction.replace(".", "").lower() + less_full_answer_list = ["less full", "emptier", "more empty"] + more_full_answer_list = ["more full", "fuller"] + pred_label, gt_label = None, None + for less_full_ans in less_full_answer_list: + if less_full_ans in prediction: + pred_label = 1 + + if not pred_label: + for more_full_ans in more_full_answer_list: + if more_full_ans in prediction: + pred_label = 2 + + if not pred_label: + if "equally full" in prediction: + pred_label = 3 + + if not pred_label: + pred_label = -1 # corrupted + + if answer == "less full": + gt_label = 1 + elif answer == "more full": + gt_label = 2 + elif answer == "equally full": + gt_label = 3 + + return [gt_label, pred_label] + + def check_accessibility_answer(self, prediction: str, answer: str) -> list: + prediction = prediction.replace(".", "").lower() + pred_label, gt_label = None, None + if "more accessible" in prediction: + pred_label = 1 + elif "less accessible" in prediction: + pred_label = 2 + elif "equally accessible" in prediction: + pred_label = 3 + else: + pred_label = -1 # corrupted + + if answer == "more accessible": + gt_label = 1 + elif answer == "less accessible": + gt_label = 2 + else: + gt_label = 3 + + return [gt_label, pred_label] + + def check_attitude_answer(self, prediction: str, answer: str) -> list: + prediction = prediction.lower() + answer = answer.lower() + answer_map = {"a": "positive", "b": "neutral", "c": "negative"} + prediction_token = ( + prediction.split("\n\n")[-1].split(":")[-1].split(".")[0].strip().lower() + ) + gt_label, pred_label = None, None + + if answer == "positive": + gt_label = 1 + elif answer == "negative": + gt_label = 2 + else: + gt_label = 3 + + try: + prediction = answer_map[prediction_token] + if prediction == "positive": + pred_label = 1 + elif prediction == "negative": + pred_label = 2 + else: + pred_label = 3 + + except: + if "positive" in prediction_token and "negative" in prediction_token: + pred_label = -1 + elif "positive" in prediction_token and "neutral" in prediction_token: + pred_label = -1 + elif "neutral" in prediction_token and "negative" in prediction_token: + pred_label = -1 + elif "positive" in prediction_token: + pred_label = 1 + elif "negative" in prediction_token: + pred_label = 2 + elif "neutral" in prediction_token: + pred_label = 3 + else: + pred_label = -1 + + return [gt_label, pred_label] diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..0c8996c --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,7 @@ +[project] +name = "CoTWithThoughtSimplifiedBaleen" +version = "0.1.0" +description = "Add your description here" +readme = "README.md" +requires-python = ">=3.13" +dependencies = ["dspy>=3.0.4", "jupyter>=1.1.1", "modaic>=0.4.1", "neptune>=1.14.0"] diff --git a/src/__init__.py b/src/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/cot.py b/src/cot.py new file mode 100644 index 0000000..c4dc684 --- /dev/null +++ b/src/cot.py @@ -0,0 +1,34 @@ +import dspy +from modaic import PrecompiledAgent, PrecompiledConfig + + +# DSPy code +class GenerateAnswer(dspy.Signature): + """Generate answers to the questions""" + + context = dspy.InputField( + desc="may contain relevant facts and psychological insights" + ) + question = dspy.InputField() + answer_choices = dspy.InputField() + answer = dspy.OutputField(desc="often between 1 and 5 words") + + +class CoTSimplifiedBaleenConfig(PrecompiledConfig): + model: str = "gpt-3.5-turbo" + max_tokens: int = 1000 + + +class CoTSimplifiedBaleen(PrecompiledAgent): + config: CoTSimplifiedBaleenConfig + + def __init__(self, config: CoTSimplifiedBaleenConfig, **kwargs): + super().__init__(config, **kwargs) + self.generate_answer = dspy.ChainOfThought(GenerateAnswer) + self.generate_answer.set_lm(dspy.LM(model=config.model, max_tokens=config.max_tokens)) + + def forward(self, question, context, answer_choices): + pred = self.generate_answer( + context=context, question=question, answer_choices=answer_choices + ) + return dspy.Prediction(context=context, answer=pred.answer) diff --git a/src/cot_with_thought.py b/src/cot_with_thought.py new file mode 100644 index 0000000..e6c35f8 --- /dev/null +++ b/src/cot_with_thought.py @@ -0,0 +1,51 @@ +import dspy +from modaic import PrecompiledAgent, PrecompiledConfig + + +# DSPy code +class GenerateAnswer(dspy.Signature): + """Generate answers to the questions""" + + context = dspy.InputField( + desc="may contain relevant facts and psychological insights" + ) + question = dspy.InputField() + thought = dspy.InputField(desc="a thought that might help answer the question") + answer_choices = dspy.InputField() + answer = dspy.OutputField(desc="often between 1 and 5 words") + + +class GenerateThought(dspy.Signature): + """Generate thoughts about questions""" + + context = dspy.InputField( + desc="may contain relevant facts and psychological insights" + ) + question = dspy.InputField() + thought = dspy.OutputField(desc="a thought that might help answer the question") + + +class CoTWithThoughtSimplifiedBaleenConfig(PrecompiledConfig): + model: str = "gpt-3.5-turbo" + max_tokens: int = 1000 + + +class CoTWithThoughtSimplifiedBaleen(PrecompiledAgent): + config: CoTWithThoughtSimplifiedBaleenConfig + + def __init__(self, config: CoTWithThoughtSimplifiedBaleenConfig, **kwargs): + super().__init__(config, **kwargs) + self.generate_thought = dspy.ChainOfThought(GenerateThought) + self.generate_answer = dspy.ChainOfThought(GenerateAnswer) + self.generate_thought.set_lm(dspy.LM(model=config.model, max_tokens=config.max_tokens)) + self.generate_answer.set_lm(dspy.LM(model=config.model, max_tokens=config.max_tokens)) + + def forward(self, question, context, answer_choices): + pred_thought = self.generate_thought(context=context, question=question) + pred = self.generate_answer( + context=context, + question=question, + thought=pred_thought.thought, + answer_choices=answer_choices, + ) + return dspy.Prediction(context=context, answer=pred.answer)