Server Admin

Viewing: test_rpo.py

# -*- coding: utf-8 -*-
# Copyright 2021 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for rpo command."""

from __future__ import absolute_import

import os
import textwrap

from gslib.commands.rpo import RpoCommand
from gslib.exception import CommandException
from gslib.gcs_json_api import GcsJsonApi
from gslib.storage_url import StorageUrlFromString
import gslib.tests.testcase as testcase
from gslib.tests.testcase.integration_testcase import SkipForGS
from gslib.tests.testcase.integration_testcase import SkipForJSON
from gslib.tests.testcase.integration_testcase import SkipForXML
from gslib.tests.util import ObjectToURI as suri
from gslib.tests.util import SetBotoConfigForTest
from gslib.tests.util import SetEnvironmentForTest
from gslib.utils import shim_util

from six import add_move, MovedModule

add_move(MovedModule('mock', 'mock', 'unittest.mock'))
from six.moves import mock


class TestRpoUnit(testcase.GsUtilUnitTestCase):

  def test_get_for_multiple_bucket_calls_api(self):
    bucket_uri1 = self.CreateBucket(bucket_name='rpofoo')
    bucket_uri2 = self.CreateBucket(bucket_name='rpobar')
    stdout = self.RunCommand(
        'rpo',
        ['get', suri(bucket_uri1), suri(bucket_uri2)],
        return_stdout=True)
    expected_string = textwrap.dedent("""\
      gs://rpofoo: None
      gs://rpobar: None
      """)
    self.assertEqual(expected_string, stdout)

  def test_get_with_wildcard(self):
    self.CreateBucket(bucket_name='boo1')
    self.CreateBucket(bucket_name='boo2')
    stdout = self.RunCommand('rpo', ['get', 'gs://boo*'], return_stdout=True)
    actual = '\n'.join(sorted(stdout.strip().split('\n')))
    expected_string = textwrap.dedent("""\
      gs://boo1: None
      gs://boo2: None""")
    self.assertEqual(actual, expected_string)

  def test_get_with_wrong_url_raises_error(self):
    with self.assertRaisesRegex(CommandException, 'No URLs matched'):
      self.RunCommand('rpo', ['get', 'gs://invalid*'])

  def test_set_called_with_incorrect_value_raises_error(self):
    with self.assertRaisesRegex(
        CommandException,
        r'Invalid value for rpo set. Should be one of \(ASYNC_TURBO\|DEFAULT\)'
    ):
      self.RunCommand('rpo', ['set', 'random', 'gs://boo*'])

  def test_set_called_with_lower_case_value_raises_error(self):
    with self.assertRaisesRegex(
        CommandException,
        r'Invalid value for rpo set. Should be one of \(ASYNC_TURBO\|DEFAULT\)'
    ):
      self.RunCommand('rpo', ['set', 'async_turbo', 'gs://boo*'])

  def test_invalid_subcommand_raises_error(self):
    with self.assertRaisesRegex(
        CommandException, 'Invalid subcommand "blah", use get|set instead'):
      self.RunCommand('rpo', ['blah', 'DEFAULT', 'gs://boo*'])


class TestRpoUnitWithShim(testcase.ShimUnitTestBase):

  def test_shim_translates_recovery_point_objective_get_command(self):
    fake_cloudsdk_dir = 'fake_dir'
    with SetBotoConfigForTest([('GSUtil', 'use_gcloud_storage', 'True'),
                               ('GSUtil', 'hidden_shim_mode', 'dry_run')]):
      with SetEnvironmentForTest({
          'CLOUDSDK_CORE_PASS_CREDENTIALS_TO_GSUTIL': 'True',
          'CLOUDSDK_ROOT_DIR': fake_cloudsdk_dir,
      }):
        self.CreateBucket(bucket_name='fake-bucket-get-rpo-1')
        mock_log_handler = self.RunCommand(
            'rpo',
            args=['get', 'gs://fake-bucket-get-rpo-1'],
            return_log_handler=True)

        info_lines = '\n'.join(mock_log_handler.messages['info'])
        self.assertIn(
            ('Gcloud Storage Command: {} storage'
             ' buckets list --format=value[separator=": "]'
             '(format("gs://{}", name),rpo.yesno(no="None"))'
             ' --raw').format(shim_util._get_gcloud_binary_path('fake_dir'),
                              r'{}'), info_lines)

  def test_shim_translates_recovery_point_objective_set_command(self):
    fake_cloudsdk_dir = 'fake_dir'
    with SetBotoConfigForTest([('GSUtil', 'use_gcloud_storage', 'True'),
                               ('GSUtil', 'hidden_shim_mode', 'dry_run')]):
      with SetEnvironmentForTest({
          'CLOUDSDK_CORE_PASS_CREDENTIALS_TO_GSUTIL': 'True',
          'CLOUDSDK_ROOT_DIR': fake_cloudsdk_dir,
      }):
        self.CreateBucket(bucket_name='fake-bucket-set-rpo')
        mock_log_handler = self.RunCommand(
            'rpo',
            args=['set', 'DEFAULT', 'gs://fake-bucket-set-rpo'],
            return_log_handler=True)

        info_lines = '\n'.join(mock_log_handler.messages['info'])
        self.assertIn(
            ('Gcloud Storage Command: {} storage'
             ' buckets update --recovery-point-objective DEFAULT').format(
                 shim_util._get_gcloud_binary_path('fake_dir')), info_lines)


class TestRpoE2E(testcase.GsUtilIntegrationTestCase):
  """Integration tests for rpo command."""

  # TODO: Delete this method once rpo get results are consistent
  # from the backend, and replace this call with
  # VerifyCommandGet(bucket_uri, 'rpo', 'DEFAULT').
  # Currently, the rpo results are inconsistent
  # and None is a valid default value for all the buckets.
  # See b/197251750#comment19
  def _verify_get_returns_default_or_none(self, bucket_uri):
    """Checks if the rpo get command returns default."""
    try:
      self.VerifyCommandGet(bucket_uri, 'rpo', 'DEFAULT')
    except AssertionError:
      self.VerifyCommandGet(bucket_uri, 'rpo', 'None')

  @SkipForXML('RPO only runs on GCS JSON API.')
  def test_get_returns_default_for_dual_region_bucket(self):
    bucket_uri = self.CreateBucket(location='nam4')
    self._verify_get_returns_default_or_none(bucket_uri)

  @SkipForXML('RPO only runs on GCS JSON API.')
  def test_get_returns_none_for_regional_bucket(self):
    bucket_uri = self.CreateBucket(location='us-central1')
    self.VerifyCommandGet(bucket_uri, 'rpo', 'None')

  @SkipForXML('RPO only runs on GCS JSON API.')
  def test_set_and_get_async_turbo(self):
    bucket_uri = self.CreateBucket(location='nam4')
    self._verify_get_returns_default_or_none(bucket_uri)
    self.RunGsUtil(['rpo', 'set', 'ASYNC_TURBO', suri(bucket_uri)])
    self.VerifyCommandGet(bucket_uri, 'rpo', 'ASYNC_TURBO')

  @SkipForXML('RPO only runs on GCS JSON API.')
  def test_set_default(self):
    bucket_uri = self.CreateBucket(location='nam4')
    self.RunGsUtil(['rpo', 'set', 'ASYNC_TURBO', suri(bucket_uri)])
    self.VerifyCommandGet(bucket_uri, 'rpo', 'ASYNC_TURBO')
    self.RunGsUtil(['rpo', 'set', 'DEFAULT', suri(bucket_uri)])
    self._verify_get_returns_default_or_none(bucket_uri)

  @SkipForXML('RPO only runs on GCS JSON API.')
  def test_set_async_turbo_fails_for_regional_buckets(self):
    bucket_uri = self.CreateBucket(location='us-central1')
    stderr = self.RunGsUtil(['rpo', 'set', 'ASYNC_TURBO',
                             suri(bucket_uri)],
                            expected_status=1,
                            return_stderr=True)
    self.assertIn('ASYNC_TURBO cannot be enabled on REGION bucket', stderr)

  @SkipForJSON('Testing XML only behavior.')
  def test_xml_fails_for_set(self):
    # Use HMAC for force XML API.
    boto_config_hmac_auth_only = [
        # Overwrite other credential types.
        ('Credentials', 'gs_oauth2_refresh_token', None),
        ('Credentials', 'gs_service_client_id', None),
        ('Credentials', 'gs_service_key_file', None),
        ('Credentials', 'gs_service_key_file_password', None),
        # Add hmac credentials.
        ('Credentials', 'gs_access_key_id', 'dummykey'),
        ('Credentials', 'gs_secret_access_key', 'dummysecret'),
    ]
    with SetBotoConfigForTest(boto_config_hmac_auth_only):
      bucket_uri = 'gs://any-bucket-name'
      stderr = self.RunGsUtil(['rpo', 'set', 'default', bucket_uri],
                              return_stderr=True,
                              expected_status=1)
      self.assertIn('command can only be with the Cloud Storage JSON API',
                    stderr)

  @SkipForJSON('Testing XML only behavior.')
  def test_xml_fails_for_get(self):
    # Use HMAC for force XML API.
    boto_config_hmac_auth_only = [
        # Overwrite other credential types.
        ('Credentials', 'gs_oauth2_refresh_token', None),
        ('Credentials', 'gs_service_client_id', None),
        ('Credentials', 'gs_service_key_file', None),
        ('Credentials', 'gs_service_key_file_password', None),
        # Add HMAC credentials.
        ('Credentials', 'gs_access_key_id', 'dummykey'),
        ('Credentials', 'gs_secret_access_key', 'dummysecret'),
    ]
    with SetBotoConfigForTest(boto_config_hmac_auth_only):
      bucket_uri = 'gs://any-bucket-name'
      stderr = self.RunGsUtil(['rpo', 'get', bucket_uri],
                              return_stderr=True,
                              expected_status=1)
      self.assertIn('command can only be with the Cloud Storage JSON API',
                    stderr)

  @SkipForGS('Testing S3 only behavior.')
  def test_s3_fails_for_set(self):
    bucket_uri = self.CreateBucket()
    stderr = self.RunGsUtil(
        ['rpo', 'set', 'DEFAULT', suri(bucket_uri)],
        return_stderr=True,
        expected_status=1)

    if self._use_gcloud_storage:
      self.assertIn(
          'Features disallowed for S3: Setting Recovery Point Objective',
          stderr)
    else:
      self.assertIn('command can only be used for GCS buckets', stderr)

  @SkipForGS('Testing S3 only behavior.')
  def test_s3_fails_for_get(self):
    bucket_uri = self.CreateBucket()
    expected_status = 0 if self._use_gcloud_storage else 1
    stdout, stderr = self.RunGsUtil(
        ['rpo', 'get', suri(bucket_uri)],
        return_stderr=True,
        return_stdout=True,
        expected_status=expected_status)
    if self._use_gcloud_storage:
      # TODO(b/265304295)
      self.assertIn('gs://None: None', stdout)
    else:
      self.assertIn('command can only be used for GCS buckets', stderr)
Back to File Manager