Last active
May 26, 2025 14:35
-
-
Save Lyken17/0b8d12b0115bc8e50ee5f400d3910d01 to your computer and use it in GitHub Desktop.
sglang_oai_requests.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pandas as pd | |
import time | |
from sglang import gen, system | |
import os, sys, os.path as osp | |
import asyncio | |
import openai | |
from tqdm import tqdm | |
import json | |
import sys | |
from sglang.utils import wait_for_server, print_highlight, terminate_process | |
import concurrent.futures | |
# Download the parquet file from Hugging Face | |
from huggingface_hub import hf_hub_download | |
port = 30000 | |
client = openai.Client(base_url=f"http://127.0.0.1:{port}/v1", api_key="None") | |
revision = "main" | |
repo_id = "agentica-org/DeepScaleR-Preview-Dataset" | |
file_path = "default/train/0000.parquet" | |
revision = "refs/convert/parquet" | |
# Download the file | |
print(f"Downloading {file_path} from {repo_id}...") | |
fpath = hf_hub_download(repo_id=repo_id, filename=file_path, repo_type="dataset", revision=revision) | |
print(f"Downloaded file to {fpath}") | |
prefix = osp.basename(fpath).split(".")[0] | |
if len(sys.argv) > 1: | |
model = sys.argv[1] | |
else: | |
# model = "deepseek-ai/DeepSeek-R1-Distill-Qwen-32B" | |
model = "default" | |
suffix = f"{model.split('/')[-1]}" | |
output_dir = repo_id.replace("/", "--") | |
output_parquet = f"dpsk-labeling/{output_dir}/{prefix}_with_{suffix}-generated.parquet" | |
df = pd.read_parquet(fpath) | |
if os.path.exists(f"{output_parquet}"): | |
print(f"Loading existing generated responses from {output_parquet}") | |
df = pd.read_parquet(f"{output_parquet}") | |
print(f"Saving df to {output_parquet}") | |
# Add the generated response to the dataframe in a new column | |
if f"{suffix}" not in df.columns: | |
df[f"{suffix}"] = None | |
if f"raw_prompt" not in df.columns: | |
df[f"raw_prompt"] = None | |
# Check if the client is healthy and alive | |
try: | |
health_check = client.models.list() | |
print("\nClient Health Check:") | |
print("-------------------") | |
print(f"Connection to port {port} successful") | |
print(f"Available models: {[model.id for model in health_check.data]}") | |
print("Client is healthy and ready to process requests") | |
except Exception as e: | |
print("\nClient Health Check Failed:") | |
print("-------------------------") | |
print(f"Error connecting to server at port {port}: {str(e)}") | |
print("Please ensure the server is running and accessible") | |
sys.exit(0) | |
server_process, port = launch_server_cmd( | |
f"python -m sglang.launch_server --model {model} --trust-remote-code --tp-size 8" | |
) | |
wait_for_server(f"http://localhost:{port}") | |
client = openai.Client(base_url=f"http://127.0.0.1:{port}/v1", api_key="None") | |
print(f"Server started on http://localhost:{port}") | |
progress = tqdm(total=len(df), desc="Data Generation Progress") # Initialize progress bar | |
def generate_response(index, question): | |
print(f"[filled {model}] Question {index} / {len(df)}: ") | |
response = client.chat.completions.create( | |
model=model, | |
messages=[ | |
{"role": "user", "content": question}, | |
], | |
temperature=0.6, | |
top_p=0.95, | |
stream=False, # Non-streaming | |
extra_body={"separate_reasoning": True}, | |
) | |
reasoning_content = [choice.message.reasoning_content for choice in response.choices] | |
message_content = [choice.message.content for choice in response.choices] | |
total_content = list(zip(message_content, reasoning_content)) | |
df.at[index, f"{suffix}"] = total_content | |
df.at[index, f"raw_prompt"] = question | |
# await asyncio.sleep(10) | |
print("=" * 50) | |
print(f"[generate_response] Question {index} / {len(df)}: {question}") | |
print(f"[generate_response] Response: done") | |
return index, question, total_content | |
def save_df(df, output_path): | |
output_path_temp = output_path + ".temp" | |
os.makedirs(osp.dirname(output_path), exist_ok=True) | |
df.to_parquet(output_path_temp) | |
os.rename(output_path_temp, output_path) | |
print(f"Saved df to {output_path}") | |
def main(): | |
global df | |
print("\nProcessing question with the model:") | |
print("----------------------------------") | |
with concurrent.futures.ThreadPoolExecutor(max_workers=64) as executor: | |
tasks = [] | |
total = len(df) | |
labeled_count = 0 | |
for index, row in df.iterrows(): | |
# Get the question text and clean it | |
if row[f"{suffix}"] is not None: | |
labeled_count += 1 | |
print(f"[skip] Question {index} / {len(df)}: already generated") | |
progress.update(1) | |
continue | |
row["raw_prompt"] = row["problem"].strip() | |
tasks.append(executor.submit(generate_response, index, row["raw_prompt"])) | |
df = df.sort_values(by="raw_prompt") | |
print("sorting done") | |
count = 0 | |
for future in concurrent.futures.as_completed(tasks): | |
index, question, message_content = future.result() | |
count += 1 | |
print(f"[post process] Question {index} |{count + labeled_count} / {total}| : {question}") | |
# print(f"Response: {message_content}") | |
if count % 10 == 0: | |
# pandas is thread safe | |
print(f"Saving df to {output_parquet}") | |
save_df(df, f"{output_parquet}") | |
progress.update(1) | |
save_df(df, f"{output_parquet}") | |
return 0 | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment