fully functional multithreading

This commit is contained in:
2023-03-27 21:43:27 +02:00
parent 282691a8e6
commit b3a83fdbad
25 changed files with 157 additions and 2905 deletions

View File

@@ -16,6 +16,7 @@ import configparser
import time import time
import shutil import shutil
import subprocess import subprocess
import multiprocessing
# Loading the config file to get user preferred temp path # Loading the config file to get user preferred temp path
@@ -30,7 +31,7 @@ class Handler:
self.tmppath = "" self.tmppath = ""
self.videometa = {} self.videometa = {}
def handler(self, fsrpath, filepath, quality_mode, quality_setting, output_path): def handler(self, fsrpath, filepath, quality_mode, quality_setting, output_path, threads=4):
# Function to be called when using this class as this function automatically determines if file is video or image # Function to be called when using this class as this function automatically determines if file is video or image
print( '\n\nFSRImageVideoUpscalerFrontend - V1.1.0\n\nCopyright 2023 FSRImageVideoUpscalerFrontend contributors\n\n\n\n' ); print( '\n\nFSRImageVideoUpscalerFrontend - V1.1.0\n\nCopyright 2023 FSRImageVideoUpscalerFrontend contributors\n\n\n\n' );
@@ -59,7 +60,7 @@ class Handler:
# Determining filetype # Determining filetype
if str(filepath)[len(filepath) - 4:] == ".mp4" or str(filepath)[len(filepath) - 4:] == ".mkv" or str(filepath)[len(filepath) - 4:] == ".MP4": if str(filepath)[len(filepath) - 4:] == ".mp4" or str(filepath)[len(filepath) - 4:] == ".mkv" or str(filepath)[len(filepath) - 4:] == ".MP4":
print("upscaling video") print("upscaling video")
self.video_scaling(fsrpath, filepath, quality_mode, quality_setting, output_path) self.video_scaling(fsrpath, filepath, quality_mode, quality_setting, output_path, threads)
elif str(filepath)[len(filepath) - 4:] == ".JPG" or str(filepath)[len(filepath) - 4:] == ".png" or str(filepath)[len(filepath) - 4:] == ".jpg" or str(filepath)[len(filepath) - 5:] == ".jpeg": elif str(filepath)[len(filepath) - 4:] == ".JPG" or str(filepath)[len(filepath) - 4:] == ".png" or str(filepath)[len(filepath) - 4:] == ".jpg" or str(filepath)[len(filepath) - 5:] == ".jpeg":
print("upscaling image") print("upscaling image")
self.photo_scaling(fsrpath, filepath, quality_mode, quality_setting, output_path) self.photo_scaling(fsrpath, filepath, quality_mode, quality_setting, output_path)
@@ -90,12 +91,10 @@ class Handler:
os.system(self.command) os.system(self.command)
print("photo upscaled") print("photo upscaled")
def video_scaling(self, fsrpath, filepath, quality_mode, quality_setting, output_path): def video_scaling(self, fsrpath, filepath, quality_mode, quality_setting, output_path, threads):
# DO NOT CALL THIS! Use Handler().handler() instead! # DO NOT CALL THIS! Use Handler().handler() instead!
# if ( sys.platform == 'win32' ):
# self.videometa = ffmpeg.probe(str(filepath))["streams"].pop(0)
# else:
self.videometa = ffmpeg.probe(str(filepath))["streams"].pop(0) self.videometa = ffmpeg.probe(str(filepath))["streams"].pop(0)
# Retrieving Video metadata # Retrieving Video metadata
self.duration = self.videometa.get("duration") self.duration = self.videometa.get("duration")
self.frames = self.videometa.get("nb_frames") self.frames = self.videometa.get("nb_frames")
@@ -105,7 +104,8 @@ class Handler:
self.infos = str(self.videometa.get("r_frame_rate")) self.infos = str(self.videometa.get("r_frame_rate"))
self.framerate = float(self.infos[:len(self.infos) - 2]) self.framerate = float(self.infos[:len(self.infos) - 2])
print('\n\nframe rate is: ', self.framerate, '\n\n') print( '\n\nframe rate is: ', self.framerate, '\n\n' )
print( '\n\nRunning with ', threads, ' threads\n\n' )
# Splitting video into frames # Splitting video into frames
try: try:
@@ -128,11 +128,11 @@ class Handler:
print("OS CURRENTLY UNSUPPORTED!") print("OS CURRENTLY UNSUPPORTED!")
return False return False
os.system(self.command) os.system( self.command )
print("video split") print( 'video split' )
# Locate Images and assemble FSR-Command # Locate Images and assemble FSR-Command
self.files = "" self.file_list = []
self.filelist = os.listdir(self.tmppath) self.filelist = os.listdir(self.tmppath)
self.filelist.pop(0) self.filelist.pop(0)
self.filelist.sort() self.filelist.sort()
@@ -140,9 +140,9 @@ class Handler:
for self.file in self.filelist: for self.file in self.filelist:
self.number += 1 self.number += 1
if ( self.os_type == 'win32' ): if ( self.os_type == 'win32' ):
self.files += f"{self.tmppath}{self.file} {self.tmppath}sc\\ig{str(self.number).zfill(8)}.png " self.file_list.append( f"{self.tmppath}{self.file} {self.tmppath}sc\\ig{str(self.number).zfill(8)}.png " );
else: else:
self.files += f"{self.tmppath}{self.file} {self.tmppath}sc/ig{str(self.number).zfill(8)}.png " self.file_list.append( f"{self.tmppath}{self.file} {self.tmppath}sc/ig{str(self.number).zfill(8)}.png " );
if ( self.os_type == 'win32' ): if ( self.os_type == 'win32' ):
self.maxlength = 8000 self.maxlength = 8000
@@ -150,105 +150,141 @@ class Handler:
self.maxlength = 31900 self.maxlength = 31900
self.pos = 1 self.pos = 1
# Refactoring of commands that are longer than 32K characters
self.fileout = []
if len(self.files) > self.maxlength:
while self.files[self.maxlength - self.pos:self.maxlength - self.pos + 1] != " ":
self.pos += 1
self.file_processing = self.files[:self.maxlength - self.pos]
if self.file_processing[len(self.file_processing) - 14:len(self.file_processing) - 12] == "ex":
self.pos += 5
else:
pass
while self.files[self.maxlength - self.pos:self.maxlength - self.pos + 1] != " ":
self.pos += 1
self.fileout.append(self.files[:self.maxlength - self.pos])
self.filesopt = self.files[self.maxlength - self.pos:]
self.posx = 0
self.posy = self.maxlength
# Command refactoring for commands that are longer than 64K characters
if len(self.filesopt) > self.maxlength:
while len(self.filesopt) > self.maxlength:
self.posx += self.maxlength - self.pos
self.posy += self.maxlength - self.pos
self.pos = 1
while self.files[self.posy - self.pos:self.posy - self.pos + 1] != " ":
self.pos += 1
self.file_processing = self.files[self.posx:self.posy - self.pos]
if self.file_processing[len(self.file_processing) - 14:len(self.file_processing) - 12] == "ex":
self.pos += 5
else:
pass
while self.files[self.posy - self.pos:self.posy - self.pos + 1] != " ":
self.pos += 1
self.file_processing = self.files[self.posx:self.posy - self.pos]
self.fileout.append(self.file_processing)
self.filesopt = self.files[self.posy - self.pos:]
self.fileout.append(self.filesopt)
else:
self.fileout.append(self.files[self.maxlength - self.pos:])
else:
self.fileout.append(self.files)
try: try:
os.mkdir(f"{self.tmppath}sc") os.mkdir(f"{self.tmppath}sc")
except FileExistsError: except FileExistsError:
pass pass
############################################
#
# Thread optimisation: Divide workload up into different threads & upscale using helper function
#
############################################
self.threads = threads
if ( threads > multiprocessing.cpu_count() ):
self.threads = multiprocessing.cpu_count();
print( f'\n\nUsing { self.threads } threads\n\n' );
time.sleep( 2 );
self.proc_list = [];
self.file_list_length = len( self.file_list );
for i in range( self.threads ):
self.files = '';
for _ in range( int( self.file_list_length // self.threads ) ):
self.files += self.file_list.pop( 0 );
print("prepared commands") if ( i == self.threads - 1 ):
for element in self.file_list:
# Upscaling images self.files += element;
print("\n\n\nUpscaling images... \n\n\n") proc = multiprocessing.Process( name=i, target=self.upscalerEngine, args=( quality_mode, self.files, fsrpath, quality_setting, i, ) );
while self.fileout != []: proc.start();
self.files_handle = self.fileout.pop(0) self.proc_list.append( proc );
if quality_mode == "default":
if self.os_type == "linux":
self.command_us = f"wine {fsrpath} -QualityMode {quality_setting} {self.files_handle}"
elif self.os_type == "win32":
self.command_us = f"FidelityFX_CLI -QualityMode {quality_setting} {self.files_handle}"
else:
print("OS CURRENTLY UNSUPPORTED!")
return False
else:
if self.os_type == "linux":
self.command_us = f"wine {fsrpath} -Scale {quality_setting} {quality_setting} {self.files_handle}"
elif self.os_type == "win32":
self.command_us = f"FidelityFX_CLI -Scale {quality_setting} {quality_setting} {self.files_handle}"
else:
print("OS CURRENTLY UNSUPPORTED!")
return False
print( self.command_us )
os.system(self.command_us)
time.sleep(3)
# await completion of all jobs
for proc in self.proc_list:
proc.join();
# get Video's audio # get Video's audio
print("Finished Upscaling individual images. \n\n\nRetrieving Video's audio to append") print( 'Finished Upscaling individual images. \n\n\nRetrieving Video audio to append' )
time.sleep( 2 );
try: try:
os.remove(f"{self.tmppath}audio.aac") os.remove(f"{self.tmppath}audio.aac")
os.remove(f"{output_path}") os.remove(f"{output_path}")
except FileNotFoundError: except FileNotFoundError:
pass pass
if self.os_type == "linux": if self.os_type == 'linux':
self.command = f"ffmpeg -i {self.filepath} -vn -acodec copy {self.tmppath}audio.aac" self.command = f'ffmpeg -i {self.filepath} -vn -acodec copy {self.tmppath}audio.aac'
elif self.os_type == "win32": elif self.os_type == 'win32':
self.command = f"ffmpeg -i {self.filepath} -vn -acodec copy {self.tmppath}audio.aac" self.command = f'ffmpeg -i {self.filepath} -vn -acodec copy {self.tmppath}audio.aac'
else: else:
print("OS CURRENTLY UNSUPPORTED!") print( 'OS CURRENTLY UNSUPPORTED!' )
return False return False
os.system(self.command) os.system( self.command )
# reassemble Video # reassemble Video
print("Reassembling Video... with framerate @", self.framerate) print( 'Reassembling Video... with framerate @', self.framerate )
if self.os_type == "linux": if self.os_type == 'linux':
self.command = f"ffmpeg -framerate {self.framerate} -i {self.tmppath}sc/ig%08d.png {output_path} -i {self.tmppath}audio.aac" self.command = f'ffmpeg -framerate {self.framerate} -i {self.tmppath}sc/ig%08d.png {output_path} -i {self.tmppath}audio.aac'
elif self.os_type == "win32": elif self.os_type == 'win32':
self.command = f"ffmpeg -framerate {self.framerate} -i \"{self.tmppath}sc\\ig%08d.png\" {output_path} -i {self.tmppath}audio.aac" self.command = f'ffmpeg -framerate {self.framerate} -i \"{self.tmppath}sc\\ig%08d.png\" {output_path} -i {self.tmppath}audio.aac'
else: else:
print("OS CURRENTLY UNSUPPORTED!") print( 'OS CURRENTLY UNSUPPORTED!' );
return False return False
os.system(self.command) os.system( self.command )
print("\n\n\n DONE \n\n\n\n") print( '\n\n---------------------------------------------------------------------------------\n\nDONE \n\nFSRImageVideoUpscalerFrontend V1.1.0\n\nCopyright 2023 FSRImageVideoUpscalerFrontend contributors\nThis application comes with absolutely no warranty to the extent permitted by applicable law\n\n' )
def upscalerEngine ( self, quality_mode, files, fsrpath, quality_setting, number ):
files = files;
# Refactoring of commands that are longer than 32K characters
fileout = [];
pos = 0;
if len( files ) > self.maxlength:
while files[self.maxlength - pos:self.maxlength - pos + 1] != ' ':
pos += 1
file_processing = files[:self.maxlength - pos]
if file_processing[len(file_processing) - 14:len(file_processing) - 12] == 'ex':
pos += 5
else:
pass
while files[self.maxlength - pos:self.maxlength - pos + 1] != ' ':
pos += 1
fileout.append(files[:self.maxlength - pos])
filesopt = files[self.maxlength - pos:]
posx = 0
posy = self.maxlength
# Command refactoring for commands that are longer than 64K characters
if len(filesopt) > self.maxlength:
while len(filesopt) > self.maxlength:
posx += self.maxlength - pos
posy += self.maxlength - pos
pos = 1
while files[posy - pos:posy - pos + 1] != ' ':
pos += 1
file_processing = files[posx:posy - pos]
if file_processing[len(file_processing) - 14:len(file_processing) - 12] == 'ex':
pos += 5
else:
pass
while files[posy - pos:posy - pos + 1] != ' ':
pos += 1
file_processing = files[posx:posy - pos]
fileout.append(file_processing)
filesopt = files[posy - pos:]
fileout.append(filesopt)
else:
fileout.append(files[self.maxlength - pos:])
else:
fileout.append(files)
# Upscaling images
print( '\n\n\nUpscaling images... \n\n\n\n\n\n PROCESS: ', number, '\n\n\n' )
while len( fileout ) > 0:
files_handle = fileout.pop(0)
if quality_mode == 'default':
if self.os_type == 'linux':
command_us = f'wine {fsrpath} -QualityMode {quality_setting} {files_handle}'
elif self.os_type == 'win32':
command_us = f'FidelityFX_CLI -QualityMode {quality_setting} {files_handle}'
else:
print( 'OS CURRENTLY UNSUPPORTED!' )
return False
else:
if self.os_type == "linux":
command_us = f'wine {fsrpath} -Scale {quality_setting} {quality_setting} {files_handle}'
elif self.os_type == "win32":
command_us = f'FidelityFX_CLI -Scale {quality_setting} {quality_setting} {files_handle}'
else:
print( 'OS CURRENTLY UNSUPPORTED!' )
return False
sub = subprocess.Popen( command_us, shell=True );
sub.wait();
time.sleep(3)
print( '\n\nCompleted executing Job\n\n\n PROCESS: ', number, '\n\n\n' );

Binary file not shown.

View File

@@ -1,22 +0,0 @@
from __future__ import unicode_literals
from . import nodes
from . import _ffmpeg
from . import _filters
from . import _probe
from . import _run
from . import _view
from .nodes import *
from ._ffmpeg import *
from ._filters import *
from ._probe import *
from ._run import *
from ._view import *
__all__ = (
nodes.__all__
+ _ffmpeg.__all__
+ _probe.__all__
+ _run.__all__
+ _view.__all__
+ _filters.__all__
)

View File

@@ -1,95 +0,0 @@
from __future__ import unicode_literals
from past.builtins import basestring
from ._utils import basestring
from .nodes import (
filter_operator,
GlobalNode,
InputNode,
MergeOutputsNode,
OutputNode,
output_operator,
)
def input(filename, **kwargs):
"""Input file URL (ffmpeg ``-i`` option)
Any supplied kwargs are passed to ffmpeg verbatim (e.g. ``t=20``,
``f='mp4'``, ``acodec='pcm'``, etc.).
To tell ffmpeg to read from stdin, use ``pipe:`` as the filename.
Official documentation: `Main options <https://ffmpeg.org/ffmpeg.html#Main-options>`__
"""
kwargs['filename'] = filename
fmt = kwargs.pop('f', None)
if fmt:
if 'format' in kwargs:
raise ValueError("Can't specify both `format` and `f` kwargs")
kwargs['format'] = fmt
return InputNode(input.__name__, kwargs=kwargs).stream()
@output_operator()
def global_args(stream, *args):
"""Add extra global command-line argument(s), e.g. ``-progress``."""
return GlobalNode(stream, global_args.__name__, args).stream()
@output_operator()
def overwrite_output(stream):
"""Overwrite output files without asking (ffmpeg ``-y`` option)
Official documentation: `Main options <https://ffmpeg.org/ffmpeg.html#Main-options>`__
"""
return GlobalNode(stream, overwrite_output.__name__, ['-y']).stream()
@output_operator()
def merge_outputs(*streams):
"""Include all given outputs in one ffmpeg command line"""
return MergeOutputsNode(streams, merge_outputs.__name__).stream()
@filter_operator()
def output(*streams_and_filename, **kwargs):
"""Output file URL
Syntax:
`ffmpeg.output(stream1[, stream2, stream3...], filename, **ffmpeg_args)`
Any supplied keyword arguments are passed to ffmpeg verbatim (e.g.
``t=20``, ``f='mp4'``, ``acodec='pcm'``, ``vcodec='rawvideo'``,
etc.). Some keyword-arguments are handled specially, as shown below.
Args:
video_bitrate: parameter for ``-b:v``, e.g. ``video_bitrate=1000``.
audio_bitrate: parameter for ``-b:a``, e.g. ``audio_bitrate=200``.
format: alias for ``-f`` parameter, e.g. ``format='mp4'``
(equivalent to ``f='mp4'``).
If multiple streams are provided, they are mapped to the same
output.
To tell ffmpeg to write to stdout, use ``pipe:`` as the filename.
Official documentation: `Synopsis <https://ffmpeg.org/ffmpeg.html#Synopsis>`__
"""
streams_and_filename = list(streams_and_filename)
if 'filename' not in kwargs:
if not isinstance(streams_and_filename[-1], basestring):
raise ValueError('A filename must be provided')
kwargs['filename'] = streams_and_filename.pop(-1)
streams = streams_and_filename
fmt = kwargs.pop('f', None)
if fmt:
if 'format' in kwargs:
raise ValueError("Can't specify both `format` and `f` kwargs")
kwargs['format'] = fmt
return OutputNode(streams, output.__name__, kwargs=kwargs).stream()
__all__ = ['input', 'merge_outputs', 'output', 'overwrite_output']

View File

@@ -1,510 +0,0 @@
from __future__ import unicode_literals
from .nodes import FilterNode, filter_operator
from ._utils import escape_chars
@filter_operator()
def filter_multi_output(stream_spec, filter_name, *args, **kwargs):
"""Apply custom filter with one or more outputs.
This is the same as ``filter`` except that the filter can produce more than one
output.
To reference an output stream, use either the ``.stream`` operator or bracket
shorthand:
Example:
```
split = ffmpeg.input('in.mp4').filter_multi_output('split')
split0 = split.stream(0)
split1 = split[1]
ffmpeg.concat(split0, split1).output('out.mp4').run()
```
"""
return FilterNode(
stream_spec, filter_name, args=args, kwargs=kwargs, max_inputs=None
)
@filter_operator()
def filter(stream_spec, filter_name, *args, **kwargs):
"""Apply custom filter.
``filter_`` is normally used by higher-level filter functions such as ``hflip``,
but if a filter implementation is missing from ``ffmpeg-python``, you can call
``filter_`` directly to have ``ffmpeg-python`` pass the filter name and arguments
to ffmpeg verbatim.
Args:
stream_spec: a Stream, list of Streams, or label-to-Stream dictionary mapping
filter_name: ffmpeg filter name, e.g. `colorchannelmixer`
*args: list of args to pass to ffmpeg verbatim
**kwargs: list of keyword-args to pass to ffmpeg verbatim
The function name is suffixed with ``_`` in order avoid confusion with the standard
python ``filter`` function.
Example:
``ffmpeg.input('in.mp4').filter('hflip').output('out.mp4').run()``
"""
return filter_multi_output(stream_spec, filter_name, *args, **kwargs).stream()
@filter_operator()
def filter_(stream_spec, filter_name, *args, **kwargs):
"""Alternate name for ``filter``, so as to not collide with the
built-in python ``filter`` operator.
"""
return filter(stream_spec, filter_name, *args, **kwargs)
@filter_operator()
def split(stream):
return FilterNode(stream, split.__name__)
@filter_operator()
def asplit(stream):
return FilterNode(stream, asplit.__name__)
@filter_operator()
def setpts(stream, expr):
"""Change the PTS (presentation timestamp) of the input frames.
Args:
expr: The expression which is evaluated for each frame to construct its
timestamp.
Official documentation: `setpts, asetpts <https://ffmpeg.org/ffmpeg-filters.html#setpts_002c-asetpts>`__
"""
return FilterNode(stream, setpts.__name__, args=[expr]).stream()
@filter_operator()
def trim(stream, **kwargs):
"""Trim the input so that the output contains one continuous subpart of the input.
Args:
start: Specify the time of the start of the kept section, i.e. the frame with
the timestamp start will be the first frame in the output.
end: Specify the time of the first frame that will be dropped, i.e. the frame
immediately preceding the one with the timestamp end will be the last frame
in the output.
start_pts: This is the same as start, except this option sets the start
timestamp in timebase units instead of seconds.
end_pts: This is the same as end, except this option sets the end timestamp in
timebase units instead of seconds.
duration: The maximum duration of the output in seconds.
start_frame: The number of the first frame that should be passed to the output.
end_frame: The number of the first frame that should be dropped.
Official documentation: `trim <https://ffmpeg.org/ffmpeg-filters.html#trim>`__
"""
return FilterNode(stream, trim.__name__, kwargs=kwargs).stream()
@filter_operator()
def overlay(main_parent_node, overlay_parent_node, eof_action='repeat', **kwargs):
"""Overlay one video on top of another.
Args:
x: Set the expression for the x coordinates of the overlaid video on the main
video. Default value is 0. In case the expression is invalid, it is set to
a huge value (meaning that the overlay will not be displayed within the
output visible area).
y: Set the expression for the y coordinates of the overlaid video on the main
video. Default value is 0. In case the expression is invalid, it is set to
a huge value (meaning that the overlay will not be displayed within the
output visible area).
eof_action: The action to take when EOF is encountered on the secondary input;
it accepts one of the following values:
* ``repeat``: Repeat the last frame (the default).
* ``endall``: End both streams.
* ``pass``: Pass the main input through.
eval: Set when the expressions for x, and y are evaluated.
It accepts the following values:
* ``init``: only evaluate expressions once during the filter initialization
or when a command is processed
* ``frame``: evaluate expressions for each incoming frame
Default value is ``frame``.
shortest: If set to 1, force the output to terminate when the shortest input
terminates. Default value is 0.
format: Set the format for the output video.
It accepts the following values:
* ``yuv420``: force YUV420 output
* ``yuv422``: force YUV422 output
* ``yuv444``: force YUV444 output
* ``rgb``: force packed RGB output
* ``gbrp``: force planar RGB output
Default value is ``yuv420``.
rgb (deprecated): If set to 1, force the filter to accept inputs in the RGB
color space. Default value is 0. This option is deprecated, use format
instead.
repeatlast: If set to 1, force the filter to draw the last overlay frame over
the main input until the end of the stream. A value of 0 disables this
behavior. Default value is 1.
Official documentation: `overlay <https://ffmpeg.org/ffmpeg-filters.html#overlay-1>`__
"""
kwargs['eof_action'] = eof_action
return FilterNode(
[main_parent_node, overlay_parent_node],
overlay.__name__,
kwargs=kwargs,
max_inputs=2,
).stream()
@filter_operator()
def hflip(stream):
"""Flip the input video horizontally.
Official documentation: `hflip <https://ffmpeg.org/ffmpeg-filters.html#hflip>`__
"""
return FilterNode(stream, hflip.__name__).stream()
@filter_operator()
def vflip(stream):
"""Flip the input video vertically.
Official documentation: `vflip <https://ffmpeg.org/ffmpeg-filters.html#vflip>`__
"""
return FilterNode(stream, vflip.__name__).stream()
@filter_operator()
def crop(stream, x, y, width, height, **kwargs):
"""Crop the input video.
Args:
x: The horizontal position, in the input video, of the left edge of
the output video.
y: The vertical position, in the input video, of the top edge of the
output video.
width: The width of the output video. Must be greater than 0.
height: The height of the output video. Must be greater than 0.
Official documentation: `crop <https://ffmpeg.org/ffmpeg-filters.html#crop>`__
"""
return FilterNode(
stream, crop.__name__, args=[width, height, x, y], kwargs=kwargs
).stream()
@filter_operator()
def drawbox(stream, x, y, width, height, color, thickness=None, **kwargs):
"""Draw a colored box on the input image.
Args:
x: The expression which specifies the top left corner x coordinate of the box.
It defaults to 0.
y: The expression which specifies the top left corner y coordinate of the box.
It defaults to 0.
width: Specify the width of the box; if 0 interpreted as the input width. It
defaults to 0.
height: Specify the height of the box; if 0 interpreted as the input height. It
defaults to 0.
color: Specify the color of the box to write. For the general syntax of this
option, check the "Color" section in the ffmpeg-utils manual. If the
special value invert is used, the box edge color is the same as the video
with inverted luma.
thickness: The expression which sets the thickness of the box edge. Default
value is 3.
w: Alias for ``width``.
h: Alias for ``height``.
c: Alias for ``color``.
t: Alias for ``thickness``.
Official documentation: `drawbox <https://ffmpeg.org/ffmpeg-filters.html#drawbox>`__
"""
if thickness:
kwargs['t'] = thickness
return FilterNode(
stream, drawbox.__name__, args=[x, y, width, height, color], kwargs=kwargs
).stream()
@filter_operator()
def drawtext(stream, text=None, x=0, y=0, escape_text=True, **kwargs):
"""Draw a text string or text from a specified file on top of a video, using the
libfreetype library.
To enable compilation of this filter, you need to configure FFmpeg with
``--enable-libfreetype``. To enable default font fallback and the font option you
need to configure FFmpeg with ``--enable-libfontconfig``. To enable the
text_shaping option, you need to configure FFmpeg with ``--enable-libfribidi``.
Args:
box: Used to draw a box around text using the background color. The value must
be either 1 (enable) or 0 (disable). The default value of box is 0.
boxborderw: Set the width of the border to be drawn around the box using
boxcolor. The default value of boxborderw is 0.
boxcolor: The color to be used for drawing box around text. For the syntax of
this option, check the "Color" section in the ffmpeg-utils manual. The
default value of boxcolor is "white".
line_spacing: Set the line spacing in pixels of the border to be drawn around
the box using box. The default value of line_spacing is 0.
borderw: Set the width of the border to be drawn around the text using
bordercolor. The default value of borderw is 0.
bordercolor: Set the color to be used for drawing border around text. For the
syntax of this option, check the "Color" section in the ffmpeg-utils
manual. The default value of bordercolor is "black".
expansion: Select how the text is expanded. Can be either none, strftime
(deprecated) or normal (default). See the Text expansion section below for
details.
basetime: Set a start time for the count. Value is in microseconds. Only
applied in the deprecated strftime expansion mode. To emulate in normal
expansion mode use the pts function, supplying the start time (in seconds)
as the second argument.
fix_bounds: If true, check and fix text coords to avoid clipping.
fontcolor: The color to be used for drawing fonts. For the syntax of this
option, check the "Color" section in the ffmpeg-utils manual. The default
value of fontcolor is "black".
fontcolor_expr: String which is expanded the same way as text to obtain dynamic
fontcolor value. By default this option has empty value and is not
processed. When this option is set, it overrides fontcolor option.
font: The font family to be used for drawing text. By default Sans.
fontfile: The font file to be used for drawing text. The path must be included.
This parameter is mandatory if the fontconfig support is disabled.
alpha: Draw the text applying alpha blending. The value can be a number between
0.0 and 1.0. The expression accepts the same variables x, y as well. The
default value is 1. Please see fontcolor_expr.
fontsize: The font size to be used for drawing text. The default value of
fontsize is 16.
text_shaping: If set to 1, attempt to shape the text (for example, reverse the
order of right-to-left text and join Arabic characters) before drawing it.
Otherwise, just draw the text exactly as given. By default 1 (if supported).
ft_load_flags: The flags to be used for loading the fonts. The flags map the
corresponding flags supported by libfreetype, and are a combination of the
following values:
* ``default``
* ``no_scale``
* ``no_hinting``
* ``render``
* ``no_bitmap``
* ``vertical_layout``
* ``force_autohint``
* ``crop_bitmap``
* ``pedantic``
* ``ignore_global_advance_width``
* ``no_recurse``
* ``ignore_transform``
* ``monochrome``
* ``linear_design``
* ``no_autohint``
Default value is "default". For more information consult the documentation
for the FT_LOAD_* libfreetype flags.
shadowcolor: The color to be used for drawing a shadow behind the drawn text.
For the syntax of this option, check the "Color" section in the ffmpeg-utils
manual. The default value of shadowcolor is "black".
shadowx: The x offset for the text shadow position with respect to the position
of the text. It can be either positive or negative values. The default value
is "0".
shadowy: The y offset for the text shadow position with respect to the position
of the text. It can be either positive or negative values. The default value
is "0".
start_number: The starting frame number for the n/frame_num variable. The
default value is "0".
tabsize: The size in number of spaces to use for rendering the tab. Default
value is 4.
timecode: Set the initial timecode representation in "hh:mm:ss[:;.]ff" format.
It can be used with or without text parameter. timecode_rate option must be
specified.
rate: Set the timecode frame rate (timecode only).
timecode_rate: Alias for ``rate``.
r: Alias for ``rate``.
tc24hmax: If set to 1, the output of the timecode option will wrap around at 24
hours. Default is 0 (disabled).
text: The text string to be drawn. The text must be a sequence of UTF-8 encoded
characters. This parameter is mandatory if no file is specified with the
parameter textfile.
textfile: A text file containing text to be drawn. The text must be a sequence
of UTF-8 encoded characters. This parameter is mandatory if no text string
is specified with the parameter text. If both text and textfile are
specified, an error is thrown.
reload: If set to 1, the textfile will be reloaded before each frame. Be sure
to update it atomically, or it may be read partially, or even fail.
x: The expression which specifies the offset where text will be drawn within
the video frame. It is relative to the left border of the output image. The
default value is "0".
y: The expression which specifies the offset where text will be drawn within
the video frame. It is relative to the top border of the output image. The
default value is "0". See below for the list of accepted constants and
functions.
Expression constants:
The parameters for x and y are expressions containing the following constants
and functions:
- dar: input display aspect ratio, it is the same as ``(w / h) * sar``
- hsub: horizontal chroma subsample values. For example for the pixel format
"yuv422p" hsub is 2 and vsub is 1.
- vsub: vertical chroma subsample values. For example for the pixel format
"yuv422p" hsub is 2 and vsub is 1.
- line_h: the height of each text line
- lh: Alias for ``line_h``.
- main_h: the input height
- h: Alias for ``main_h``.
- H: Alias for ``main_h``.
- main_w: the input width
- w: Alias for ``main_w``.
- W: Alias for ``main_w``.
- ascent: the maximum distance from the baseline to the highest/upper grid
coordinate used to place a glyph outline point, for all the rendered glyphs.
It is a positive value, due to the grid's orientation with the Y axis
upwards.
- max_glyph_a: Alias for ``ascent``.
- descent: the maximum distance from the baseline to the lowest grid
coordinate used to place a glyph outline
point, for all the rendered glyphs. This is a negative value, due to the
grid's orientation, with the Y axis upwards.
- max_glyph_d: Alias for ``descent``.
- max_glyph_h: maximum glyph height, that is the maximum height for all the
glyphs contained in the rendered text, it is equivalent to ascent - descent.
- max_glyph_w: maximum glyph width, that is the maximum width for all the
glyphs contained in the rendered text.
- n: the number of input frame, starting from 0
- rand(min, max): return a random number included between min and max
- sar: The input sample aspect ratio.
- t: timestamp expressed in seconds, NAN if the input timestamp is unknown
- text_h: the height of the rendered text
- th: Alias for ``text_h``.
- text_w: the width of the rendered text
- tw: Alias for ``text_w``.
- x: the x offset coordinates where the text is drawn.
- y: the y offset coordinates where the text is drawn.
These parameters allow the x and y expressions to refer each other, so you can
for example specify ``y=x/dar``.
Official documentation: `drawtext <https://ffmpeg.org/ffmpeg-filters.html#drawtext>`__
"""
if text is not None:
if escape_text:
text = escape_chars(text, '\\\'%')
kwargs['text'] = text
if x != 0:
kwargs['x'] = x
if y != 0:
kwargs['y'] = y
return filter(stream, drawtext.__name__, **kwargs)
@filter_operator()
def concat(*streams, **kwargs):
"""Concatenate audio and video streams, joining them together one after the other.
The filter works on segments of synchronized video and audio streams. All segments
must have the same number of streams of each type, and that will also be the number
of streams at output.
Args:
unsafe: Activate unsafe mode: do not fail if segments have a different format.
Related streams do not always have exactly the same duration, for various reasons
including codec frame size or sloppy authoring. For that reason, related
synchronized streams (e.g. a video and its audio track) should be concatenated at
once. The concat filter will use the duration of the longest stream in each segment
(except the last one), and if necessary pad shorter audio streams with silence.
For this filter to work correctly, all segments must start at timestamp 0.
All corresponding streams must have the same parameters in all segments; the
filtering system will automatically select a common pixel format for video streams,
and a common sample format, sample rate and channel layout for audio streams, but
other settings, such as resolution, must be converted explicitly by the user.
Different frame rates are acceptable but will result in variable frame rate at
output; be sure to configure the output file to handle it.
Official documentation: `concat <https://ffmpeg.org/ffmpeg-filters.html#concat>`__
"""
video_stream_count = kwargs.get('v', 1)
audio_stream_count = kwargs.get('a', 0)
stream_count = video_stream_count + audio_stream_count
if len(streams) % stream_count != 0:
raise ValueError(
'Expected concat input streams to have length multiple of {} (v={}, a={}); got {}'.format(
stream_count, video_stream_count, audio_stream_count, len(streams)
)
)
kwargs['n'] = int(len(streams) / stream_count)
return FilterNode(streams, concat.__name__, kwargs=kwargs, max_inputs=None).stream()
@filter_operator()
def zoompan(stream, **kwargs):
"""Apply Zoom & Pan effect.
Args:
zoom: Set the zoom expression. Default is 1.
x: Set the x expression. Default is 0.
y: Set the y expression. Default is 0.
d: Set the duration expression in number of frames. This sets for how many
number of frames effect will last for single input image.
s: Set the output image size, default is ``hd720``.
fps: Set the output frame rate, default is 25.
z: Alias for ``zoom``.
Official documentation: `zoompan <https://ffmpeg.org/ffmpeg-filters.html#zoompan>`__
"""
return FilterNode(stream, zoompan.__name__, kwargs=kwargs).stream()
@filter_operator()
def hue(stream, **kwargs):
"""Modify the hue and/or the saturation of the input.
Args:
h: Specify the hue angle as a number of degrees. It accepts an expression, and
defaults to "0".
s: Specify the saturation in the [-10,10] range. It accepts an expression and
defaults to "1".
H: Specify the hue angle as a number of radians. It accepts an expression, and
defaults to "0".
b: Specify the brightness in the [-10,10] range. It accepts an expression and
defaults to "0".
Official documentation: `hue <https://ffmpeg.org/ffmpeg-filters.html#hue>`__
"""
return FilterNode(stream, hue.__name__, kwargs=kwargs).stream()
@filter_operator()
def colorchannelmixer(stream, *args, **kwargs):
"""Adjust video input frames by re-mixing color channels.
Official documentation: `colorchannelmixer <https://ffmpeg.org/ffmpeg-filters.html#colorchannelmixer>`__
"""
return FilterNode(stream, colorchannelmixer.__name__, kwargs=kwargs).stream()
__all__ = [
'colorchannelmixer',
'concat',
'crop',
'drawbox',
'drawtext',
'filter',
'filter_',
'filter_multi_output',
'hflip',
'hue',
'overlay',
'setpts',
'trim',
'vflip',
'zoompan',
]

View File

@@ -1,30 +0,0 @@
import json
import subprocess
from ._run import Error
from ._utils import convert_kwargs_to_cmd_line_args
def probe(filename, cmd='ffprobe', timeout=None, **kwargs):
"""Run ffprobe on the specified file and return a JSON representation of the output.
Raises:
:class:`ffmpeg.Error`: if ffprobe returns a non-zero exit code,
an :class:`Error` is returned with a generic error message.
The stderr output can be retrieved by accessing the
``stderr`` property of the exception.
"""
args = [cmd, '-show_format', '-show_streams', '-of', 'json']
args += convert_kwargs_to_cmd_line_args(kwargs)
args += [filename]
p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
communicate_kwargs = {}
if timeout is not None:
communicate_kwargs['timeout'] = timeout
out, err = p.communicate(**communicate_kwargs)
if p.returncode != 0:
raise Error('ffprobe', out, err)
return json.loads(out.decode('utf-8'))
__all__ = ['probe']

View File

@@ -1,347 +0,0 @@
from __future__ import unicode_literals
from .dag import get_outgoing_edges, topo_sort
from ._utils import basestring, convert_kwargs_to_cmd_line_args
from builtins import str
from functools import reduce
import copy
import operator
import subprocess
from ._ffmpeg import input, output
from .nodes import (
get_stream_spec_nodes,
FilterNode,
GlobalNode,
InputNode,
OutputNode,
output_operator,
)
try:
from collections.abc import Iterable
except ImportError:
from collections import Iterable
class Error(Exception):
def __init__(self, cmd, stdout, stderr):
super(Error, self).__init__(
'{} error (see stderr output for detail)'.format(cmd)
)
self.stdout = stdout
self.stderr = stderr
def _get_input_args(input_node):
if input_node.name == input.__name__:
kwargs = copy.copy(input_node.kwargs)
filename = kwargs.pop('filename')
fmt = kwargs.pop('format', None)
video_size = kwargs.pop('video_size', None)
args = []
if fmt:
args += ['-f', fmt]
if video_size:
args += ['-video_size', '{}x{}'.format(video_size[0], video_size[1])]
args += convert_kwargs_to_cmd_line_args(kwargs)
args += ['-i', filename]
else:
raise ValueError('Unsupported input node: {}'.format(input_node))
return args
def _format_input_stream_name(stream_name_map, edge, is_final_arg=False):
prefix = stream_name_map[edge.upstream_node, edge.upstream_label]
if not edge.upstream_selector:
suffix = ''
else:
suffix = ':{}'.format(edge.upstream_selector)
if is_final_arg and isinstance(edge.upstream_node, InputNode):
## Special case: `-map` args should not have brackets for input
## nodes.
fmt = '{}{}'
else:
fmt = '[{}{}]'
return fmt.format(prefix, suffix)
def _format_output_stream_name(stream_name_map, edge):
return '[{}]'.format(stream_name_map[edge.upstream_node, edge.upstream_label])
def _get_filter_spec(node, outgoing_edge_map, stream_name_map):
incoming_edges = node.incoming_edges
outgoing_edges = get_outgoing_edges(node, outgoing_edge_map)
inputs = [
_format_input_stream_name(stream_name_map, edge) for edge in incoming_edges
]
outputs = [
_format_output_stream_name(stream_name_map, edge) for edge in outgoing_edges
]
filter_spec = '{}{}{}'.format(
''.join(inputs), node._get_filter(outgoing_edges), ''.join(outputs)
)
return filter_spec
def _allocate_filter_stream_names(filter_nodes, outgoing_edge_maps, stream_name_map):
stream_count = 0
for upstream_node in filter_nodes:
outgoing_edge_map = outgoing_edge_maps[upstream_node]
for upstream_label, downstreams in sorted(outgoing_edge_map.items()):
if len(downstreams) > 1:
# TODO: automatically insert `splits` ahead of time via graph transformation.
raise ValueError(
'Encountered {} with multiple outgoing edges with same upstream '
'label {!r}; a `split` filter is probably required'.format(
upstream_node, upstream_label
)
)
stream_name_map[upstream_node, upstream_label] = 's{}'.format(stream_count)
stream_count += 1
def _get_filter_arg(filter_nodes, outgoing_edge_maps, stream_name_map):
_allocate_filter_stream_names(filter_nodes, outgoing_edge_maps, stream_name_map)
filter_specs = [
_get_filter_spec(node, outgoing_edge_maps[node], stream_name_map)
for node in filter_nodes
]
return ';'.join(filter_specs)
def _get_global_args(node):
return list(node.args)
def _get_output_args(node, stream_name_map):
if node.name != output.__name__:
raise ValueError('Unsupported output node: {}'.format(node))
args = []
if len(node.incoming_edges) == 0:
raise ValueError('Output node {} has no mapped streams'.format(node))
for edge in node.incoming_edges:
# edge = node.incoming_edges[0]
stream_name = _format_input_stream_name(
stream_name_map, edge, is_final_arg=True
)
if stream_name != '0' or len(node.incoming_edges) > 1:
args += ['-map', stream_name]
kwargs = copy.copy(node.kwargs)
filename = kwargs.pop('filename')
if 'format' in kwargs:
args += ['-f', kwargs.pop('format')]
if 'video_bitrate' in kwargs:
args += ['-b:v', str(kwargs.pop('video_bitrate'))]
if 'audio_bitrate' in kwargs:
args += ['-b:a', str(kwargs.pop('audio_bitrate'))]
if 'video_size' in kwargs:
video_size = kwargs.pop('video_size')
if not isinstance(video_size, basestring) and isinstance(video_size, Iterable):
video_size = '{}x{}'.format(video_size[0], video_size[1])
args += ['-video_size', video_size]
args += convert_kwargs_to_cmd_line_args(kwargs)
args += [filename]
return args
@output_operator()
def get_args(stream_spec, overwrite_output=False):
"""Build command-line arguments to be passed to ffmpeg."""
nodes = get_stream_spec_nodes(stream_spec)
args = []
# TODO: group nodes together, e.g. `-i somefile -r somerate`.
sorted_nodes, outgoing_edge_maps = topo_sort(nodes)
input_nodes = [node for node in sorted_nodes if isinstance(node, InputNode)]
output_nodes = [node for node in sorted_nodes if isinstance(node, OutputNode)]
global_nodes = [node for node in sorted_nodes if isinstance(node, GlobalNode)]
filter_nodes = [node for node in sorted_nodes if isinstance(node, FilterNode)]
stream_name_map = {(node, None): str(i) for i, node in enumerate(input_nodes)}
filter_arg = _get_filter_arg(filter_nodes, outgoing_edge_maps, stream_name_map)
args += reduce(operator.add, [_get_input_args(node) for node in input_nodes])
if filter_arg:
args += ['-filter_complex', filter_arg]
args += reduce(
operator.add, [_get_output_args(node, stream_name_map) for node in output_nodes]
)
args += reduce(operator.add, [_get_global_args(node) for node in global_nodes], [])
if overwrite_output:
args += ['-y']
return args
@output_operator()
def compile(stream_spec, cmd='ffmpeg', overwrite_output=False):
"""Build command-line for invoking ffmpeg.
The :meth:`run` function uses this to build the command line
arguments and should work in most cases, but calling this function
directly is useful for debugging or if you need to invoke ffmpeg
manually for whatever reason.
This is the same as calling :meth:`get_args` except that it also
includes the ``ffmpeg`` command as the first argument.
"""
if isinstance(cmd, basestring):
cmd = [cmd]
elif type(cmd) != list:
cmd = list(cmd)
return cmd + get_args(stream_spec, overwrite_output=overwrite_output)
@output_operator()
def run_async(
stream_spec,
cmd='ffmpeg',
pipe_stdin=False,
pipe_stdout=False,
pipe_stderr=False,
quiet=False,
overwrite_output=False,
cwd=None,
):
"""Asynchronously invoke ffmpeg for the supplied node graph.
Args:
pipe_stdin: if True, connect pipe to subprocess stdin (to be
used with ``pipe:`` ffmpeg inputs).
pipe_stdout: if True, connect pipe to subprocess stdout (to be
used with ``pipe:`` ffmpeg outputs).
pipe_stderr: if True, connect pipe to subprocess stderr.
quiet: shorthand for setting ``capture_stdout`` and
``capture_stderr``.
**kwargs: keyword-arguments passed to ``get_args()`` (e.g.
``overwrite_output=True``).
Returns:
A `subprocess Popen`_ object representing the child process.
Examples:
Run and stream input::
process = (
ffmpeg
.input('pipe:', format='rawvideo', pix_fmt='rgb24', s='{}x{}'.format(width, height))
.output(out_filename, pix_fmt='yuv420p')
.overwrite_output()
.run_async(pipe_stdin=True)
)
process.communicate(input=input_data)
Run and capture output::
process = (
ffmpeg
.input(in_filename)
.output('pipe:', format='rawvideo', pix_fmt='rgb24')
.run_async(pipe_stdout=True, pipe_stderr=True)
)
out, err = process.communicate()
Process video frame-by-frame using numpy::
process1 = (
ffmpeg
.input(in_filename)
.output('pipe:', format='rawvideo', pix_fmt='rgb24')
.run_async(pipe_stdout=True)
)
process2 = (
ffmpeg
.input('pipe:', format='rawvideo', pix_fmt='rgb24', s='{}x{}'.format(width, height))
.output(out_filename, pix_fmt='yuv420p')
.overwrite_output()
.run_async(pipe_stdin=True)
)
while True:
in_bytes = process1.stdout.read(width * height * 3)
if not in_bytes:
break
in_frame = (
np
.frombuffer(in_bytes, np.uint8)
.reshape([height, width, 3])
)
out_frame = in_frame * 0.3
process2.stdin.write(
frame
.astype(np.uint8)
.tobytes()
)
process2.stdin.close()
process1.wait()
process2.wait()
.. _subprocess Popen: https://docs.python.org/3/library/subprocess.html#popen-objects
"""
args = compile(stream_spec, cmd, overwrite_output=overwrite_output)
stdin_stream = subprocess.PIPE if pipe_stdin else None
stdout_stream = subprocess.PIPE if pipe_stdout else None
stderr_stream = subprocess.PIPE if pipe_stderr else None
if quiet:
stderr_stream = subprocess.STDOUT
stdout_stream = subprocess.DEVNULL
return subprocess.Popen(
args,
stdin=stdin_stream,
stdout=stdout_stream,
stderr=stderr_stream,
cwd=cwd,
)
@output_operator()
def run(
stream_spec,
cmd='ffmpeg',
capture_stdout=False,
capture_stderr=False,
input=None,
quiet=False,
overwrite_output=False,
cwd=None,
):
"""Invoke ffmpeg for the supplied node graph.
Args:
capture_stdout: if True, capture stdout (to be used with
``pipe:`` ffmpeg outputs).
capture_stderr: if True, capture stderr.
quiet: shorthand for setting ``capture_stdout`` and ``capture_stderr``.
input: text to be sent to stdin (to be used with ``pipe:``
ffmpeg inputs)
**kwargs: keyword-arguments passed to ``get_args()`` (e.g.
``overwrite_output=True``).
Returns: (out, err) tuple containing captured stdout and stderr data.
"""
process = run_async(
stream_spec,
cmd,
pipe_stdin=input is not None,
pipe_stdout=capture_stdout,
pipe_stderr=capture_stderr,
quiet=quiet,
overwrite_output=overwrite_output,
cwd=cwd,
)
out, err = process.communicate(input)
retcode = process.poll()
if retcode:
raise Error('ffmpeg', out, err)
return out, err
__all__ = [
'compile',
'Error',
'get_args',
'run',
'run_async',
]

View File

@@ -1,108 +0,0 @@
from __future__ import unicode_literals
from builtins import str
from past.builtins import basestring
import hashlib
import sys
if sys.version_info.major == 2:
# noinspection PyUnresolvedReferences,PyShadowingBuiltins
str = str
try:
from collections.abc import Iterable
except ImportError:
from collections import Iterable
# `past.builtins.basestring` module can't be imported on Python3 in some environments (Ubuntu).
# This code is copy-pasted from it to avoid crashes.
class BaseBaseString(type):
def __instancecheck__(cls, instance):
return isinstance(instance, (bytes, str))
def __subclasshook__(cls, thing):
# TODO: What should go here?
raise NotImplemented
def with_metaclass(meta, *bases):
class metaclass(meta):
__call__ = type.__call__
__init__ = type.__init__
def __new__(cls, name, this_bases, d):
if this_bases is None:
return type.__new__(cls, name, (), d)
return meta(name, bases, d)
return metaclass('temporary_class', None, {})
if sys.version_info.major >= 3:
class basestring(with_metaclass(BaseBaseString)):
pass
else:
# noinspection PyUnresolvedReferences,PyCompatibility
from builtins import basestring
def _recursive_repr(item):
"""Hack around python `repr` to deterministically represent dictionaries.
This is able to represent more things than json.dumps, since it does not require
things to be JSON serializable (e.g. datetimes).
"""
if isinstance(item, basestring):
result = str(item)
elif isinstance(item, list):
result = '[{}]'.format(', '.join([_recursive_repr(x) for x in item]))
elif isinstance(item, dict):
kv_pairs = [
'{}: {}'.format(_recursive_repr(k), _recursive_repr(item[k]))
for k in sorted(item)
]
result = '{' + ', '.join(kv_pairs) + '}'
else:
result = repr(item)
return result
def get_hash(item):
repr_ = _recursive_repr(item).encode('utf-8')
return hashlib.md5(repr_).hexdigest()
def get_hash_int(item):
return int(get_hash(item), base=16)
def escape_chars(text, chars):
"""Helper function to escape uncomfortable characters."""
text = str(text)
chars = list(set(chars))
if '\\' in chars:
chars.remove('\\')
chars.insert(0, '\\')
for ch in chars:
text = text.replace(ch, '\\' + ch)
return text
def convert_kwargs_to_cmd_line_args(kwargs):
"""Helper function to build command line arguments out of dict."""
args = []
for k in sorted(kwargs.keys()):
v = kwargs[k]
if isinstance(v, Iterable) and not isinstance(v, str):
for value in v:
args.append('-{}'.format(k))
if value is not None:
args.append('{}'.format(value))
continue
args.append('-{}'.format(k))
if v is not None:
args.append('{}'.format(v))
return args

View File

@@ -1,108 +0,0 @@
from __future__ import unicode_literals
from builtins import str
from .dag import get_outgoing_edges
from ._run import topo_sort
import tempfile
from ffmpeg.nodes import (
FilterNode,
get_stream_spec_nodes,
InputNode,
OutputNode,
stream_operator,
)
_RIGHT_ARROW = '\u2192'
def _get_node_color(node):
if isinstance(node, InputNode):
color = '#99cc00'
elif isinstance(node, OutputNode):
color = '#99ccff'
elif isinstance(node, FilterNode):
color = '#ffcc00'
else:
color = None
return color
@stream_operator()
def view(stream_spec, detail=False, filename=None, pipe=False, **kwargs):
try:
import graphviz
except ImportError:
raise ImportError(
'failed to import graphviz; please make sure graphviz is installed (e.g. '
'`pip install graphviz`)'
)
show_labels = kwargs.pop('show_labels', True)
if pipe and filename is not None:
raise ValueError('Can\'t specify both `filename` and `pipe`')
elif not pipe and filename is None:
filename = tempfile.mktemp()
nodes = get_stream_spec_nodes(stream_spec)
sorted_nodes, outgoing_edge_maps = topo_sort(nodes)
graph = graphviz.Digraph(format='png')
graph.attr(rankdir='LR')
if len(list(kwargs.keys())) != 0:
raise ValueError(
'Invalid kwargs key(s): {}'.format(', '.join(list(kwargs.keys())))
)
for node in sorted_nodes:
color = _get_node_color(node)
if detail:
lines = [node.short_repr]
lines += ['{!r}'.format(arg) for arg in node.args]
lines += [
'{}={!r}'.format(key, node.kwargs[key]) for key in sorted(node.kwargs)
]
node_text = '\n'.join(lines)
else:
node_text = node.short_repr
graph.node(
str(hash(node)), node_text, shape='box', style='filled', fillcolor=color
)
outgoing_edge_map = outgoing_edge_maps.get(node, {})
for edge in get_outgoing_edges(node, outgoing_edge_map):
kwargs = {}
up_label = edge.upstream_label
down_label = edge.downstream_label
up_selector = edge.upstream_selector
if show_labels and (
up_label is not None
or down_label is not None
or up_selector is not None
):
if up_label is None:
up_label = ''
if up_selector is not None:
up_label += ":" + up_selector
if down_label is None:
down_label = ''
if up_label != '' and down_label != '':
middle = ' {} '.format(_RIGHT_ARROW)
else:
middle = ''
kwargs['label'] = '{} {} {}'.format(up_label, middle, down_label)
upstream_node_id = str(hash(edge.upstream_node))
downstream_node_id = str(hash(edge.downstream_node))
graph.edge(upstream_node_id, downstream_node_id, **kwargs)
if pipe:
return graph.pipe()
else:
graph.view(filename, cleanup=True)
return stream_spec
__all__ = ['view']

View File

@@ -1,240 +0,0 @@
from __future__ import unicode_literals
from ._utils import get_hash, get_hash_int
from builtins import object
from collections import namedtuple
class DagNode(object):
"""Node in a directed-acyclic graph (DAG).
Edges:
DagNodes are connected by edges. An edge connects two nodes with a label for
each side:
- ``upstream_node``: upstream/parent node
- ``upstream_label``: label on the outgoing side of the upstream node
- ``downstream_node``: downstream/child node
- ``downstream_label``: label on the incoming side of the downstream node
For example, DagNode A may be connected to DagNode B with an edge labelled
"foo" on A's side, and "bar" on B's side:
_____ _____
| | | |
| A >[foo]---[bar]> B |
|_____| |_____|
Edge labels may be integers or strings, and nodes cannot have more than one
incoming edge with the same label.
DagNodes may have any number of incoming edges and any number of outgoing
edges. DagNodes keep track only of their incoming edges, but the entire graph
structure can be inferred by looking at the furthest downstream nodes and
working backwards.
Hashing:
DagNodes must be hashable, and two nodes are considered to be equivalent if
they have the same hash value.
Nodes are immutable, and the hash should remain constant as a result. If a
node with new contents is required, create a new node and throw the old one
away.
String representation:
In order for graph visualization tools to show useful information, nodes must
be representable as strings. The ``repr`` operator should provide a more or
less "full" representation of the node, and the ``short_repr`` property should
be a shortened, concise representation.
Again, because nodes are immutable, the string representations should remain
constant.
"""
def __hash__(self):
"""Return an integer hash of the node."""
raise NotImplementedError()
def __eq__(self, other):
"""Compare two nodes; implementations should return True if (and only if)
hashes match.
"""
raise NotImplementedError()
def __repr__(self, other):
"""Return a full string representation of the node."""
raise NotImplementedError()
@property
def short_repr(self):
"""Return a partial/concise representation of the node."""
raise NotImplementedError()
@property
def incoming_edge_map(self):
"""Provides information about all incoming edges that connect to this node.
The edge map is a dictionary that maps an ``incoming_label`` to
``(outgoing_node, outgoing_label)``. Note that implicitly, ``incoming_node`` is
``self``. See "Edges" section above.
"""
raise NotImplementedError()
DagEdge = namedtuple(
'DagEdge',
[
'downstream_node',
'downstream_label',
'upstream_node',
'upstream_label',
'upstream_selector',
],
)
def get_incoming_edges(downstream_node, incoming_edge_map):
edges = []
for downstream_label, upstream_info in list(incoming_edge_map.items()):
upstream_node, upstream_label, upstream_selector = upstream_info
edges += [
DagEdge(
downstream_node,
downstream_label,
upstream_node,
upstream_label,
upstream_selector,
)
]
return edges
def get_outgoing_edges(upstream_node, outgoing_edge_map):
edges = []
for upstream_label, downstream_infos in sorted(outgoing_edge_map.items()):
for downstream_info in downstream_infos:
downstream_node, downstream_label, downstream_selector = downstream_info
edges += [
DagEdge(
downstream_node,
downstream_label,
upstream_node,
upstream_label,
downstream_selector,
)
]
return edges
class KwargReprNode(DagNode):
"""A DagNode that can be represented as a set of args+kwargs."""
@property
def __upstream_hashes(self):
hashes = []
for downstream_label, upstream_info in list(self.incoming_edge_map.items()):
upstream_node, upstream_label, upstream_selector = upstream_info
hashes += [
hash(x)
for x in [
downstream_label,
upstream_node,
upstream_label,
upstream_selector,
]
]
return hashes
@property
def __inner_hash(self):
props = {'args': self.args, 'kwargs': self.kwargs}
return get_hash(props)
def __get_hash(self):
hashes = self.__upstream_hashes + [self.__inner_hash]
return get_hash_int(hashes)
def __init__(self, incoming_edge_map, name, args, kwargs):
self.__incoming_edge_map = incoming_edge_map
self.name = name
self.args = args
self.kwargs = kwargs
self.__hash = self.__get_hash()
def __hash__(self):
return self.__hash
def __eq__(self, other):
return hash(self) == hash(other)
@property
def short_hash(self):
return '{:x}'.format(abs(hash(self)))[:12]
def long_repr(self, include_hash=True):
formatted_props = ['{!r}'.format(arg) for arg in self.args]
formatted_props += [
'{}={!r}'.format(key, self.kwargs[key]) for key in sorted(self.kwargs)
]
out = '{}({})'.format(self.name, ', '.join(formatted_props))
if include_hash:
out += ' <{}>'.format(self.short_hash)
return out
def __repr__(self):
return self.long_repr()
@property
def incoming_edges(self):
return get_incoming_edges(self, self.incoming_edge_map)
@property
def incoming_edge_map(self):
return self.__incoming_edge_map
@property
def short_repr(self):
return self.name
def topo_sort(downstream_nodes):
marked_nodes = []
sorted_nodes = []
outgoing_edge_maps = {}
def visit(
upstream_node,
upstream_label,
downstream_node,
downstream_label,
downstream_selector=None,
):
if upstream_node in marked_nodes:
raise RuntimeError('Graph is not a DAG')
if downstream_node is not None:
outgoing_edge_map = outgoing_edge_maps.get(upstream_node, {})
outgoing_edge_infos = outgoing_edge_map.get(upstream_label, [])
outgoing_edge_infos += [
(downstream_node, downstream_label, downstream_selector)
]
outgoing_edge_map[upstream_label] = outgoing_edge_infos
outgoing_edge_maps[upstream_node] = outgoing_edge_map
if upstream_node not in sorted_nodes:
marked_nodes.append(upstream_node)
for edge in upstream_node.incoming_edges:
visit(
edge.upstream_node,
edge.upstream_label,
edge.downstream_node,
edge.downstream_label,
edge.upstream_selector,
)
marked_nodes.remove(upstream_node)
sorted_nodes.append(upstream_node)
unmarked_nodes = [(node, None) for node in downstream_nodes]
while unmarked_nodes:
upstream_node, upstream_label = unmarked_nodes.pop()
visit(upstream_node, upstream_label, None, None)
return sorted_nodes, outgoing_edge_maps

View File

@@ -1,380 +0,0 @@
from __future__ import unicode_literals
from past.builtins import basestring
from .dag import KwargReprNode
from ._utils import escape_chars, get_hash_int
from builtins import object
import os
def _is_of_types(obj, types):
valid = False
for stream_type in types:
if isinstance(obj, stream_type):
valid = True
break
return valid
def _get_types_str(types):
return ', '.join(['{}.{}'.format(x.__module__, x.__name__) for x in types])
class Stream(object):
"""Represents the outgoing edge of an upstream node; may be used to create more
downstream nodes.
"""
def __init__(
self, upstream_node, upstream_label, node_types, upstream_selector=None
):
if not _is_of_types(upstream_node, node_types):
raise TypeError(
'Expected upstream node to be of one of the following type(s): {}; got {}'.format(
_get_types_str(node_types), type(upstream_node)
)
)
self.node = upstream_node
self.label = upstream_label
self.selector = upstream_selector
def __hash__(self):
return get_hash_int([hash(self.node), hash(self.label)])
def __eq__(self, other):
return hash(self) == hash(other)
def __repr__(self):
node_repr = self.node.long_repr(include_hash=False)
selector = ''
if self.selector:
selector = ':{}'.format(self.selector)
out = '{}[{!r}{}] <{}>'.format(
node_repr, self.label, selector, self.node.short_hash
)
return out
def __getitem__(self, index):
"""
Select a component (audio, video) of the stream.
Example:
Process the audio and video portions of a stream independently::
input = ffmpeg.input('in.mp4')
audio = input['a'].filter("aecho", 0.8, 0.9, 1000, 0.3)
video = input['v'].hflip()
out = ffmpeg.output(audio, video, 'out.mp4')
"""
if self.selector is not None:
raise ValueError('Stream already has a selector: {}'.format(self))
elif not isinstance(index, basestring):
raise TypeError("Expected string index (e.g. 'a'); got {!r}".format(index))
return self.node.stream(label=self.label, selector=index)
@property
def audio(self):
"""Select the audio-portion of a stream.
Some ffmpeg filters drop audio streams, and care must be taken
to preserve the audio in the final output. The ``.audio`` and
``.video`` operators can be used to reference the audio/video
portions of a stream so that they can be processed separately
and then re-combined later in the pipeline. This dilemma is
intrinsic to ffmpeg, and ffmpeg-python tries to stay out of the
way while users may refer to the official ffmpeg documentation
as to why certain filters drop audio.
``stream.audio`` is a shorthand for ``stream['a']``.
Example:
Process the audio and video portions of a stream independently::
input = ffmpeg.input('in.mp4')
audio = input.audio.filter("aecho", 0.8, 0.9, 1000, 0.3)
video = input.video.hflip()
out = ffmpeg.output(audio, video, 'out.mp4')
"""
return self['a']
@property
def video(self):
"""Select the video-portion of a stream.
Some ffmpeg filters drop audio streams, and care must be taken
to preserve the audio in the final output. The ``.audio`` and
``.video`` operators can be used to reference the audio/video
portions of a stream so that they can be processed separately
and then re-combined later in the pipeline. This dilemma is
intrinsic to ffmpeg, and ffmpeg-python tries to stay out of the
way while users may refer to the official ffmpeg documentation
as to why certain filters drop audio.
``stream.video`` is a shorthand for ``stream['v']``.
Example:
Process the audio and video portions of a stream independently::
input = ffmpeg.input('in.mp4')
audio = input.audio.filter("aecho", 0.8, 0.9, 1000, 0.3)
video = input.video.hflip()
out = ffmpeg.output(audio, video, 'out.mp4')
"""
return self['v']
def get_stream_map(stream_spec):
if stream_spec is None:
stream_map = {}
elif isinstance(stream_spec, Stream):
stream_map = {None: stream_spec}
elif isinstance(stream_spec, (list, tuple)):
stream_map = dict(enumerate(stream_spec))
elif isinstance(stream_spec, dict):
stream_map = stream_spec
return stream_map
def get_stream_map_nodes(stream_map):
nodes = []
for stream in list(stream_map.values()):
if not isinstance(stream, Stream):
raise TypeError('Expected Stream; got {}'.format(type(stream)))
nodes.append(stream.node)
return nodes
def get_stream_spec_nodes(stream_spec):
stream_map = get_stream_map(stream_spec)
return get_stream_map_nodes(stream_map)
class Node(KwargReprNode):
"""Node base"""
@classmethod
def __check_input_len(cls, stream_map, min_inputs, max_inputs):
if min_inputs is not None and len(stream_map) < min_inputs:
raise ValueError(
'Expected at least {} input stream(s); got {}'.format(
min_inputs, len(stream_map)
)
)
elif max_inputs is not None and len(stream_map) > max_inputs:
raise ValueError(
'Expected at most {} input stream(s); got {}'.format(
max_inputs, len(stream_map)
)
)
@classmethod
def __check_input_types(cls, stream_map, incoming_stream_types):
for stream in list(stream_map.values()):
if not _is_of_types(stream, incoming_stream_types):
raise TypeError(
'Expected incoming stream(s) to be of one of the following types: {}; got {}'.format(
_get_types_str(incoming_stream_types), type(stream)
)
)
@classmethod
def __get_incoming_edge_map(cls, stream_map):
incoming_edge_map = {}
for downstream_label, upstream in list(stream_map.items()):
incoming_edge_map[downstream_label] = (
upstream.node,
upstream.label,
upstream.selector,
)
return incoming_edge_map
def __init__(
self,
stream_spec,
name,
incoming_stream_types,
outgoing_stream_type,
min_inputs,
max_inputs,
args=[],
kwargs={},
):
stream_map = get_stream_map(stream_spec)
self.__check_input_len(stream_map, min_inputs, max_inputs)
self.__check_input_types(stream_map, incoming_stream_types)
incoming_edge_map = self.__get_incoming_edge_map(stream_map)
super(Node, self).__init__(incoming_edge_map, name, args, kwargs)
self.__outgoing_stream_type = outgoing_stream_type
self.__incoming_stream_types = incoming_stream_types
def stream(self, label=None, selector=None):
"""Create an outgoing stream originating from this node.
More nodes may be attached onto the outgoing stream.
"""
return self.__outgoing_stream_type(self, label, upstream_selector=selector)
def __getitem__(self, item):
"""Create an outgoing stream originating from this node; syntactic sugar for
``self.stream(label)``. It can also be used to apply a selector: e.g.
``node[0:'a']`` returns a stream with label 0 and selector ``'a'``, which is
the same as ``node.stream(label=0, selector='a')``.
Example:
Process the audio and video portions of a stream independently::
input = ffmpeg.input('in.mp4')
audio = input[:'a'].filter("aecho", 0.8, 0.9, 1000, 0.3)
video = input[:'v'].hflip()
out = ffmpeg.output(audio, video, 'out.mp4')
"""
if isinstance(item, slice):
return self.stream(label=item.start, selector=item.stop)
else:
return self.stream(label=item)
class FilterableStream(Stream):
def __init__(self, upstream_node, upstream_label, upstream_selector=None):
super(FilterableStream, self).__init__(
upstream_node, upstream_label, {InputNode, FilterNode}, upstream_selector
)
# noinspection PyMethodOverriding
class InputNode(Node):
"""InputNode type"""
def __init__(self, name, args=[], kwargs={}):
super(InputNode, self).__init__(
stream_spec=None,
name=name,
incoming_stream_types={},
outgoing_stream_type=FilterableStream,
min_inputs=0,
max_inputs=0,
args=args,
kwargs=kwargs,
)
@property
def short_repr(self):
return os.path.basename(self.kwargs['filename'])
# noinspection PyMethodOverriding
class FilterNode(Node):
def __init__(self, stream_spec, name, max_inputs=1, args=[], kwargs={}):
super(FilterNode, self).__init__(
stream_spec=stream_spec,
name=name,
incoming_stream_types={FilterableStream},
outgoing_stream_type=FilterableStream,
min_inputs=1,
max_inputs=max_inputs,
args=args,
kwargs=kwargs,
)
"""FilterNode"""
def _get_filter(self, outgoing_edges):
args = self.args
kwargs = self.kwargs
if self.name in ('split', 'asplit'):
args = [len(outgoing_edges)]
out_args = [escape_chars(x, '\\\'=:') for x in args]
out_kwargs = {}
for k, v in list(kwargs.items()):
k = escape_chars(k, '\\\'=:')
v = escape_chars(v, '\\\'=:')
out_kwargs[k] = v
arg_params = [escape_chars(v, '\\\'=:') for v in out_args]
kwarg_params = ['{}={}'.format(k, out_kwargs[k]) for k in sorted(out_kwargs)]
params = arg_params + kwarg_params
params_text = escape_chars(self.name, '\\\'=:')
if params:
params_text += '={}'.format(':'.join(params))
return escape_chars(params_text, '\\\'[],;')
# noinspection PyMethodOverriding
class OutputNode(Node):
def __init__(self, stream, name, args=[], kwargs={}):
super(OutputNode, self).__init__(
stream_spec=stream,
name=name,
incoming_stream_types={FilterableStream},
outgoing_stream_type=OutputStream,
min_inputs=1,
max_inputs=None,
args=args,
kwargs=kwargs,
)
@property
def short_repr(self):
return os.path.basename(self.kwargs['filename'])
class OutputStream(Stream):
def __init__(self, upstream_node, upstream_label, upstream_selector=None):
super(OutputStream, self).__init__(
upstream_node,
upstream_label,
{OutputNode, GlobalNode, MergeOutputsNode},
upstream_selector=upstream_selector,
)
# noinspection PyMethodOverriding
class MergeOutputsNode(Node):
def __init__(self, streams, name):
super(MergeOutputsNode, self).__init__(
stream_spec=streams,
name=name,
incoming_stream_types={OutputStream},
outgoing_stream_type=OutputStream,
min_inputs=1,
max_inputs=None,
)
# noinspection PyMethodOverriding
class GlobalNode(Node):
def __init__(self, stream, name, args=[], kwargs={}):
super(GlobalNode, self).__init__(
stream_spec=stream,
name=name,
incoming_stream_types={OutputStream},
outgoing_stream_type=OutputStream,
min_inputs=1,
max_inputs=1,
args=args,
kwargs=kwargs,
)
def stream_operator(stream_classes={Stream}, name=None):
def decorator(func):
func_name = name or func.__name__
[setattr(stream_class, func_name, func) for stream_class in stream_classes]
return func
return decorator
def filter_operator(name=None):
return stream_operator(stream_classes={FilterableStream}, name=name)
def output_operator(name=None):
return stream_operator(stream_classes={OutputStream}, name=name)
__all__ = ['Stream']

Binary file not shown.

Before

Width:  |  Height:  |  Size: 2.2 KiB

View File

@@ -1,821 +0,0 @@
from __future__ import unicode_literals
from builtins import bytes
from builtins import range
from builtins import str
import ffmpeg
import os
import pytest
import random
import re
import subprocess
import sys
try:
import mock # python 2
except ImportError:
from unittest import mock # python 3
TEST_DIR = os.path.dirname(__file__)
SAMPLE_DATA_DIR = os.path.join(TEST_DIR, 'sample_data')
TEST_INPUT_FILE1 = os.path.join(SAMPLE_DATA_DIR, 'in1.mp4')
TEST_OVERLAY_FILE = os.path.join(SAMPLE_DATA_DIR, 'overlay.png')
TEST_OUTPUT_FILE1 = os.path.join(SAMPLE_DATA_DIR, 'out1.mp4')
TEST_OUTPUT_FILE2 = os.path.join(SAMPLE_DATA_DIR, 'out2.mp4')
BOGUS_INPUT_FILE = os.path.join(SAMPLE_DATA_DIR, 'bogus')
subprocess.check_call(['ffmpeg', '-version'])
def test_escape_chars():
assert ffmpeg._utils.escape_chars('a:b', ':') == r'a\:b'
assert ffmpeg._utils.escape_chars('a\\:b', ':\\') == 'a\\\\\\:b'
assert (
ffmpeg._utils.escape_chars('a:b,c[d]e%{}f\'g\'h\\i', '\\\':,[]%')
== 'a\\:b\\,c\\[d\\]e\\%{}f\\\'g\\\'h\\\\i'
)
assert ffmpeg._utils.escape_chars(123, ':\\') == '123'
def test_fluent_equality():
base1 = ffmpeg.input('dummy1.mp4')
base2 = ffmpeg.input('dummy1.mp4')
base3 = ffmpeg.input('dummy2.mp4')
t1 = base1.trim(start_frame=10, end_frame=20)
t2 = base1.trim(start_frame=10, end_frame=20)
t3 = base1.trim(start_frame=10, end_frame=30)
t4 = base2.trim(start_frame=10, end_frame=20)
t5 = base3.trim(start_frame=10, end_frame=20)
assert t1 == t2
assert t1 != t3
assert t1 == t4
assert t1 != t5
def test_fluent_concat():
base = ffmpeg.input('dummy.mp4')
trimmed1 = base.trim(start_frame=10, end_frame=20)
trimmed2 = base.trim(start_frame=30, end_frame=40)
trimmed3 = base.trim(start_frame=50, end_frame=60)
concat1 = ffmpeg.concat(trimmed1, trimmed2, trimmed3)
concat2 = ffmpeg.concat(trimmed1, trimmed2, trimmed3)
concat3 = ffmpeg.concat(trimmed1, trimmed3, trimmed2)
assert concat1 == concat2
assert concat1 != concat3
def test_fluent_output():
ffmpeg.input('dummy.mp4').trim(start_frame=10, end_frame=20).output('dummy2.mp4')
def test_fluent_complex_filter():
in_file = ffmpeg.input('dummy.mp4')
return ffmpeg.concat(
in_file.trim(start_frame=10, end_frame=20),
in_file.trim(start_frame=30, end_frame=40),
in_file.trim(start_frame=50, end_frame=60),
).output('dummy2.mp4')
def test_node_repr():
in_file = ffmpeg.input('dummy.mp4')
trim1 = ffmpeg.trim(in_file, start_frame=10, end_frame=20)
trim2 = ffmpeg.trim(in_file, start_frame=30, end_frame=40)
trim3 = ffmpeg.trim(in_file, start_frame=50, end_frame=60)
concatted = ffmpeg.concat(trim1, trim2, trim3)
output = ffmpeg.output(concatted, 'dummy2.mp4')
assert repr(in_file.node) == 'input(filename={!r}) <{}>'.format(
'dummy.mp4', in_file.node.short_hash
)
assert repr(trim1.node) == 'trim(end_frame=20, start_frame=10) <{}>'.format(
trim1.node.short_hash
)
assert repr(trim2.node) == 'trim(end_frame=40, start_frame=30) <{}>'.format(
trim2.node.short_hash
)
assert repr(trim3.node) == 'trim(end_frame=60, start_frame=50) <{}>'.format(
trim3.node.short_hash
)
assert repr(concatted.node) == 'concat(n=3) <{}>'.format(concatted.node.short_hash)
assert repr(output.node) == 'output(filename={!r}) <{}>'.format(
'dummy2.mp4', output.node.short_hash
)
def test_stream_repr():
in_file = ffmpeg.input('dummy.mp4')
assert repr(in_file) == 'input(filename={!r})[None] <{}>'.format(
'dummy.mp4', in_file.node.short_hash
)
split0 = in_file.filter_multi_output('split')[0]
assert repr(split0) == 'split()[0] <{}>'.format(split0.node.short_hash)
dummy_out = in_file.filter_multi_output('dummy')['out']
assert repr(dummy_out) == 'dummy()[{!r}] <{}>'.format(
dummy_out.label, dummy_out.node.short_hash
)
def test_repeated_args():
out_file = ffmpeg.input('dummy.mp4').output(
'dummy2.mp4', streamid=['0:0x101', '1:0x102']
)
assert out_file.get_args() == [
'-i',
'dummy.mp4',
'-streamid',
'0:0x101',
'-streamid',
'1:0x102',
'dummy2.mp4',
]
def test__get_args__simple():
out_file = ffmpeg.input('dummy.mp4').output('dummy2.mp4')
assert out_file.get_args() == ['-i', 'dummy.mp4', 'dummy2.mp4']
def test_global_args():
out_file = (
ffmpeg.input('dummy.mp4')
.output('dummy2.mp4')
.global_args('-progress', 'someurl')
)
assert out_file.get_args() == [
'-i',
'dummy.mp4',
'dummy2.mp4',
'-progress',
'someurl',
]
def _get_simple_example():
return ffmpeg.input(TEST_INPUT_FILE1).output(TEST_OUTPUT_FILE1)
def _get_complex_filter_example():
split = ffmpeg.input(TEST_INPUT_FILE1).vflip().split()
split0 = split[0]
split1 = split[1]
overlay_file = ffmpeg.input(TEST_OVERLAY_FILE)
overlay_file = ffmpeg.crop(overlay_file, 10, 10, 158, 112)
return (
ffmpeg.concat(
split0.trim(start_frame=10, end_frame=20),
split1.trim(start_frame=30, end_frame=40),
)
.overlay(overlay_file.hflip())
.drawbox(50, 50, 120, 120, color='red', thickness=5)
.output(TEST_OUTPUT_FILE1)
.overwrite_output()
)
def test__get_args__complex_filter():
out = _get_complex_filter_example()
args = ffmpeg.get_args(out)
assert args == [
'-i',
TEST_INPUT_FILE1,
'-i',
TEST_OVERLAY_FILE,
'-filter_complex',
'[0]vflip[s0];'
'[s0]split=2[s1][s2];'
'[s1]trim=end_frame=20:start_frame=10[s3];'
'[s2]trim=end_frame=40:start_frame=30[s4];'
'[s3][s4]concat=n=2[s5];'
'[1]crop=158:112:10:10[s6];'
'[s6]hflip[s7];'
'[s5][s7]overlay=eof_action=repeat[s8];'
'[s8]drawbox=50:50:120:120:red:t=5[s9]',
'-map',
'[s9]',
TEST_OUTPUT_FILE1,
'-y',
]
def test_combined_output():
i1 = ffmpeg.input(TEST_INPUT_FILE1)
i2 = ffmpeg.input(TEST_OVERLAY_FILE)
out = ffmpeg.output(i1, i2, TEST_OUTPUT_FILE1)
assert out.get_args() == [
'-i',
TEST_INPUT_FILE1,
'-i',
TEST_OVERLAY_FILE,
'-map',
'0',
'-map',
'1',
TEST_OUTPUT_FILE1,
]
@pytest.mark.parametrize('use_shorthand', [True, False])
def test_filter_with_selector(use_shorthand):
i = ffmpeg.input(TEST_INPUT_FILE1)
if use_shorthand:
v1 = i.video.hflip()
a1 = i.audio.filter('aecho', 0.8, 0.9, 1000, 0.3)
else:
v1 = i['v'].hflip()
a1 = i['a'].filter('aecho', 0.8, 0.9, 1000, 0.3)
out = ffmpeg.output(a1, v1, TEST_OUTPUT_FILE1)
assert out.get_args() == [
'-i',
TEST_INPUT_FILE1,
'-filter_complex',
'[0:a]aecho=0.8:0.9:1000:0.3[s0];' '[0:v]hflip[s1]',
'-map',
'[s0]',
'-map',
'[s1]',
TEST_OUTPUT_FILE1,
]
def test_get_item_with_bad_selectors():
input = ffmpeg.input(TEST_INPUT_FILE1)
with pytest.raises(ValueError) as excinfo:
input['a']['a']
assert str(excinfo.value).startswith('Stream already has a selector:')
with pytest.raises(TypeError) as excinfo:
input[:'a']
assert str(excinfo.value).startswith("Expected string index (e.g. 'a')")
with pytest.raises(TypeError) as excinfo:
input[5]
assert str(excinfo.value).startswith("Expected string index (e.g. 'a')")
def _get_complex_filter_asplit_example():
split = ffmpeg.input(TEST_INPUT_FILE1).vflip().asplit()
split0 = split[0]
split1 = split[1]
return (
ffmpeg.concat(
split0.filter('atrim', start=10, end=20),
split1.filter('atrim', start=30, end=40),
)
.output(TEST_OUTPUT_FILE1)
.overwrite_output()
)
def test_filter_concat__video_only():
in1 = ffmpeg.input('in1.mp4')
in2 = ffmpeg.input('in2.mp4')
args = ffmpeg.concat(in1, in2).output('out.mp4').get_args()
assert args == [
'-i',
'in1.mp4',
'-i',
'in2.mp4',
'-filter_complex',
'[0][1]concat=n=2[s0]',
'-map',
'[s0]',
'out.mp4',
]
def test_filter_concat__audio_only():
in1 = ffmpeg.input('in1.mp4')
in2 = ffmpeg.input('in2.mp4')
args = ffmpeg.concat(in1, in2, v=0, a=1).output('out.mp4').get_args()
assert args == [
'-i',
'in1.mp4',
'-i',
'in2.mp4',
'-filter_complex',
'[0][1]concat=a=1:n=2:v=0[s0]',
'-map',
'[s0]',
'out.mp4',
]
def test_filter_concat__audio_video():
in1 = ffmpeg.input('in1.mp4')
in2 = ffmpeg.input('in2.mp4')
joined = ffmpeg.concat(in1.video, in1.audio, in2.hflip(), in2['a'], v=1, a=1).node
args = ffmpeg.output(joined[0], joined[1], 'out.mp4').get_args()
assert args == [
'-i',
'in1.mp4',
'-i',
'in2.mp4',
'-filter_complex',
'[1]hflip[s0];[0:v][0:a][s0][1:a]concat=a=1:n=2:v=1[s1][s2]',
'-map',
'[s1]',
'-map',
'[s2]',
'out.mp4',
]
def test_filter_concat__wrong_stream_count():
in1 = ffmpeg.input('in1.mp4')
in2 = ffmpeg.input('in2.mp4')
with pytest.raises(ValueError) as excinfo:
ffmpeg.concat(in1.video, in1.audio, in2.hflip(), v=1, a=1).node
assert (
str(excinfo.value)
== 'Expected concat input streams to have length multiple of 2 (v=1, a=1); got 3'
)
def test_filter_asplit():
out = _get_complex_filter_asplit_example()
args = out.get_args()
assert args == [
'-i',
TEST_INPUT_FILE1,
'-filter_complex',
(
'[0]vflip[s0];'
'[s0]asplit=2[s1][s2];'
'[s1]atrim=end=20:start=10[s3];'
'[s2]atrim=end=40:start=30[s4];'
'[s3][s4]concat=n=2[s5]'
),
'-map',
'[s5]',
TEST_OUTPUT_FILE1,
'-y',
]
def test__output__bitrate():
args = (
ffmpeg.input('in')
.output('out', video_bitrate=1000, audio_bitrate=200)
.get_args()
)
assert args == ['-i', 'in', '-b:v', '1000', '-b:a', '200', 'out']
@pytest.mark.parametrize('video_size', [(320, 240), '320x240'])
def test__output__video_size(video_size):
args = ffmpeg.input('in').output('out', video_size=video_size).get_args()
assert args == ['-i', 'in', '-video_size', '320x240', 'out']
def test_filter_normal_arg_escape():
"""Test string escaping of normal filter args (e.g. ``font`` param of ``drawtext``
filter).
"""
def _get_drawtext_font_repr(font):
"""Build a command-line arg using drawtext ``font`` param and extract the
``-filter_complex`` arg.
"""
args = (
ffmpeg.input('in')
.drawtext('test', font='a{}b'.format(font))
.output('out')
.get_args()
)
assert args[:3] == ['-i', 'in', '-filter_complex']
assert args[4:] == ['-map', '[s0]', 'out']
match = re.match(
r'\[0\]drawtext=font=a((.|\n)*)b:text=test\[s0\]',
args[3],
re.MULTILINE,
)
assert match is not None, 'Invalid -filter_complex arg: {!r}'.format(args[3])
return match.group(1)
expected_backslash_counts = {
'x': 0,
'\'': 3,
'\\': 3,
'%': 0,
':': 2,
',': 1,
'[': 1,
']': 1,
'=': 2,
'\n': 0,
}
for ch, expected_backslash_count in list(expected_backslash_counts.items()):
expected = '{}{}'.format('\\' * expected_backslash_count, ch)
actual = _get_drawtext_font_repr(ch)
assert expected == actual
def test_filter_text_arg_str_escape():
"""Test string escaping of normal filter args (e.g. ``text`` param of ``drawtext``
filter).
"""
def _get_drawtext_text_repr(text):
"""Build a command-line arg using drawtext ``text`` param and extract the
``-filter_complex`` arg.
"""
args = ffmpeg.input('in').drawtext('a{}b'.format(text)).output('out').get_args()
assert args[:3] == ['-i', 'in', '-filter_complex']
assert args[4:] == ['-map', '[s0]', 'out']
match = re.match(r'\[0\]drawtext=text=a((.|\n)*)b\[s0\]', args[3], re.MULTILINE)
assert match is not None, 'Invalid -filter_complex arg: {!r}'.format(args[3])
return match.group(1)
expected_backslash_counts = {
'x': 0,
'\'': 7,
'\\': 7,
'%': 4,
':': 2,
',': 1,
'[': 1,
']': 1,
'=': 2,
'\n': 0,
}
for ch, expected_backslash_count in list(expected_backslash_counts.items()):
expected = '{}{}'.format('\\' * expected_backslash_count, ch)
actual = _get_drawtext_text_repr(ch)
assert expected == actual
# def test_version():
# subprocess.check_call(['ffmpeg', '-version'])
def test__compile():
out_file = ffmpeg.input('dummy.mp4').output('dummy2.mp4')
assert out_file.compile() == ['ffmpeg', '-i', 'dummy.mp4', 'dummy2.mp4']
assert out_file.compile(cmd='ffmpeg.old') == [
'ffmpeg.old',
'-i',
'dummy.mp4',
'dummy2.mp4',
]
@pytest.mark.parametrize('pipe_stdin', [True, False])
@pytest.mark.parametrize('pipe_stdout', [True, False])
@pytest.mark.parametrize('pipe_stderr', [True, False])
@pytest.mark.parametrize('cwd', [None, '/tmp'])
def test__run_async(mocker, pipe_stdin, pipe_stdout, pipe_stderr, cwd):
process__mock = mock.Mock()
popen__mock = mocker.patch.object(subprocess, 'Popen', return_value=process__mock)
stream = _get_simple_example()
process = ffmpeg.run_async(
stream,
pipe_stdin=pipe_stdin,
pipe_stdout=pipe_stdout,
pipe_stderr=pipe_stderr,
cwd=cwd,
)
assert process is process__mock
expected_stdin = subprocess.PIPE if pipe_stdin else None
expected_stdout = subprocess.PIPE if pipe_stdout else None
expected_stderr = subprocess.PIPE if pipe_stderr else None
(args,), kwargs = popen__mock.call_args
assert args == ffmpeg.compile(stream)
assert kwargs == dict(
stdin=expected_stdin,
stdout=expected_stdout,
stderr=expected_stderr,
cwd=cwd,
)
def test__run():
stream = _get_complex_filter_example()
out, err = ffmpeg.run(stream)
assert out is None
assert err is None
@pytest.mark.parametrize('capture_stdout', [True, False])
@pytest.mark.parametrize('capture_stderr', [True, False])
def test__run__capture_out(mocker, capture_stdout, capture_stderr):
mocker.patch.object(ffmpeg._run, 'compile', return_value=['echo', 'test'])
stream = _get_simple_example()
out, err = ffmpeg.run(
stream, capture_stdout=capture_stdout, capture_stderr=capture_stderr
)
if capture_stdout:
assert out == 'test\n'.encode()
else:
assert out is None
if capture_stderr:
assert err == ''.encode()
else:
assert err is None
def test__run__input_output(mocker):
mocker.patch.object(ffmpeg._run, 'compile', return_value=['cat'])
stream = _get_simple_example()
out, err = ffmpeg.run(stream, input='test'.encode(), capture_stdout=True)
assert out == 'test'.encode()
assert err is None
@pytest.mark.parametrize('capture_stdout', [True, False])
@pytest.mark.parametrize('capture_stderr', [True, False])
def test__run__error(mocker, capture_stdout, capture_stderr):
mocker.patch.object(ffmpeg._run, 'compile', return_value=['ffmpeg'])
stream = _get_complex_filter_example()
with pytest.raises(ffmpeg.Error) as excinfo:
out, err = ffmpeg.run(
stream, capture_stdout=capture_stdout, capture_stderr=capture_stderr
)
assert str(excinfo.value) == 'ffmpeg error (see stderr output for detail)'
out = excinfo.value.stdout
err = excinfo.value.stderr
if capture_stdout:
assert out == ''.encode()
else:
assert out is None
if capture_stderr:
assert err.decode().startswith('ffmpeg version')
else:
assert err is None
def test__run__multi_output():
in_ = ffmpeg.input(TEST_INPUT_FILE1)
out1 = in_.output(TEST_OUTPUT_FILE1)
out2 = in_.output(TEST_OUTPUT_FILE2)
ffmpeg.run([out1, out2], overwrite_output=True)
def test__run__dummy_cmd():
stream = _get_complex_filter_example()
ffmpeg.run(stream, cmd='true')
def test__run__dummy_cmd_list():
stream = _get_complex_filter_example()
ffmpeg.run(stream, cmd=['true', 'ignored'])
def test__filter__custom():
stream = ffmpeg.input('dummy.mp4')
stream = ffmpeg.filter(stream, 'custom_filter', 'a', 'b', kwarg1='c')
stream = ffmpeg.output(stream, 'dummy2.mp4')
assert stream.get_args() == [
'-i',
'dummy.mp4',
'-filter_complex',
'[0]custom_filter=a:b:kwarg1=c[s0]',
'-map',
'[s0]',
'dummy2.mp4',
]
def test__filter__custom_fluent():
stream = (
ffmpeg.input('dummy.mp4')
.filter('custom_filter', 'a', 'b', kwarg1='c')
.output('dummy2.mp4')
)
assert stream.get_args() == [
'-i',
'dummy.mp4',
'-filter_complex',
'[0]custom_filter=a:b:kwarg1=c[s0]',
'-map',
'[s0]',
'dummy2.mp4',
]
def test__merge_outputs():
in_ = ffmpeg.input('in.mp4')
out1 = in_.output('out1.mp4')
out2 = in_.output('out2.mp4')
assert ffmpeg.merge_outputs(out1, out2).get_args() == [
'-i',
'in.mp4',
'out1.mp4',
'out2.mp4',
]
assert ffmpeg.get_args([out1, out2]) == ['-i', 'in.mp4', 'out2.mp4', 'out1.mp4']
def test__input__start_time():
assert ffmpeg.input('in', ss=10.5).output('out').get_args() == [
'-ss',
'10.5',
'-i',
'in',
'out',
]
assert ffmpeg.input('in', ss=0.0).output('out').get_args() == [
'-ss',
'0.0',
'-i',
'in',
'out',
]
def test_multi_passthrough():
out1 = ffmpeg.input('in1.mp4').output('out1.mp4')
out2 = ffmpeg.input('in2.mp4').output('out2.mp4')
out = ffmpeg.merge_outputs(out1, out2)
assert ffmpeg.get_args(out) == [
'-i',
'in1.mp4',
'-i',
'in2.mp4',
'out1.mp4',
'-map',
'1',
'out2.mp4',
]
assert ffmpeg.get_args([out1, out2]) == [
'-i',
'in2.mp4',
'-i',
'in1.mp4',
'out2.mp4',
'-map',
'1',
'out1.mp4',
]
def test_passthrough_selectors():
i1 = ffmpeg.input(TEST_INPUT_FILE1)
args = ffmpeg.output(i1['1'], i1['2'], TEST_OUTPUT_FILE1).get_args()
assert args == [
'-i',
TEST_INPUT_FILE1,
'-map',
'0:1',
'-map',
'0:2',
TEST_OUTPUT_FILE1,
]
def test_mixed_passthrough_selectors():
i1 = ffmpeg.input(TEST_INPUT_FILE1)
args = ffmpeg.output(i1['1'].hflip(), i1['2'], TEST_OUTPUT_FILE1).get_args()
assert args == [
'-i',
TEST_INPUT_FILE1,
'-filter_complex',
'[0:1]hflip[s0]',
'-map',
'[s0]',
'-map',
'0:2',
TEST_OUTPUT_FILE1,
]
def test_pipe():
width = 32
height = 32
frame_size = width * height * 3 # 3 bytes for rgb24
frame_count = 10
start_frame = 2
out = (
ffmpeg.input(
'pipe:0',
format='rawvideo',
pixel_format='rgb24',
video_size=(width, height),
framerate=10,
)
.trim(start_frame=start_frame)
.output('pipe:1', format='rawvideo')
)
args = out.get_args()
assert args == [
'-f',
'rawvideo',
'-video_size',
'{}x{}'.format(width, height),
'-framerate',
'10',
'-pixel_format',
'rgb24',
'-i',
'pipe:0',
'-filter_complex',
'[0]trim=start_frame=2[s0]',
'-map',
'[s0]',
'-f',
'rawvideo',
'pipe:1',
]
cmd = ['ffmpeg'] + args
p = subprocess.Popen(
cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
in_data = bytes(
bytearray([random.randint(0, 255) for _ in range(frame_size * frame_count)])
)
p.stdin.write(in_data) # note: this could block, in which case need to use threads
p.stdin.close()
out_data = p.stdout.read()
assert len(out_data) == frame_size * (frame_count - start_frame)
assert out_data == in_data[start_frame * frame_size :]
def test__probe():
data = ffmpeg.probe(TEST_INPUT_FILE1)
assert set(data.keys()) == {'format', 'streams'}
assert data['format']['duration'] == '7.036000'
@pytest.mark.skipif(sys.version_info < (3, 3), reason='requires python3.3 or higher')
def test__probe_timeout():
with pytest.raises(subprocess.TimeoutExpired) as excinfo:
ffmpeg.probe(TEST_INPUT_FILE1, timeout=0)
assert 'timed out after 0 seconds' in str(excinfo.value)
def test__probe__exception():
with pytest.raises(ffmpeg.Error) as excinfo:
ffmpeg.probe(BOGUS_INPUT_FILE)
assert str(excinfo.value) == 'ffprobe error (see stderr output for detail)'
assert 'No such file or directory'.encode() in excinfo.value.stderr
def test__probe__extra_args():
data = ffmpeg.probe(TEST_INPUT_FILE1, show_frames=None)
assert set(data.keys()) == {'format', 'streams', 'frames'}
def get_filter_complex_input(flt, name):
m = re.search(r'\[([^]]+)\]{}(?=[[;]|$)'.format(name), flt)
if m:
return m.group(1)
else:
return None
def get_filter_complex_outputs(flt, name):
m = re.search(r'(^|[];]){}((\[[^]]+\])+)(?=;|$)'.format(name), flt)
if m:
return m.group(2)[1:-1].split('][')
else:
return None
def test__get_filter_complex_input():
assert get_filter_complex_input('', 'scale') is None
assert get_filter_complex_input('scale', 'scale') is None
assert get_filter_complex_input('scale[s3][s4];etc', 'scale') is None
assert get_filter_complex_input('[s2]scale', 'scale') == 's2'
assert get_filter_complex_input('[s2]scale;etc', 'scale') == 's2'
assert get_filter_complex_input('[s2]scale[s3][s4];etc', 'scale') == 's2'
def test__get_filter_complex_outputs():
assert get_filter_complex_outputs('', 'scale') is None
assert get_filter_complex_outputs('scale', 'scale') is None
assert get_filter_complex_outputs('scalex[s0][s1]', 'scale') is None
assert get_filter_complex_outputs('scale[s0][s1]', 'scale') == ['s0', 's1']
assert get_filter_complex_outputs('[s5]scale[s0][s1]', 'scale') == ['s0', 's1']
assert get_filter_complex_outputs('[s5]scale[s1][s0]', 'scale') == ['s1', 's0']
assert get_filter_complex_outputs('[s5]scale[s1]', 'scale') == ['s1']
assert get_filter_complex_outputs('[s5]scale[s1];x', 'scale') == ['s1']
assert get_filter_complex_outputs('y;[s5]scale[s1];x', 'scale') == ['s1']
def test__multi_output_edge_label_order():
scale2ref = ffmpeg.filter_multi_output(
[ffmpeg.input('x'), ffmpeg.input('y')], 'scale2ref'
)
out = ffmpeg.merge_outputs(
scale2ref[1].filter('scale').output('a'),
scale2ref[10000].filter('hflip').output('b'),
)
args = out.get_args()
flt_cmpl = args[args.index('-filter_complex') + 1]
out1, out2 = get_filter_complex_outputs(flt_cmpl, 'scale2ref')
assert out1 == get_filter_complex_input(flt_cmpl, 'scale')
assert out2 == get_filter_complex_input(flt_cmpl, 'hflip')

View File

@@ -1,15 +0,0 @@
[tool.black]
skip-string-normalization = true
target_version = ['py27'] # TODO: drop Python 2 support (... "Soon").
include = '\.pyi?$'
exclude = '''
(
/(
\.eggs
| \.git
| \.tox
| \venv
| dist
)/
)
'''

View File

@@ -1,2 +0,0 @@
[pytest]
testpaths = ffmpeg/tests

View File

@@ -1,2 +0,0 @@
[aliases]
test=pytest

View File

@@ -1,100 +0,0 @@
from setuptools import setup
from textwrap import dedent
version = '0.2.0'
download_url = 'https://github.com/kkroening/ffmpeg-python/archive/v{}.zip'.format(
version
)
long_description = dedent(
'''\
ffmpeg-python: Python bindings for FFmpeg
=========================================
:Github: https://github.com/kkroening/ffmpeg-python
:API Reference: https://kkroening.github.io/ffmpeg-python/
'''
)
file_formats = [
'aac',
'ac3',
'avi',
'bmp',
'flac',
'gif',
'mov',
'mp3',
'mp4',
'png',
'raw',
'rawvideo',
'wav',
]
file_formats += ['.{}'.format(x) for x in file_formats]
misc_keywords = [
'-vf',
'a/v',
'audio',
'dsp',
'FFmpeg',
'ffmpeg',
'ffprobe',
'filtering',
'filter_complex',
'movie',
'render',
'signals',
'sound',
'streaming',
'streams',
'vf',
'video',
'wrapper',
]
keywords = misc_keywords + file_formats
setup(
name='ffmpeg-python',
packages=['ffmpeg'],
version=version,
description='Python bindings for FFmpeg - with complex filtering support',
author='Karl Kroening',
author_email='karlk@kralnet.us',
url='https://github.com/kkroening/ffmpeg-python',
download_url=download_url,
keywords=keywords,
long_description=long_description,
install_requires=['future'],
extras_require={
'dev': [
'future==0.17.1',
'numpy==1.16.4',
'pytest-mock==1.10.4',
'pytest==4.6.1',
'Sphinx==2.1.0',
'tox==3.12.1',
]
},
classifiers=[
'Intended Audience :: Developers',
'License :: OSI Approved :: Apache Software License',
'Natural Language :: English',
'Operating System :: OS Independent',
'Programming Language :: Python',
'Programming Language :: Python :: 2',
'Programming Language :: Python :: 2.7',
'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3.3',
'Programming Language :: Python :: 3.4',
'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: 3.6',
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8',
'Programming Language :: Python :: 3.9',
'Programming Language :: Python :: 3.10',
],
)

View File

@@ -1,24 +0,0 @@
# Tox (https://tox.readthedocs.io/) is a tool for running tests
# in multiple virtualenvs. This configuration file will run the
# test suite on all supported python versions. To use it, "pip install tox"
# and then run "tox" from this directory.
[tox]
envlist = py27, py35, py36, py37, py38, py39, py310
[gh-actions]
python =
2.7: py27
3.5: py35
3.6: py36
3.7: py37
3.8: py38
3.9: py39
3.10: py310
[testenv]
commands = py.test -vv
deps =
future
pytest
pytest-mock

View File

@@ -9,19 +9,39 @@
import argparse import argparse
import bin.handler import bin.handler
import os
import time
ap = argparse.ArgumentParser( description='FSRImageVideoUpscaler - CLI' ) ap = argparse.ArgumentParser( description='FSRImageVideoUpscaler - CLI' )
ap.add_argument( 'inputfile', help='File path for the video / image to be upscaled' ) ap.add_argument( 'inputfile', help='File path for the video / image to be upscaled' )
ap.add_argument( 'outputfile', help='File path for the video / image that was upscaled' ) ap.add_argument( 'outputfile', help='File path for the video / image that was upscaled' )
ap.add_argument( '-s', '--scalefactor', help='Scale factor for the video / image' ) ap.add_argument( '-s', '--scalefactor', help='Scale factor for the video / image' )
ap.add_argument( '-T', '--threads', help='Thread count to use. Cannot exceed CPU thread count. Scaling non-linear (using 2 threads is not exactly 2x the speed of 1 thread)' )
args = ap.parse_args() args = ap.parse_args()
handler = bin.handler.Handler() handler = bin.handler.Handler()
if ( args.scalefactor ): go = True;
if ( args.scalefactor[ len(args.scalefactor) -1: ] == 'x' ):
handler.handler( 'bin/lib/FidelityFX_CLI.exe', args.inputfile, 'custom', args.scalefactor, args.outputfile ) if ( os.path.exists( args.outputfile ) ):
if ( input( 'File already exists. Do you want to replace it? (y/n) ' ).lower() == 'y' ):
go = True
os.remove( args.outputfile );
else: else:
raise NameError( 'Argument Scale does require to be of form 2x! (it has to end in x)' ) print( '\nRefusing to Upscale video. Please delete the file or specify another filepath!')
else: go = False
handler.handler( 'bin/lib/FidelityFX_CLI.exe', args.inputfile, 'custom', '2x', args.outputfile )
if ( go ):
if ( args.scalefactor ):
if ( args.scalefactor[ len(args.scalefactor) -1: ] == 'x' ):
if ( args.threads != None ):
handler.handler( 'bin/lib/FidelityFX_CLI.exe', args.inputfile, 'custom', args.scalefactor, args.outputfile, threads=int( args.threads ) );
else:
handler.handler( 'bin/lib/FidelityFX_CLI.exe', args.inputfile, 'custom', args.scalefactor, args.outputfile );
else:
raise NameError( 'Argument Scale does require to be of form 2x! (it has to end in x)' )
else:
if ( args.threads != None ):
handler.handler( 'bin/lib/FidelityFX_CLI.exe', args.inputfile, 'custom', '2x', args.outputfile, threads=int( args.threads ) );
else:
handler.handler( 'bin/lib/FidelityFX_CLI.exe', args.inputfile, 'custom', '2x', args.outputfile )