Skip to content

Instantly share code, notes, and snippets.

@adilcpm
Created May 28, 2025 17:56
Show Gist options
  • Save adilcpm/92bee38a61e940c4583864f9502a898c to your computer and use it in GitHub Desktop.
Save adilcpm/92bee38a61e940c4583864f9502a898c to your computer and use it in GitHub Desktop.
Bybit clock offset
use std::{collections::HashMap, ops::Add, time::Duration};
use arber::{
config::get_config,
exec::bybit::structs::{AuthArgs, BybitWSAuthRequest},
},
};
use bybit::ws::response::PrivateResponse;
use bytes::BufMut;
use chrono::Utc;
use futures::{SinkExt, StreamExt};
use hmac::{Hmac, Mac};
use sha2::Sha256;
use tokio::time::interval;
use tokio_tungstenite::{connect_async, tungstenite::Message};
use tracing::{error, info};
use utils::now_ms;
use yellowstone_grpc_proto::{
geyser::SubscribeRequestFilterTransactions,
prelude::{subscribe_update::UpdateOneof, SubscribeRequest, SubscribeRequestPing},
};
type HmacSha256 = Hmac<Sha256>;
const PRIVATE_WS_ENDPOINT: &str = "wss://stream.bybit.com:443/v5/private";
const SIGNATURE_PART1: &str = "GET/realtime";
const AUTH_CHANNEL: &str = "auth";
#[derive(Debug, Clone)]
struct RttSample {
rtt_ms: u64,
server_timestamp: u64,
ping_timestamp: u64,
}
pub async fn bybit_ws_test() {
tracing_subscriber::fmt()
.with_writer(std::io::stdout)
.with_max_level(tracing::Level::INFO)
.init();
let api_key: String = get_config().bybit_api_key_trade.clone();
let secret: String = get_config().bybit_api_secret_trade.clone();
let (ws_stream, _response) = connect_async(PRIVATE_WS_ENDPOINT).await.unwrap();
let (mut write, mut read) = ws_stream.split();
let expire = Utc::now().timestamp_millis().add(10000) as u64;
let mut mac =
HmacSha256::new_from_slice(secret.as_bytes()).expect("HMAC can take key of any size");
let mut buffer: Vec<u8> = Vec::new();
buffer.put(SIGNATURE_PART1.as_bytes());
buffer.put(expire.to_string().as_bytes());
mac.update(&buffer);
let result = mac.finalize();
let code_bytes = result.into_bytes();
let signature_start_login = hex::encode(code_bytes);
let auth_message = BybitWSAuthRequest {
op: AUTH_CHANNEL,
args: &[
AuthArgs::Str(api_key.as_str()),
AuthArgs::Int(expire),
AuthArgs::Str(signature_start_login.as_str()),
],
};
let auth_message_bytes = serde_json::to_vec(&auth_message).unwrap();
write.send(Message::Binary(auth_message_bytes)).await.unwrap();
let mut ping_interval = interval(Duration::from_secs(10));
let mut ping_send_timestamp_ms = now_ms();
// Store multiple RTT samples for better clock sync estimation
let mut rtt_samples: Vec<RttSample> = Vec::new();
const MAX_SAMPLES: usize = 10; // Keep last 10 samples
const MIN_SAMPLES_FOR_CALCULATION: usize = 3; // Need at least 3 samples
loop {
tokio::select! {
// Branch 1: Handle incoming messages
msg = read.next() => {
match msg {
Some(Ok(message)) => match message {
Message::Text(text) => {
match serde_json::from_str(&text) {
Ok(res) => {
match res {
PrivateResponse::Pong(res) => {
let server_ts_ms = res.args[0].parse::<u64>().unwrap();
let local_ts_ms = now_ms();
let rtt = local_ts_ms - ping_send_timestamp_ms;
// Store the new sample
let sample = RttSample {
rtt_ms: rtt,
server_timestamp: server_ts_ms,
ping_timestamp: ping_send_timestamp_ms,
};
rtt_samples.push(sample);
// Keep only the last MAX_SAMPLES
if rtt_samples.len() > MAX_SAMPLES {
rtt_samples.remove(0);
}
info!("Pong timestamp ms: {:?}", server_ts_ms);
info!("Local timestamp ms: {:?}", local_ts_ms);
info!("Current RTT: {:?} ms", rtt);
info!("Raw offset: {:?} ms", server_ts_ms as i64 - local_ts_ms as i64);
// Calculate offset using minimum RTT if we have enough samples
if rtt_samples.len() >= MIN_SAMPLES_FOR_CALCULATION {
// Find the sample with minimum RTT
let min_rtt_sample = rtt_samples.iter()
.min_by_key(|sample| sample.rtt_ms)
.unwrap();
let min_rtt = min_rtt_sample.rtt_ms;
let one_way_min = min_rtt / 2;
// Use the minimum RTT sample for offset calculation
let estimated_local_time_at_server_response =
min_rtt_sample.ping_timestamp + one_way_min;
let offset_min_rtt = min_rtt_sample.server_timestamp as i64 -
estimated_local_time_at_server_response as i64;
// Also calculate current sample offset for comparison
let one_way_current = rtt / 2;
let estimated_local_time_current = ping_send_timestamp_ms + one_way_current;
let offset_current = server_ts_ms as i64 - estimated_local_time_current as i64;
info!("=== RTT ANALYSIS ===");
info!("Samples collected: {}", rtt_samples.len());
info!("Current RTT: {} ms", rtt);
info!("Minimum RTT: {} ms", min_rtt);
info!("RTT improvement: {} ms", rtt as i64 - min_rtt as i64);
info!("Current sample offset: {} ms", offset_current);
info!("Min RTT offset: {} ms", offset_min_rtt);
info!("Offset difference: {} ms", offset_current - offset_min_rtt);
// Show RTT distribution
let mut rtts: Vec<u64> = rtt_samples.iter().map(|s| s.rtt_ms).collect();
rtts.sort();
info!("RTT distribution: {:?}", rtts);
} else {
info!("Collecting samples... ({}/{})", rtt_samples.len(), MIN_SAMPLES_FOR_CALCULATION);
}
}
_ => info!("RECEIVED OTHER MESSAGE : {:?}", res),
}
}
Err(e) => {
error!("Error parsing JSON: {}", e);
}
}
}
_ => info!("RECEIVED OTHER MESSAGE : {:?}", message),
},
Some(Err(e)) => {
error!("Error reading from from exec feed WebSocket v2: {}", e);
break;
}
None => {
info!("WebSocket stream ended");
break;
}
}
},
// Branch 2: Send ping at regular intervals
_ = ping_interval.tick() => {
ping_send_timestamp_ms = now_ms();
if let Err(e) = write.send(Message::Text("{\"op\":\"ping\"}".to_string())).await {
error!("Failed to send ping: {:?}", e);
break;
}
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment