Make zip_helpers.merge_zip() detect conflicts
Bug: 1428082
Change-Id: Ib88529e1eb135978fb6aa10876620496b4a1dbea
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/4429430
Reviewed-by: Sam Maier <smaier@chromium.org>
Commit-Queue: Andrew Grieve <agrieve@chromium.org>
Cr-Commit-Position: refs/heads/main@{#1131268}
diff --git a/build/action_helpers_unittest.py b/build/action_helpers_unittest.py
index a377291b..6a9f9085 100755
--- a/build/action_helpers_unittest.py
+++ b/build/action_helpers_unittest.py
@@ -14,7 +14,7 @@
import action_helpers
-class ActionHelperTest(unittest.TestCase):
+class ActionHelpersTest(unittest.TestCase):
def test_atomic_output(self):
tmp_file = pathlib.Path(tempfile.mktemp())
tmp_file.write_text('test')
diff --git a/build/zip_helpers.py b/build/zip_helpers.py
index 4e36b36..b8ab9dd 100644
--- a/build/zip_helpers.py
+++ b/build/zip_helpers.py
@@ -184,12 +184,16 @@
compress: Overrides compression setting from origin zip entries.
"""
assert not isinstance(input_zips, str) # Easy mistake to make.
- out_zip = output
- if not isinstance(output, zipfile.ZipFile):
+ if isinstance(output, zipfile.ZipFile):
+ out_zip = output
+ out_filename = output.filename
+ else:
+ assert isinstance(output, str), 'Was: ' + repr(output)
out_zip = zipfile.ZipFile(output, 'w')
+ out_filename = output
# Include paths in the existing zip here to avoid adding duplicate files.
- added_names = set(out_zip.namelist())
+ crc_by_name = {i.filename: (out_filename, i.CRC) for i in out_zip.infolist()}
try:
for in_file in input_zips:
@@ -205,18 +209,30 @@
else:
dst_name = info.filename
- # TODO(agrieve): Fail if duplicate entry is not identical.
- already_added = dst_name in added_names
- if not already_added:
- if compress is not None:
- compress_entry = compress
- else:
- compress_entry = info.compress_type != zipfile.ZIP_STORED
- add_to_zip_hermetic(out_zip,
- dst_name,
- data=in_zip.read(info),
- compress=compress_entry)
- added_names.add(dst_name)
+ data = in_zip.read(info)
+
+ # If there's a duplicate file, ensure contents is the same and skip
+ # adding it multiple times.
+ if dst_name in crc_by_name:
+ orig_filename, orig_crc = crc_by_name[dst_name]
+ new_crc = zipfile.crc32(data)
+ if new_crc == orig_crc:
+ continue
+ msg = f"""File appeared in multiple inputs with differing contents.
+File: {dst_name}
+Input1: {orig_filename}
+Input2: {in_file}"""
+ raise Exception(msg)
+
+ if compress is not None:
+ compress_entry = compress
+ else:
+ compress_entry = info.compress_type != zipfile.ZIP_STORED
+ add_to_zip_hermetic(out_zip,
+ dst_name,
+ data=data,
+ compress=compress_entry)
+ crc_by_name[dst_name] = (in_file, out_zip.getinfo(dst_name).CRC)
finally:
if output is not out_zip:
out_zip.close()
diff --git a/build/zip_helpers_unittest.py b/build/zip_helpers_unittest.py
new file mode 100755
index 0000000..19000273
--- /dev/null
+++ b/build/zip_helpers_unittest.py
@@ -0,0 +1,58 @@
+#!/usr/bin/env python3
+# Copyright 2023 The Chromium Authors
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+import os
+import pathlib
+import shutil
+import sys
+import tempfile
+import unittest
+import zipfile
+
+import zip_helpers
+
+
+def _make_test_zips(tmp_dir, create_conflct=False):
+ zip1 = os.path.join(tmp_dir, 'A.zip')
+ zip2 = os.path.join(tmp_dir, 'B.zip')
+ with zipfile.ZipFile(zip1, 'w') as z:
+ z.writestr('file1', 'AAAAA')
+ z.writestr('file2', 'BBBBB')
+ with zipfile.ZipFile(zip2, 'w') as z:
+ z.writestr('file2', 'ABABA' if create_conflct else 'BBBBB')
+ z.writestr('file3', 'CCCCC')
+ return zip1, zip2
+
+
+class ZipHelpersTest(unittest.TestCase):
+ def test_merge_zips__identical_file(self):
+ with tempfile.TemporaryDirectory() as tmp_dir:
+ zip1, zip2 = _make_test_zips(tmp_dir)
+
+ merged_zip = os.path.join(tmp_dir, 'merged.zip')
+ zip_helpers.merge_zips(merged_zip, [zip1, zip2])
+
+ with zipfile.ZipFile(merged_zip) as z:
+ self.assertEqual(z.namelist(), ['file1', 'file2', 'file3'])
+
+ def test_merge_zips__conflict(self):
+ with tempfile.TemporaryDirectory() as tmp_dir:
+ zip1, zip2 = _make_test_zips(tmp_dir, create_conflct=True)
+
+ merged_zip = os.path.join(tmp_dir, 'merged.zip')
+ with self.assertRaises(Exception):
+ zip_helpers.merge_zips(merged_zip, [zip1, zip2])
+
+ def test_merge_zips__conflict_with_append(self):
+ with tempfile.TemporaryDirectory() as tmp_dir:
+ zip1, zip2 = _make_test_zips(tmp_dir, create_conflct=True)
+
+ with self.assertRaises(Exception):
+ with zipfile.ZipFile(zip1, 'a') as dst_zip:
+ zip_helpers.merge_zips(dst_zip, [zip2])
+
+
+if __name__ == '__main__':
+ unittest.main()