Skip to content

Instantly share code, notes, and snippets.

@Lyken17
Last active May 26, 2025 14:35
Show Gist options
  • Save Lyken17/0b8d12b0115bc8e50ee5f400d3910d01 to your computer and use it in GitHub Desktop.
Save Lyken17/0b8d12b0115bc8e50ee5f400d3910d01 to your computer and use it in GitHub Desktop.
sglang_oai_requests.py
import pandas as pd
import time
from sglang import gen, system
import os, sys, os.path as osp
import asyncio
import openai
from tqdm import tqdm
import json
import sys
from sglang.utils import wait_for_server, print_highlight, terminate_process
import concurrent.futures
# Download the parquet file from Hugging Face
from huggingface_hub import hf_hub_download
port = 30000
client = openai.Client(base_url=f"http://127.0.0.1:{port}/v1", api_key="None")
revision = "main"
repo_id = "agentica-org/DeepScaleR-Preview-Dataset"
file_path = "default/train/0000.parquet"
revision = "refs/convert/parquet"
# Download the file
print(f"Downloading {file_path} from {repo_id}...")
fpath = hf_hub_download(repo_id=repo_id, filename=file_path, repo_type="dataset", revision=revision)
print(f"Downloaded file to {fpath}")
prefix = osp.basename(fpath).split(".")[0]
if len(sys.argv) > 1:
model = sys.argv[1]
else:
# model = "deepseek-ai/DeepSeek-R1-Distill-Qwen-32B"
model = "default"
suffix = f"{model.split('/')[-1]}"
output_dir = repo_id.replace("/", "--")
output_parquet = f"dpsk-labeling/{output_dir}/{prefix}_with_{suffix}-generated.parquet"
df = pd.read_parquet(fpath)
if os.path.exists(f"{output_parquet}"):
print(f"Loading existing generated responses from {output_parquet}")
df = pd.read_parquet(f"{output_parquet}")
print(f"Saving df to {output_parquet}")
# Add the generated response to the dataframe in a new column
if f"{suffix}" not in df.columns:
df[f"{suffix}"] = None
if f"raw_prompt" not in df.columns:
df[f"raw_prompt"] = None
# Check if the client is healthy and alive
try:
health_check = client.models.list()
print("\nClient Health Check:")
print("-------------------")
print(f"Connection to port {port} successful")
print(f"Available models: {[model.id for model in health_check.data]}")
print("Client is healthy and ready to process requests")
except Exception as e:
print("\nClient Health Check Failed:")
print("-------------------------")
print(f"Error connecting to server at port {port}: {str(e)}")
print("Please ensure the server is running and accessible")
sys.exit(0)
server_process, port = launch_server_cmd(
f"python -m sglang.launch_server --model {model} --trust-remote-code --tp-size 8"
)
wait_for_server(f"http://localhost:{port}")
client = openai.Client(base_url=f"http://127.0.0.1:{port}/v1", api_key="None")
print(f"Server started on http://localhost:{port}")
progress = tqdm(total=len(df), desc="Data Generation Progress") # Initialize progress bar
def generate_response(index, question):
print(f"[filled {model}] Question {index} / {len(df)}: ")
response = client.chat.completions.create(
model=model,
messages=[
{"role": "user", "content": question},
],
temperature=0.6,
top_p=0.95,
stream=False, # Non-streaming
extra_body={"separate_reasoning": True},
)
reasoning_content = [choice.message.reasoning_content for choice in response.choices]
message_content = [choice.message.content for choice in response.choices]
total_content = list(zip(message_content, reasoning_content))
df.at[index, f"{suffix}"] = total_content
df.at[index, f"raw_prompt"] = question
# await asyncio.sleep(10)
print("=" * 50)
print(f"[generate_response] Question {index} / {len(df)}: {question}")
print(f"[generate_response] Response: done")
return index, question, total_content
def save_df(df, output_path):
output_path_temp = output_path + ".temp"
os.makedirs(osp.dirname(output_path), exist_ok=True)
df.to_parquet(output_path_temp)
os.rename(output_path_temp, output_path)
print(f"Saved df to {output_path}")
def main():
global df
print("\nProcessing question with the model:")
print("----------------------------------")
with concurrent.futures.ThreadPoolExecutor(max_workers=64) as executor:
tasks = []
total = len(df)
labeled_count = 0
for index, row in df.iterrows():
# Get the question text and clean it
if row[f"{suffix}"] is not None:
labeled_count += 1
print(f"[skip] Question {index} / {len(df)}: already generated")
progress.update(1)
continue
row["raw_prompt"] = row["problem"].strip()
tasks.append(executor.submit(generate_response, index, row["raw_prompt"]))
df = df.sort_values(by="raw_prompt")
print("sorting done")
count = 0
for future in concurrent.futures.as_completed(tasks):
index, question, message_content = future.result()
count += 1
print(f"[post process] Question {index} |{count + labeled_count} / {total}| : {question}")
# print(f"Response: {message_content}")
if count % 10 == 0:
# pandas is thread safe
print(f"Saving df to {output_parquet}")
save_df(df, f"{output_parquet}")
progress.update(1)
save_df(df, f"{output_parquet}")
return 0
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment