A股上市公司传智教育(股票代码 003032)旗下技术交流社区北京昌平校区

 找回密码
 加入黑马

QQ登录

只需一步,快速开始

所用库
multiprocessing
gc
opencv-python
abc
1
2
3
4
实现方法
不熟悉抽象基类的话可以直接看总结
将主要的实现逻辑写成一个抽象基类ABVideoCapture,其子类只用关心要做的图像算法,并实现到.process_image(image)这一方法中,就可以定制一个自带算法处理的实时VideoCapture类
ABVideoCapture也可以选择重构.write静态方法以支持从其他的数据源中读取图片。这里边有鸭子类型带来的好处
ABVideoCapture.read_gen()是一个生成器函数,也是整个实例可以快速运行的核心,ABVideoCapture.read()和ABVideoCapture.__iter__()都依赖于它。实例会维护一个由它返回的生成器ABVideoCapture.__read_gen。它可以一个一个的生成经过自定义算法处理过后的缓存栈顶的图片。
抽象基类实现了迭代器协议__iter__和上下文管理器协议__enter__、__exit__。
实现代码
import gc
import abc
from multiprocessing import Process, Manager

import cv2


# 定义抽象基类,此类不能直接实例化
# 做好框架
# 其子类只用实现.process_image方法,返回任意图像算法处理后的从缓存栈中读取的图片
class ABVideoCapture(abc.ABC):
    def __init__(self, cam, top=100):
        self.stack = Manager().list()
        self.max_cache = top
        self.write_process = Process(target=self.__class__.write, args=(self.stack, cam, top))
        self.write_process.start()
        self.__read_gen = self.read_gen()

    @abc.abstractmethod
    def process_image(self, image):
        """
        对输入的图片进行处理并返回处理后的图片
        """
外汇常见问题https://www.kaifx.cn/lists/question/

    def read_gen(self):
        while True:
            if len(self.stack) != 0:
                img = self.process_image(self.stack.pop())
                yield img

    def read(self):
        try:
            return True, next(self.__read_gen)
        except StopIteration:
            return False, None
        except TypeError:
            raise TypeError('{}.read_gen必须为生成器函数'.format(self.__class__.__name__))

    def __iter__(self):
        yield from self.__read_gen

    def release(self):
        self.write_process.terminate()

    def __del__(self):
        self.release()

    @staticmethod
    def write(stack, cam, top):
        """向共享缓冲栈中写入数据"""
        cap = cv2.VideoCapture(cam)
        while True:
            _, img = cap.read()
            if _:
                stack.append(img)
                # 每到一定容量清空一次缓冲栈
                # 利用gc库,手动清理内存垃圾,防止内存溢出
                if len(stack) >= top:
                    del stack[:]
                    gc.collect()

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.release()


# 继承ABVideoCapture,对缓存栈中的图片不做处理直接返回
class VideoCapture(ABVideoCapture):
    def process_image(self, image):
        # 这里对图像的处理算法可以随意制定
        return image

示例一(经典用法)
camera_addr = 0

cap = VideoCapture(camera_addr)
while True:
    _, img = cap.read()
    if _:
        cv2.imshow('img', img)
        key = cv2.waitKey(1) & 0xFF
        if key == ord('q'):
            break

cap.release()
cv2.destroyAllWindows()
1
示例二(上下文+迭代器)
camera_addr = 0
with VideoCapture(camera_addr) as cap:
    for img in cap:
        cv2.imshow('img', img)
        key = cv2.waitKey(1) & 0xFF
        if key == ord('q'):
            break
cv2.destroyAllWindows()

0 个回复

您需要登录后才可以回帖 登录 | 加入黑马