Viewing: test_mv.py
# -*- coding: utf-8 -*-
# Copyright 2013 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Integration tests for mv command."""
from __future__ import absolute_import
from __future__ import print_function
from __future__ import division
from __future__ import unicode_literals
import os
from gslib.cs_api_map import ApiSelector
from gslib.tests.test_cp import TestCpMvPOSIXBucketToLocalErrors
from gslib.tests.test_cp import TestCpMvPOSIXBucketToLocalNoErrors
from gslib.tests.test_cp import TestCpMvPOSIXLocalToBucketNoErrors
import gslib.tests.testcase as testcase
from gslib.tests.testcase.integration_testcase import SkipForS3
from gslib.tests.util import ObjectToURI as suri
from gslib.tests.util import SequentialAndParallelTransfer
from gslib.tests.util import SetBotoConfigForTest
from gslib.tests.util import SetEnvironmentForTest
from gslib.tests.util import unittest
from gslib.utils.boto_util import UsingCrcmodExtension
from gslib.utils.retry_util import Retry
from gslib.utils.system_util import IS_WINDOWS
from gslib.utils import shim_util
class TestMvUnitTests(testcase.GsUtilUnitTestCase):
"""Unit tests for mv command."""
def test_move_bucket_objects_with_duplicate_names_inter_bucket(self):
"""Tests moving multiple top-level items between buckets."""
bucket1_uri = self.CreateBucket()
self.CreateObject(bucket_uri=bucket1_uri,
object_name='dir1/file.txt',
contents=b'data')
self.CreateObject(bucket_uri=bucket1_uri,
object_name='dir2/file.txt',
contents=b'data')
bucket2_uri = self.CreateBucket()
self.RunCommand('mv', [suri(bucket1_uri, '*'), suri(bucket2_uri)])
actual = set(
str(u)
for u in self._test_wildcard_iterator(suri(bucket2_uri, '**')).IterAll(
expand_top_level_buckets=True))
expected = set([
suri(bucket2_uri, 'dir1', 'file.txt'),
suri(bucket2_uri, 'dir2', 'file.txt'),
])
self.assertEqual(actual, expected)
def test_move_bucket_objects_with_duplicate_names_to_bucket_subdir(self):
"""Tests moving multiple top-level items between buckets."""
bucket1_uri = self.CreateBucket()
self.CreateObject(bucket_uri=bucket1_uri,
object_name='dir1/file.txt',
contents=b'data')
self.CreateObject(bucket_uri=bucket1_uri,
object_name='dir2/file.txt',
contents=b'data')
bucket2_uri = self.CreateBucket()
self.RunCommand('mv', [suri(bucket1_uri, '*'), suri(bucket2_uri, 'dir')])
actual = set(
str(u)
for u in self._test_wildcard_iterator(suri(bucket2_uri, '**')).IterAll(
expand_top_level_buckets=True))
expected = set([
suri(bucket2_uri, 'dir', 'dir1', 'file.txt'),
suri(bucket2_uri, 'dir', 'dir2', 'file.txt'),
])
self.assertEqual(actual, expected)
def test_move_dirs_with_duplicate_file_names_to_bucket(self):
"""Tests moving multiple top-level items to a bucket."""
bucket_uri = self.CreateBucket()
dir_path = self.CreateTempDir(test_files=[
('dir1', 'file.txt'),
('dir2', 'file.txt'),
])
self.RunCommand('mv', [dir_path + '/*', suri(bucket_uri)])
actual = set(
str(u)
for u in self._test_wildcard_iterator(suri(bucket_uri, '**')).IterAll(
expand_top_level_buckets=True))
expected = set([
suri(bucket_uri, 'dir1', 'file.txt'),
suri(bucket_uri, 'dir2', 'file.txt'),
])
self.assertEqual(actual, expected)
class TestMvUnitTestsWithShim(testcase.ShimUnitTestBase):
"""Unit tests for mv command with shim."""
def test_shim_translates_flags(self):
bucket_uri = self.CreateBucket()
fpath = self.CreateTempFile(contents=b'abcd')
with SetBotoConfigForTest([('GSUtil', 'use_gcloud_storage', 'True'),
('GSUtil', 'hidden_shim_mode', 'dry_run')]):
with SetEnvironmentForTest({
'CLOUDSDK_CORE_PASS_CREDENTIALS_TO_GSUTIL': 'True',
'CLOUDSDK_ROOT_DIR': 'fake_dir',
}):
mock_log_handler = self.RunCommand(
'mv', ['-a', 'public-read', fpath,
suri(bucket_uri)],
return_log_handler=True)
info_lines = '\n'.join(mock_log_handler.messages['info'])
self.assertIn(
'Gcloud Storage Command: {} storage mv'
' --predefined-acl publicRead {} {}'.format(
shim_util._get_gcloud_binary_path('fake_dir'), fpath,
suri(bucket_uri)), info_lines)
class TestMvE2ETests(testcase.GsUtilIntegrationTestCase):
"""Integration tests for mv command."""
def test_moving(self):
"""Tests moving two buckets, one with 2 objects and one with 0 objects."""
bucket1_uri = self.CreateBucket(test_objects=2)
self.AssertNObjectsInBucket(bucket1_uri, 2)
bucket2_uri = self.CreateBucket()
self.AssertNObjectsInBucket(bucket2_uri, 0)
# Move two objects from bucket1 to bucket2.
objs = [
self.StorageUriCloneReplaceKey(bucket1_uri, key).versionless_uri
for key in bucket1_uri.list_bucket()
]
cmd = (['-m', 'mv'] + objs + [suri(bucket2_uri)])
stderr = self.RunGsUtil(cmd, return_stderr=True)
# Rewrite API may output an additional 'Copying' progress notification.
self.assertGreaterEqual(
stderr.count('Copying'), 2,
'stderr did not contain 2 "Copying" lines:\n%s' % stderr)
self.assertLessEqual(
stderr.count('Copying'), 4,
'stderr did not contain <= 4 "Copying" lines:\n%s' % stderr)
self.assertEqual(
stderr.count('Copying') % 2, 0,
'stderr did not contain even number of "Copying" lines:\n%s' % stderr)
self.assertEqual(stderr.count('Removing'), 2,
'stderr did not contain 2 "Removing" lines:\n%s' % stderr)
self.AssertNObjectsInBucket(bucket1_uri, 0)
self.AssertNObjectsInBucket(bucket2_uri, 2)
# Remove one of the objects.
objs = [
self.StorageUriCloneReplaceKey(bucket2_uri, key).versionless_uri
for key in bucket2_uri.list_bucket()
]
obj1 = objs[0]
self.RunGsUtil(['rm', obj1])
self.AssertNObjectsInBucket(bucket1_uri, 0)
self.AssertNObjectsInBucket(bucket2_uri, 1)
# Move the 1 remaining object back.
objs = [
suri(self.StorageUriCloneReplaceKey(bucket2_uri, key))
for key in bucket2_uri.list_bucket()
]
cmd = (['-m', 'mv'] + objs + [suri(bucket1_uri)])
stderr = self.RunGsUtil(cmd, return_stderr=True)
# Rewrite API may output an additional 'Copying' progress notification.
self.assertGreaterEqual(
stderr.count('Copying'), 1,
'stderr did not contain >= 1 "Copying" lines:\n%s' % stderr)
self.assertLessEqual(
stderr.count('Copying'), 2,
'stderr did not contain <= 2 "Copying" lines:\n%s' % stderr)
self.assertEqual(stderr.count('Removing'), 1)
self.AssertNObjectsInBucket(bucket1_uri, 1)
self.AssertNObjectsInBucket(bucket2_uri, 0)
def test_move_bucket_to_dir(self):
"""Tests moving a local directory to a bucket."""
bucket_uri = self.CreateBucket(test_objects=2)
self.AssertNObjectsInBucket(bucket_uri, 2)
tmpdir = self.CreateTempDir()
self.RunGsUtil(['mv', suri(bucket_uri, '*'), tmpdir])
dir_list = []
for dirname, _, filenames in os.walk(tmpdir):
for filename in filenames:
dir_list.append(os.path.join(dirname, filename))
self.assertEqual(len(dir_list), 2)
self.AssertNObjectsInBucket(bucket_uri, 0)
def test_move_dir_to_bucket(self):
"""Tests moving a local directory to a bucket."""
bucket_uri = self.CreateBucket()
dir_to_move = self.CreateTempDir(test_files=2)
self.RunGsUtil(['mv', dir_to_move, suri(bucket_uri)])
self.AssertNObjectsInBucket(bucket_uri, 2)
@SequentialAndParallelTransfer
def test_stdin_args(self):
"""Tests mv with the -I option."""
tmpdir = self.CreateTempDir()
fpath1 = self.CreateTempFile(tmpdir=tmpdir, contents=b'data1')
fpath2 = self.CreateTempFile(tmpdir=tmpdir, contents=b'data2')
bucket_uri = self.CreateBucket()
self.RunGsUtil(['mv', '-I', suri(bucket_uri)],
stdin='\n'.join((fpath1, fpath2)))
# Use @Retry as hedge against bucket listing eventual consistency.
@Retry(AssertionError, tries=3, timeout_secs=1)
def _Check1():
stdout = self.RunGsUtil(['ls', suri(bucket_uri)], return_stdout=True)
self.assertIn(os.path.basename(fpath1), stdout)
self.assertIn(os.path.basename(fpath2), stdout)
self.assertNumLines(stdout, 2)
_Check1()
def test_mv_no_clobber(self):
"""Tests mv with the -n option."""
fpath1 = self.CreateTempFile(contents=b'data1')
bucket_uri = self.CreateBucket()
object_uri = self.CreateObject(bucket_uri=bucket_uri, contents=b'data2')
stderr = self.RunGsUtil(
['mv', '-n', fpath1, suri(object_uri)], return_stderr=True)
# Copy should be skipped and source file should not be removed.
if self._use_gcloud_storage:
self.assertIn(
'Skipping existing destination item (no-clobber): %s' %
suri(object_uri), stderr)
else:
self.assertIn('Skipping existing item: %s' % suri(object_uri), stderr)
self.assertNotIn('Removing %s' % suri(fpath1), stderr)
# Object content should be unchanged.
contents = self.RunGsUtil(['cat', suri(object_uri)], return_stdout=True)
self.assertEqual(contents, 'data2')
@unittest.skipIf(IS_WINDOWS, 'POSIX attributes not available on Windows.')
@unittest.skipUnless(UsingCrcmodExtension(), 'Test requires fast crcmod.')
def test_mv_preserve_posix_bucket_to_dir_no_errors(self):
"""Tests use of the -P flag with mv from a bucket to a local dir.
Specifically tests combinations of POSIX attributes in metadata that will
pass validation.
"""
bucket_uri = self.CreateBucket()
tmpdir = self.CreateTempDir()
TestCpMvPOSIXBucketToLocalNoErrors(self, bucket_uri, tmpdir, is_cp=False)
@unittest.skipIf(IS_WINDOWS, 'POSIX attributes not available on Windows.')
def test_mv_preserve_posix_bucket_to_dir_errors(self):
"""Tests use of the -P flag with mv from a bucket to a local dir.
Specifically, combinations of POSIX attributes in metadata that will fail
validation.
"""
bucket_uri = self.CreateBucket()
tmpdir = self.CreateTempDir()
obj = self.CreateObject(bucket_uri=bucket_uri,
object_name='obj',
contents=b'obj')
TestCpMvPOSIXBucketToLocalErrors(self, bucket_uri, obj, tmpdir, is_cp=False)
@unittest.skipIf(IS_WINDOWS, 'POSIX attributes not available on Windows.')
def test_mv_preseve_posix_dir_to_bucket_no_errors(self):
"""Tests use of the -P flag with mv from a local dir to a bucket."""
bucket_uri = self.CreateBucket()
TestCpMvPOSIXLocalToBucketNoErrors(self, bucket_uri, is_cp=False)
@SkipForS3('Test is only relevant for gs storage classes.')
def test_mv_early_deletion_warning(self):
"""Tests that mv on a recent nearline object warns about early deletion."""
if self.test_api == ApiSelector.XML:
return unittest.skip('boto does not return object storage class')
bucket_uri = self.CreateBucket(storage_class='NEARLINE')
object_uri = self.CreateObject(bucket_uri=bucket_uri, contents=b'obj')
stderr = self.RunGsUtil(
['mv', suri(object_uri),
suri(bucket_uri, 'foo')], return_stderr=True)
self.assertIn(
'Warning: moving nearline object %s may incur an early deletion '
'charge, because the original object is less than 30 days old '
'according to the local system time.' % suri(object_uri), stderr)
def test_move_bucket_objects_with_duplicate_names_to_dir(self):
"""Tests moving multiple top-level items to a bucket."""
bucket_uri = self.CreateBucket()
self.CreateObject(bucket_uri=bucket_uri,
object_name='dir1/file.txt',
contents=b'data')
self.CreateObject(bucket_uri=bucket_uri,
object_name='dir2/file.txt',
contents=b'data')
self.AssertNObjectsInBucket(bucket_uri, 2)
tmpdir = self.CreateTempDir()
self.RunGsUtil(['mv', suri(bucket_uri, '*'), tmpdir])
file_list = []
for dirname, _, filenames in os.walk(tmpdir):
for filename in filenames:
file_list.append(os.path.join(dirname, filename))
self.assertEqual(len(file_list), 2)
self.assertIn('{}{}dir1{}file.txt'.format(tmpdir, os.sep, os.sep),
file_list)
self.assertIn('{}{}dir2{}file.txt'.format(tmpdir, os.sep, os.sep),
file_list)
self.AssertNObjectsInBucket(bucket_uri, 0)
Back to File Manager