Rizin
unix-like reverse engineering framework and cli tools
test-lz4-list.py
Go to the documentation of this file.
1 #! /usr/bin/env python3
2 import subprocess
3 import time
4 import glob
5 import os
6 import tempfile
7 import unittest
8 
9 SIZES = [3, 11] # Always 2 sizes
10 MIB = 1048576
11 LZ4 = os.path.dirname(os.path.realpath(__file__)) + "/../lz4"
12 if not os.path.exists(LZ4):
13  LZ4 = os.path.dirname(os.path.realpath(__file__)) + "/../programs/lz4"
14 TEMP = tempfile.gettempdir()
15 
16 
17 class NVerboseFileInfo(object):
18  def __init__(self, line_in):
19  self.lineline = line_in
20  splitlines = line_in.split()
21  if len(splitlines) != 7:
22  errout("Unexpected line: {}".format(line_in))
23  self.frames, self.type, self.block, self.compressed, self.uncompressed, self.ratio, self.filenamefilename = splitlines
24  self.exp_unc_sizeexp_unc_size = 0
25  # Get real file sizes
26  if "concat-all" in self.filenamefilename or "2f--content-size" in self.filenamefilename:
27  for i in SIZES:
28  self.exp_unc_sizeexp_unc_size += os.path.getsize("{}/test_list_{}M".format(TEMP, i))
29  else:
30  uncompressed_filename = self.filenamefilename.split("-")[0]
31  self.exp_unc_sizeexp_unc_size += os.path.getsize("{}/{}".format(TEMP, uncompressed_filename))
32  self.exp_comp_sizeexp_comp_size = os.path.getsize("{}/{}".format(TEMP, self.filenamefilename))
33 
34 
35 class TestNonVerbose(unittest.TestCase):
36  @classmethod
37  def setUpClass(self):
38  self.nvinfo_listnvinfo_list = []
39  for i, line in enumerate(execute("{} --list -m {}/test_list_*.lz4".format(LZ4, TEMP), print_output=True)):
40  if i > 0:
41  self.nvinfo_listnvinfo_list.append(NVerboseFileInfo(line))
42 
43  def test_frames(self):
44  all_concat_frames = 0
45  all_concat_index = None
46  for i, nvinfo in enumerate(self.nvinfo_listnvinfo_list):
47  if "concat-all" in nvinfo.filename:
48  all_concat_index = i
49  elif "2f--content-size" in nvinfo.filename:
50  self.assertEqual("2", nvinfo.frames, nvinfo.line)
51  all_concat_frames += 2
52  else:
53  self.assertEqual("1", nvinfo.frames, nvinfo.line)
54  all_concat_frames += 1
55  self.assertNotEqual(None, all_concat_index, "Couldn't find concat-all file index.")
56  self.assertEqual(self.nvinfo_listnvinfo_list[all_concat_index].frames, str(all_concat_frames), self.nvinfo_listnvinfo_list[all_concat_index].line)
57 
58  def test_frame_types(self):
59  for nvinfo in self.nvinfo_listnvinfo_list:
60  if "-lz4f-" in nvinfo.filename:
61  self.assertEqual(nvinfo.type, "LZ4Frame", nvinfo.line)
62  elif "-legc-" in nvinfo.filename:
63  self.assertEqual(nvinfo.type, "LegacyFrame", nvinfo.line)
64  elif "-skip-" in nvinfo.filename:
65  self.assertEqual(nvinfo.type, "SkippableFrame", nvinfo.line)
66 
67  def test_block(self):
68  for nvinfo in self.nvinfo_listnvinfo_list:
69  # if "-leg" in nvinfo.filename or "-skip" in nvinfo.filename:
70  # self.assertEqual(nvinfo.block, "-", nvinfo.line)
71  if "--BD" in nvinfo.filename:
72  self.assertRegex(nvinfo.block, "^B[0-9]+D$", nvinfo.line)
73  elif "--BI" in nvinfo.filename:
74  self.assertRegex(nvinfo.block, "^B[0-9]+I$", nvinfo.line)
75 
77  for nvinfo in self.nvinfo_listnvinfo_list:
78  self.assertEqual(nvinfo.compressed, to_human(nvinfo.exp_comp_size), nvinfo.line)
79 
80  def test_ratio(self):
81  for nvinfo in self.nvinfo_listnvinfo_list:
82  if "--content-size" in nvinfo.filename:
83  self.assertEqual(nvinfo.ratio, "{:.2f}%".format(float(nvinfo.exp_comp_size) / float(nvinfo.exp_unc_size) * 100), nvinfo.line)
84 
86  for nvinfo in self.nvinfo_listnvinfo_list:
87  if "--content-size" in nvinfo.filename:
88  self.assertEqual(nvinfo.uncompressed, to_human(nvinfo.exp_unc_size), nvinfo.line)
89 
90 
91 class VerboseFileInfo(object):
92  def __init__(self, lines):
93  # Parse lines
94  self.frame_listframe_list = []
95  self.file_frame_mapfile_frame_map = []
96  for i, line in enumerate(lines):
97  if i == 0:
98  self.filenamefilename = line
99  continue
100  elif i == 1:
101  # Skip header
102  continue
103  frame_info = dict(zip(["frame", "type", "block", "checksum", "compressed", "uncompressed", "ratio"], line.split()))
104  frame_info["line"] = line
105  self.frame_listframe_list.append(frame_info)
106 
107 
108 class TestVerbose(unittest.TestCase):
109  @classmethod
110  def setUpClass(self):
111  # Even do we're listing 2 files to test multiline working as expected.
112  # we're only really interested in testing the output of the concat-all file.
113  self.vinfo_listvinfo_list = []
114  start = end = 0
115  output = execute("{} --list -m -v {}/test_list_concat-all.lz4 {}/test_list_*M-lz4f-2f--content-size.lz4".format(LZ4, TEMP, TEMP), print_output=True)
116  for i, line in enumerate(output):
117  if line.startswith("test_list"):
118  if start != 0 and end != 0:
119  self.vinfo_listvinfo_list.append(VerboseFileInfo(output[start:end]))
120  start = i
121  if not line:
122  end = i
123  self.vinfo_listvinfo_list.append(VerboseFileInfo(output[start:end]))
124  # Populate file_frame_map as a reference of the expected info
125  concat_file_list = glob.glob("/tmp/test_list_[!concat]*.lz4")
126  # One of the files has 2 frames so duplicate it in this list to map each frame 1 to a single file
127  for i, filename in enumerate(concat_file_list):
128  if "2f--content-size" in filename:
129  concat_file_list.insert(i, filename)
130  break
131  self.cvinfocvinfo = self.vinfo_listvinfo_list[0]
132  self.cvinfocvinfo.file_frame_map = concat_file_list
133  self.cvinfocvinfo.compressed_size = os.path.getsize("{}/test_list_concat-all.lz4".format(TEMP))
134 
135  def test_filename(self):
136  for i, vinfo in enumerate(self.vinfo_listvinfo_list):
137  self.assertRegex(vinfo.filename, "^test_list_.*({}/{})".format(i + 1, len(self.vinfo_listvinfo_list)))
138 
139  def test_frame_number(self):
140  for vinfo in self.vinfo_listvinfo_list:
141  for i, frame_info in enumerate(vinfo.frame_list):
142  self.assertEqual(frame_info["frame"], str(i + 1), frame_info["line"])
143 
144  def test_frame_type(self):
145  for i, frame_info in enumerate(self.cvinfocvinfo.frame_list):
146  if "-lz4f-" in self.cvinfocvinfo.file_frame_map[i]:
147  self.assertEqual(self.cvinfocvinfo.frame_list[i]["type"], "LZ4Frame", self.cvinfocvinfo.frame_list[i]["line"])
148  elif "-legc-" in self.cvinfocvinfo.file_frame_map[i]:
149  self.assertEqual(self.cvinfocvinfo.frame_list[i]["type"], "LegacyFrame", self.cvinfocvinfo.frame_list[i]["line"])
150  elif "-skip-" in self.cvinfocvinfo.file_frame_map[i]:
151  self.assertEqual(self.cvinfocvinfo.frame_list[i]["type"], "SkippableFrame", self.cvinfocvinfo.frame_list[i]["line"])
152 
153  def test_block(self):
154  for i, frame_info in enumerate(self.cvinfocvinfo.frame_list):
155  if "--BD" in self.cvinfocvinfo.file_frame_map[i]:
156  self.assertRegex(self.cvinfocvinfo.frame_list[i]["block"], "^B[0-9]+D$", self.cvinfocvinfo.frame_list[i]["line"])
157  elif "--BI" in self.cvinfocvinfo.file_frame_map[i]:
158  self.assertEqual(self.cvinfocvinfo.frame_list[i]["block"], "^B[0-9]+I$", self.cvinfocvinfo.frame_list[i]["line"])
159 
160  def test_checksum(self):
161  for i, frame_info in enumerate(self.cvinfocvinfo.frame_list):
162  if "-lz4f-" in self.cvinfocvinfo.file_frame_map[i] and "--no-frame-crc" not in self.cvinfocvinfo.file_frame_map[i]:
163  self.assertEqual(self.cvinfocvinfo.frame_list[i]["checksum"], "XXH32", self.cvinfocvinfo.frame_list[i]["line"])
164 
165  def test_compressed(self):
166  total = 0
167  for i, frame_info in enumerate(self.cvinfocvinfo.frame_list):
168  if "-2f-" not in self.cvinfocvinfo.file_frame_map[i]:
169  expected_size = os.path.getsize(self.cvinfocvinfo.file_frame_map[i])
170  self.assertEqual(self.cvinfocvinfo.frame_list[i]["compressed"], str(expected_size), self.cvinfocvinfo.frame_list[i]["line"])
171  total += int(self.cvinfocvinfo.frame_list[i]["compressed"])
172  self.assertEqual(total, self.cvinfocvinfo.compressed_size, "Expected total sum ({}) to match {} filesize".format(total, self.cvinfocvinfo.filename))
173 
174  def test_uncompressed(self):
175  for i, frame_info in enumerate(self.cvinfocvinfo.frame_list):
176  ffm = self.cvinfocvinfo.file_frame_map[i]
177  if "-2f-" not in ffm and "--content-size" in ffm:
178  expected_size_unc = int(ffm[ffm.rindex("_") + 1:ffm.index("M")]) * 1048576
179  self.assertEqual(self.cvinfocvinfo.frame_list[i]["uncompressed"], str(expected_size_unc), self.cvinfocvinfo.frame_list[i]["line"])
180 
181  def test_ratio(self):
182  for i, frame_info in enumerate(self.cvinfocvinfo.frame_list):
183  if "--content-size" in self.cvinfocvinfo.file_frame_map[i]:
184  self.assertEqual(self.cvinfocvinfo.frame_list[i]['ratio'],
185  "{:.2f}%".format(float(self.cvinfocvinfo.frame_list[i]['compressed']) / float(self.cvinfocvinfo.frame_list[i]['uncompressed']) * 100),
186  self.cvinfocvinfo.frame_list[i]["line"])
187 
188 
189 def to_human(size):
190  for unit in ['', 'K', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y']:
191  if size < 1024.0:
192  break
193  size /= 1024.0
194  return "{:.2f}{}".format(size, unit)
195 
196 
197 def log(text):
198  print(time.strftime("%Y/%m/%d %H:%M:%S") + ' - ' + text)
199 
200 
201 def errout(text, err=1):
202  log(text)
203  exit(err)
204 
205 
206 def execute(command, print_command=True, print_output=False, print_error=True, param_shell=True):
207  if os.environ.get('QEMU_SYS'):
208  command = "{} {}".format(os.environ['QEMU_SYS'], command)
209  if print_command:
210  log("> " + command)
211  popen = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=param_shell)
212  stdout_lines, stderr_lines = popen.communicate()
213  stderr_lines = stderr_lines.decode("utf-8")
214  stdout_lines = stdout_lines.decode("utf-8")
215  if print_output:
216  if stdout_lines:
217  print(stdout_lines)
218  if stderr_lines:
219  print(stderr_lines)
220  if popen.returncode is not None and popen.returncode != 0:
221  if stderr_lines and not print_output and print_error:
222  print(stderr_lines)
223  errout("Failed to run: {}\n".format(command, stdout_lines + stderr_lines))
224  return (stdout_lines + stderr_lines).splitlines()
225 
226 
227 def cleanup(silent=False):
228  for f in glob.glob("{}/test_list*".format(TEMP)):
229  if not silent:
230  log("Deleting {}".format(f))
231  os.unlink(f)
232 
233 
234 def datagen(file_name, size):
235  non_sparse_size = size // 2
236  sparse_size = size - non_sparse_size
237  with open(file_name, "wb") as f:
238  f.seek(sparse_size)
239  f.write(os.urandom(non_sparse_size))
240 
241 
243  # file format ~ test_list<frametype>-<no_frames>f<create-args>.lz4 ~
244  # Generate LZ4Frames
245  for i in SIZES:
246  filename = "{}/test_list_{}M".format(TEMP, i)
247  log("Generating {}".format(filename))
248  datagen(filename, i * MIB)
249  for j in ["--content-size", "-BI", "-BD", "-BX", "--no-frame-crc"]:
250  lz4file = "{}-lz4f-1f{}.lz4".format(filename, j)
251  execute("{} {} {} {}".format(LZ4, j, filename, lz4file))
252  # Generate skippable frames
253  lz4file = "{}-skip-1f.lz4".format(filename)
254  skipsize = i * 1024
255  skipbytes = bytes([80, 42, 77, 24]) + skipsize.to_bytes(4, byteorder='little', signed=False)
256  with open(lz4file, 'wb') as f:
257  f.write(skipbytes)
258  f.write(os.urandom(skipsize))
259  # Generate legacy frames
260  lz4file = "{}-legc-1f.lz4".format(filename)
261  execute("{} -l {} {}".format(LZ4, filename, lz4file))
262 
263  # Concatenate --content-size files
264  file_list = glob.glob("{}/test_list_*-lz4f-1f--content-size.lz4".format(TEMP))
265  with open("{}/test_list_{}M-lz4f-2f--content-size.lz4".format(TEMP, sum(SIZES)), 'ab') as outfile:
266  for fname in file_list:
267  with open(fname, 'rb') as infile:
268  outfile.write(infile.read())
269 
270  # Concatenate all files
271  file_list = glob.glob("{}/test_list_*.lz4".format(TEMP))
272  with open("{}/test_list_concat-all.lz4".format(TEMP), 'ab') as outfile:
273  for fname in file_list:
274  with open(fname, 'rb') as infile:
275  outfile.write(infile.read())
276 
277 
278 if __name__ == '__main__':
279  cleanup()
281  unittest.main(verbosity=2, exit=False)
282  cleanup(silent=True)
size_t len
Definition: 6502dis.c:15
static ut8 bytes[32]
Definition: asm_arc.c:23
def __init__(self, line_in)
#define append(x, y)
Definition: cmd_print.c:1740
def log(text)
def to_human(size)
def execute(command, print_command=True, print_output=False, print_error=True, param_shell=True)
def generate_files()
def errout(text, err=1)
def datagen(file_name, size)
def cleanup(silent=False)
static int
Definition: sfsocketcall.h:114
Definition: gzlog.c:289
Definition: zipint.h:278