forked from dCaples/AutoDidact
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsimple_qa.py
executable file
·195 lines (161 loc) · 6.66 KB
/
simple_qa.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
#!/usr/bin/env python3
"""
Simple command-line Q&A environment for testing with search functionality.
"""
import sys
import json
import random
import time
import asyncio
from typing import Dict, Any
# Import our search module (ensure these functions follow the new interfaces)
from search_module import search, get_question_answer, get_question_count
class SimpleQAEnvironment:
"""Simple command-line environment for Q&A with search capability."""
def __init__(self):
self.score = {"correct": 0, "incorrect": 0, "total": 0}
self.session_data = []
self.current_question = None
def display_welcome(self):
"""Display welcome message and instructions."""
print("\n===== Search & Answer Environment =====")
print("Answer questions using the search tool to find relevant information.")
print("Type 'q' to quit, 'h' for help.\n")
def display_help(self):
"""Display help information."""
print("\n===== Commands =====")
print("n - Get a new question")
print("s <query> - Search for information (e.g., s program launch date)")
print("a <answer> - Submit your answer")
print("h - Display this help message")
print("q - Quit the program\n")
def display_question(self, question: str):
"""Display the current question."""
print("\n===== QUESTION =====")
print(question)
print("=====================\n")
def get_new_question(self) -> str:
"""Get a new random question and set it as current."""
total_questions = get_question_count()
question_id = random.randint(0, total_questions - 1)
# Updated to match new interface: get_question_answer now returns a dict.
qa = get_question_answer(question_id)
question = qa["question"]
correct_answer = qa["answer"]
question_data = {
"id": question_id,
"question": question,
"correct_answer": correct_answer,
"start_time": time.time(),
"searches": []
}
self.current_question = question_data
return question
def perform_search(self, query: str):
"""Perform a search with the given query."""
if not query:
print("Please provide a search query.")
return
try:
print("\n===== SEARCH RESULTS =====")
results = search(query)
print(results)
print("==========================\n")
# Record search in current question data if available.
if self.current_question is not None:
self.current_question["searches"].append(query)
except Exception as e:
print(f"Error searching: {str(e)}")
async def process_answer(self, user_answer: str):
"""Process and verify the user's answer."""
if self.current_question is None:
print("Please get a question first.")
return
if not user_answer:
print("Please provide an answer.")
return
# Record answer and calculate time taken.
self.current_question["user_answer"] = user_answer
self.current_question["end_time"] = time.time()
self.current_question["time_taken"] = (
self.current_question["end_time"] - self.current_question["start_time"]
)
try:
print("\nVerifying your answer...")
correct = await verify(
user_answer,
self.current_question["question"],
self.current_question["correct_answer"],
router
)
# Update score and inform the user.
self.score["total"] += 1
if correct:
self.score["correct"] += 1
print("\n✓ Your answer is CORRECT!")
else:
self.score["incorrect"] += 1
print("\n✗ Your answer is INCORRECT.")
print(f"\nThe correct answer is:\n{self.current_question['correct_answer']}")
print(f"\nScore: {self.score['correct']}/{self.score['total']}")
# Record the result and add the current question to the session data.
self.current_question["is_correct"] = correct
self.session_data.append(self.current_question)
# Clear the current question.
self.current_question = None
except Exception as e:
print(f"Error verifying answer: {str(e)}")
def save_session(self):
"""Save the session data to a file."""
if not self.session_data:
return
timestamp = time.strftime("%Y%m%d_%H%M%S")
filename = f"qa_session_{timestamp}.json"
session_data = {
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S"),
"score": self.score,
"questions": self.session_data
}
try:
with open(filename, 'w') as f:
json.dump(session_data, f, indent=2)
print(f"\nSession data saved to {filename}")
except Exception as e:
print(f"Error saving session data: {str(e)}")
async def run(self):
"""Run the main command loop."""
self.display_welcome()
while True:
command = input("\n> ").strip()
if not command:
continue
# Process commands.
if command.lower() == 'q':
break
elif command.lower() == 'h':
self.display_help()
elif command.lower() == 'n':
question = self.get_new_question()
self.display_question(question)
elif command.lower().startswith('s '):
query = command[2:].strip()
self.perform_search(query)
elif command.lower().startswith('a '):
answer = command[2:].strip()
await self.process_answer(answer)
else:
print("Unknown command. Type 'h' for help.")
# Save session data on exit.
self.save_session()
print("\nThank you for using the Q&A environment!")
async def main():
"""Main function to start the application."""
env = SimpleQAEnvironment()
await env.run()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\nProgram terminated by user.")
except Exception as e:
print(f"\nError: {str(e)}")