Viewing: test_util.py
# -*- coding: utf-8 -*-
# Copyright 2013 Google Inc. All Rights Reserved.
#
# Permission is hereby granted, free of charge, to any person obtaining a
# copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish, dis-
# tribute, sublicense, and/or sell copies of the Software, and to permit
# persons to whom the Software is furnished to do so, subject to the fol-
# lowing conditions:
#
# The above copyright notice and this permission notice shall be included
# in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
# IN THE SOFTWARE.
"""Tests for gsutil utility functions."""
from __future__ import absolute_import
from __future__ import print_function
from __future__ import division
from __future__ import unicode_literals
from gslib.utils import boto_util
from gslib.utils import constants
from gslib.utils import system_util
from gslib.utils import ls_helper
from gslib.utils import retry_util
from gslib.utils import text_util
from gslib.utils import unit_util
import gslib.tests.testcase as testcase
from gslib.tests.util import SetEnvironmentForTest
from gslib.tests.util import TestParams
from gslib.utils.text_util import CompareVersions
from gslib.utils.unit_util import DecimalShort
from gslib.utils.unit_util import HumanReadableWithDecimalPlaces
from gslib.utils.unit_util import PrettyTime
import httplib2
import os
import six
from six import add_move, MovedModule
add_move(MovedModule('mock', 'mock', 'unittest.mock'))
from six.moves import mock
# TODO: Split tests for <foo>_util methods out into their own test classes.
class TestUtil(testcase.GsUtilUnitTestCase):
"""Tests for utility functions."""
def testMakeHumanReadable(self):
"""Tests converting byte counts to human-readable strings."""
self.assertEqual(unit_util.MakeHumanReadable(0), '0 B')
self.assertEqual(unit_util.MakeHumanReadable(1023), '1023 B')
self.assertEqual(unit_util.MakeHumanReadable(1024), '1 KiB')
self.assertEqual(unit_util.MakeHumanReadable(1024**2), '1 MiB')
self.assertEqual(unit_util.MakeHumanReadable(1024**3), '1 GiB')
self.assertEqual(unit_util.MakeHumanReadable(1024**3 * 5.3), '5.3 GiB')
self.assertEqual(unit_util.MakeHumanReadable(1024**4 * 2.7), '2.7 TiB')
self.assertEqual(unit_util.MakeHumanReadable(1024**5), '1 PiB')
self.assertEqual(unit_util.MakeHumanReadable(1024**6), '1 EiB')
def testMakeBitsHumanReadable(self):
"""Tests converting bit counts to human-readable strings."""
self.assertEqual(unit_util.MakeBitsHumanReadable(0), '0 bit')
self.assertEqual(unit_util.MakeBitsHumanReadable(1023), '1023 bit')
self.assertEqual(unit_util.MakeBitsHumanReadable(1024), '1 Kibit')
self.assertEqual(unit_util.MakeBitsHumanReadable(1024**2), '1 Mibit')
self.assertEqual(unit_util.MakeBitsHumanReadable(1024**3), '1 Gibit')
self.assertEqual(unit_util.MakeBitsHumanReadable(1024**3 * 5.3),
'5.3 Gibit')
self.assertEqual(unit_util.MakeBitsHumanReadable(1024**4 * 2.7),
'2.7 Tibit')
self.assertEqual(unit_util.MakeBitsHumanReadable(1024**5), '1 Pibit')
self.assertEqual(unit_util.MakeBitsHumanReadable(1024**6), '1 Eibit')
def testHumanReadableToBytes(self):
"""Tests converting human-readable strings to byte counts."""
self.assertEqual(unit_util.HumanReadableToBytes('1'), 1)
self.assertEqual(unit_util.HumanReadableToBytes('15'), 15)
self.assertEqual(unit_util.HumanReadableToBytes('15.3'), 15)
self.assertEqual(unit_util.HumanReadableToBytes('15.7'), 16)
self.assertEqual(unit_util.HumanReadableToBytes('1023'), 1023)
self.assertEqual(unit_util.HumanReadableToBytes('1k'), 1024)
self.assertEqual(unit_util.HumanReadableToBytes('2048'), 2048)
self.assertEqual(unit_util.HumanReadableToBytes('1 k'), 1024)
self.assertEqual(unit_util.HumanReadableToBytes('1 K'), 1024)
self.assertEqual(unit_util.HumanReadableToBytes('1 KB'), 1024)
self.assertEqual(unit_util.HumanReadableToBytes('1 KiB'), 1024)
self.assertEqual(unit_util.HumanReadableToBytes('1 m'), 1024**2)
self.assertEqual(unit_util.HumanReadableToBytes('1 M'), 1024**2)
self.assertEqual(unit_util.HumanReadableToBytes('1 MB'), 1024**2)
self.assertEqual(unit_util.HumanReadableToBytes('1 MiB'), 1024**2)
self.assertEqual(unit_util.HumanReadableToBytes('1 g'), 1024**3)
self.assertEqual(unit_util.HumanReadableToBytes('1 G'), 1024**3)
self.assertEqual(unit_util.HumanReadableToBytes('1 GB'), 1024**3)
self.assertEqual(unit_util.HumanReadableToBytes('1 GiB'), 1024**3)
self.assertEqual(unit_util.HumanReadableToBytes('1t'), 1024**4)
self.assertEqual(unit_util.HumanReadableToBytes('1T'), 1024**4)
self.assertEqual(unit_util.HumanReadableToBytes('1TB'), 1024**4)
self.assertEqual(unit_util.HumanReadableToBytes('1TiB'), 1024**4)
self.assertEqual(unit_util.HumanReadableToBytes('1\t p'), 1024**5)
self.assertEqual(unit_util.HumanReadableToBytes('1\t P'), 1024**5)
self.assertEqual(unit_util.HumanReadableToBytes('1\t PB'), 1024**5)
self.assertEqual(unit_util.HumanReadableToBytes('1\t PiB'), 1024**5)
self.assertEqual(unit_util.HumanReadableToBytes('1e'), 1024**6)
self.assertEqual(unit_util.HumanReadableToBytes('1E'), 1024**6)
self.assertEqual(unit_util.HumanReadableToBytes('1EB'), 1024**6)
self.assertEqual(unit_util.HumanReadableToBytes('1EiB'), 1024**6)
def testCompareVersions(self):
"""Tests CompareVersions for various use cases."""
# CompareVersions(first, second) returns (g, m), where
# g is True if first known to be greater than second, else False.
# m is True if first known to be greater by at least 1 major version,
(g, m) = CompareVersions('3.37', '3.2')
self.assertTrue(g)
self.assertFalse(m)
(g, m) = CompareVersions('7', '2')
self.assertTrue(g)
self.assertTrue(m)
(g, m) = CompareVersions('3.32', '3.32pre')
self.assertTrue(g)
self.assertFalse(m)
(g, m) = CompareVersions('3.32pre', '3.31')
self.assertTrue(g)
self.assertFalse(m)
(g, m) = CompareVersions('3.4pre', '3.3pree')
self.assertTrue(g)
self.assertFalse(m)
(g, m) = CompareVersions('3.2', '3.37')
self.assertFalse(g)
self.assertFalse(m)
(g, m) = CompareVersions('2', '7')
self.assertFalse(g)
self.assertFalse(m)
(g, m) = CompareVersions('3.32pre', '3.32')
self.assertFalse(g)
self.assertFalse(m)
(g, m) = CompareVersions('3.31', '3.32pre')
self.assertFalse(g)
self.assertFalse(m)
(g, m) = CompareVersions('3.3pre', '3.3pre')
self.assertFalse(g)
self.assertFalse(m)
(g, m) = CompareVersions('foobar', 'baz')
self.assertFalse(g)
self.assertFalse(m)
(g, m) = CompareVersions('3.32', 'baz')
self.assertFalse(g)
self.assertFalse(m)
(g, m) = CompareVersions('3.4', '3.3')
self.assertTrue(g)
self.assertFalse(m)
(g, m) = CompareVersions('3.3', '3.4')
self.assertFalse(g)
self.assertFalse(m)
(g, m) = CompareVersions('4.1', '3.33')
self.assertTrue(g)
self.assertTrue(m)
(g, m) = CompareVersions('3.10', '3.1')
self.assertTrue(g)
self.assertFalse(m)
def _AssertProxyInfosEqual(self, pi1, pi2):
self.assertEqual(pi1.proxy_type, pi2.proxy_type)
self.assertEqual(pi1.proxy_host, pi2.proxy_host)
self.assertEqual(pi1.proxy_port, pi2.proxy_port)
self.assertEqual(pi1.proxy_rdns, pi2.proxy_rdns)
self.assertEqual(pi1.proxy_user, pi2.proxy_user)
self.assertEqual(pi1.proxy_pass, pi2.proxy_pass)
def testMakeMetadataLine(self):
test_params = (TestParams(args=('AFairlyShortKey', 'Value'),
expected=' AFairlyShortKey: Value'),
TestParams(args=('', 'Value'),
expected=' : Value'),
TestParams(args=('AnotherKey', 'Value'),
kwargs={'indent': 2},
expected=' AnotherKey: Value'),
TestParams(args=('AKeyMuchLongerThanTheLast', 'Value'),
expected=(' AKeyMuchLongerThanTheLast:Value')))
for params in test_params:
line = ls_helper.MakeMetadataLine(*(params.args), **(params.kwargs))
self.assertEqual(line, params.expected)
def testSetProxyInfo(self):
"""Tests SetProxyInfo for various proxy use cases in boto file."""
valid_proxy_types = ['socks4', 'socks5', 'http']
valid_proxy_host = ['hostname', '1.2.3.4', None]
valid_proxy_port = [8888, 0]
valid_proxy_user = ['foo', None]
valid_proxy_pass = ['Bar', None]
valid_proxy_rdns = [True, False, None]
proxy_type_spec = {
'socks4': httplib2.socks.PROXY_TYPE_SOCKS4,
'socks5': httplib2.socks.PROXY_TYPE_SOCKS5,
'http': httplib2.socks.PROXY_TYPE_HTTP,
'https': httplib2.socks.PROXY_TYPE_HTTP
}
#Generate all input combination values
boto_proxy_config_test_values = [{
'proxy_host': p_h,
'proxy_type': p_t,
'proxy_port': p_p,
'proxy_user': p_u,
'proxy_pass': p_s,
'proxy_rdns': p_d
}
for p_h in valid_proxy_host
for p_s in valid_proxy_pass
for p_p in valid_proxy_port
for p_u in valid_proxy_user
for p_t in valid_proxy_types
for p_d in valid_proxy_rdns]
#Test all input combination values
with SetEnvironmentForTest({'http_proxy': 'http://host:50'}):
for test_values in boto_proxy_config_test_values:
proxy_type = proxy_type_spec.get(test_values.get('proxy_type'))
proxy_host = test_values.get('proxy_host')
proxy_port = test_values.get('proxy_port')
proxy_user = test_values.get('proxy_user')
proxy_pass = test_values.get('proxy_pass')
proxy_rdns = bool(test_values.get('proxy_rdns'))
# Added to force socks proxies not to use rdns as in SetProxyInfo()
if not (proxy_type == proxy_type_spec['http']):
proxy_rdns = False
expected = httplib2.ProxyInfo(proxy_host=proxy_host,
proxy_type=proxy_type,
proxy_port=proxy_port,
proxy_user=proxy_user,
proxy_pass=proxy_pass,
proxy_rdns=proxy_rdns)
# Checks to make sure environment variable fallbacks are working
if not (expected.proxy_host and expected.proxy_port):
expected = httplib2.ProxyInfo(proxy_type_spec['http'], 'host', 50)
# Assume proxy_rnds is True if a proxy environment variable exists.
if test_values.get('proxy_rdns') == None:
expected.proxy_rdns = True
self._AssertProxyInfosEqual(boto_util.SetProxyInfo(test_values),
expected)
def testProxyInfoFromEnvironmentVar(self):
"""Tests ProxyInfoFromEnvironmentVar for various cases."""
valid_variables = ['http_proxy', 'https_proxy']
if not system_util.IS_WINDOWS:
# Dynamically set Windows environment variables are case-insensitive.
valid_variables.append('HTTPS_PROXY')
# Clear any existing environment variables for the duration of the test.
clear_dict = {}
for key in valid_variables:
clear_dict[key] = None
with SetEnvironmentForTest(clear_dict):
for env_var in valid_variables:
for url_string in ['hostname', 'http://hostname', 'https://hostname']:
with SetEnvironmentForTest({env_var: url_string}):
self._AssertProxyInfosEqual(
boto_util.ProxyInfoFromEnvironmentVar(env_var),
httplib2.ProxyInfo(
httplib2.socks.PROXY_TYPE_HTTP, 'hostname',
443 if env_var.lower().startswith('https') else 80))
# Shouldn't populate info for other variables
for other_env_var in valid_variables:
if other_env_var == env_var:
continue
self._AssertProxyInfosEqual(
boto_util.ProxyInfoFromEnvironmentVar(other_env_var),
httplib2.ProxyInfo(httplib2.socks.PROXY_TYPE_HTTP, None, 0))
for url_string in [
'1.2.3.4:50', 'http://1.2.3.4:50', 'https://1.2.3.4:50'
]:
with SetEnvironmentForTest({env_var: url_string}):
self._AssertProxyInfosEqual(
boto_util.ProxyInfoFromEnvironmentVar(env_var),
httplib2.ProxyInfo(httplib2.socks.PROXY_TYPE_HTTP, '1.2.3.4',
50))
for url_string in [
'foo:bar@1.2.3.4:50', 'http://foo:bar@1.2.3.4:50',
'https://foo:bar@1.2.3.4:50'
]:
with SetEnvironmentForTest({env_var: url_string}):
self._AssertProxyInfosEqual(
boto_util.ProxyInfoFromEnvironmentVar(env_var),
httplib2.ProxyInfo(httplib2.socks.PROXY_TYPE_HTTP,
'1.2.3.4',
50,
proxy_user='foo',
proxy_pass='bar'))
for url_string in [
'bar@1.2.3.4:50', 'http://bar@1.2.3.4:50', 'https://bar@1.2.3.4:50'
]:
with SetEnvironmentForTest({env_var: url_string}):
self._AssertProxyInfosEqual(
boto_util.ProxyInfoFromEnvironmentVar(env_var),
httplib2.ProxyInfo(httplib2.socks.PROXY_TYPE_HTTP,
'1.2.3.4',
50,
proxy_user='bar'))
for env_var in ['proxy', 'noproxy', 'garbage']:
for url_string in ['1.2.3.4:50', 'http://1.2.3.4:50']:
with SetEnvironmentForTest({env_var: url_string}):
self._AssertProxyInfosEqual(
boto_util.ProxyInfoFromEnvironmentVar(env_var),
httplib2.ProxyInfo(httplib2.socks.PROXY_TYPE_HTTP, None, 0))
# We want to make sure the wrapped function is called without executing it.
@mock.patch.object(retry_util.http_wrapper,
'HandleExceptionsAndRebuildHttpConnections')
@mock.patch.object(retry_util.logging, 'info')
def testWarnAfterManyRetriesHandler(self, mock_log_info_fn, mock_wrapped_fn):
# The only ExceptionRetryArgs attributes that the function cares about are
# num_retries and total_wait_sec; we can pass None for the other values.
retry_args_over_threshold = retry_util.http_wrapper.ExceptionRetryArgs(
None, None, None, 3, None, constants.LONG_RETRY_WARN_SEC + 1)
retry_args_under_threshold = retry_util.http_wrapper.ExceptionRetryArgs(
None, None, None, 2, None, constants.LONG_RETRY_WARN_SEC - 1)
retry_util.LogAndHandleRetries()(retry_args_under_threshold)
self.assertTrue(mock_wrapped_fn.called)
# Check that we didn't emit a message.
self.assertFalse(mock_log_info_fn.called)
retry_util.LogAndHandleRetries()(retry_args_over_threshold)
self.assertEqual(mock_wrapped_fn.call_count, 2)
# Check that we did emit a message.
self.assertTrue(mock_log_info_fn.called)
def testUIDecimalShort(self):
"""Tests DecimalShort for UI."""
self.assertEqual('12.3b', DecimalShort(12345678910))
self.assertEqual('123.5m', DecimalShort(123456789))
self.assertEqual('1.2k', DecimalShort(1234))
self.assertEqual('1.0k', DecimalShort(1000))
self.assertEqual('432', DecimalShort(432))
self.assertEqual('43.2t', DecimalShort(43.25 * 10**12))
self.assertEqual('43.2q', DecimalShort(43.25 * 10**15))
self.assertEqual('43250.0q', DecimalShort(43.25 * 10**18))
def testUIPrettyTime(self):
"""Tests PrettyTime for UI."""
self.assertEqual('25:02:10', PrettyTime(90130))
self.assertEqual('01:00:00', PrettyTime(3600))
self.assertEqual('00:59:59', PrettyTime(3599))
self.assertEqual('100+ hrs', PrettyTime(3600 * 100))
self.assertEqual('999+ hrs', PrettyTime(3600 * 10000))
def testUIHumanReadableWithDecimalPlaces(self):
"""Tests HumanReadableWithDecimalPlaces for UI."""
self.assertEqual('1.0 GiB',
HumanReadableWithDecimalPlaces(1024**3 + 1024**2 * 10, 1))
self.assertEqual('1.0 GiB', HumanReadableWithDecimalPlaces(1024**3), 1)
self.assertEqual('1.01 GiB',
HumanReadableWithDecimalPlaces(1024**3 + 1024**2 * 10, 2))
self.assertEqual('1.000 GiB',
HumanReadableWithDecimalPlaces(1024**3 + 1024**2 * 5, 3))
self.assertEqual('1.10 GiB',
HumanReadableWithDecimalPlaces(1024**3 + 1024**2 * 100, 2))
self.assertEqual('1.100 GiB',
HumanReadableWithDecimalPlaces(1024**3 + 1024**2 * 100, 3))
self.assertEqual('10.00 MiB',
HumanReadableWithDecimalPlaces(1024**2 * 10, 2))
# The test below is good for rounding.
self.assertEqual('2.01 GiB', HumanReadableWithDecimalPlaces(2157969408, 2))
self.assertEqual('2.0 GiB', HumanReadableWithDecimalPlaces(2157969408, 1))
self.assertEqual('0 B', HumanReadableWithDecimalPlaces(0, 0))
self.assertEqual('0.00 B', HumanReadableWithDecimalPlaces(0, 2))
self.assertEqual('0.00000 B', HumanReadableWithDecimalPlaces(0, 5))
def testAmzGenerationTypeConversions(self):
amz_gen_as_str = six.ensure_str('9PpsRjBGjBh90IvIS96dgRc_UL6NyGqD')
amz_gen_as_long = 25923956239092482442895228561437790190304192615858167521375267910356975448388
self.assertEqual(text_util.DecodeLongAsString(amz_gen_as_long),
amz_gen_as_str)
self.assertEqual(text_util.EncodeStringAsLong(amz_gen_as_str),
amz_gen_as_long)
def DoTestAddQueryParamToUrl(self, url, param_name, param_val, expected_url):
new_url = text_util.AddQueryParamToUrl(url, param_name, param_val)
self.assertEqual(new_url, expected_url)
def testAddQueryParamToUrlWorksForASCIIValues(self):
# Note that the params here contain empty values and duplicates.
old_url = 'http://foo.bar/path/endpoint?a=1&a=2&b=3&c='
param_name = 'newparam'
param_val = 'nevalue'
expected_url = '{}&{}={}'.format(old_url, param_name, param_val)
self.DoTestAddQueryParamToUrl(old_url, param_name, param_val, expected_url)
def testAddQueryParamToUrlWorksForUTF8Values(self):
old_url = 'http://foo.bar/path/êndpoint?Â=1&a=2&ß=3&c='
param_name = 'nêwparam'
param_val = 'nêwvalue'
# Expected return value is a UTF-8 encoded `str`.
expected_url = '{}&{}={}'.format(old_url, param_name, param_val)
self.DoTestAddQueryParamToUrl(old_url, param_name, param_val, expected_url)
def testAddQueryParamToUrlWorksForRawUnicodeValues(self):
old_url = 'http://foo.bar/path/êndpoint?Â=1&a=2&ß=3&c='
param_name = 'nêwparam'
param_val = 'nêwvalue'
# Since the original URL was a `unicode`, the returned URL should also be.
expected_url = '{}&{}={}'.format(old_url, param_name, param_val)
self.DoTestAddQueryParamToUrl(old_url, param_name, param_val, expected_url)
@mock.patch.object(boto_util, 'GetMaxUploadCompressionBufferSize')
def testGetMaxConcurrentCompressedUploadsMinimum(self, mock_config):
"""Test GetMaxConcurrentCompressedUploads returns at least 1."""
mock_config.return_value = 0
self.assertEqual(boto_util.GetMaxConcurrentCompressedUploads(), 1)
mock_config.return_value = -1
self.assertEqual(boto_util.GetMaxConcurrentCompressedUploads(), 1)
Back to File Manager