11 LZ4 = os.path.dirname(os.path.realpath(__file__)) +
"/../lz4"
12 if not os.path.exists(LZ4):
13 LZ4 = os.path.dirname(os.path.realpath(__file__)) +
"/../programs/lz4"
14 TEMP = tempfile.gettempdir()
20 splitlines = line_in.split()
21 if len(splitlines) != 7:
22 errout(
"Unexpected line: {}".format(line_in))
23 self.frames, self.type, self.block, self.compressed, self.uncompressed, self.ratio, self.
filenamefilename = splitlines
26 if "concat-all" in self.
filenamefilename
or "2f--content-size" in self.
filenamefilename:
28 self.
exp_unc_sizeexp_unc_size += os.path.getsize(
"{}/test_list_{}M".format(TEMP, i))
30 uncompressed_filename = self.
filenamefilename.split(
"-")[0]
31 self.
exp_unc_sizeexp_unc_size += os.path.getsize(
"{}/{}".format(TEMP, uncompressed_filename))
39 for i, line
in enumerate(
execute(
"{} --list -m {}/test_list_*.lz4".format(LZ4, TEMP), print_output=
True)):
45 all_concat_index =
None
46 for i, nvinfo
in enumerate(self.
nvinfo_listnvinfo_list):
47 if "concat-all" in nvinfo.filename:
49 elif "2f--content-size" in nvinfo.filename:
50 self.assertEqual(
"2", nvinfo.frames, nvinfo.line)
51 all_concat_frames += 2
53 self.assertEqual(
"1", nvinfo.frames, nvinfo.line)
54 all_concat_frames += 1
55 self.assertNotEqual(
None, all_concat_index,
"Couldn't find concat-all file index.")
56 self.assertEqual(self.
nvinfo_listnvinfo_list[all_concat_index].frames,
str(all_concat_frames), self.
nvinfo_listnvinfo_list[all_concat_index].line)
60 if "-lz4f-" in nvinfo.filename:
61 self.assertEqual(nvinfo.type,
"LZ4Frame", nvinfo.line)
62 elif "-legc-" in nvinfo.filename:
63 self.assertEqual(nvinfo.type,
"LegacyFrame", nvinfo.line)
64 elif "-skip-" in nvinfo.filename:
65 self.assertEqual(nvinfo.type,
"SkippableFrame", nvinfo.line)
71 if "--BD" in nvinfo.filename:
72 self.assertRegex(nvinfo.block,
"^B[0-9]+D$", nvinfo.line)
73 elif "--BI" in nvinfo.filename:
74 self.assertRegex(nvinfo.block,
"^B[0-9]+I$", nvinfo.line)
78 self.assertEqual(nvinfo.compressed,
to_human(nvinfo.exp_comp_size), nvinfo.line)
82 if "--content-size" in nvinfo.filename:
83 self.assertEqual(nvinfo.ratio,
"{:.2f}%".format(
float(nvinfo.exp_comp_size) /
float(nvinfo.exp_unc_size) * 100), nvinfo.line)
87 if "--content-size" in nvinfo.filename:
88 self.assertEqual(nvinfo.uncompressed,
to_human(nvinfo.exp_unc_size), nvinfo.line)
96 for i, line
in enumerate(lines):
103 frame_info = dict(
zip([
"frame",
"type",
"block",
"checksum",
"compressed",
"uncompressed",
"ratio"], line.split()))
104 frame_info[
"line"] = line
115 output =
execute(
"{} --list -m -v {}/test_list_concat-all.lz4 {}/test_list_*M-lz4f-2f--content-size.lz4".format(LZ4, TEMP, TEMP), print_output=
True)
116 for i, line
in enumerate(output):
117 if line.startswith(
"test_list"):
118 if start != 0
and end != 0:
125 concat_file_list = glob.glob(
"/tmp/test_list_[!concat]*.lz4")
127 for i, filename
in enumerate(concat_file_list):
128 if "2f--content-size" in filename:
129 concat_file_list.insert(i, filename)
132 self.
cvinfocvinfo.file_frame_map = concat_file_list
133 self.
cvinfocvinfo.compressed_size = os.path.getsize(
"{}/test_list_concat-all.lz4".format(TEMP))
136 for i, vinfo
in enumerate(self.
vinfo_listvinfo_list):
137 self.assertRegex(vinfo.filename,
"^test_list_.*({}/{})".format(i + 1,
len(self.
vinfo_listvinfo_list)))
141 for i, frame_info
in enumerate(vinfo.frame_list):
142 self.assertEqual(frame_info[
"frame"],
str(i + 1), frame_info[
"line"])
145 for i, frame_info
in enumerate(self.
cvinfocvinfo.frame_list):
146 if "-lz4f-" in self.
cvinfocvinfo.file_frame_map[i]:
147 self.assertEqual(self.
cvinfocvinfo.frame_list[i][
"type"],
"LZ4Frame", self.
cvinfocvinfo.frame_list[i][
"line"])
148 elif "-legc-" in self.
cvinfocvinfo.file_frame_map[i]:
149 self.assertEqual(self.
cvinfocvinfo.frame_list[i][
"type"],
"LegacyFrame", self.
cvinfocvinfo.frame_list[i][
"line"])
150 elif "-skip-" in self.
cvinfocvinfo.file_frame_map[i]:
151 self.assertEqual(self.
cvinfocvinfo.frame_list[i][
"type"],
"SkippableFrame", self.
cvinfocvinfo.frame_list[i][
"line"])
154 for i, frame_info
in enumerate(self.
cvinfocvinfo.frame_list):
155 if "--BD" in self.
cvinfocvinfo.file_frame_map[i]:
156 self.assertRegex(self.
cvinfocvinfo.frame_list[i][
"block"],
"^B[0-9]+D$", self.
cvinfocvinfo.frame_list[i][
"line"])
157 elif "--BI" in self.
cvinfocvinfo.file_frame_map[i]:
158 self.assertEqual(self.
cvinfocvinfo.frame_list[i][
"block"],
"^B[0-9]+I$", self.
cvinfocvinfo.frame_list[i][
"line"])
161 for i, frame_info
in enumerate(self.
cvinfocvinfo.frame_list):
162 if "-lz4f-" in self.
cvinfocvinfo.file_frame_map[i]
and "--no-frame-crc" not in self.
cvinfocvinfo.file_frame_map[i]:
163 self.assertEqual(self.
cvinfocvinfo.frame_list[i][
"checksum"],
"XXH32", self.
cvinfocvinfo.frame_list[i][
"line"])
167 for i, frame_info
in enumerate(self.
cvinfocvinfo.frame_list):
168 if "-2f-" not in self.
cvinfocvinfo.file_frame_map[i]:
169 expected_size = os.path.getsize(self.
cvinfocvinfo.file_frame_map[i])
170 self.assertEqual(self.
cvinfocvinfo.frame_list[i][
"compressed"],
str(expected_size), self.
cvinfocvinfo.frame_list[i][
"line"])
171 total +=
int(self.
cvinfocvinfo.frame_list[i][
"compressed"])
172 self.assertEqual(total, self.
cvinfocvinfo.compressed_size,
"Expected total sum ({}) to match {} filesize".format(total, self.
cvinfocvinfo.filename))
175 for i, frame_info
in enumerate(self.
cvinfocvinfo.frame_list):
176 ffm = self.
cvinfocvinfo.file_frame_map[i]
177 if "-2f-" not in ffm
and "--content-size" in ffm:
178 expected_size_unc =
int(ffm[ffm.rindex(
"_") + 1:ffm.index(
"M")]) * 1048576
179 self.assertEqual(self.
cvinfocvinfo.frame_list[i][
"uncompressed"],
str(expected_size_unc), self.
cvinfocvinfo.frame_list[i][
"line"])
182 for i, frame_info
in enumerate(self.
cvinfocvinfo.frame_list):
183 if "--content-size" in self.
cvinfocvinfo.file_frame_map[i]:
184 self.assertEqual(self.
cvinfocvinfo.frame_list[i][
'ratio'],
185 "{:.2f}%".format(
float(self.
cvinfocvinfo.frame_list[i][
'compressed']) /
float(self.
cvinfocvinfo.frame_list[i][
'uncompressed']) * 100),
186 self.
cvinfocvinfo.frame_list[i][
"line"])
190 for unit
in [
'',
'K',
'M',
'G',
'T',
'P',
'E',
'Z',
'Y']:
194 return "{:.2f}{}".format(size, unit)
198 print(time.strftime(
"%Y/%m/%d %H:%M:%S") +
' - ' + text)
206 def execute(command, print_command=True, print_output=False, print_error=True, param_shell=True):
207 if os.environ.get(
'QEMU_SYS'):
208 command =
"{} {}".format(os.environ[
'QEMU_SYS'], command)
211 popen = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=param_shell)
212 stdout_lines, stderr_lines = popen.communicate()
213 stderr_lines = stderr_lines.decode(
"utf-8")
214 stdout_lines = stdout_lines.decode(
"utf-8")
220 if popen.returncode
is not None and popen.returncode != 0:
221 if stderr_lines
and not print_output
and print_error:
223 errout(
"Failed to run: {}\n".format(command, stdout_lines + stderr_lines))
224 return (stdout_lines + stderr_lines).splitlines()
228 for f
in glob.glob(
"{}/test_list*".format(TEMP)):
230 log(
"Deleting {}".format(f))
235 non_sparse_size = size // 2
236 sparse_size = size - non_sparse_size
237 with open(file_name,
"wb")
as f:
239 f.write(os.urandom(non_sparse_size))
246 filename =
"{}/test_list_{}M".format(TEMP, i)
247 log(
"Generating {}".format(filename))
249 for j
in [
"--content-size",
"-BI",
"-BD",
"-BX",
"--no-frame-crc"]:
250 lz4file =
"{}-lz4f-1f{}.lz4".format(filename, j)
251 execute(
"{} {} {} {}".format(LZ4, j, filename, lz4file))
253 lz4file =
"{}-skip-1f.lz4".format(filename)
255 skipbytes =
bytes([80, 42, 77, 24]) + skipsize.to_bytes(4, byteorder=
'little', signed=
False)
256 with open(lz4file,
'wb')
as f:
258 f.write(os.urandom(skipsize))
260 lz4file =
"{}-legc-1f.lz4".format(filename)
261 execute(
"{} -l {} {}".format(LZ4, filename, lz4file))
264 file_list = glob.glob(
"{}/test_list_*-lz4f-1f--content-size.lz4".format(TEMP))
265 with open(
"{}/test_list_{}M-lz4f-2f--content-size.lz4".format(TEMP, sum(SIZES)),
'ab')
as outfile:
266 for fname
in file_list:
267 with open(fname,
'rb')
as infile:
268 outfile.write(infile.read())
271 file_list = glob.glob(
"{}/test_list_*.lz4".format(TEMP))
272 with open(
"{}/test_list_concat-all.lz4".format(TEMP),
'ab')
as outfile:
273 for fname
in file_list:
274 with open(fname,
'rb')
as infile:
275 outfile.write(infile.read())
278 if __name__ ==
'__main__':
281 unittest.main(verbosity=2, exit=
False)
def __init__(self, line_in)
def test_compressed_size(self)
def test_frame_types(self)
def test_uncompressed_size(self)
def test_compressed(self)
def test_uncompressed(self)
def test_frame_type(self)
def test_frame_number(self)
def __init__(self, lines)
def execute(command, print_command=True, print_output=False, print_error=True, param_shell=True)
def datagen(file_name, size)
def cleanup(silent=False)