Last active
May 6, 2025 02:48
-
-
Save ehartford/8b2a97f10851173a204fb39b59654b26 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# ----------------------------------------------------------- | |
# ultra_math_dataset.py | |
# Token-lean “visible-scratch-pad” dataset generator | |
# Revision 7 – Final minor cleanup (removed unreachable code) | |
# ----------------------------------------------------------- | |
import json, math, random, uuid | |
from decimal import Decimal, InvalidOperation | |
from fractions import Fraction | |
# Requires Python 3.9+ for math.lcm / math.isqrt preferred paths | |
DELIM = "¦" # uncommon single token | |
# ----------------------------------------------------------- | |
# definitive op-code legend | |
# ----------------------------------------------------------- | |
# Arithmetic : D M S A B R C L I PDEC F | |
# Algebra : G DISC ROOT Q1 Q2 | |
# Geometry : E ROOT (reuse) | |
# Final Answer: Z (Contains the final formatted answer string) | |
# ----------------------------------------------------------- | |
# ---------- helpers ---------- | |
def step(op, x="", y="", z="", o=""): | |
parts = [op, str(x), str(y), str(z), str(o)] | |
while parts and parts[-1] == "": # trim empties | |
parts.pop() | |
return DELIM.join(parts) | |
def jid() -> str: | |
return str(uuid.uuid4()) | |
def write_jsonl(fp, obj): | |
fp.write(json.dumps(obj, ensure_ascii=False) + "\n") | |
# ---------- arithmetic ---------- | |
def long_division(): | |
dividend = random.randint(10, 9999) | |
divisor = random.randint(2, 99) | |
steps = [] | |
if dividend < divisor: # trivial remainder-only case | |
final_answer_str = f"0 R{dividend}" | |
steps.append(step("R", dividend)) | |
else: | |
rem, q_str = 0, "" | |
dividend_str = str(dividend) | |
for i, digit in enumerate(dividend_str): | |
cur = rem * 10 + int(digit) | |
if i > 0 and rem > 0: | |
steps.append(step("B", rem, digit, cur)) | |
if cur < divisor: | |
if q_str or i > 0: q_str += "0" | |
rem = cur | |
if i < len(dividend_str) - 1: continue | |
else: | |
if not q_str: q_str = "0" | |
break | |
q_dig = cur // divisor | |
prod = q_dig * divisor | |
new_rem = cur - prod | |
steps += [ | |
step("D", cur, divisor, q_dig), | |
step("M", q_dig, divisor, prod), | |
step("S", cur, prod, new_rem) | |
] | |
q_str += str(q_dig) | |
rem = new_rem | |
if rem > 0: | |
last_op_rem = None | |
if steps and steps[-1].startswith("S" + DELIM): | |
try: last_op_rem = int(steps[-1].split(DELIM)[-1]) | |
except ValueError: pass | |
if last_op_rem != rem: | |
steps.append(step("R", rem)) | |
final_answer_str = f"{int(q_str)}" + (f" R{rem}" if rem > 0 else "") | |
steps.append(step("Z", final_answer_str)) | |
return dict( | |
problem_id = jid(), | |
operation = "long_division", | |
problem = f"{dividend} ÷ {divisor}", | |
steps = steps, | |
final_answer = final_answer_str | |
) | |
def decimal_mult(): | |
a = round(random.uniform(0.1, 99.9), random.randint(1, 2)) | |
b = round(random.uniform(0.1, 99.9), random.randint(1, 2)) | |
a_str, b_str = str(a), str(b) | |
a_dp = len(a_str.split(".")[1]) if '.' in a_str else 0 | |
b_dp = len(b_str.split(".")[1]) if '.' in b_str else 0 | |
dp = a_dp + b_dp | |
a_i = int(a_str.replace(".", "")) | |
b_i = int(b_str.replace(".", "")) | |
prod = a_i * b_i | |
res_decimal = Decimal(a_str) * Decimal(b_str) | |
try: | |
is_integer = res_decimal == res_decimal.to_integral_value() | |
except InvalidOperation: | |
is_integer = False | |
if is_integer: | |
res = str(res_decimal.to_integral_value()) | |
else: | |
res = ('{:.%df}' % dp).format(res_decimal).rstrip('0').rstrip('.') | |
if res == ".": res = "0" | |
steps = [ | |
step("M", a_i, b_i, prod), | |
step("C", dp, "dp"), | |
step("PDEC", prod, dp, res), | |
] | |
steps.append(step("Z", res)) | |
return dict( | |
problem_id = jid(), | |
operation = "decimal_mul", | |
problem = f"{a} × {b}", | |
steps = steps, | |
final_answer = res | |
) | |
# ---- fractions (± × ÷) ---- | |
def _fraction_op(symbol): | |
n1, d1 = random.randint(1, 9), random.randint(2, 9) | |
n2, d2 = random.randint(1, 9), random.randint(2, 9) | |
# Removed unreachable guard: if symbol == '/' and n2 == 0: n2 = 1 | |
f1, f2 = Fraction(n1, d1), Fraction(n2, d2) | |
steps = [] | |
res = None | |
if symbol in "+-": | |
try: lcd = math.lcm(d1, d2) | |
except AttributeError: lcd = (d1 * d2) // math.gcd(d1, d2) | |
if d1 != d2: steps.append(step("L", d1, d2, lcd)) | |
n1c, n2c = n1 * (lcd // d1), n2 * (lcd // d2) | |
if d1 != lcd: steps.append(step("C", str(f1), lcd, f"{n1c}/{lcd}")) | |
if d2 != lcd: steps.append(step("C", str(f2), lcd, f"{n2c}/{lcd}")) | |
out_num = n1c + n2c if symbol == "+" else n1c - n2c | |
steps.append(step("A" if symbol=="+" else "S", n1c, n2c, out_num)) | |
res = Fraction(out_num, lcd) | |
out_den = lcd # Define out_den for consistency? Not really needed here | |
elif symbol == "*": | |
out_num, out_den = n1 * n2, d1 * d2 | |
steps += [ step("M", n1, n2, out_num), step("M", d1, d2, out_den) ] | |
res = Fraction(out_num, out_den) | |
lcd = out_den # Not really lcd, but denominator before simplification | |
else: # division '/' | |
inv = Fraction(d2, n2) | |
steps.append(step("I", str(f2), str(inv))) | |
out_num, out_den = n1 * d2, d1 * n2 | |
steps += [ step("M", n1, d2, out_num), step("M", d1, n2, out_den) ] | |
res = Fraction(out_num, out_den) | |
lcd = out_den # Not really lcd, but denominator before simplification | |
final_answer_str = str(res) | |
pre_simp_str = ( | |
f"{out_num}/{lcd}" if symbol in "+-" else f"{out_num}/{out_den}" | |
) | |
if final_answer_str != pre_simp_str and '/' in pre_simp_str: | |
steps.append(step("F", pre_simp_str, final_answer_str)) | |
steps.append(step("Z", final_answer_str)) | |
return dict( | |
problem_id = jid(), | |
operation = f"fraction_{ {'+':'add','-':'sub','*':'mul','/':'div'}[symbol] }", | |
problem = f"{f1} {symbol} {f2}", | |
steps = steps, | |
final_answer = final_answer_str | |
) | |
# shortcuts | |
frac_add = lambda: _fraction_op("+") | |
frac_sub = lambda: _fraction_op("-") | |
frac_mul = lambda: _fraction_op("*") | |
frac_div = lambda: _fraction_op("/") | |
# ---------- algebra ---------- | |
def linear_simple(): | |
m = random.choice([i for i in range(-9,10) if i != 0]) | |
x = random.randint(-10,10) | |
b = random.randint(-10,10) | |
y = m*x + b | |
rhs1 = y - b | |
sol = Fraction(rhs1, m) | |
final_answer_str = f"x={sol}" | |
lhs = f"{m:+}x".replace("+1x", "+x").replace("-1x", "-x").lstrip('+') | |
if b != 0: lhs += f"{b:+d}" | |
problem = f"Solve {lhs} = {y}" | |
steps = [ | |
step("S", y, b, rhs1), | |
step("D", rhs1, m, final_answer_str) | |
] | |
steps.append(step("Z", final_answer_str)) | |
return dict( | |
problem_id = jid(), | |
operation = "linear_eq_simple", | |
problem = problem, | |
steps = steps, | |
final_answer = final_answer_str | |
) | |
def quadratic(): | |
r1, r2 = random.sample(range(-6,7), 2) | |
a = random.randint(1,3) | |
b, c = -a*(r1+r2), a*r1*r2 | |
try: | |
disc = b*b - 4*a*c | |
if disc < 0: return quadratic() | |
sqrt_disc = math.isqrt(disc) | |
if sqrt_disc * sqrt_disc != disc: | |
# print(f"WARN: Non-perfect square discriminant {disc}...") # Reduce noise | |
return quadratic() | |
except AttributeError: # Fallback for Python < 3.8 | |
disc = b*b - 4*a*c | |
if disc < 0: return quadratic() | |
sqrt_disc_f = math.sqrt(disc) | |
if sqrt_disc_f != int(sqrt_disc_f): | |
# print(f"WARN: Non-perfect square discriminant {disc}...") # Reduce noise | |
return quadratic() | |
sqrt_disc = int(sqrt_disc_f) | |
denom= 2*a | |
root1, root2 = max(r1,r2), min(r1,r2) | |
final_answer_str = f"x={root1}, x={root2}" | |
# Formatting logic unchanged - produces standard notation | |
expr_terms = [] | |
if a == 1: expr_terms.append("x²") | |
elif a == -1: expr_terms.append("-x²") | |
else: expr_terms.append(f"{a}x²") | |
if b != 0: | |
sign = "+" if b > 0 else "-" | |
b_val_str = "" if abs(b) == 1 else str(abs(b)) | |
expr_terms.append(f"{sign}{b_val_str}x") | |
if c != 0: | |
sign = "+" if c > 0 else "" | |
expr_terms.append(f"{sign}{c}") | |
expr = "".join(expr_terms).lstrip('+') | |
problem = f"Solve {expr} = 0" | |
steps = [ | |
step("DISC", b*b, 4*a*c, disc), | |
step("ROOT", disc, sqrt_disc), | |
step("Q1", -b, sqrt_disc, denom, root1), | |
step("Q2", -b, sqrt_disc, denom, root2), | |
] | |
steps.append(step("Z", final_answer_str)) | |
return dict( | |
problem_id = jid(), | |
operation = "quadratic_eq", | |
problem = problem, | |
steps = steps, | |
final_answer = final_answer_str | |
) | |
# ---------- geometry / trig ---------- | |
def pythag_hyp(): | |
triples = [(3,4,5),(5,12,13),(7,24,25),(8,15,17),(9,40,41)] | |
a,b,c_ans = random.choice(triples) | |
k = random.randint(1,5) | |
a, b, c_ans = a*k, b*k, c_ans*k | |
a_sq, b_sq = a*a, b*b | |
sum_sq = a_sq + b_sq | |
final_answer_str = str(c_ans) | |
steps = [ | |
step("E", a, 2, a_sq), | |
step("E", b, 2, b_sq), | |
step("A", a_sq, b_sq, sum_sq), | |
step("ROOT", sum_sq, c_ans) | |
] | |
steps.append(step("Z", final_answer_str)) | |
return dict( | |
problem_id = jid(), | |
operation = "pythag_hyp", | |
problem = f"Find hypotenuse: legs {a} and {b}", | |
steps = steps, | |
final_answer = final_answer_str | |
) | |
# ---------- dataset driver ---------- | |
GENERATORS = [ | |
long_division, decimal_mult, | |
frac_add, frac_sub, frac_mul, frac_div, | |
linear_simple, quadratic, | |
pythag_hyp | |
] | |
def build_dataset(n=10_000, path="math_visible_dataset_v7.jsonl"): | |
random.seed(42) | |
count = 0 | |
attempts = 0 | |
max_attempts = int(n * 1.1) + 20 | |
print(f"Attempting to generate {n} examples...") | |
with open(path, "w", encoding="utf-8") as fp: | |
while count < n and attempts < max_attempts: | |
attempts += 1 | |
try: | |
gen = random.choice(GENERATORS) | |
example = gen() | |
if example: | |
write_jsonl(fp, example) | |
count += 1 | |
if count % 1000 == 0 and count > 0: | |
print(f"... successfully generated {count}/{n} examples") | |
except Exception as e: | |
print(f"ERROR: Generator {gen.__name__} failed: {e}. Skipping attempt {attempts}.") | |
print(f"✔ Successfully wrote {count} lines → {path} (after {attempts} attempts)") | |
if count < n: | |
print(f"WARN: Target of {n} examples not reached ({count}/{n}).") | |
# ---------- quick self-test ---------- | |
if __name__ == "__main__": | |
test_file = "math_visible_dataset_v7_test.jsonl" | |
test_count = 50 | |
build_dataset(test_count, path=test_file) | |
try: | |
written_count = 0 | |
with open(test_file, "r", encoding="utf-8") as f: | |
first_line = f.readline() | |
if first_line: | |
written_count += 1 | |
example = json.loads(first_line) | |
print("\nFirst example generated:") | |
print(json.dumps(example, indent=2)) | |
assert len(example["steps"]) > 0, "Steps list is empty" | |
final_step_str = example["steps"][-1] | |
assert DELIM in final_step_str, f"Delimiter '{DELIM}' not found in final step: {final_step_str}" | |
assert final_step_str.startswith("Z" + DELIM), f"Final step does not start with Z{DELIM}: {final_step_str}" | |
assert final_step_str.split(DELIM)[1] == example["final_answer"], "Final step Z value mismatch with final_answer field" | |
print("\nBasic checks passed for the first example.") | |
for line in f: written_count += 1 | |
else: | |
print("\nTest file appears empty!") | |
print(f"\nSelf-test generated {written_count} examples (requested {test_count}).") | |
assert written_count > 0 | |
except FileNotFoundError: | |
print(f"\nERROR: Test file '{test_file}' not found during self-check.") | |
except Exception as e: | |
print(f"\nError during self-check: {e}") | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment