Created
April 8, 2022 18:07
-
-
Save piyueh/403e40995c0b000e0dd0272a847c789e to your computer and use it in GitHub Desktop.
A simple OpenFOAM data reader with double-precision floats
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#! /usr/bin/env python3 | |
# -*- coding: utf-8 -*- | |
# vim:fenc=utf-8 | |
# | |
# Copyright © 2022 Pi-Yueh Chuang <[email protected]> | |
# | |
# Distributed under terms of the BSD 3-Clause license. | |
"""Test. | |
""" | |
import re | |
import struct | |
import pathlib | |
import numpy | |
import pyvista | |
import vtk | |
def parse_lists(content, datatype, factor=1): | |
"""Find and parse lists in a binary string. | |
""" | |
dtlen = struct.calcsize(datatype) | |
# reading all numbers of arrays | |
results = re.finditer(b"^(\d+)\n\(", content, re.DOTALL | re.MULTILINE) | |
assert results is not None, "Failed to read arrays." | |
# `meta` contains tuples. Each tuple represents the following info of a list: | |
# 1st element: number of elements in a list | |
# 2nd element: beginning index of the list; offset by 2 due to leading "\n(" | |
meta = list(map(lambda inp: (int(inp.group(1)), inp.end(1)+2), results)) | |
# read all lists | |
lists = map( | |
lambda inp: struct.unpack( | |
f"{inp[0]*factor}{datatype}", | |
content[inp[1]:inp[1]+inp[0]*factor*dtlen] | |
), | |
meta | |
) | |
return list(lists) | |
def parse_lists_in_file(filename, datatype, factor=1): | |
"""Find and parse lists in a binary file. | |
""" | |
filename = pathlib.Path(filename).expanduser().resolve() | |
with open(filename, "rb") as fobj: | |
content = fobj.read() | |
return parse_lists(content, datatype, factor) | |
def read_points(casedir): | |
"""Read points' coordinates. | |
""" | |
casedir = pathlib.Path(casedir).expanduser().resolve() | |
data = parse_lists_in_file(casedir.joinpath("constant", "polyMesh", "points"), "d", 3) | |
assert len(data) == 1, f"Number of lists in file `points` shoule be 1. Found {len(data)}." | |
data = numpy.array(data[0]).reshape(-1, 3) | |
return data | |
def read_faces(casedir): | |
"""Read faces' definitions. | |
""" | |
casedir = pathlib.Path(casedir).expanduser().resolve() | |
data = parse_lists_in_file(casedir.joinpath("constant", "polyMesh", "faces"), "i") | |
assert len(data) == 2, f"Number of lists in file `faces` shoule be 2. Found {len(data)}." | |
data = [data[1][bg:ed] for bg, ed in zip(data[0][:-1], data[0][1:])] # reshape to 2D | |
return data | |
def read_owner(casedir): | |
"""Read faces' owners. | |
""" | |
casedir = pathlib.Path(casedir).expanduser().resolve() | |
data = parse_lists_in_file(casedir.joinpath("constant", "polyMesh", "owner"), "i") | |
assert len(data) == 1, f"Number of lists in file `owner` shoule be 1. Found {len(data)}." | |
return data[0] | |
def read_neighbour(casedir): | |
"""Read faces' neighbours. | |
Note the spelling of "neighbour". | |
""" | |
casedir = pathlib.Path(casedir).expanduser().resolve() | |
data = parse_lists_in_file(casedir.joinpath("constant", "polyMesh", "neighbour"), "i") | |
assert len(data) == 1, f"Number of lists in file `neighbour` shoule be 1. Found {len(data)}." | |
return data[0] | |
def read_boundary(casedir): | |
"""Read faces' neighbours. | |
""" | |
casedir = pathlib.Path(casedir).expanduser().resolve() | |
with open(casedir.joinpath("constant", "polyMesh", "boundary"), "rb") as fobj: | |
content = fobj.read() | |
# reading the number of boundaries | |
result = re.search(b"^(\d+)\n\(", content, re.DOTALL | re.MULTILINE) # should be only 1 match | |
assert result is not None, "Failed to read the number of boundaries." | |
nbc = int(result.group(1)) | |
content = content[result.end(1)+2:] | |
# read each boundary | |
boundaries = {} | |
results = re.finditer(b"(\w+)\s+\{(.*?)}\s", content, re.DOTALL | re.MULTILINE) | |
boundaries = {result.group(1).decode(): result.group(2).decode() for result in results} | |
for name, boundary in boundaries.items(): # note that boundary is pure str; not binary str! | |
results = re.findall("(\w+)\s+(.*?);$", boundary, re.DOTALL | re.MULTILINE) | |
boundaries[name] = {key: val for key, val in results} | |
# convert integers | |
boundaries[name]["nFaces"] = int(boundaries[name]["nFaces"]) | |
boundaries[name]["startFace"] = int(boundaries[name]["startFace"]) | |
return boundaries | |
def read_cellzones(casedir): | |
"""Read cellzones' definitions. | |
""" | |
casedir = pathlib.Path(casedir).expanduser().resolve() | |
with open(casedir.joinpath("constant", "polyMesh", "cellZones"), "rb") as fobj: | |
content = fobj.read() | |
# get the number of zones (re.search guarantees the result is the first match) | |
result = re.search(b"^(\d+)\s+\(", content, re.DOTALL | re.MULTILINE) | |
assert result is not None, "Failed to get the number of zones." | |
# number of zones and the content after the number of zones | |
nzones = int(result.group(1)) | |
content = content[result.end(1)+2:] # offset 2 characters due to "\n(" | |
# get the names of each zone | |
names = re.findall(b"^(\w+)\s+\{", content, re.DOTALL | re.MULTILINE) | |
# get the lists from the content | |
arrys = parse_lists(content, "i") | |
result = {name.decode(): arry for name, arry in zip(names, arrys)} | |
return result | |
def read_times(casedir): | |
"""Read all time values of this simulation case. | |
""" | |
casedir = pathlib.Path(casedir).expanduser().resolve() | |
times = [] | |
for pth in casedir.iterdir(): | |
result = re.fullmatch(r"^(\d+(?:$|\.\d+$))", pth.name) | |
if result is None: continue | |
if result.group(1) == "0": | |
times.append("0") | |
continue | |
tfile = list(pth.glob("**/time")) | |
assert len(tfile) == 1 | |
tfile = tfile[0] | |
with open(tfile, "r") as fobj: # time file seems to always be ASCII format | |
content = fobj.read() | |
result = re.search(r"\sname\s+\"(\d+(?:\.\d+)?)\";", content) | |
assert str(pth.relative_to(casedir)) == result.group(1) | |
times.append(str(pth.relative_to(casedir))) | |
return times | |
def read_snapshot(casedir, time, field): | |
"""Read in simulation at a specific simulation time. | |
""" | |
# in case we forget to use a str sometimes | |
assert isinstance(time, str), "Please make the time value a str." | |
casedir = pathlib.Path(casedir).expanduser().resolve() | |
solnfile = casedir.joinpath(time, field) | |
assert solnfile.exists() and solnfile.is_file(), f"{solnfile} did not exist or is not a file." | |
with open(solnfile, "rb") as fobj: | |
content = fobj.read() | |
# regex patterns (use plain str first) | |
floatptn = r"(?:\d+(?:\.\d+)?)" | |
vectorptn = rf"\({floatptn}\s+{floatptn}\s+{floatptn}\)" | |
listptn = r"List<(\S+)>\s+.*?\)\s?" | |
ptn = rf"\sinternalField\s+(\S+)\s+({floatptn}|{vectorptn}|{listptn});\s" | |
# get the type of value for internalField | |
result = re.search(ptn.encode(), content, re.DOTALL) | |
assert result is not None, f"Error parsing solution file {solnfile}" | |
# type | |
field_type = result.group(1).decode() # convert binary str to a plain str | |
if "nonuniform" in field_type: | |
if b"scalar" in result.group(2): | |
soln = numpy.array( | |
parse_lists(content[result.start(2):], "d")[0], | |
dtype=float | |
) | |
elif b"vector" in result.group(2): | |
soln = numpy.array( | |
parse_lists(content[result.start(2):], "d", 3)[0], | |
dtype=float | |
).reshape(-1, 3) | |
else: | |
raise NotImplementedError(f"{result.group(2)} has not been implemented.") | |
elif "uniform" in field_type: | |
try: | |
soln = float(result.group(2)) | |
except ValueError as err: | |
if not str(err).startswith("could not convert"): | |
raise | |
soln = numpy.array([float(val) for val in result.group(2).strip(b"() \t").split()]) | |
else: | |
raise RuntimeError("I don't know there're other field types.") | |
return soln | |
def get_cell_faces(ncells, owners, neighbour): | |
"""Calculate the face IDs of cells. | |
""" | |
# get info from the file `owner` | |
face_owners = numpy.array(owners) | |
face_idxs = face_owners.argsort().tolist() | |
face_owners = face_owners[face_idxs].tolist() | |
cell_w_faces, cell_face_counts = numpy.unique(face_owners, return_counts=True) | |
cum_cell_face_counts = numpy.zeros(ncells+1, dtype=int) | |
cum_cell_face_counts[cell_w_faces+1] = cell_face_counts | |
cum_cell_face_counts = numpy.cumsum(cum_cell_face_counts) | |
# merge the data from `neighbour` and `owner` | |
cell_faces = list(map( | |
lambda inp: face_idxs[inp[0]:inp[1]], | |
zip(cum_cell_face_counts[:-1], cum_cell_face_counts[1:]) | |
)) | |
# slow for-loop; need imporvement | |
for face, cell in enumerate(neighbour): | |
cell_faces[cell].append(face) | |
return cell_faces | |
def get_cell_vertices(cell_faces, faces): | |
"""Calculate the vertex IDs of cells (unordered). | |
""" | |
cell_verts = [] | |
# slow; but I can live w/ it for now | |
for vals in cell_faces: | |
verts = set() | |
for val in vals: | |
verts.update(faces[val]) | |
verts = list(verts) | |
verts.sort() | |
cell_verts.append(verts) | |
return cell_verts | |
def get_cell_data_vtk(cell_faces, faces, points): | |
"""Calculate the vertex IDs of cells in preparation for VTK data objects. | |
""" | |
cell_verts = [] | |
cell_types = [] | |
# slow; but I can live w/ it for now | |
for vals in cell_faces: | |
local_faces = [list(faces[val]) for val in vals] | |
verts = get_hexahedron_nodes(local_faces) | |
verts = reorder_nodes(verts, points[verts]) | |
nverts = len(set(verts)) | |
assert nverts == 8, f"Currently only support hexahedrons. Got {nverts} vertices: {verts}" | |
cell_types.append(vtk.VTK_HEXAHEDRON) | |
cell_verts.append(nverts) | |
cell_verts.extend(verts) | |
return numpy.array(cell_types, dtype=int), numpy.array(cell_verts, dtype=int) | |
def get_vtk_unstructuredgrid(casedir, times=None, fields=("U", "p")): | |
"""Get a pyvista.UnstructuredGrid. | |
""" | |
casedir = pathlib.Path(casedir).expanduser().resolve() | |
print("Reading mesh vertices") | |
points = read_points(casedir) | |
print("Reading faces' definition") | |
faces = read_faces(casedir) | |
print("Reading faces' owners") | |
owners = read_owner(casedir) | |
print("Reading faces' neighbors") | |
neighbour = read_neighbour(casedir) | |
print("Calculating cells' definitions w.r.t. faces") | |
cell_faces = get_cell_faces(max(owners)+1, owners, neighbour) | |
print("Calculating cells' definitions w.r.t. vertices") | |
cell_types, cell_verts = get_cell_data_vtk(cell_faces, faces, points) | |
print("Generating pyvista.UnstructuredGrid") | |
mesh = pyvista.UnstructuredGrid(cell_verts, cell_types, points) | |
# get all available times if users did not provide any | |
if times is None: | |
times = read_times(casedir) | |
elif numpy.isscalar(times): | |
times = [str(times)] | |
elif isinstance(times, str): | |
times = [times] | |
elif not hasattr(times, "__iter__"): | |
raise ValueError("`times` got the wrong type of value.") | |
for time in times: | |
for field in fields: | |
print(f"Reading field {field} at time {time}") | |
data = read_snapshot(casedir, time, field) | |
if isinstance(data, float): | |
data = numpy.full(mesh.n_cells, data, dtype=float) | |
elif data.ndim == 1 and data.size ==3 and mesh.n_cells != 3: | |
data = numpy.tile(data, (mesh.n_cells, 1)) | |
mesh.cell_data[f"{time}/{field}"] = data | |
if data.ndim == 1: | |
mesh.set_active_scalars(f"{time}/{field}", "cell") | |
elif data.ndim == 2: | |
mesh.set_active_vectors(f"{time}/{field}", "cell") | |
else: | |
raise NotImplementedError("Haven't implemented tensors or higher rank data format") | |
return mesh | |
def get_hexahedron_nodes(faces): | |
"""Given the faces of a hexahedron, return an arbitrary pair of face that do share an edge. | |
""" | |
assert len(faces) == 6, f"A hexahedron only has 6 faces. Got {len(faces)}." | |
assert all(len(face) == 4 for face in faces), f"Some faces don't have exactly 4 nodes: {faces}." | |
# let the first face encountered as the first face in the pair | |
face_1 = faces[0] | |
# check the disjoint status against other faces | |
disjoint = [set(face_1).isdisjoint(set(face)) for face in faces[1:]] | |
assert disjoint.count(False) == 4, f"A face doesn't have 4 neighbors: {faces}" | |
# the only one with True is the face 2 | |
face_2_idx = disjoint.index(True)+1 | |
face_2 = faces[face_2_idx] | |
# pop out the two faces for convenience and use sets | |
faces = [set(face) for face in faces] | |
faces[0] = set() | |
faces[face_2_idx] = set() | |
# new lists to store sorted nodes for face 1 | |
face_1_new = [face_1.pop(0)] | |
# sort face_1 | |
counter = 0 | |
while len(face_1) != 0: | |
node_1 = face_1_new[-1] | |
node_2 = face_1.pop(0) | |
results = [{node_1, node_2} < face for face in faces] | |
if not any(results): | |
face_1.append(node_2) | |
continue | |
if results.count(True) != 1: | |
raise RuntimeError(f"Shouldn't be > 1. Got {results}.") | |
face_1_new.append(node_2) | |
counter += 1 | |
if counter == 1000: | |
raise RuntimeError("Infinite loop.") | |
face_2_new = [] | |
# sort face_2 following face_1's order | |
for node_1 in face_1_new: | |
counter = 0 | |
while len(face_2) != 0: | |
counter += 1 | |
if counter == 1000: | |
raise RuntimeError("Infinite loop.") | |
node_2 = face_2.pop(0) | |
results = [{node_1, node_2} < face for face in faces] | |
if not any(results) or results.count(True) == 1: | |
face_2.append(node_2) | |
continue | |
if results.count(True) > 2: | |
raise RuntimeError(f"Shouldn't be > 2. Got {results}.") | |
face_2_new.append(node_2) | |
break | |
return face_1_new + face_2_new | |
def reorder_nodes(cell, points): | |
"""Reorder nodes so that the node 0 to 3 corresponds to the face with inward normal vector. | |
Note: this function assumes the nodes in the cell are sorted by get_hexahedron_nodes. | |
""" | |
face_1 = cell[:4] | |
face_2 = cell[4:] | |
points_1 = points[:4] | |
points_2 = points[4:] | |
normal_1 = numpy.cross(points_1[1]-points_1[0], points_1[2]-points_1[0]) | |
normal_2 = numpy.cross(points_2[1]-points_2[0], points_2[2]-points_2[0]) | |
assert normal_1.dot(normal_2) > 0, f"\n{points_1}\n{points_2}" | |
vec04 = points_2[0] - points_1[0] | |
proj = vec04.dot(normal_1) | |
if proj < 0.: | |
face = face_2 + face_1 | |
return face | |
def coplanar_nodes_argsort(nodes): | |
"""Ruturn sorted indices that sort coplanar nodes either clockwise or counter clockwise. | |
""" | |
assert nodes.shape == (4, 3), f"Nodes' shape should be (4, 3). Got {nodes.shape}" | |
center = numpy.sum(nodes, 0) | |
vecs = nodes - center | |
lengths = numpy.linalg.norm(vecs, axis=1) | |
x = vecs[0] / lengths[0] | |
z = numpy.cross(x, vecs[1]) / lengths[1] | |
y = numpy.cross(z, x) | |
assert abs(numpy.linalg.norm(y)-1.0) <= 1e-10 | |
assert abs(numpy.linalg.norm(z)-1.0) <= 1e-10 | |
assert numpy.allclose(numpy.dot(vecs, z), 0.0, 1e-10, 1e-10) | |
lx = numpy.dot(vecs, x) | |
ly = numpy.dot(vecs, y) | |
theta = numpy.arctan2(ly, lx) | |
return numpy.argsort(theta) | |
if __name__ == "__main__": | |
import pprint | |
loc = pathlib.Path.home().joinpath("pipeflow", "run", "airflow-OF-case-16-8-600-binary") | |
mesh = get_vtk_unstructuredgrid(loc) | |
print(mesh) | |
# print(mesh.cell_data) | |
# mesh.save("./test.vtk") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment