diff --git a/src/extract_rdcosts.py b/src/extract_rdcosts.py index 02145ede..825ba2f3 100755 --- a/src/extract_rdcosts.py +++ b/src/extract_rdcosts.py @@ -1,6 +1,7 @@ #!/usr/bin/env python3 import glob +import gzip import os import subprocess import threading @@ -12,14 +13,52 @@ ofdir = os.path.join("/tmp", "rdcost", "data") n_threads = 8 home_rel = lambda path: os.path.join(os.environ["HOME"], path) -qps = range(12, 14) -sequences = ("/opt/test_seqs/hevc-B/*.yuv",) +dest_qps = tuple(range(51)) +base_qps = tuple(range(12, 43)) +#base_qps = tuple(range(12, 15)) +sequences = ("/opt/test_seqs/hevc-B/*.yuv", "/opt/test_seqs/custom_seqs/*/*.yuv") +#sequences = ("/opt/test_seqs/custom_seqs/*/*.yuv",) -kvzargs = [home_rel("kvazaar/src/kvazaar"), "--threads", "1", "--preset=ultrafast"] +kvzargs = [home_rel("kvazaar/src/kvazaar"), "--threads", "1", "--preset=ultrafast", "--fastrd-sampling", "--fast-residual-cost=0"] kvzenv = {"LD_LIBRARY_PATH": home_rel("kvazaar/src/.libs/")} gzargs = ["gzip"] +class MultiPipeGZOutManager: + pipe_fn_template = "%02i.txt" + gzout_fn_template = "%02i.txt.gz" + + def __init__(self, odpath, dest_qps): + self.odpath = odpath + self.dest_qps = dest_qps + + self.pipe_fns = [] + self.gzout_fns = [] + for qp in dest_qps: + pipe_fn = os.path.join(self.odpath, self.pipe_fn_template % qp) + gzout_fn = os.path.join(self.odpath, self.gzout_fn_template % qp) + + self.pipe_fns.append(pipe_fn) + self.gzout_fns.append(gzout_fn) + + def __enter__(self): + os.makedirs(self.odpath, exist_ok=True) + for pipe_fn in self.pipe_fns: + try: + os.unlink(pipe_fn) + except FileNotFoundError: + pass + os.mkfifo(pipe_fn) + return self + + def __exit__(self, *_): + for pipe_fn in self.pipe_fns: + os.unlink(pipe_fn) + + def items(self): + for pipe_fn, gzout_fn in zip(self.pipe_fns, self.gzout_fns): + yield (pipe_fn, gzout_fn) + class MTSafeIterable: def __init__(self, iterable): self.lock = threading.Lock() @@ -42,6 +81,26 @@ def chain(lol): for i in l: yield i +# Would've used Popen with gzip, but "gzip [fifo]" with an unconnected fifo +# will detect the situation and not block, but just consider it an empty +# file. Don't like it when tools outsmart their user.. +def do_gzip(in_fn, out_fn): + BLOCK_SZ = 65536 + PRINT_MULT = 1024 + with open(in_fn, "rb") as inf, gzip.open(out_fn, "wb") as outf: + num_read = 0 + print_next_thres = BLOCK_SZ * PRINT_MULT + while True: + block = inf.read(BLOCK_SZ) + num_read += len(block) + if (num_read >= print_next_thres): + print(" read %8i MB from %s" % (num_read / (1024 * 1024), in_fn)) + print_next_thres += BLOCK_SZ * PRINT_MULT + + if (len(block) == 0): + break + outf.write(block) + def run_job(job): ifpath, qp = job ifname = os.path.basename(ifpath) @@ -49,30 +108,45 @@ def run_job(job): jobname = "%s-qp%i" % (ifname, qp) hevcname = "%s.hevc" % jobname logname = "%s.log" % jobname - ofname = "%s.gz" % jobname + odname = jobname hevcpath = os.path.join("/tmp", hevcname) logpath = os.path.join(logdir, logname) - ofpath = os.path.join(ofdir, ofname) + odpath = os.path.join(ofdir, odname) - my_kvzargs = kvzargs + ["-i", ifpath, - "--qp", str(qp), - "-o", hevcpath] + my_kvzargs = kvzargs + ["-i", ifpath, + "--qp", str(qp), + "-o", hevcpath, + "--fastrd-outdir", odpath] with open(logpath, "w") as lf: - with open(ofpath, "wb") as of: - kvz = subprocess.Popen(my_kvzargs, env=kvzenv, stdout=subprocess.PIPE, stderr=lf) - gzip = subprocess.Popen(gzargs, stdin=kvz.stdout, stdout=of) + with MultiPipeGZOutManager(odpath, dest_qps) as pipes_and_outputs: + gzips = [] + gzip_threads = [] + #for pipe_fn, out_f in pipes_and_outputs.items(): + for pipe_fn, out_fn in pipes_and_outputs.items(): + #my_gzargs = gzargs + ["-c", pipe_fn] + #gzip = subprocess.Popen(my_gzargs, stdout=out_f) + #gzips.append(gzip) + #print(" launched gzip %s" % " ".join(my_gzargs)) + + + gzip_thread = threading.Thread(target=do_gzip, args=(pipe_fn, out_fn)) + gzip_thread.start() + gzip_threads.append(gzip_thread) + + kvz = subprocess.Popen(my_kvzargs, env=kvzenv, stderr=lf) - gzip.communicate() kvz.communicate() + for gzip in gzips: + gzip.communicate() def threadfunc(joblist): for job in joblist: run_job(job) def main(): - jobs = combinations(chain(map(glob.glob, sequences)), qps) + jobs = combinations(chain(map(glob.glob, sequences)), base_qps) joblist = MTSafeIterable(jobs) threads = [threading.Thread(target=threadfunc, args=(joblist,)) for _ in range(n_threads)]