Created
September 13, 2022 21:00
-
-
Save yenchenlin/b1bf5f270c7c9e9cf58994629ee5979c to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# coding: utf-8 | |
# In[1]: | |
import os | |
import json | |
import numpy as np | |
import cv2 | |
import argparse | |
from tqdm import tqdm | |
parser = argparse.ArgumentParser(description='Preprocess poses into instant-ngp format.') | |
parser.add_argument("--scene", help="path to the scene dataset", required=True) | |
parser.add_argument("--skip_training_frames", help="select 1 frame every N frames for training, the skipped frames become the test frames", type=int, default=4) | |
parser.add_argument("--skip_test_frames", help="select 1 frame every N test frames", type=int, default=16) | |
parser.add_argument("--method", help="The method used to decide block assignment.", type=str, default="linear", choices=['linear', 'kmeans']) | |
parser.add_argument("--n_training_frames", help="number of frames for training", type=int, default=None) | |
parser.add_argument("--n_blocks", help="number of blocks", type=int, default=8) | |
args = parser.parse_args() | |
# ## Automatic rescale & offset | |
# | |
# One thing that makes instant-ngp hard to use is determining the `scale` and `offset` manually. | |
# We use this script to automatically scale and translate an existing dataset. | |
# In[2]: | |
def closest_point_2_lines(oa, da, ob, db): | |
""" Returns point closest to both rays of form o+t*d, and a weight factor | |
that goes to 0 if the lines are parallel. | |
""" | |
da = da / np.linalg.norm(da) | |
db = db / np.linalg.norm(db) | |
c = np.cross(da, db) | |
denom = np.linalg.norm(c)**2 | |
t = ob - oa | |
ta = np.linalg.det([t, db, c]) / (denom + 1e-10) | |
tb = np.linalg.det([t, da, c]) / (denom + 1e-10) | |
if ta > 0: | |
ta = 0 | |
if tb > 0: | |
tb = 0 | |
return (oa+ta*da+ob+tb*db) * 0.5, denom | |
# In[3]: | |
SRC_PATH = os.path.join(args.scene, f'./raw/transforms.json') | |
TRAINING_PATH = os.path.join(args.scene, f'./training_transforms.json') | |
TEST_PATH = os.path.join(args.scene, f'./test/test_transforms.json') | |
os.makedirs(os.path.join(args.scene, './test'), exist_ok=True) | |
with open(SRC_PATH) as f: | |
out = json.load(f) | |
training_frames = [] | |
test_frames = [] | |
for i, f in enumerate(out["frames"]): | |
if i % args.skip_training_frames == 0: | |
training_frames.append( | |
{ | |
'file_path': f["file_path"], | |
'transform_matrix': np.array(f["transform_matrix"]), | |
} | |
) | |
else: | |
test_frames.append( | |
{ | |
'file_path': os.path.join('../images', f["file_path"].split('/')[-1]), | |
'transform_matrix': np.array(f["transform_matrix"]), | |
} | |
) | |
if args.n_training_frames is not None and len(training_frames) == args.n_training_frames: | |
break | |
# Block assignment | |
if args.method == 'linear': | |
n_frames_per_block = len(training_frames) // args.n_blocks + 1 | |
# Assign block_id to each frame. | |
for idx, frame in enumerate(training_frames): | |
frame['block_id'] = idx // n_frames_per_block | |
for idx, frame in enumerate(test_frames): | |
frame['block_id'] = (idx // (args.skip_training_frames-1)) // n_frames_per_block | |
elif args.method == 'kmeans': | |
from sklearn.cluster import KMeans | |
# Get translations. | |
ts = [] | |
for frame in training_frames: | |
ts.append(frame['transform_matrix'][:3, -1]) | |
# Fit KMeans. | |
kmeans = KMeans(n_clusters=args.n_blocks, random_state=0).fit(ts) | |
# Assign block_id predicted by kmeans to each frame. | |
for idx, frame in enumerate(training_frames): | |
frame['block_id'] = int(kmeans.labels_[idx]) | |
for idx, frame in enumerate(test_frames): | |
pos = frame['transform_matrix'][:3, -1] | |
frame['block_id'] = int(kmeans.predict([pos])[0]) | |
else: | |
raise NotImplementedError(f"Unsupport method {args.method}.") | |
# In[4]: | |
# Find a central point they are all looking at. | |
# If we have more than 1000 frames, | |
# subsample 1/4 of them to do the task otherwise it's too slow. | |
print("computing center of attention...") | |
n_frames = len(training_frames) | |
if n_frames > 1000: | |
print(f"too many frames ({n_frames}), subsampling {n_frames // 4} frames to compute the center of attention ...") | |
subsampled_frames = training_frames[::4] | |
totw = 0.0 | |
totp = np.array([0.0, 0.0, 0.0]) | |
for f in tqdm(subsampled_frames): | |
mf = f["transform_matrix"][0:3,:] | |
for g in subsampled_frames: | |
mg = g["transform_matrix"][0:3,:] | |
p, w = closest_point_2_lines(mf[:,3], mf[:,2], mg[:,3], mg[:,2]) | |
if w > 0.01: | |
totp += p*w | |
totw += w | |
totp /= totw | |
print(totp) # the cameras are looking at totp | |
# Normalize both training and test frames and store them. | |
for split in ['training', 'test']: | |
if split == 'training': | |
out["frames"] = training_frames | |
output_path = TRAINING_PATH | |
elif split == 'test': | |
out['frames'] = test_frames[::args.skip_test_frames] | |
# Sort frames by their block_id | |
out['frames'] = sorted(out['frames'], key= lambda x: x['block_id']) | |
output_path = TEST_PATH | |
out["n_blocks"] = args.n_blocks | |
for f in out["frames"]: | |
f["transform_matrix"][0:3,3] -= totp | |
avglen = 0. | |
for f in out["frames"]: | |
avglen += np.linalg.norm(f["transform_matrix"][0:3,3]) | |
nframes = len(out["frames"]) | |
avglen /= nframes | |
print("avg camera distance from origin", avglen) | |
for f in out["frames"]: | |
f["transform_matrix"][0:3,3] *= 4.0 / avglen # scale to "nerf sized" | |
for f in out["frames"]: | |
f["transform_matrix"] = f["transform_matrix"].tolist() | |
print(nframes,"frames") | |
print(f"writing {split} data to {output_path}.") | |
with open(output_path, "w") as outfile: | |
json.dump(out, outfile, indent=2) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment