class VideoReader: def __init__(self, source_path, step=1, start=0, stop=None, dimension=DimensionType.DIM_2D): super().__init__( source_path=source_path, step=step, start=start, stop=stop + 1 if stop is not None else stop, dimension=dimension, ) def _has_frame(self, i): if i >= self._start: if (i - self._start) % self._step == 0: if self._stop is None or i < self._stop: return True return False def _decode(self, container): frame_num = 0 for packet in container.demux(): if packet.stream.type == 'video': for image in packet.decode(): frame_num += 1 if self._has_frame(frame_num - 1): if packet.stream.metadata.get('rotate'): old_image = image image = av.Videoframe().from_ndarray( rotate_image( image.to_ndarray(format='bgr24'), 360 - int(container.streams.video[0].metadata.get('rotate')) ), format ='bgr24' ) image.pts = old_image.pts yield (image, self._source_path[0], image.pts) def __iter__(self): container = self._get_av_container() source_video_stream = container.streams.video[0] source_video_stream.thread_type = 'AUTO' return self._decode(container) def get_progress(self, pos): container = self._get_av_container() # Not for all containers return real value stream = container.streams.video[0] return pos / stream.duration if stream.duration else None def _get_av_container(self): if isinstance(self._source_path[0], io.BytesIO): self._source_path[0].seek(0) # required for re-reading return av.open(self._source_path[0]) def get_preview(self): container = self._get_av_container() stream = container.streams.video[0] preview = next(container.decode(stream)) return self._get_preview(preview.to_image() if not stream.metadata.get('rotate') else av.Videoframe().from_ndarray( rotate_image( preview.to_ndarray(format='bgr24'), 360 - int(container.streams.video[0].metadata.get('rotate')) ), format ='bgr24' ).to_image() ) def get_image_size(self, i): image = (next(iter(self)))[0] return image.width, image.height
官网PyAV documentation — PyAV 8.0.1 documentation
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)