Created
September 30, 2017 16:17
-
-
Save gaborvecsei/c7e91d6027597a0b8b05b233198cfe5d to your computer and use it in GitHub Desktop.
OpenCV threaded Camera object for higher FPS and processing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import threading | |
import cv2 | |
class Camera(object): | |
""" | |
Base Camera object | |
""" | |
def __init__(self): | |
self._cam = None | |
self._frame = None | |
self._frame_width = None | |
self._frame_height = None | |
self._ret = False | |
self.auto_undistortion = False | |
self._camera_matrix = None | |
self._distortion_coefficients = None | |
self._is_running = False | |
def _init_camera(self): | |
""" | |
This is the first for creating our camera | |
We should override this! | |
""" | |
pass | |
def start_camera(self): | |
""" | |
Start the running of the camera, without this we can't capture frames | |
Camera runs on a separate thread so we can reach a higher FPS | |
""" | |
self._init_camera() | |
self._is_running = True | |
threading.Thread(target=self._update_camera, args=()).start() | |
def _read_from_camera(self): | |
""" | |
This method is responsible for grabbing frames from the camera | |
We should override this! | |
""" | |
if self._cam is None: | |
raise Exception("Camera is not started!") | |
def _update_camera(self): | |
""" | |
Grabs the frames from the camera | |
""" | |
while True: | |
if self._is_running: | |
self._ret, self._frame = self._read_from_camera() | |
else: | |
break | |
def get_frame_width_and_height(self): | |
""" | |
Returns the width and height of the grabbed images | |
:return (int int): width and height | |
""" | |
return self._frame_width, self._frame_height | |
def read(self): | |
""" | |
With this you can grab the last frame from the camera | |
:return (boolean, np.array): return value and frame | |
""" | |
return self._ret, self._frame | |
def release_camera(self): | |
""" | |
Stop the camera | |
""" | |
self._is_running = False | |
def is_running(self): | |
return self._is_running | |
def set_calibration_matrices(self, camera_matrix, distortion_coefficients): | |
self._camera_matrix = camera_matrix | |
self._distortion_coefficients = distortion_coefficients | |
def activate_auto_undistortion(self): | |
self.auto_undistortion = True | |
def deactivate_auto_undistortion(self): | |
self.auto_undistortion = False | |
def _undistort_image(self, image): | |
if self._camera_matrix is None or self._distortion_coefficients is None: | |
import warnings | |
warnings.warn("Undistortion has no effect because <camera_matrix>/<distortion_coefficients> is None!") | |
return image | |
h, w = image.shape[:2] | |
new_camera_matrix, roi = cv2.getOptimalNewCameraMatrix(self._camera_matrix, | |
self._distortion_coefficients, (w, h), | |
1, | |
(w, h)) | |
undistorted = cv2.undistort(image, self._camera_matrix, self._distortion_coefficients, None, | |
new_camera_matrix) | |
return undistorted | |
class WebCamera(Camera): | |
""" | |
Simple Webcamera | |
""" | |
def __init__(self, video_src=0): | |
""" | |
:param video_src (int): camera source code (it should be 0 or 1, or the filename) | |
""" | |
super().__init__() | |
self._video_src = video_src | |
def _init_camera(self): | |
super()._init_camera() | |
self._cam = cv2.VideoCapture(self._video_src) | |
self._ret, self._frame = self._cam.read() | |
if not self._ret: | |
raise Exception("No camera feed") | |
self._frame_height, self._frame_width, c = self._frame.shape | |
return self._ret | |
def _read_from_camera(self): | |
super()._read_from_camera() | |
self._ret, self._frame = self._cam.read() | |
if self._ret: | |
if self.auto_undistortion: | |
self._frame = self._undistort_image(self._frame) | |
return True, self._frame | |
else: | |
return False, None | |
def release_camera(self): | |
super().release_camera() | |
self._cam.release() | |
# Example usage: | |
if __name__ == "__main__": | |
webcam = WebCamera(video_src=0) | |
webcam.start_camera() | |
while True: | |
ret, frame = webcam.read() | |
cv2.imshow("webcam test", frame) | |
key = cv2.waitKey(1) | |
if key == 27: | |
break | |
webcam.release_camera() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment