[go: nahoru, domu]

Make zip_helpers.merge_zip() detect conflicts

Bug: 1428082
Change-Id: Ib88529e1eb135978fb6aa10876620496b4a1dbea
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/4429430
Reviewed-by: Sam Maier <smaier@chromium.org>
Commit-Queue: Andrew Grieve <agrieve@chromium.org>
Cr-Commit-Position: refs/heads/main@{#1131268}
diff --git a/build/action_helpers_unittest.py b/build/action_helpers_unittest.py
index a377291b..6a9f9085 100755
--- a/build/action_helpers_unittest.py
+++ b/build/action_helpers_unittest.py
@@ -14,7 +14,7 @@
 import action_helpers
 
 
-class ActionHelperTest(unittest.TestCase):
+class ActionHelpersTest(unittest.TestCase):
   def test_atomic_output(self):
     tmp_file = pathlib.Path(tempfile.mktemp())
     tmp_file.write_text('test')
diff --git a/build/zip_helpers.py b/build/zip_helpers.py
index 4e36b36..b8ab9dd 100644
--- a/build/zip_helpers.py
+++ b/build/zip_helpers.py
@@ -184,12 +184,16 @@
     compress: Overrides compression setting from origin zip entries.
   """
   assert not isinstance(input_zips, str)  # Easy mistake to make.
-  out_zip = output
-  if not isinstance(output, zipfile.ZipFile):
+  if isinstance(output, zipfile.ZipFile):
+    out_zip = output
+    out_filename = output.filename
+  else:
+    assert isinstance(output, str), 'Was: ' + repr(output)
     out_zip = zipfile.ZipFile(output, 'w')
+    out_filename = output
 
   # Include paths in the existing zip here to avoid adding duplicate files.
-  added_names = set(out_zip.namelist())
+  crc_by_name = {i.filename: (out_filename, i.CRC) for i in out_zip.infolist()}
 
   try:
     for in_file in input_zips:
@@ -205,18 +209,30 @@
           else:
             dst_name = info.filename
 
-          # TODO(agrieve): Fail if duplicate entry is not identical.
-          already_added = dst_name in added_names
-          if not already_added:
-            if compress is not None:
-              compress_entry = compress
-            else:
-              compress_entry = info.compress_type != zipfile.ZIP_STORED
-            add_to_zip_hermetic(out_zip,
-                                dst_name,
-                                data=in_zip.read(info),
-                                compress=compress_entry)
-            added_names.add(dst_name)
+          data = in_zip.read(info)
+
+          # If there's a duplicate file, ensure contents is the same and skip
+          # adding it multiple times.
+          if dst_name in crc_by_name:
+            orig_filename, orig_crc = crc_by_name[dst_name]
+            new_crc = zipfile.crc32(data)
+            if new_crc == orig_crc:
+              continue
+            msg = f"""File appeared in multiple inputs with differing contents.
+File: {dst_name}
+Input1: {orig_filename}
+Input2: {in_file}"""
+            raise Exception(msg)
+
+          if compress is not None:
+            compress_entry = compress
+          else:
+            compress_entry = info.compress_type != zipfile.ZIP_STORED
+          add_to_zip_hermetic(out_zip,
+                              dst_name,
+                              data=data,
+                              compress=compress_entry)
+          crc_by_name[dst_name] = (in_file, out_zip.getinfo(dst_name).CRC)
   finally:
     if output is not out_zip:
       out_zip.close()
diff --git a/build/zip_helpers_unittest.py b/build/zip_helpers_unittest.py
new file mode 100755
index 0000000..19000273
--- /dev/null
+++ b/build/zip_helpers_unittest.py
@@ -0,0 +1,58 @@
+#!/usr/bin/env python3
+# Copyright 2023 The Chromium Authors
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+import os
+import pathlib
+import shutil
+import sys
+import tempfile
+import unittest
+import zipfile
+
+import zip_helpers
+
+
+def _make_test_zips(tmp_dir, create_conflct=False):
+  zip1 = os.path.join(tmp_dir, 'A.zip')
+  zip2 = os.path.join(tmp_dir, 'B.zip')
+  with zipfile.ZipFile(zip1, 'w') as z:
+    z.writestr('file1', 'AAAAA')
+    z.writestr('file2', 'BBBBB')
+  with zipfile.ZipFile(zip2, 'w') as z:
+    z.writestr('file2', 'ABABA' if create_conflct else 'BBBBB')
+    z.writestr('file3', 'CCCCC')
+  return zip1, zip2
+
+
+class ZipHelpersTest(unittest.TestCase):
+  def test_merge_zips__identical_file(self):
+    with tempfile.TemporaryDirectory() as tmp_dir:
+      zip1, zip2 = _make_test_zips(tmp_dir)
+
+      merged_zip = os.path.join(tmp_dir, 'merged.zip')
+      zip_helpers.merge_zips(merged_zip, [zip1, zip2])
+
+      with zipfile.ZipFile(merged_zip) as z:
+        self.assertEqual(z.namelist(), ['file1', 'file2', 'file3'])
+
+  def test_merge_zips__conflict(self):
+    with tempfile.TemporaryDirectory() as tmp_dir:
+      zip1, zip2 = _make_test_zips(tmp_dir, create_conflct=True)
+
+      merged_zip = os.path.join(tmp_dir, 'merged.zip')
+      with self.assertRaises(Exception):
+        zip_helpers.merge_zips(merged_zip, [zip1, zip2])
+
+  def test_merge_zips__conflict_with_append(self):
+    with tempfile.TemporaryDirectory() as tmp_dir:
+      zip1, zip2 = _make_test_zips(tmp_dir, create_conflct=True)
+
+      with self.assertRaises(Exception):
+        with zipfile.ZipFile(zip1, 'a') as dst_zip:
+          zip_helpers.merge_zips(dst_zip, [zip2])
+
+
+if __name__ == '__main__':
+  unittest.main()