Server Admin
Server Info
File Manager
Commander
MySQL Client
PHP Info
Editing: copy_helper.py
# -*- coding: utf-8 -*- # Copyright 2011 Google Inc. All Rights Reserved. # Copyright 2011, Nexenta Systems Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Helper functions for copy functionality.""" from __future__ import absolute_import from __future__ import print_function from __future__ import division from __future__ import unicode_literals import base64 from collections import namedtuple import csv import datetime import errno import gzip import json import logging import mimetypes from operator import attrgetter import os import pickle import pyu2f import random import re import shutil import six import stat import subprocess import tempfile import textwrap import time import traceback import six from six.moves import range from six.moves import range from apitools.base.protorpclite import protojson from boto import config import crcmod import gslib from gslib.cloud_api import AccessDeniedException from gslib.cloud_api import ArgumentException from gslib.cloud_api import CloudApi from gslib.cloud_api import EncryptionException from gslib.cloud_api import NotFoundException from gslib.cloud_api import PreconditionException from gslib.cloud_api import Preconditions from gslib.cloud_api import ResumableDownloadException from gslib.cloud_api import ResumableUploadAbortException from gslib.cloud_api import ResumableUploadException from gslib.cloud_api import ResumableUploadStartOverException from gslib.cloud_api import ServiceException from gslib.commands.compose import MAX_COMPOSE_ARITY from gslib.commands.config import DEFAULT_PARALLEL_COMPOSITE_UPLOAD_COMPONENT_SIZE from gslib.commands.config import DEFAULT_PARALLEL_COMPOSITE_UPLOAD_THRESHOLD from gslib.commands.config import DEFAULT_SLICED_OBJECT_DOWNLOAD_COMPONENT_SIZE from gslib.commands.config import DEFAULT_SLICED_OBJECT_DOWNLOAD_MAX_COMPONENTS from gslib.commands.config import DEFAULT_SLICED_OBJECT_DOWNLOAD_THRESHOLD from gslib.commands.config import DEFAULT_GZIP_COMPRESSION_LEVEL from gslib.cs_api_map import ApiSelector from gslib.daisy_chain_wrapper import DaisyChainWrapper from gslib.exception import CommandException from gslib.exception import HashMismatchException from gslib.exception import InvalidUrlError from gslib.file_part import FilePart from gslib.parallel_tracker_file import GenerateComponentObjectPrefix from gslib.parallel_tracker_file import ReadParallelUploadTrackerFile from gslib.parallel_tracker_file import ValidateParallelCompositeTrackerData from gslib.parallel_tracker_file import WriteComponentToParallelUploadTrackerFile from gslib.parallel_tracker_file import WriteParallelUploadTrackerFile from gslib.progress_callback import FileProgressCallbackHandler from gslib.progress_callback import ProgressCallbackWithTimeout from gslib.resumable_streaming_upload import ResumableStreamingJsonUploadWrapper from gslib import storage_url from gslib.storage_url import ContainsWildcard from gslib.storage_url import GenerationFromUrlAndString from gslib.storage_url import IsCloudSubdirPlaceholder from gslib.storage_url import StorageUrlFromString from gslib.third_party.storage_apitools import storage_v1_messages as apitools_messages from gslib.thread_message import FileMessage from gslib.thread_message import RetryableErrorMessage from gslib.tracker_file import DeleteDownloadTrackerFiles from gslib.tracker_file import DeleteTrackerFile from gslib.tracker_file import ENCRYPTION_UPLOAD_TRACKER_ENTRY from gslib.tracker_file import GetDownloadStartByte from gslib.tracker_file import GetTrackerFilePath from gslib.tracker_file import GetUploadTrackerData from gslib.tracker_file import RaiseUnwritableTrackerFileException from gslib.tracker_file import ReadOrCreateDownloadTrackerFile from gslib.tracker_file import SERIALIZATION_UPLOAD_TRACKER_ENTRY from gslib.tracker_file import TrackerFileType from gslib.tracker_file import WriteDownloadComponentTrackerFile from gslib.tracker_file import WriteJsonDataToTrackerFile from gslib.utils import parallelism_framework_util from gslib.utils import stet_util from gslib.utils import temporary_file_util from gslib.utils import text_util from gslib.utils.boto_util import GetJsonResumableChunkSize from gslib.utils.boto_util import GetMaxRetryDelay from gslib.utils.boto_util import GetNumRetries from gslib.utils.boto_util import ResumableThreshold from gslib.utils.boto_util import UsingCrcmodExtension from gslib.utils.cloud_api_helper import GetCloudApiInstance from gslib.utils.cloud_api_helper import GetDownloadSerializationData from gslib.utils.constants import DEFAULT_FILE_BUFFER_SIZE from gslib.utils.constants import MIN_SIZE_COMPUTE_LOGGING from gslib.utils.constants import UTF8 from gslib.utils.encryption_helper import CryptoKeyType from gslib.utils.encryption_helper import CryptoKeyWrapperFromKey from gslib.utils.encryption_helper import FindMatchingCSEKInBotoConfig from gslib.utils.encryption_helper import GetEncryptionKeyWrapper from gslib.utils.hashing_helper import Base64EncodeHash from gslib.utils.hashing_helper import CalculateB64EncodedMd5FromContents from gslib.utils.hashing_helper import CalculateHashesFromContents from gslib.utils.hashing_helper import CHECK_HASH_IF_FAST_ELSE_FAIL from gslib.utils.hashing_helper import CHECK_HASH_NEVER from gslib.utils.hashing_helper import ConcatCrc32c from gslib.utils.hashing_helper import GetDownloadHashAlgs from gslib.utils.hashing_helper import GetMd5 from gslib.utils.hashing_helper import GetUploadHashAlgs from gslib.utils.hashing_helper import HashingFileUploadWrapper from gslib.utils.metadata_util import ObjectIsGzipEncoded from gslib.utils.parallelism_framework_util import AtomicDict from gslib.utils.parallelism_framework_util import CheckMultiprocessingAvailableAndInit from gslib.utils.parallelism_framework_util import PutToQueueWithTimeout from gslib.utils.posix_util import ATIME_ATTR from gslib.utils.posix_util import ConvertDatetimeToPOSIX from gslib.utils.posix_util import GID_ATTR from gslib.utils.posix_util import MODE_ATTR from gslib.utils.posix_util import MTIME_ATTR from gslib.utils.posix_util import ParseAndSetPOSIXAttributes from gslib.utils.posix_util import UID_ATTR from gslib.utils.system_util import CheckFreeSpace from gslib.utils.system_util import GetFileSize from gslib.utils.system_util import GetStreamFromFileUrl from gslib.utils.system_util import IS_WINDOWS from gslib.utils.translation_helper import AddS3MarkerAclToObjectMetadata from gslib.utils.translation_helper import CopyObjectMetadata from gslib.utils.translation_helper import DEFAULT_CONTENT_TYPE from gslib.utils.translation_helper import ObjectMetadataFromHeaders from gslib.utils.translation_helper import PreconditionsFromHeaders from gslib.utils.translation_helper import S3MarkerAclFromObjectMetadata from gslib.utils.unit_util import DivideAndCeil from gslib.utils.unit_util import HumanReadableToBytes from gslib.utils.unit_util import MakeHumanReadable from gslib.utils.unit_util import SECONDS_PER_DAY from gslib.utils.unit_util import TEN_MIB from gslib.wildcard_iterator import CreateWildcardIterator if six.PY3: long = int # pylint: disable=g-import-not-at-top if IS_WINDOWS: import msvcrt # Declare copy_helper_opts as a global because namedtuple isn't aware of # assigning to a class member (which breaks pickling done by multiprocessing). # For details see # http://stackoverflow.com/questions/16377215/how-to-pickle-a-namedtuple-instance-correctly # pylint: disable=global-at-module-level global global_copy_helper_opts # In-memory map of local files that are currently opened for write. Used to # ensure that if we write to the same file twice (say, for example, because the # user specified two identical source URLs), the writes occur serially. global open_files_map, open_files_lock open_files_map = AtomicDict( manager=(parallelism_framework_util.top_level_manager if CheckMultiprocessingAvailableAndInit().is_available else None)) # We don't allow multiple processes on Windows, so using a process-safe lock # would be unnecessary. open_files_lock = parallelism_framework_util.CreateLock() # For debugging purposes; if True, files and objects that fail hash validation # will be saved with the below suffix appended. _RENAME_ON_HASH_MISMATCH = False _RENAME_ON_HASH_MISMATCH_SUFFIX = '_corrupt' PARALLEL_UPLOAD_TEMP_NAMESPACE = ( '/gsutil/tmp/parallel_composite_uploads/for_details_see/gsutil_help_cp/') PARALLEL_UPLOAD_STATIC_SALT = u""" PARALLEL_UPLOAD_SALT_TO_PREVENT_COLLISIONS. The theory is that no user will have prepended this to the front of one of their object names and then done an MD5 hash of the name, and then prepended PARALLEL_UPLOAD_TEMP_NAMESPACE to the front of their object name. Note that there will be no problems with object name length since we hash the original name. """ # When uploading a file, get the following fields in the response for # filling in command output and manifests. UPLOAD_RETURN_FIELDS = [ 'crc32c', 'customerEncryption', 'etag', 'generation', 'md5Hash', 'size', ] # This tuple is used only to encapsulate the arguments needed for # command.Apply() in the parallel composite upload case. # Note that content_type is used instead of a full apitools Object() because # apitools objects are not picklable. # filename: String name of file. # file_start: start byte of file (may be in the middle of a file for partitioned # files). # file_length: length of upload (may not be the entire length of a file for # partitioned files). # src_url: FileUrl describing the source file. # dst_url: CloudUrl describing the destination component file. # canned_acl: canned_acl to apply to the uploaded file/component. # content_type: content-type for final object, used for setting content-type # of components and final object. # tracker_file: tracker file for this component. # tracker_file_lock: tracker file lock for tracker file(s). # gzip_encoded: Whether to use gzip transport encoding for the upload. PerformParallelUploadFileToObjectArgs = namedtuple( 'PerformParallelUploadFileToObjectArgs', 'filename file_start file_length src_url dst_url canned_acl ' 'content_type storage_class tracker_file tracker_file_lock encryption_key_sha256 ' 'gzip_encoded') PerformSlicedDownloadObjectToFileArgs = namedtuple( 'PerformSlicedDownloadObjectToFileArgs', 'component_num src_url src_obj_metadata_json dst_url download_file_name ' 'start_byte end_byte decryption_key') # This tuple is used only to encapsulate the arguments returned by # _PerformSlicedDownloadObjectToFile. # component_num: Component number. # crc32c: CRC32C hash value (integer) of the downloaded bytes # bytes_transferred: The number of bytes transferred, potentially less # than the component size if the download was resumed. # component_total_size: The number of bytes corresponding to the whole # component size, potentially more than bytes_transferred # if the download was resumed. # server_encoding: Content-encoding string if it was detected that the server # sent encoded bytes during transfer, None otherwise. PerformSlicedDownloadReturnValues = namedtuple( 'PerformSlicedDownloadReturnValues', 'component_num crc32c bytes_transferred component_total_size ' 'server_encoding') # TODO: Refactor this file to be less cumbersome. In particular, some of the # different paths (e.g., uploading a file to an object vs. downloading an # object to a file) could be split into separate files. # Chunk size to use while zipping/unzipping gzip files. GZIP_CHUNK_SIZE = 8192 # Indicates that all files should be gzipped, in _UploadFileToObject GZIP_ALL_FILES = 'GZIP_ALL_FILES' # Number of bytes to wait before updating a sliced download component tracker # file. TRACKERFILE_UPDATE_THRESHOLD = TEN_MIB PARALLEL_COMPOSITE_SUGGESTION_THRESHOLD = 150 * 1024 * 1024 # S3 requires special Multipart upload logic (that we currently don't implement) # for files > 5GiB in size. S3_MAX_UPLOAD_SIZE = 5 * 1024 * 1024 * 1024 # TODO: Create a message class that serializes posting this message once # through the UI's global status queue. global suggested_sliced_transfers, suggested_sliced_transfers_lock suggested_sliced_transfers = AtomicDict( manager=(parallelism_framework_util.top_level_manager if CheckMultiprocessingAvailableAndInit().is_available else None)) suggested_sliced_transfers_lock = parallelism_framework_util.CreateLock() COMMON_EXTENSION_RULES = { 'md': 'text/markdown', 'tgz': 'application/gzip', } class FileConcurrencySkipError(Exception): """Raised when skipping a file due to a concurrent, duplicate copy.""" def _RmExceptionHandler(cls, e): """Simple exception handler to allow post-completion status.""" cls.logger.error(str(e)) def _ParallelCopyExceptionHandler(cls, e): """Simple exception handler to allow post-completion status.""" cls.logger.error(str(e)) cls.op_failure_count += 1 cls.logger.debug('\n\nEncountered exception while copying:\n%s\n', traceback.format_exc()) def _PerformParallelUploadFileToObject(cls, args, thread_state=None): """Function argument to Apply for performing parallel composite uploads. Args: cls: Calling Command class. args: PerformParallelUploadFileToObjectArgs tuple describing the target. thread_state: gsutil Cloud API instance to use for the operation. Returns: StorageUrl representing a successfully uploaded component. """ fp = FilePart(args.filename, args.file_start, args.file_length) gsutil_api = GetCloudApiInstance(cls, thread_state=thread_state) with fp: # We take many precautions with the component names that make collisions # effectively impossible. Specifying preconditions will just allow us to # reach a state in which uploads will always fail on retries. preconditions = None # Fill in content type and storage class if one was provided. dst_object_metadata = apitools_messages.Object( name=args.dst_url.object_name, bucket=args.dst_url.bucket_name, contentType=args.content_type, storageClass=args.storage_class) orig_prefer_api = gsutil_api.prefer_api try: if global_copy_helper_opts.canned_acl: # No canned ACL support in JSON, force XML API to be used for # upload/copy operations. gsutil_api.prefer_api = ApiSelector.XML ret = _UploadFileToObject(args.src_url, fp, args.file_length, args.dst_url, dst_object_metadata, preconditions, gsutil_api, cls.logger, cls, _ParallelCopyExceptionHandler, gzip_exts=None, allow_splitting=False, is_component=True, gzip_encoded=args.gzip_encoded) finally: if global_copy_helper_opts.canned_acl: gsutil_api.prefer_api = orig_prefer_api component = ret[2] WriteComponentToParallelUploadTrackerFile( args.tracker_file, args.tracker_file_lock, component, cls.logger, encryption_key_sha256=args.encryption_key_sha256) return ret CopyHelperOpts = namedtuple('CopyHelperOpts', [ 'perform_mv', 'no_clobber', 'daisy_chain', 'read_args_from_stdin', 'print_ver', 'use_manifest', 'preserve_acl', 'canned_acl', 'skip_unsupported_objects', 'test_callback_file', 'dest_storage_class', ]) # pylint: disable=global-variable-undefined def CreateCopyHelperOpts(perform_mv=False, no_clobber=False, daisy_chain=False, read_args_from_stdin=False, print_ver=False, use_manifest=False, preserve_acl=False, canned_acl=None, skip_unsupported_objects=False, test_callback_file=None, dest_storage_class=None): """Creates CopyHelperOpts for passing options to CopyHelper.""" # We create a tuple with union of options needed by CopyHelper and any # copy-related functionality in CpCommand, RsyncCommand, or Command class. global global_copy_helper_opts global_copy_helper_opts = CopyHelperOpts( perform_mv=perform_mv, no_clobber=no_clobber, daisy_chain=daisy_chain, read_args_from_stdin=read_args_from_stdin, print_ver=print_ver, use_manifest=use_manifest, preserve_acl=preserve_acl, canned_acl=canned_acl, skip_unsupported_objects=skip_unsupported_objects, test_callback_file=test_callback_file, dest_storage_class=dest_storage_class) return global_copy_helper_opts # pylint: disable=global-variable-undefined # pylint: disable=global-variable-not-assigned def GetCopyHelperOpts(): """Returns namedtuple holding CopyHelper options.""" global global_copy_helper_opts return global_copy_helper_opts def _SelectDownloadStrategy(dst_url): """Get download strategy based on the destination object. Args: dst_url: Destination StorageUrl. Returns: gsutil Cloud API DownloadStrategy. """ dst_is_special = False if dst_url.IsFileUrl(): # Check explicitly first because os.stat doesn't work on 'nul' in Windows. if dst_url.object_name == os.devnull: dst_is_special = True try: mode = os.stat(dst_url.object_name).st_mode if stat.S_ISCHR(mode): dst_is_special = True except OSError: pass if dst_is_special: return CloudApi.DownloadStrategy.ONE_SHOT else: return CloudApi.DownloadStrategy.RESUMABLE def InsistDstUrlNamesContainer(exp_dst_url, have_existing_dst_container, command_name): """Ensures the destination URL names a container. Acceptable containers include directory, bucket, bucket subdir, and non-existent bucket subdir. Args: exp_dst_url: Wildcard-expanded destination StorageUrl. have_existing_dst_container: bool indicator of whether exp_dst_url names a container (directory, bucket, or existing bucket subdir). command_name: Name of command making call. May not be the same as the calling class's self.command_name in the case of commands implemented atop other commands (like mv command). Raises: CommandException: if the URL being checked does not name a container. """ if ((exp_dst_url.IsFileUrl() and not exp_dst_url.IsDirectory()) or (exp_dst_url.IsCloudUrl() and exp_dst_url.IsBucket() and not have_existing_dst_container)): raise CommandException('Destination URL must name a directory, bucket, ' 'or bucket\nsubdirectory for the multiple ' 'source form of the %s command.' % command_name) def _ShouldTreatDstUrlAsBucketSubDir(have_multiple_srcs, dst_url, have_existing_dest_subdir, src_url_names_container, recursion_requested): """Checks whether dst_url should be treated as a bucket "sub-directory". The decision about whether something constitutes a bucket "sub-directory" depends on whether there are multiple sources in this request and whether there is an existing bucket subdirectory. For example, when running the command: gsutil cp file gs://bucket/abc if there's no existing gs://bucket/abc bucket subdirectory we should copy file to the object gs://bucket/abc. In contrast, if there's an existing gs://bucket/abc bucket subdirectory we should copy file to gs://bucket/abc/file. And regardless of whether gs://bucket/abc exists, when running the command: gsutil cp file1 file2 gs://bucket/abc we should copy file1 to gs://bucket/abc/file1 (and similarly for file2). Finally, for recursive copies, if the source is a container then we should copy to a container as the target. For example, when running the command: gsutil cp -r dir1 gs://bucket/dir2 we should copy the subtree of dir1 to gs://bucket/dir2. Note that we don't disallow naming a bucket "sub-directory" where there's already an object at that URL. For example it's legitimate (albeit confusing) to have an object called gs://bucket/dir and then run the command gsutil cp file1 file2 gs://bucket/dir Doing so will end up with objects gs://bucket/dir, gs://bucket/dir/file1, and gs://bucket/dir/file2. Args: have_multiple_srcs: Bool indicator of whether this is a multi-source operation. dst_url: StorageUrl to check. have_existing_dest_subdir: bool indicator whether dest is an existing subdirectory. src_url_names_container: bool indicator of whether the source URL is a container. recursion_requested: True if a recursive operation has been requested. Returns: bool indicator. """ if have_existing_dest_subdir: return True if dst_url.IsCloudUrl(): return (have_multiple_srcs or (src_url_names_container and recursion_requested)) def _ShouldTreatDstUrlAsSingleton(src_url_names_container, have_multiple_srcs, have_existing_dest_subdir, dst_url, recursion_requested): """Checks that dst_url names a single file/object after wildcard expansion. It is possible that an object path might name a bucket sub-directory. Args: src_url_names_container: Bool indicator of whether the source for the operation is a container (bucket, bucket subdir, or directory). have_multiple_srcs: Bool indicator of whether this is a multi-source operation. have_existing_dest_subdir: bool indicator whether dest is an existing subdirectory. dst_url: StorageUrl to check. recursion_requested: True if a recursive operation has been requested. Returns: bool indicator. """ if recursion_requested and src_url_names_container: return False if dst_url.IsFileUrl(): return not dst_url.IsDirectory() else: # dst_url.IsCloudUrl() return (not have_multiple_srcs and not have_existing_dest_subdir and dst_url.IsObject()) def _IsUrlValidParentDir(url): """Returns True if not FileUrl ending in relative path symbols. A URL is invalid if it is a FileUrl and the parent directory of the file is a relative path symbol. Unix will not allow a file itself to be named with a relative path symbol, but one can be the parent. Notably, "../obj" can lead to unexpected behavior at the copy destination. We examine the pre-recursion "url", which might point to "..", to see if the parent is valid. If the user does a recursive copy from a URL, it may not end up the final parent of the copied object. For example, see: "dir/nested_dir/obj". If you ran "cp -r dir gs://bucket" from the parent of "dir", then the "url" arg would be "dir", but "nested_dir" would be the parent of "obj". This actually doesn't matter since recursion won't add relative path symbols to the path. However, we still return if "url" is valid because there are cases where we need to copy every parent directory up to "dir" to prevent file name conflicts. Args: url: StorageUrl before recursion. Returns: Boolean indicating if the "url" is valid as a parent directory. """ if not url.IsFileUrl(): return True url_string_minus_trailing_delimiter = url.versionless_url_string.rstrip( url.delim) _, _, after_last_delimiter = (url_string_minus_trailing_delimiter.rpartition( url.delim)) return after_last_delimiter not in storage_url.RELATIVE_PATH_SYMBOLS and ( after_last_delimiter not in [ url.scheme + '://' + symbol for symbol in storage_url.RELATIVE_PATH_SYMBOLS ]) def ConstructDstUrl(src_url, exp_src_url, src_url_names_container, have_multiple_srcs, has_multiple_top_level_srcs, exp_dst_url, have_existing_dest_subdir, recursion_requested, preserve_posix=False): """Constructs the destination URL for a given exp_src_url/exp_dst_url pair. Uses context-dependent naming rules that mimic Linux cp and mv behavior. Args: src_url: Source StorageUrl to be copied. exp_src_url: Single StorageUrl from wildcard expansion of src_url. src_url_names_container: True if src_url names a container (including the case of a wildcard-named bucket subdir (like gs://bucket/abc, where gs://bucket/abc/* matched some objects). have_multiple_srcs: True if this is a multi-source request. This can be true if src_url wildcard-expanded to multiple URLs or if there were multiple source URLs in the request. has_multiple_top_level_srcs: Same as have_multiple_srcs but measured before recursion. exp_dst_url: the expanded StorageUrl requested for the cp destination. Final written path is constructed from this plus a context-dependent variant of src_url. have_existing_dest_subdir: bool indicator whether dest is an existing subdirectory. recursion_requested: True if a recursive operation has been requested. preserve_posix: True if preservation of posix attributes has been requested. Returns: StorageUrl to use for copy. Raises: CommandException if destination object name not specified for source and source is a stream. """ if (exp_dst_url.IsFileUrl() and exp_dst_url.IsStream() and preserve_posix): raise CommandException('Cannot preserve POSIX attributes with a stream.') if _ShouldTreatDstUrlAsSingleton(src_url_names_container, have_multiple_srcs, have_existing_dest_subdir, exp_dst_url, recursion_requested): # We're copying one file or object to one file or object. return exp_dst_url if exp_src_url.IsFileUrl() and (exp_src_url.IsStream() or exp_src_url.IsFifo()): if have_existing_dest_subdir: type_text = 'stream' if exp_src_url.IsStream() else 'named pipe' raise CommandException('Destination object name needed when ' 'source is a %s' % type_text) return exp_dst_url if not recursion_requested and not have_multiple_srcs: # We're copying one file or object to a subdirectory. Append final comp # of exp_src_url to exp_dst_url. src_final_comp = exp_src_url.object_name.rpartition(src_url.delim)[-1] return StorageUrlFromString('%s%s%s' % (exp_dst_url.url_string.rstrip( exp_dst_url.delim), exp_dst_url.delim, src_final_comp)) # Else we're copying multiple sources to a directory, bucket, or a bucket # "sub-directory". # Ensure exp_dst_url ends in delim char if we're doing a multi-src copy or # a copy to a directory. (The check for copying to a directory needs # special-case handling so that the command: # gsutil cp gs://bucket/obj dir # will turn into file://dir/ instead of file://dir -- the latter would cause # the file "dirobj" to be created.) # Note: need to check have_multiple_srcs or src_url.names_container() # because src_url could be a bucket containing a single object, named # as gs://bucket. if ((have_multiple_srcs or src_url_names_container or (exp_dst_url.IsFileUrl() and exp_dst_url.IsDirectory())) and not exp_dst_url.url_string.endswith(exp_dst_url.delim)): exp_dst_url = StorageUrlFromString( '%s%s' % (exp_dst_url.url_string, exp_dst_url.delim)) src_url_is_valid_parent = _IsUrlValidParentDir(src_url) if not src_url_is_valid_parent and has_multiple_top_level_srcs: # To avoid top-level name conflicts, we need to copy the parent dir. # However, that cannot be done because the parent dir has an invalid name. raise InvalidUrlError( 'Presence of multiple top-level sources and invalid expanded URL' ' make file name conflicts possible for URL: {}'.format(src_url)) # Making naming behavior match how things work with local Linux cp and mv # operations depends on many factors, including whether the destination is a # container, and the plurality of the source(s). # 1. Recursively copying from directories, buckets, or bucket subdirs should # result in objects/files mirroring the source hierarchy. For example: # gsutil cp -r dir1/dir2 gs://bucket # should create the object gs://bucket/dir2/file2, assuming dir1/dir2 # contains file2). # # To be consistent with Linux cp behavior, there's one more wrinkle when # working with subdirs: The resulting object names depend on whether the # destination subdirectory exists. For example, if gs://bucket/subdir # exists, the command: # gsutil cp -r dir1/dir2 gs://bucket/subdir # should create objects named like gs://bucket/subdir/dir2/a/b/c. In # contrast, if gs://bucket/subdir does not exist, this same command # should create objects named like gs://bucket/subdir/a/b/c. # # If there are multiple top-level source items, preserve source parent # dirs. This is similar to when the destination dir already exists and # avoids conflicts such as "dir1/f.txt" and "dir2/f.txt" both getting # copied to "gs://bucket/f.txt". Linux normally errors on these conflicts, # but we cannot do that because we need to give users the ability to create # dirs as they copy to the cloud. # # Note: "mv" is similar to running "cp -r" followed by source deletion. # # 2. Copying individual files or objects to dirs, buckets or bucket subdirs # should result in objects/files named by the final source file name # component. Example: # gsutil cp dir1/*.txt gs://bucket # should create the objects gs://bucket/f1.txt and gs://bucket/f2.txt, # assuming dir1 contains f1.txt and f2.txt. # Ignore the "multiple top-level sources" rule if using double wildcard ** # because that treats all files as top-level, in which case the user doesn't # want to preserve directories. preserve_src_top_level_dirs = ('**' not in src_url.versionless_url_string and src_url_is_valid_parent and (has_multiple_top_level_srcs or have_existing_dest_subdir)) if preserve_src_top_level_dirs or (src_url_names_container and (exp_dst_url.IsCloudUrl() or exp_dst_url.IsDirectory())): # Case 1. Container copy to a destination other than a file. # Build dst_key_name from subpath of exp_src_url past # where src_url ends. For example, for src_url=gs://bucket/ and # exp_src_url=gs://bucket/src_subdir/obj, dst_key_name should be # src_subdir/obj. src_url_path_sans_final_dir = GetPathBeforeFinalDir(src_url, exp_src_url) dst_key_name = exp_src_url.versionless_url_string[ len(src_url_path_sans_final_dir):].lstrip(src_url.delim) if not preserve_src_top_level_dirs: # Only copy file name, not parent dir. dst_key_name = dst_key_name.partition(src_url.delim)[-1] else: # Case 2. dst_key_name = exp_src_url.object_name.rpartition(src_url.delim)[-1] if (exp_dst_url.IsFileUrl() or _ShouldTreatDstUrlAsBucketSubDir( have_multiple_srcs, exp_dst_url, have_existing_dest_subdir, src_url_names_container, recursion_requested)): if exp_dst_url.object_name and exp_dst_url.object_name.endswith( exp_dst_url.delim): dst_key_name = '%s%s%s' % (exp_dst_url.object_name.rstrip( exp_dst_url.delim), exp_dst_url.delim, dst_key_name) else: delim = exp_dst_url.delim if exp_dst_url.object_name else '' dst_key_name = '%s%s%s' % (exp_dst_url.object_name or '', delim, dst_key_name) new_exp_dst_url = exp_dst_url.Clone() new_exp_dst_url.object_name = dst_key_name.replace(src_url.delim, exp_dst_url.delim) return new_exp_dst_url def _CreateDigestsFromDigesters(digesters): digests = {} if digesters: for alg in digesters: digests[alg] = base64.b64encode( digesters[alg].digest()).rstrip(b'\n').decode('ascii') return digests def _CreateDigestsFromLocalFile(status_queue, algs, file_name, src_url, src_obj_metadata): """Creates a base64 CRC32C and/or MD5 digest from file_name. Args: status_queue: Queue for posting progress messages for UI/Analytics. algs: List of algorithms to compute. file_name: File to digest. src_url: StorageUrl for local object. Used to track progress. src_obj_metadata: Metadata of source object. Returns: Dict of algorithm name : base 64 encoded digest """ hash_dict = {} if 'md5' in algs: hash_dict['md5'] = GetMd5() if 'crc32c' in algs: hash_dict['crc32c'] = crcmod.predefined.Crc('crc-32c') with open(file_name, 'rb') as fp: CalculateHashesFromContents(fp, hash_dict, callback_processor=ProgressCallbackWithTimeout( src_obj_metadata.size, FileProgressCallbackHandler( status_queue, src_url=src_url, operation_name='Hashing').call)) digests = {} for alg_name, digest in six.iteritems(hash_dict): digests[alg_name] = Base64EncodeHash(digest.hexdigest()) return digests def _CheckCloudHashes(logger, src_url, dst_url, src_obj_metadata, dst_obj_metadata): """Validates integrity of two cloud objects copied via daisy-chain. Args: logger: for outputting log messages. src_url: CloudUrl for source cloud object. dst_url: CloudUrl for destination cloud object. src_obj_metadata: Cloud Object metadata for object being downloaded from. dst_obj_metadata: Cloud Object metadata for object being uploaded to. Raises: CommandException: if cloud digests don't match local digests. """ # See hack comment in _CheckHashes. # Sometimes (e.g. when kms is enabled for s3) the values we check below are # not actually content hashes. The early exit here provides users a workaround # for this case and any others we've missed. check_hashes_config = config.get('GSUtil', 'check_hashes', CHECK_HASH_IF_FAST_ELSE_FAIL) if check_hashes_config == CHECK_HASH_NEVER: return checked_one = False download_hashes = {} upload_hashes = {} if src_obj_metadata.md5Hash: src_md5hash = six.ensure_binary(src_obj_metadata.md5Hash) download_hashes['md5'] = src_md5hash if src_obj_metadata.crc32c: src_crc32c_hash = six.ensure_binary(src_obj_metadata.crc32c) download_hashes['crc32c'] = src_crc32c_hash if dst_obj_metadata.md5Hash: dst_md5hash = six.ensure_binary(dst_obj_metadata.md5Hash) upload_hashes['md5'] = dst_md5hash if dst_obj_metadata.crc32c: dst_crc32c_hash = six.ensure_binary(dst_obj_metadata.crc32c) upload_hashes['crc32c'] = dst_crc32c_hash for alg, upload_b64_digest in six.iteritems(upload_hashes): if alg not in download_hashes: continue download_b64_digest = download_hashes[alg] if six.PY3 and isinstance(download_b64_digest, str): download_b64_digest = download_b64_digest.encode('ascii') logger.debug('Comparing source vs destination %s-checksum for %s. (%s/%s)', alg, dst_url, download_b64_digest, upload_b64_digest) if download_b64_digest != upload_b64_digest: raise HashMismatchException( '%s signature for source object (%s) doesn\'t match ' 'destination object digest (%s). Object (%s) will be deleted.' % (alg, download_b64_digest, upload_b64_digest, dst_url)) checked_one = True if not checked_one: # One known way this can currently happen is when downloading objects larger # than 5 GiB from S3 (for which the etag is not an MD5). logger.warn( 'WARNING: Found no hashes to validate object downloaded from %s and ' 'uploaded to %s. Integrity cannot be assured without hashes.', src_url, dst_url) def _CheckHashes(logger, obj_url, obj_metadata, file_name, digests, is_upload=False): """Validates integrity by comparing cloud digest to local digest. Args: logger: for outputting log messages. obj_url: CloudUrl for cloud object. obj_metadata: Cloud Object being downloaded from or uploaded to. file_name: Local file name on disk being downloaded to or uploaded from (used only for logging). digests: Computed Digests for the object. is_upload: If true, comparing for an uploaded object (controls logging). Raises: CommandException: if cloud digests don't match local digests. """ # Hack below. # I cannot track down all of the code paths that get here, so I finally # gave up and opted to convert all of the hashes to str. I know that they # *should* be bytes, but the path of least resistance led to str. # Not a nice thing, but for now it makes tests pass... # Update: Since the _CheckCloudHashes function above needs to be changed # as well, I am going to make the executive decision that hashes are # bytes - here as well. It's what the hash and base64 PY3 libs return, # and should be the native format for these things. local_hashes = digests cloud_hashes = {} if obj_metadata.md5Hash: md5_b64_digest = six.ensure_binary(obj_metadata.md5Hash) cloud_hashes['md5'] = md5_b64_digest.rstrip(b'\n') if obj_metadata.crc32c: crc32c_b64_hash = six.ensure_binary(obj_metadata.crc32c) cloud_hashes['crc32c'] = crc32c_b64_hash.rstrip(b'\n') checked_one = False for alg in local_hashes: if alg not in cloud_hashes: continue local_b64_digest = six.ensure_binary(local_hashes[alg]) cloud_b64_digest = cloud_hashes[alg] logger.debug('Comparing local vs cloud %s-checksum for %s. (%s/%s)', alg, file_name, local_b64_digest, cloud_b64_digest) if local_b64_digest != cloud_b64_digest: raise HashMismatchException( '%s signature computed for local file (%s) doesn\'t match ' 'cloud-supplied digest (%s). %s (%s) will be deleted.' % (alg, local_b64_digest, cloud_b64_digest, 'Cloud object' if is_upload else 'Local file', obj_url if is_upload else file_name)) checked_one = True if not checked_one: if is_upload: logger.warn( 'WARNING: Found no hashes to validate object uploaded to %s. ' 'Integrity cannot be assured without hashes.', obj_url) else: # One known way this can currently happen is when downloading objects larger # than 5 GB from S3 (for which the etag is not an MD5). logger.warn( 'WARNING: Found no hashes to validate object downloaded to %s. ' 'Integrity cannot be assured without hashes.', file_name) def IsNoClobberServerException(e): """Checks to see if the server attempted to clobber a file. In this case we specified via a precondition that we didn't want the file clobbered. Args: e: The Exception that was generated by a failed copy operation Returns: bool indicator - True indicates that the server did attempt to clobber an existing file. """ return ((isinstance(e, PreconditionException)) or (isinstance(e, ResumableUploadException) and '412' in e.message)) def CheckForDirFileConflict(exp_src_url, dst_url): """Checks whether copying exp_src_url into dst_url is not possible. This happens if a directory exists in local file system where a file needs to go or vice versa. In that case we print an error message and exits. Example: if the file "./x" exists and you try to do: gsutil cp gs://mybucket/x/y . the request can't succeed because it requires a directory where the file x exists. Note that we don't enforce any corresponding restrictions for buckets, because the flat namespace semantics for buckets doesn't prohibit such cases the way hierarchical file systems do. For example, if a bucket contains an object called gs://bucket/dir and then you run the command: gsutil cp file1 file2 gs://bucket/dir you'll end up with objects gs://bucket/dir, gs://bucket/dir/file1, and gs://bucket/dir/file2. Args: exp_src_url: Expanded source StorageUrl. dst_url: Destination StorageUrl. Raises: CommandException: if errors encountered. """ if dst_url.IsCloudUrl(): # The problem can only happen for file destination URLs. return dst_path = dst_url.object_name final_dir = os.path.dirname(dst_path) if os.path.isfile(final_dir): raise CommandException('Cannot retrieve %s because a file exists ' 'where a directory needs to be created (%s).' % (exp_src_url.url_string, final_dir)) if os.path.isdir(dst_path): raise CommandException('Cannot retrieve %s because a directory exists ' '(%s) where the file needs to be created.' % (exp_src_url.url_string, dst_path)) def _PartitionFile(canned_acl, content_type, dst_bucket_url, file_size, fp, random_prefix, src_url, storage_class, tracker_file, tracker_file_lock, encryption_key_sha256=None, gzip_encoded=False): """Partitions a file into FilePart objects to be uploaded and later composed. These objects, when composed, will match the original file. This entails splitting the file into parts, naming and forming a destination URL for each part, and also providing the PerformParallelUploadFileToObjectArgs corresponding to each part. Args: canned_acl: The user-provided canned_acl, if applicable. content_type: content type for the component and final objects. dst_bucket_url: CloudUrl for the destination bucket. file_size: The size of fp, in bytes. fp: The file object to be partitioned. random_prefix: The randomly-generated prefix used to prevent collisions among the temporary component names. src_url: Source FileUrl from the original command. storage_class: storage class for the component and final objects. tracker_file: The path to the parallel composite upload tracker file. tracker_file_lock: The lock protecting access to the tracker file. encryption_key_sha256: Encryption key SHA256 for use in this upload, if any. gzip_encoded: Whether to use gzip transport encoding for the upload. Returns: dst_args: The destination URIs for the temporary component objects. """ parallel_composite_upload_component_size = HumanReadableToBytes( config.get('GSUtil', 'parallel_composite_upload_component_size', DEFAULT_PARALLEL_COMPOSITE_UPLOAD_COMPONENT_SIZE)) (num_components, component_size) = _GetPartitionInfo( file_size, MAX_COMPOSE_ARITY, parallel_composite_upload_component_size) dst_args = {} # Arguments to create commands and pass to subprocesses. file_names = [] # Used for the 2-step process of forming dst_args. for i in range(num_components): # "Salt" the object name with something a user is very unlikely to have # used in an object name, then hash the extended name to make sure # we don't run into problems with name length. Using a deterministic # naming scheme for the temporary components allows users to take # advantage of resumable uploads for each component. encoded_name = six.ensure_binary(PARALLEL_UPLOAD_STATIC_SALT + fp.name) content_md5 = GetMd5() content_md5.update(encoded_name) digest = content_md5.hexdigest() temp_file_name = (random_prefix + PARALLEL_UPLOAD_TEMP_NAMESPACE + digest + '_' + str(i)) tmp_dst_url = dst_bucket_url.Clone() tmp_dst_url.object_name = temp_file_name if i < (num_components - 1): # Every component except possibly the last is the same size. file_part_length = component_size else: # The last component just gets all of the remaining bytes. file_part_length = (file_size - ((num_components - 1) * component_size)) offset = i * component_size func_args = PerformParallelUploadFileToObjectArgs( fp.name, offset, file_part_length, src_url, tmp_dst_url, canned_acl, content_type, storage_class, tracker_file, tracker_file_lock, encryption_key_sha256, gzip_encoded) file_names.append(temp_file_name) dst_args[temp_file_name] = func_args return dst_args def _GetComponentNumber(component): """Gets component number from component CloudUrl. Used during parallel composite upload. Args: component: CloudUrl representing component. Returns: component number """ return int(component.object_name[component.object_name.rfind('_') + 1:]) def _DoParallelCompositeUpload(fp, src_url, dst_url, dst_obj_metadata, canned_acl, file_size, preconditions, gsutil_api, command_obj, copy_exception_handler, logger, gzip_encoded=False): """Uploads a local file to a cloud object using parallel composite upload. The file is partitioned into parts, and then the parts are uploaded in parallel, composed to form the original destination object, and deleted. Args: fp: The file object to be uploaded. src_url: FileUrl representing the local file. dst_url: CloudUrl representing the destination file. dst_obj_metadata: apitools Object describing the destination object. canned_acl: The canned acl to apply to the object, if any. file_size: The size of the source file in bytes. preconditions: Cloud API Preconditions for the final object. gsutil_api: gsutil Cloud API instance to use. command_obj: Command object (for calling Apply). copy_exception_handler: Copy exception handler (for use in Apply). logger: logging.Logger for outputting log messages. gzip_encoded: Whether to use gzip transport encoding for the upload. Returns: Elapsed upload time, uploaded Object with generation, crc32c, and size fields populated. """ start_time = time.time() dst_bucket_url = StorageUrlFromString(dst_url.bucket_url_string) api_selector = gsutil_api.GetApiSelector(provider=dst_url.scheme) encryption_keywrapper = GetEncryptionKeyWrapper(config) encryption_key_sha256 = (encryption_keywrapper.crypto_key_sha256 if encryption_keywrapper else None) # Determine which components, if any, have already been successfully # uploaded. tracker_file_name = GetTrackerFilePath(dst_url, TrackerFileType.PARALLEL_UPLOAD, api_selector, src_url) (existing_enc_key_sha256, existing_prefix, existing_components) = (ReadParallelUploadTrackerFile( tracker_file_name, logger)) # Ensure that the tracker data is still valid (encryption keys match) and # perform any necessary cleanup. (existing_prefix, existing_components) = ValidateParallelCompositeTrackerData( tracker_file_name, existing_enc_key_sha256, existing_prefix, existing_components, encryption_key_sha256, dst_bucket_url, command_obj, logger, _DeleteTempComponentObjectFn, _RmExceptionHandler) encryption_key_sha256 = (encryption_key_sha256.decode('ascii') if encryption_key_sha256 is not None else None) random_prefix = (existing_prefix if existing_prefix is not None else GenerateComponentObjectPrefix( encryption_key_sha256=encryption_key_sha256)) # Create (or overwrite) the tracker file for the upload. WriteParallelUploadTrackerFile(tracker_file_name, random_prefix, existing_components, encryption_key_sha256=encryption_key_sha256) # Protect the tracker file within calls to Apply. tracker_file_lock = parallelism_framework_util.CreateLock() # Dict to track component info so we may align FileMessage values # before and after the operation. components_info = {} # Get the set of all components that should be uploaded. dst_args = _PartitionFile(canned_acl, dst_obj_metadata.contentType, dst_bucket_url, file_size, fp, random_prefix, src_url, dst_obj_metadata.storageClass, tracker_file_name, tracker_file_lock, encryption_key_sha256=encryption_key_sha256, gzip_encoded=gzip_encoded) (components_to_upload, existing_components, existing_objects_to_delete) = (FilterExistingComponents( dst_args, existing_components, dst_bucket_url, gsutil_api)) # Assign a start message to each different component type for component in components_to_upload: components_info[component.dst_url.url_string] = ( FileMessage.COMPONENT_TO_UPLOAD, component.file_length) PutToQueueWithTimeout( gsutil_api.status_queue, FileMessage(component.src_url, component.dst_url, time.time(), size=component.file_length, finished=False, component_num=_GetComponentNumber(component.dst_url), message_type=FileMessage.COMPONENT_TO_UPLOAD)) for component in existing_components: component_str = component[0].versionless_url_string components_info[component_str] = (FileMessage.EXISTING_COMPONENT, component[1]) PutToQueueWithTimeout( gsutil_api.status_queue, FileMessage(src_url, component[0], time.time(), finished=False, size=component[1], component_num=_GetComponentNumber(component[0]), message_type=FileMessage.EXISTING_COMPONENT)) for component in existing_objects_to_delete: PutToQueueWithTimeout( gsutil_api.status_queue, FileMessage(src_url, component, time.time(), finished=False, message_type=FileMessage.EXISTING_OBJECT_TO_DELETE)) # In parallel, copy all of the file parts that haven't already been # uploaded to temporary objects. cp_results = command_obj.Apply( _PerformParallelUploadFileToObject, components_to_upload, copy_exception_handler, ('op_failure_count', 'total_bytes_transferred'), arg_checker=gslib.command.DummyArgChecker, parallel_operations_override=command_obj.ParallelOverrideReason.SLICE, should_return_results=True) uploaded_components = [] for cp_result in cp_results: uploaded_components.append(cp_result[2]) components = uploaded_components + [i[0] for i in existing_components] if len(components) == len(dst_args): # Only try to compose if all of the components were uploaded successfully. # Sort the components so that they will be composed in the correct order. components = sorted(components, key=_GetComponentNumber) request_components = [] for component_url in components: src_obj_metadata = ( apitools_messages.ComposeRequest.SourceObjectsValueListEntry( name=component_url.object_name)) if component_url.HasGeneration(): src_obj_metadata.generation = long(component_url.generation) request_components.append(src_obj_metadata) composed_object = gsutil_api.ComposeObject( request_components, dst_obj_metadata, preconditions=preconditions, provider=dst_url.scheme, fields=['crc32c', 'generation', 'size'], encryption_tuple=encryption_keywrapper) try: # Make sure only to delete things that we know were successfully # uploaded (as opposed to all of the objects that we attempted to # create) so that we don't delete any preexisting objects, except for # those that were uploaded by a previous, failed run and have since # changed (but still have an old generation lying around). objects_to_delete = components + existing_objects_to_delete command_obj.Apply( _DeleteTempComponentObjectFn, objects_to_delete, _RmExceptionHandler, arg_checker=gslib.command.DummyArgChecker, parallel_operations_override=command_obj.ParallelOverrideReason.SLICE) # Assign an end message to each different component type for component in components: component_str = component.versionless_url_string try: PutToQueueWithTimeout( gsutil_api.status_queue, FileMessage(src_url, component, time.time(), finished=True, component_num=_GetComponentNumber(component), size=components_info[component_str][1], message_type=components_info[component_str][0])) except: # pylint: disable=bare-except PutToQueueWithTimeout( gsutil_api.status_queue, FileMessage(src_url, component, time.time(), finished=True)) for component in existing_objects_to_delete: PutToQueueWithTimeout( gsutil_api.status_queue, FileMessage(src_url, component, time.time(), finished=True, message_type=FileMessage.EXISTING_OBJECT_TO_DELETE)) except Exception: # pylint: disable=broad-except # If some of the delete calls fail, don't cause the whole command to # fail. The copy was successful iff the compose call succeeded, so # reduce this to a warning. logger.warn( 'Failed to delete some of the following temporary objects:\n' + '\n'.join(dst_args.keys())) finally: with tracker_file_lock: DeleteTrackerFile(tracker_file_name) else: # Some of the components failed to upload. In this case, we want to exit # without deleting the objects. raise CommandException( 'Some temporary components were not uploaded successfully. ' 'Please retry this upload.') elapsed_time = time.time() - start_time return elapsed_time, composed_object def _ShouldDoParallelCompositeUpload(logger, allow_splitting, src_url, dst_url, file_size, gsutil_api, canned_acl=None): """Determines whether parallel composite upload strategy should be used. Args: logger: for outputting log messages. allow_splitting: If false, then this function returns false. src_url: FileUrl corresponding to a local file. dst_url: CloudUrl corresponding to destination cloud object. file_size: The size of the source file, in bytes. gsutil_api: CloudApi that may be used to check if the destination bucket has any metadata attributes set that would discourage us from using parallel composite uploads. canned_acl: Canned ACL to apply to destination object, if any. Returns: True iff a parallel upload should be performed on the source file. """ global suggested_sliced_transfers, suggested_sliced_transfers_lock parallel_composite_upload_threshold = HumanReadableToBytes( config.get('GSUtil', 'parallel_composite_upload_threshold', DEFAULT_PARALLEL_COMPOSITE_UPLOAD_THRESHOLD)) all_factors_but_size = ( allow_splitting # Don't split the pieces multiple times. and not src_url.IsStream() # We can't partition streams. and not src_url.IsFifo() # We can't partition fifos. and dst_url.scheme == 'gs' # Compose is only for gs. and not canned_acl) # TODO: Implement canned ACL support for compose. # Since parallel composite uploads are disabled by default, make user aware of # them. # TODO: Once compiled crcmod is being distributed by major Linux distributions # remove this check. if (all_factors_but_size and parallel_composite_upload_threshold == 0 and file_size >= PARALLEL_COMPOSITE_SUGGESTION_THRESHOLD): with suggested_sliced_transfers_lock: if not suggested_sliced_transfers.get('suggested'): logger.info('\n'.join( textwrap.wrap( '==> NOTE: You are uploading one or more large file(s), which ' 'would run significantly faster if you enable parallel composite ' 'uploads. This feature can be enabled by editing the ' '"parallel_composite_upload_threshold" value in your .boto ' 'configuration file. However, note that if you do this large files ' 'will be uploaded as ' '`composite objects <https://cloud.google.com/storage/docs/composite-objects>`_,' # pylint: disable=line-too-long 'which means that any user who downloads such objects will need to ' 'have a compiled crcmod installed (see "gsutil help crcmod"). This ' 'is because without a compiled crcmod, computing checksums on ' 'composite objects is so slow that gsutil disables downloads of ' 'composite objects.')) + '\n') suggested_sliced_transfers['suggested'] = True return (all_factors_but_size and parallel_composite_upload_threshold > 0 and file_size >= parallel_composite_upload_threshold) def ExpandUrlToSingleBlr(url_str, gsutil_api, project_id, treat_nonexistent_object_as_subdir=False, logger=None): """Expands wildcard if present in url_str. Args: url_str: String representation of requested url. gsutil_api: gsutil Cloud API instance to use. project_id: project ID to use (for iterators). treat_nonexistent_object_as_subdir: indicates if should treat a non-existent object as a subdir. logger: logging.Logger instance to use for output. If None, the root Logger will be used. Returns: (exp_url, have_existing_dst_container) where exp_url is a StorageUrl and have_existing_dst_container is a bool indicating whether exp_url names an existing directory, bucket, or bucket subdirectory. In the case where we match a subdirectory AND an object, the object is returned. Raises: CommandException: if url_str matched more than 1 URL. """ logger = logger or logging.Logger() # Handle wildcarded url case. if ContainsWildcard(url_str): blr_expansion = list( CreateWildcardIterator(url_str, gsutil_api, project_id=project_id, logger=logger)) if len(blr_expansion) != 1: raise CommandException('Destination (%s) must match exactly 1 URL' % url_str) blr = blr_expansion[0] # BLR is either an OBJECT, PREFIX, or BUCKET; the latter two represent # directories. return (StorageUrlFromString(blr.url_string), not blr.IsObject()) storage_url = StorageUrlFromString(url_str) # Handle non-wildcarded URL. if storage_url.IsFileUrl(): return (storage_url, storage_url.IsDirectory()) # At this point we have a cloud URL. if storage_url.IsBucket(): return (storage_url, True) # For object/prefix URLs, there are four cases that indicate the destination # is a cloud subdirectory; these are always considered to be an existing # container. Checking each case allows gsutil to provide Unix-like # destination folder semantics, but requires up to three HTTP calls, noted # below. # Case 1: If a placeholder object ending with '/' exists. if IsCloudSubdirPlaceholder(storage_url): return (storage_url, True) # Get version of object name without trailing slash for matching prefixes prefix = storage_url.object_name.rstrip('/') # HTTP call to make an eventually consistent check for a matching prefix, # _$folder$, or empty listing. list_iterator = gsutil_api.ListObjects(storage_url.bucket_name, prefix=prefix, delimiter='/', provider=storage_url.scheme, fields=['prefixes', 'items/name']) for obj_or_prefix in list_iterator: # To conserve HTTP calls for the common case, we make a single listing # that covers prefixes and object names. Listing object names covers the # _$folder$ case and the nonexistent-object-as-subdir case. However, if # there are many existing objects for which the target URL is an exact # prefix, this listing could be paginated and span multiple HTTP calls. # If this case becomes common, we could heurestically abort the # listing operation after the first page of results and just query for the # _$folder$ object directly using GetObjectMetadata. # TODO: currently the ListObjects iterator yields objects before prefixes, # because ls depends on this iteration order for proper display. We could # save up to 1ms in determining that a destination is a prefix if we had a # way to yield prefixes first, but this would require poking a major hole # through the abstraction to control this iteration order. if (obj_or_prefix.datatype == CloudApi.CsObjectOrPrefixType.PREFIX and obj_or_prefix.data == prefix + '/'): # Case 2: If there is a matching prefix when listing the destination URL. return (storage_url, True) elif (obj_or_prefix.datatype == CloudApi.CsObjectOrPrefixType.OBJECT and obj_or_prefix.data.name == storage_url.object_name + '_$folder$'): # Case 3: If a placeholder object matching destination + _$folder$ # exists. return (storage_url, True) elif (obj_or_prefix.datatype == CloudApi.CsObjectOrPrefixType.OBJECT and obj_or_prefix.data.name == storage_url.object_name): # The object exists but it is not a container return (storage_url, False) # Case 4: If no objects/prefixes matched, and nonexistent objects should be # treated as subdirectories. return (storage_url, treat_nonexistent_object_as_subdir) def TriggerReauthForDestinationProviderIfNecessary(destination_url, gsutil_api, worker_count): """Makes a request to the destination API provider to trigger reauth. Addresses https://github.com/GoogleCloudPlatform/gsutil/issues/1639. If an API call occurs in a child process, the library that handles reauth will fail. We need to make at least one API call in the main process to allow a user to reauthorize. For cloud source URLs this already happens because the plurality of the source name expansion iterator is checked in the main thread. For cloud destination URLs, only some situations result in a similar API call. In these situations, this function exits without performing an API call. In others, this function performs an API call to trigger reauth. Args: destination_url (StorageUrl): The destination of the transfer. gsutil_api (CloudApiDelegator): API to use for the GetBucket call. worker_count (int): If greater than 1, assume that parallel execution is used. Technically, reauth challenges can be answered in the main process, but they may be triggered multiple times if multithreading is used. Returns: None, but performs an API call if necessary. """ # Only perform this check if the user has opted in. if not config.getbool( 'GSUtil', 'trigger_reauth_challenge_for_parallel_operations', False): return # Reauth is not necessary for non-cloud destinations. if not destination_url.IsCloudUrl(): return # Destination wildcards are expanded by an API call in the main process. if ContainsWildcard(destination_url.url_string): return # If gsutil executes sequentially, all calls will occur in the main process. if worker_count == 1: return try: # The specific API call is not important, but one must occur. gsutil_api.GetBucket( destination_url.bucket_name, fields=['location'], # Single field to limit XML API calls. provider=destination_url.scheme) except Exception as e: if isinstance(e, pyu2f.errors.PluginError): raise # Other exceptions can be ignored. The purpose was just to trigger # a reauth challenge. def FixWindowsNaming(src_url, dst_url): """Translates Windows pathnames to cloud pathnames. Rewrites the destination URL built by ConstructDstUrl(). Args: src_url: Source StorageUrl to be copied. dst_url: The destination StorageUrl built by ConstructDstUrl(). Returns: StorageUrl to use for copy. """ if (src_url.IsFileUrl() and src_url.delim == '\\' and dst_url.IsCloudUrl()): trans_url_str = re.sub(r'\\', '/', dst_url.url_string) dst_url = StorageUrlFromString(trans_url_str) return dst_url def SrcDstSame(src_url, dst_url): """Checks if src_url and dst_url represent the same object or file. We don't handle anything about hard or symbolic links. Args: src_url: Source StorageUrl. dst_url: Destination StorageUrl. Returns: Bool indicator. """ if src_url.IsFileUrl() and dst_url.IsFileUrl(): # Translate a/b/./c to a/b/c, so src=dst comparison below works. new_src_path = os.path.normpath(src_url.object_name) new_dst_path = os.path.normpath(dst_url.object_name) return new_src_path == new_dst_path else: return (src_url.url_string == dst_url.url_string and src_url.generation == dst_url.generation) def _LogCopyOperation(logger, src_url, dst_url, dst_obj_metadata): """Logs copy operation, including Content-Type if appropriate. Args: logger: logger instance to use for output. src_url: Source StorageUrl. dst_url: Destination StorageUrl. dst_obj_metadata: Object-specific metadata that should be overidden during the copy. """ if (dst_url.IsCloudUrl() and dst_obj_metadata and dst_obj_metadata.contentType): content_type_msg = ' [Content-Type=%s]' % dst_obj_metadata.contentType else: content_type_msg = '' if src_url.IsFileUrl() and (src_url.IsStream() or src_url.IsFifo()): src_text = '<STDIN>' if src_url.IsStream() else 'named pipe' logger.info('Copying from %s%s...', src_text, content_type_msg) else: logger.info('Copying %s%s...', src_url.url_string, content_type_msg) # pylint: disable=undefined-variable def _CopyObjToObjInTheCloud(src_url, src_obj_metadata, dst_url, dst_obj_metadata, preconditions, gsutil_api, decryption_key=None): """Performs copy-in-the cloud from specified src to dest object. Args: src_url: Source CloudUrl. src_obj_metadata: Metadata for source object; must include etag and size. dst_url: Destination CloudUrl. dst_obj_metadata: Object-specific metadata that should be overidden during the copy. preconditions: Preconditions to use for the copy. gsutil_api: gsutil Cloud API instance to use for the copy. decryption_key: Base64-encoded decryption key for the source object, if any. Returns: (elapsed_time, bytes_transferred, dst_url with generation, md5 hash of destination) excluding overhead like initial GET. Raises: CommandException: if errors encountered. """ decryption_keywrapper = CryptoKeyWrapperFromKey(decryption_key) encryption_keywrapper = GetEncryptionKeyWrapper(config) start_time = time.time() progress_callback = FileProgressCallbackHandler(gsutil_api.status_queue, src_url=src_url, dst_url=dst_url, operation_name='Copying').call if global_copy_helper_opts.test_callback_file: with open(global_copy_helper_opts.test_callback_file, 'rb') as test_fp: progress_callback = pickle.loads(test_fp.read()).call dst_obj = gsutil_api.CopyObject(src_obj_metadata, dst_obj_metadata, src_generation=src_url.generation, canned_acl=global_copy_helper_opts.canned_acl, preconditions=preconditions, progress_callback=progress_callback, provider=dst_url.scheme, fields=UPLOAD_RETURN_FIELDS, decryption_tuple=decryption_keywrapper, encryption_tuple=encryption_keywrapper) end_time = time.time() result_url = dst_url.Clone() result_url.generation = GenerationFromUrlAndString(result_url, dst_obj.generation) PutToQueueWithTimeout( gsutil_api.status_queue, FileMessage(src_url, dst_url, end_time, message_type=FileMessage.FILE_CLOUD_COPY, size=src_obj_metadata.size, finished=True)) return (end_time - start_time, src_obj_metadata.size, result_url, dst_obj.md5Hash) def _SetContentTypeFromFile(src_url, dst_obj_metadata): """Detects and sets Content-Type if src_url names a local file. Args: src_url: Source StorageUrl. dst_obj_metadata: Object-specific metadata that should be overidden during the copy. """ # contentType == '' if user requested default type. if (dst_obj_metadata.contentType is None and src_url.IsFileUrl() and not src_url.IsStream() and not src_url.IsFifo()): # Only do content type recognition if src_url is a file. Object-to-object # copies with no -h Content-Type specified re-use the content type of the # source object. object_name = src_url.object_name content_type = None # Streams (denoted by '-') are expected to be 'application/octet-stream' # and 'file' would partially consume them. if object_name != '-': real_file_path = os.path.realpath(object_name) if config.getbool('GSUtil', 'use_magicfile', False) and not IS_WINDOWS: try: p = subprocess.Popen(['file', '-b', '--mime', real_file_path], stdout=subprocess.PIPE, stderr=subprocess.PIPE) output, error = p.communicate() p.stdout.close() p.stderr.close() if p.returncode != 0 or error: raise CommandException( 'Encountered error running "file -b --mime %s" ' '(returncode=%d).\n%s' % (real_file_path, p.returncode, error)) # Parse output by removing line delimiter content_type = output.rstrip() content_type = six.ensure_str(content_type) except OSError as e: # 'file' executable may not always be present. raise CommandException( 'Encountered OSError running "file -b --mime %s"\n%s' % (real_file_path, e)) else: _, _, extension = real_file_path.rpartition('.') if extension in COMMON_EXTENSION_RULES: content_type = COMMON_EXTENSION_RULES[extension] else: content_type, _ = mimetypes.guess_type(real_file_path) if not content_type: content_type = DEFAULT_CONTENT_TYPE dst_obj_metadata.contentType = content_type # pylint: disable=undefined-variable def _UploadFileToObjectNonResumable(src_url, src_obj_filestream, src_obj_size, dst_url, dst_obj_metadata, preconditions, gsutil_api, gzip_encoded=False): """Uploads the file using a non-resumable strategy. This function does not support component transfers. Args: src_url: Source StorageUrl to upload. src_obj_filestream: File pointer to uploadable bytes. src_obj_size (int or None): Size of the source object. dst_url: Destination StorageUrl for the upload. dst_obj_metadata: Metadata for the target object. preconditions: Preconditions for the upload, if any. gsutil_api: gsutil Cloud API instance to use for the upload. gzip_encoded: Whether to use gzip transport encoding for the upload. Returns: Elapsed upload time, uploaded Object with generation, md5, and size fields populated. """ progress_callback = FileProgressCallbackHandler( gsutil_api.status_queue, src_url=src_url, dst_url=dst_url, operation_name='Uploading').call if global_copy_helper_opts.test_callback_file: with open(global_copy_helper_opts.test_callback_file, 'rb') as test_fp: progress_callback = pickle.loads(test_fp.read()).call start_time = time.time() encryption_keywrapper = GetEncryptionKeyWrapper(config) if src_url.IsStream() or src_url.IsFifo(): # TODO: gsutil-beta: Provide progress callbacks for streaming uploads. uploaded_object = gsutil_api.UploadObjectStreaming( src_obj_filestream, object_metadata=dst_obj_metadata, canned_acl=global_copy_helper_opts.canned_acl, preconditions=preconditions, progress_callback=progress_callback, encryption_tuple=encryption_keywrapper, provider=dst_url.scheme, fields=UPLOAD_RETURN_FIELDS, gzip_encoded=gzip_encoded) else: uploaded_object = gsutil_api.UploadObject( src_obj_filestream, object_metadata=dst_obj_metadata, canned_acl=global_copy_helper_opts.canned_acl, size=src_obj_size, preconditions=preconditions, progress_callback=progress_callback, encryption_tuple=encryption_keywrapper, provider=dst_url.scheme, fields=UPLOAD_RETURN_FIELDS, gzip_encoded=gzip_encoded) end_time = time.time() elapsed_time = end_time - start_time return elapsed_time, uploaded_object # pylint: disable=undefined-variable def _UploadFileToObjectResumable(src_url, src_obj_filestream, src_obj_size, dst_url, dst_obj_metadata, preconditions, gsutil_api, logger, is_component=False, gzip_encoded=False): """Uploads the file using a resumable strategy. Args: src_url: Source FileUrl to upload. Must not be a stream. src_obj_filestream: File pointer to uploadable bytes. src_obj_size (int or None): Size of the source object. dst_url: Destination StorageUrl for the upload. dst_obj_metadata: Metadata for the target object. preconditions: Preconditions for the upload, if any. gsutil_api: gsutil Cloud API instance to use for the upload. logger: for outputting log messages. is_component: indicates whether this is a single component or whole file. gzip_encoded: Whether to use gzip transport encoding for the upload. Returns: Elapsed upload time, uploaded Object with generation, md5, and size fields populated. """ tracker_file_name = GetTrackerFilePath( dst_url, TrackerFileType.UPLOAD, gsutil_api.GetApiSelector(provider=dst_url.scheme)) encryption_keywrapper = GetEncryptionKeyWrapper(config) encryption_key_sha256 = ( encryption_keywrapper.crypto_key_sha256.decode('ascii') if encryption_keywrapper and encryption_keywrapper.crypto_key_sha256 else None) def _UploadTrackerCallback(serialization_data): """Creates a new tracker file for starting an upload from scratch. This function is called by the gsutil Cloud API implementation and the the serialization data is implementation-specific. Args: serialization_data: Serialization data used in resuming the upload. """ data = { ENCRYPTION_UPLOAD_TRACKER_ENTRY: encryption_key_sha256, SERIALIZATION_UPLOAD_TRACKER_ENTRY: str(serialization_data) } WriteJsonDataToTrackerFile(tracker_file_name, data) # This contains the upload URL, which will uniquely identify the # destination object. tracker_data = GetUploadTrackerData( tracker_file_name, logger, encryption_key_sha256=encryption_key_sha256) if tracker_data: logger.info('Resuming upload for %s', src_url.url_string) retryable = True component_num = _GetComponentNumber(dst_url) if is_component else None progress_callback = FileProgressCallbackHandler( gsutil_api.status_queue, src_url=src_url, component_num=component_num, dst_url=dst_url, operation_name='Uploading').call if global_copy_helper_opts.test_callback_file: with open(global_copy_helper_opts.test_callback_file, 'rb') as test_fp: progress_callback = pickle.loads(test_fp.read()).call start_time = time.time() num_startover_attempts = 0 # This loop causes us to retry when the resumable upload failed in a way that # requires starting over with a new upload ID. Retries within a single upload # ID within the current process are handled in # gsutil_api.UploadObjectResumable, and retries within a single upload ID # spanning processes happens if an exception occurs not caught below (which # will leave the tracker file in place, and cause the upload ID to be reused # the next time the user runs gsutil and attempts the same upload). while retryable: try: uploaded_object = gsutil_api.UploadObjectResumable( src_obj_filestream, object_metadata=dst_obj_metadata, canned_acl=global_copy_helper_opts.canned_acl, preconditions=preconditions, provider=dst_url.scheme, size=src_obj_size, serialization_data=tracker_data, encryption_tuple=encryption_keywrapper, fields=UPLOAD_RETURN_FIELDS, tracker_callback=_UploadTrackerCallback, progress_callback=progress_callback, gzip_encoded=gzip_encoded) retryable = False except ResumableUploadStartOverException as e: logger.info('Caught ResumableUploadStartOverException for upload of %s.' % src_url.url_string) # This can happen, for example, if the server sends a 410 response code. # In that case the current resumable upload ID can't be reused, so delete # the tracker file and try again up to max retries. num_startover_attempts += 1 retryable = (num_startover_attempts < GetNumRetries()) if not retryable: raise # If the server sends a 404 response code, then the upload should only # be restarted if it was the object (and not the bucket) that was missing. try: logger.info('Checking that bucket %s exists before retrying upload...' % dst_obj_metadata.bucket) gsutil_api.GetBucket(dst_obj_metadata.bucket, provider=dst_url.scheme) except AccessDeniedException: # Proceed with deleting the tracker file in the event that the bucket # exists, but the user does not have permission to view its metadata. pass except NotFoundException: raise finally: DeleteTrackerFile(tracker_file_name) logger.info('Deleted tracker file %s for resumable upload of %s before ' 'retrying.' % (tracker_file_name, src_url.url_string)) logger.info( 'Restarting upload of %s from scratch (retry #%d) after exception ' 'indicating we need to start over with a new resumable upload ID: %s' % (src_url.url_string, num_startover_attempts, e)) tracker_data = None src_obj_filestream.seek(0) # Reset the progress callback handler. component_num = _GetComponentNumber(dst_url) if is_component else None progress_callback = FileProgressCallbackHandler( gsutil_api.status_queue, src_url=src_url, component_num=component_num, dst_url=dst_url, operation_name='Uploading').call # Report the retryable error to the global status queue. PutToQueueWithTimeout( gsutil_api.status_queue, RetryableErrorMessage(e, time.time(), num_retries=num_startover_attempts)) time.sleep( min(random.random() * (2**num_startover_attempts), GetMaxRetryDelay())) except ResumableUploadAbortException: retryable = False raise finally: if not retryable: DeleteTrackerFile(tracker_file_name) end_time = time.time() elapsed_time = end_time - start_time return (elapsed_time, uploaded_object) def _SelectUploadCompressionStrategy(object_name, is_component=False, gzip_exts=False, gzip_encoded=False): """Selects how an upload should be compressed. This is a helper function for _UploadFileToObject. Args: object_name: The object name of the source FileUrl. is_component: indicates whether this is a single component or whole file. gzip_exts: List of file extensions to gzip prior to upload, if any. If gzip_exts is GZIP_ALL_FILES, gzip all files. gzip_encoded: Whether to use gzip transport encoding for the upload. Used in conjunction with gzip_exts for selecting which files will be encoded. Streaming files compressed is only supported on the JSON GCS API. Returns: A tuple: (If the file should be gzipped locally, if the file should be gzip transport encoded). """ zipped_file = False gzip_encoded_file = False fname_parts = object_name.split('.') # If gzip_encoded and is_component are marked as true, the file was already # filtered through the original gzip_exts filter and we must compress the # component via gzip transport encoding. if gzip_encoded and is_component: gzip_encoded_file = True elif (gzip_exts == GZIP_ALL_FILES or (gzip_exts and len(fname_parts) > 1 and fname_parts[-1] in gzip_exts)): zipped_file = not gzip_encoded gzip_encoded_file = gzip_encoded return zipped_file, gzip_encoded_file def _ApplyZippedUploadCompression(src_url, src_obj_filestream, src_obj_size, logger): """Compresses a to-be-uploaded local file to save bandwidth. This is a helper function for _UploadFileToObject. Args: src_url: Source FileUrl. src_obj_filestream: Read stream of the source file - will be consumed and closed. src_obj_size (int or None): Size of the source file. logger: for outputting log messages. Returns: StorageUrl path to compressed file, read stream of the compressed file, compressed file size. """ # TODO: Compress using a streaming model as opposed to all at once here. if src_obj_size is not None and src_obj_size >= MIN_SIZE_COMPUTE_LOGGING: logger.info('Compressing %s (to tmp)...', src_url) (gzip_fh, gzip_path) = tempfile.mkstemp() gzip_fp = None try: # Check for temp space. Assume the compressed object is at most 2x # the size of the object (normally should compress to smaller than # the object) if src_url.IsStream() or src_url.IsFifo(): # TODO: Support streaming gzip uploads. # https://github.com/GoogleCloudPlatform/gsutil/issues/364 raise CommandException( 'gzip compression is not currently supported on streaming uploads. ' 'Remove the compression flag or save the streamed output ' 'temporarily to a file before uploading.') if src_obj_size is not None and (CheckFreeSpace(gzip_path) < 2 * int(src_obj_size)): raise CommandException('Inadequate temp space available to compress ' '%s. See the CHANGING TEMP DIRECTORIES section ' 'of "gsutil help cp" for more info.' % src_url) compression_level = config.getint('GSUtil', 'gzip_compression_level', DEFAULT_GZIP_COMPRESSION_LEVEL) gzip_fp = gzip.open(gzip_path, 'wb', compresslevel=compression_level) data = src_obj_filestream.read(GZIP_CHUNK_SIZE) while data: gzip_fp.write(data) data = src_obj_filestream.read(GZIP_CHUNK_SIZE) finally: if gzip_fp: gzip_fp.close() os.close(gzip_fh) src_obj_filestream.close() gzip_size = os.path.getsize(gzip_path) compressed_filestream = open(gzip_path, 'rb') return StorageUrlFromString(gzip_path), compressed_filestream, gzip_size def _DelegateUploadFileToObject(upload_delegate, upload_url, upload_stream, zipped_file, gzip_encoded_file, parallel_composite_upload, logger): """Handles setup and tear down logic for uploads. This is a helper function for _UploadFileToObject. Args: upload_delegate: Function that handles uploading the file. upload_url: StorageURL path to the file. upload_stream: Read stream of the file being uploaded. This will be closed after the upload. zipped_file: Flag for if the file is locally compressed prior to calling this function. If true, the local temporary file is deleted after the upload. gzip_encoded_file: Flag for if the file will be uploaded with the gzip transport encoding. If true, a lock is used to limit resource usage. parallel_composite_upload: Set to true if this upload represents a top-level parallel composite upload (not an upload of a component). If true, resource locking is skipped. logger: For outputting log messages. Returns: The elapsed upload time, the uploaded object. """ elapsed_time = None uploaded_object = None try: # Parallel transport compressed uploads use a signifcant amount of memory. # The number of threads that may run concurrently are restricted as a # result. Parallel composite upload's don't actually upload data, but # instead fork for each component and calling _UploadFileToObject # individually. The parallel_composite_upload flag is false for the actual # upload invocation. if gzip_encoded_file and not parallel_composite_upload: with gslib.command.concurrent_compressed_upload_lock: elapsed_time, uploaded_object = upload_delegate() else: elapsed_time, uploaded_object = upload_delegate() finally: # In the zipped_file case, this is the gzip stream. When the gzip stream is # created, the original source stream is closed in # _ApplyZippedUploadCompression. This means that we do not have to # explicitly close the source stream here in the zipped_file case. upload_stream.close() if zipped_file: try: os.unlink(upload_url.object_name) # Windows sometimes complains the temp file is locked when you try to # delete it. except Exception: # pylint: disable=broad-except logger.warning( 'Could not delete %s. This can occur in Windows because the ' 'temporary file is still locked.', upload_url.object_name) return elapsed_time, uploaded_object def _UploadFileToObject(src_url, src_obj_filestream, src_obj_size, dst_url, dst_obj_metadata, preconditions, gsutil_api, logger, command_obj, copy_exception_handler, gzip_exts=None, allow_splitting=True, is_component=False, gzip_encoded=False): """Uploads a local file to an object. Args: src_url: Source FileUrl. src_obj_filestream: Read stream of the source file to be read and closed. src_obj_size (int or None): Size of the source file. dst_url: Destination CloudUrl. dst_obj_metadata: Metadata to be applied to the destination object. preconditions: Preconditions to use for the copy. gsutil_api: gsutil Cloud API to use for the copy. logger: for outputting log messages. command_obj: command object for use in Apply in parallel composite uploads. copy_exception_handler: For handling copy exceptions during Apply. gzip_exts: List of file extensions to gzip prior to upload, if any. If gzip_exts is GZIP_ALL_FILES, gzip all files. allow_splitting: Whether to allow the file to be split into component pieces for an parallel composite upload. is_component: indicates whether this is a single component or whole file. gzip_encoded: Whether to use gzip transport encoding for the upload. Used in conjunction with gzip_exts for selecting which files will be encoded. Streaming files compressed is only supported on the JSON GCS API. Returns: (elapsed_time, bytes_transferred, dst_url with generation, md5 hash of destination) excluding overhead like initial GET. Raises: CommandException: if errors encountered. """ if not dst_obj_metadata or not dst_obj_metadata.contentLanguage: content_language = config.get_value('GSUtil', 'content_language') if content_language: dst_obj_metadata.contentLanguage = content_language upload_url = src_url upload_stream = src_obj_filestream upload_size = src_obj_size zipped_file, gzip_encoded_file = _SelectUploadCompressionStrategy( src_url.object_name, is_component, gzip_exts, gzip_encoded) # The component's parent already printed this debug message. if gzip_encoded_file and not is_component: logger.debug('Using compressed transport encoding for %s.', src_url) elif zipped_file: upload_url, upload_stream, upload_size = _ApplyZippedUploadCompression( src_url, src_obj_filestream, src_obj_size, logger) dst_obj_metadata.contentEncoding = 'gzip' # If we're sending an object with gzip encoding, it's possible it also # has an incompressible content type. Google Cloud Storage will remove # the top layer of compression when serving the object, which would cause # the served content not to match the CRC32C/MD5 hashes stored and make # integrity checking impossible. Therefore we set cache control to # no-transform to ensure it is served in its original form. The caveat is # that to read this object, other clients must then support # accept-encoding:gzip. if not dst_obj_metadata.cacheControl: dst_obj_metadata.cacheControl = 'no-transform' elif 'no-transform' not in dst_obj_metadata.cacheControl.lower(): dst_obj_metadata.cacheControl += ',no-transform' if not is_component: PutToQueueWithTimeout( gsutil_api.status_queue, FileMessage(upload_url, dst_url, time.time(), message_type=FileMessage.FILE_UPLOAD, size=upload_size, finished=False)) elapsed_time = None uploaded_object = None hash_algs = GetUploadHashAlgs() digesters = dict((alg, hash_algs[alg]()) for alg in hash_algs or {}) parallel_composite_upload = _ShouldDoParallelCompositeUpload( logger, allow_splitting, upload_url, dst_url, src_obj_size, gsutil_api, canned_acl=global_copy_helper_opts.canned_acl) non_resumable_upload = ((0 if upload_size is None else upload_size) < ResumableThreshold() or src_url.IsStream() or src_url.IsFifo()) if ((src_url.IsStream() or src_url.IsFifo()) and gsutil_api.GetApiSelector(provider=dst_url.scheme) == ApiSelector.JSON): orig_stream = upload_stream # Add limited seekable properties to the stream via buffering. upload_stream = ResumableStreamingJsonUploadWrapper( orig_stream, GetJsonResumableChunkSize()) if not parallel_composite_upload and len(hash_algs): # Parallel composite uploads calculate hashes per-component in subsequent # calls to this function, but the composition of the final object is a # cloud-only operation. wrapped_filestream = HashingFileUploadWrapper(upload_stream, digesters, hash_algs, upload_url, logger) else: wrapped_filestream = upload_stream def CallParallelCompositeUpload(): return _DoParallelCompositeUpload(upload_stream, upload_url, dst_url, dst_obj_metadata, global_copy_helper_opts.canned_acl, upload_size, preconditions, gsutil_api, command_obj, copy_exception_handler, logger, gzip_encoded=gzip_encoded_file) def CallNonResumableUpload(): return _UploadFileToObjectNonResumable(upload_url, wrapped_filestream, upload_size, dst_url, dst_obj_metadata, preconditions, gsutil_api, gzip_encoded=gzip_encoded_file) def CallResumableUpload(): return _UploadFileToObjectResumable(upload_url, wrapped_filestream, upload_size, dst_url, dst_obj_metadata, preconditions, gsutil_api, logger, is_component=is_component, gzip_encoded=gzip_encoded_file) if parallel_composite_upload: delegate = CallParallelCompositeUpload elif non_resumable_upload: delegate = CallNonResumableUpload else: delegate = CallResumableUpload elapsed_time, uploaded_object = _DelegateUploadFileToObject( delegate, upload_url, upload_stream, zipped_file, gzip_encoded_file, parallel_composite_upload, logger) if not parallel_composite_upload: try: digests = _CreateDigestsFromDigesters(digesters) _CheckHashes(logger, dst_url, uploaded_object, src_url.object_name, digests, is_upload=True) except HashMismatchException: if _RENAME_ON_HASH_MISMATCH: corrupted_obj_metadata = apitools_messages.Object( name=dst_obj_metadata.name, bucket=dst_obj_metadata.bucket, etag=uploaded_object.etag) dst_obj_metadata.name = (dst_url.object_name + _RENAME_ON_HASH_MISMATCH_SUFFIX) gsutil_api.CopyObject(corrupted_obj_metadata, dst_obj_metadata, provider=dst_url.scheme) # If the digest doesn't match, delete the object. gsutil_api.DeleteObject(dst_url.bucket_name, dst_url.object_name, generation=uploaded_object.generation, provider=dst_url.scheme) raise result_url = dst_url.Clone() result_url.generation = uploaded_object.generation result_url.generation = GenerationFromUrlAndString(result_url, uploaded_object.generation) if not is_component: PutToQueueWithTimeout( gsutil_api.status_queue, FileMessage(upload_url, dst_url, time.time(), message_type=FileMessage.FILE_UPLOAD, size=upload_size, finished=True)) return (elapsed_time, uploaded_object.size, result_url, uploaded_object.md5Hash) def _GetDownloadFile(dst_url, src_obj_metadata, logger): """Creates a new download file, and deletes the file that will be replaced. Names and creates a temporary file for this download. Also, if there is an existing file at the path where this file will be placed after the download is completed, that file will be deleted. Args: dst_url: Destination FileUrl. src_obj_metadata: Metadata from the source object. logger: for outputting log messages. Returns: (download_file_name, need_to_unzip) download_file_name: The name of the temporary file to which the object will be downloaded. need_to_unzip: If true, a temporary zip file was used and must be uncompressed as part of validation. """ dir_name = os.path.dirname(dst_url.object_name) if dir_name and not os.path.exists(dir_name): # Do dir creation in try block so can ignore case where dir already # exists. This is needed to avoid a race condition when running gsutil # -m cp. try: os.makedirs(dir_name) except OSError as e: if e.errno != errno.EEXIST: raise need_to_unzip = False # For gzipped objects download to a temp file and unzip. For the XML API, # this represents the result of a HEAD request. For the JSON API, this is # the stored encoding which the service may not respect. However, if the # server sends decompressed bytes for a file that is stored compressed # (double compressed case), there is no way we can validate the hash and # we will fail our hash check for the object. if ObjectIsGzipEncoded(src_obj_metadata): need_to_unzip = True download_file_name = temporary_file_util.GetTempZipFileName(dst_url) logger.info('Downloading to temp gzip filename %s', download_file_name) else: download_file_name = temporary_file_util.GetTempFileName(dst_url) # If a file exists at the permanent destination (where the file will be moved # after the download is completed), delete it here to reduce disk space # requirements. if os.path.exists(dst_url.object_name): os.unlink(dst_url.object_name) # Downloads open the temporary download file in r+b mode, which requires it # to already exist, so we create it here if it doesn't exist already. if not os.path.exists(download_file_name): fp = open(download_file_name, 'w') fp.close() return download_file_name, need_to_unzip def _ShouldDoSlicedDownload(download_strategy, src_obj_metadata, allow_splitting, logger): """Determines whether the sliced download strategy should be used. Args: download_strategy: CloudApi download strategy. src_obj_metadata: Metadata from the source object. allow_splitting: If false, then this function returns false. logger: logging.Logger for log message output. Returns: True iff a sliced download should be performed on the source file. """ sliced_object_download_threshold = HumanReadableToBytes( config.get('GSUtil', 'sliced_object_download_threshold', DEFAULT_SLICED_OBJECT_DOWNLOAD_THRESHOLD)) max_components = config.getint('GSUtil', 'sliced_object_download_max_components', DEFAULT_SLICED_OBJECT_DOWNLOAD_MAX_COMPONENTS) # Don't use sliced download if it will prevent us from performing an # integrity check. check_hashes_config = config.get('GSUtil', 'check_hashes', CHECK_HASH_IF_FAST_ELSE_FAIL) parallel_hashing = src_obj_metadata.crc32c and UsingCrcmodExtension() hashing_okay = parallel_hashing or check_hashes_config == CHECK_HASH_NEVER use_slice = (allow_splitting and download_strategy is not CloudApi.DownloadStrategy.ONE_SHOT and max_components > 1 and hashing_okay and sliced_object_download_threshold > 0 and src_obj_metadata.size >= sliced_object_download_threshold) if (not use_slice and src_obj_metadata.size >= PARALLEL_COMPOSITE_SUGGESTION_THRESHOLD and not UsingCrcmodExtension() and check_hashes_config != CHECK_HASH_NEVER): with suggested_sliced_transfers_lock: if not suggested_sliced_transfers.get('suggested'): logger.info('\n'.join( textwrap.wrap( '==> NOTE: You are downloading one or more large file(s), which ' 'would run significantly faster if you enabled sliced object ' 'downloads. This feature is enabled by default but requires that ' 'compiled crcmod be installed (see "gsutil help crcmod").')) + '\n') suggested_sliced_transfers['suggested'] = True return use_slice def _PerformSlicedDownloadObjectToFile(cls, args, thread_state=None): """Function argument to Apply for performing sliced downloads. Args: cls: Calling Command class. args: PerformSlicedDownloadObjectToFileArgs tuple describing the target. thread_state: gsutil Cloud API instance to use for the operation. Returns: PerformSlicedDownloadReturnValues named-tuple filled with: component_num: The component number for this download. crc32c: CRC32C hash value (integer) of the downloaded bytes. bytes_transferred: The number of bytes transferred, potentially less than the component size if the download was resumed. component_total_size: The number of bytes corresponding to the whole component size, potentially more than bytes_transferred if the download was resumed. """ gsutil_api = GetCloudApiInstance(cls, thread_state=thread_state) # Deserialize the picklable object metadata. src_obj_metadata = protojson.decode_message(apitools_messages.Object, args.src_obj_metadata_json) hash_algs = GetDownloadHashAlgs(cls.logger, consider_crc32c=src_obj_metadata.crc32c) digesters = dict((alg, hash_algs[alg]()) for alg in hash_algs or {}) (bytes_transferred, server_encoding) = (_DownloadObjectToFileResumable( args.src_url, src_obj_metadata, args.dst_url, args.download_file_name, gsutil_api, cls.logger, digesters, component_num=args.component_num, start_byte=args.start_byte, end_byte=args.end_byte, decryption_key=args.decryption_key)) crc32c_val = None if 'crc32c' in digesters: crc32c_val = digesters['crc32c'].crcValue return PerformSlicedDownloadReturnValues(args.component_num, crc32c_val, bytes_transferred, args.end_byte - args.start_byte + 1, server_encoding) def _MaintainSlicedDownloadTrackerFiles(src_obj_metadata, dst_url, download_file_name, logger, api_selector, num_components): """Maintains sliced download tracker files in order to permit resumability. Reads or creates a sliced download tracker file representing this object download. Upon an attempt at cross-process resumption, the contents of the sliced download tracker file are verified to make sure a resumption is possible and appropriate. In the case that a resumption should not be attempted, existing component tracker files are deleted (to prevent child processes from attempting resumption), and a new sliced download tracker file is created. Args: src_obj_metadata: Metadata from the source object. Must include etag and generation. dst_url: Destination FileUrl. download_file_name: Temporary file name to be used for the download. logger: for outputting log messages. api_selector: The Cloud API implementation used. num_components: The number of components to perform this download with. """ assert src_obj_metadata.etag tracker_file = None # Only can happen if the resumable threshold is set higher than the # parallel transfer threshold. if src_obj_metadata.size < ResumableThreshold(): return tracker_file_name = GetTrackerFilePath(dst_url, TrackerFileType.SLICED_DOWNLOAD, api_selector) fp = None # Check to see if we should attempt resuming the download. try: fp = open(download_file_name, 'rb') existing_file_size = GetFileSize(fp) # A parallel resumption should be attempted only if the destination file # size is exactly the same as the source size and the tracker file matches. if existing_file_size == src_obj_metadata.size: tracker_file = open(tracker_file_name, 'r') tracker_file_data = json.load(tracker_file) if (tracker_file_data['etag'] == src_obj_metadata.etag and tracker_file_data['generation'] == src_obj_metadata.generation and tracker_file_data['num_components'] == num_components): return else: tracker_file.close() logger.warn('Sliced download tracker file doesn\'t match for ' 'download of %s. Restarting download from scratch.' % dst_url.object_name) except (IOError, ValueError) as e: # Ignore non-existent file (happens first time a download # is attempted on an object), but warn user for other errors. if isinstance(e, ValueError) or e.errno != errno.ENOENT: logger.warn('Couldn\'t read sliced download tracker file (%s): %s. ' 'Restarting download from scratch.' % (tracker_file_name, str(e))) finally: if fp: fp.close() if tracker_file: tracker_file.close() # Delete component tracker files to guarantee download starts from scratch. DeleteDownloadTrackerFiles(dst_url, api_selector) # Create a new sliced download tracker file to represent this download. try: with open(tracker_file_name, 'w') as tracker_file: tracker_file_data = { 'etag': src_obj_metadata.etag, 'generation': src_obj_metadata.generation, 'num_components': num_components, } tracker_file.write(json.dumps(tracker_file_data)) except IOError as e: RaiseUnwritableTrackerFileException(tracker_file_name, e.strerror) class SlicedDownloadFileWrapper(object): """Wraps a file object to be used in GetObjectMedia for sliced downloads. In order to allow resumability, the file object used by each thread in a sliced object download should be wrapped using SlicedDownloadFileWrapper. Passing a SlicedDownloadFileWrapper object to GetObjectMedia will allow the download component tracker file for this component to be updated periodically, while the downloaded bytes are normally written to file. """ def __init__(self, fp, tracker_file_name, src_obj_metadata, start_byte, end_byte): """Initializes the SlicedDownloadFileWrapper. Args: fp: The already-open file object to be used for writing in GetObjectMedia. Data will be written to file starting at the current seek position. tracker_file_name: The name of the tracker file for this component. src_obj_metadata: Metadata from the source object. Must include etag and generation. start_byte: The first byte to be downloaded for this parallel component. end_byte: The last byte to be downloaded for this parallel component. """ self._orig_fp = fp self._tracker_file_name = tracker_file_name self._src_obj_metadata = src_obj_metadata self._last_tracker_file_byte = None self._start_byte = start_byte self._end_byte = end_byte @property def mode(self): """Returns the mode of the underlying file descriptor, or None.""" return getattr(self._orig_fp, 'mode', None) def write(self, data): # pylint: disable=invalid-name current_file_pos = self._orig_fp.tell() assert (self._start_byte <= current_file_pos and current_file_pos + len(data) <= self._end_byte + 1) text_util.write_to_fd(self._orig_fp, data) current_file_pos = self._orig_fp.tell() threshold = TRACKERFILE_UPDATE_THRESHOLD if (self._last_tracker_file_byte is None or current_file_pos - self._last_tracker_file_byte > threshold or current_file_pos == self._end_byte + 1): WriteDownloadComponentTrackerFile(self._tracker_file_name, self._src_obj_metadata, current_file_pos) self._last_tracker_file_byte = current_file_pos def seek(self, offset, whence=os.SEEK_SET): # pylint: disable=invalid-name if whence == os.SEEK_END: self._orig_fp.seek(offset + self._end_byte + 1) else: self._orig_fp.seek(offset, whence) assert self._start_byte <= self._orig_fp.tell() <= self._end_byte + 1 def tell(self): # pylint: disable=invalid-name return self._orig_fp.tell() def flush(self): # pylint: disable=invalid-name self._orig_fp.flush() def close(self): # pylint: disable=invalid-name if self._orig_fp: self._orig_fp.close() def _PartitionObject(src_url, src_obj_metadata, dst_url, download_file_name, decryption_key=None): """Partitions an object into components to be downloaded. Each component is a byte range of the object. The byte ranges of the returned components are mutually exclusive and collectively exhaustive. The byte ranges are inclusive at both end points. Args: src_url: Source CloudUrl. src_obj_metadata: Metadata from the source object with non-pickleable fields removed. dst_url: Destination FileUrl. download_file_name: Temporary file name to be used for the download. decryption_key: Base64-encoded decryption key for the source object, if any. Returns: components_to_download: A list of PerformSlicedDownloadObjectToFileArgs to be used in Apply for the sliced download. """ sliced_download_component_size = HumanReadableToBytes( config.get('GSUtil', 'sliced_object_download_component_size', DEFAULT_SLICED_OBJECT_DOWNLOAD_COMPONENT_SIZE)) max_components = config.getint('GSUtil', 'sliced_object_download_max_components', DEFAULT_SLICED_OBJECT_DOWNLOAD_MAX_COMPONENTS) num_components, component_size = _GetPartitionInfo( src_obj_metadata.size, max_components, sliced_download_component_size) components_to_download = [] component_lengths = [] for i in range(num_components): start_byte = i * component_size end_byte = min((i + 1) * (component_size) - 1, src_obj_metadata.size - 1) component_lengths.append(end_byte - start_byte + 1) # We need to serialize src_obj_metadata for pickling since it can # contain nested classes such as custom metadata. src_obj_metadata_json = protojson.encode_message(src_obj_metadata) components_to_download.append( PerformSlicedDownloadObjectToFileArgs(i, src_url, src_obj_metadata_json, dst_url, download_file_name, start_byte, end_byte, decryption_key)) return components_to_download, component_lengths def _DoSlicedDownload(src_url, src_obj_metadata, dst_url, download_file_name, command_obj, logger, copy_exception_handler, api_selector, decryption_key=None, status_queue=None): """Downloads a cloud object to a local file using sliced download. Byte ranges are decided for each thread/process, and then the parts are downloaded in parallel. Args: src_url: Source CloudUrl. src_obj_metadata: Metadata from the source object. dst_url: Destination FileUrl. download_file_name: Temporary file name to be used for download. command_obj: command object for use in Apply in parallel composite uploads. logger: for outputting log messages. copy_exception_handler: For handling copy exceptions during Apply. api_selector: The Cloud API implementation used. decryption_key: Base64-encoded decryption key for the source object, if any. status_queue: Queue for posting file messages for UI/Analytics. Returns: (bytes_transferred, crc32c) bytes_transferred: Number of bytes transferred from server this call. crc32c: a crc32c hash value (integer) for the downloaded bytes, or None if crc32c hashing wasn't performed. """ # CustomerEncryptionValue is a subclass and thus not pickleable for # multiprocessing, but at this point we already have the matching key, # so just discard the metadata. src_obj_metadata.customerEncryption = None components_to_download, component_lengths = _PartitionObject( src_url, src_obj_metadata, dst_url, download_file_name, decryption_key) num_components = len(components_to_download) _MaintainSlicedDownloadTrackerFiles(src_obj_metadata, dst_url, download_file_name, logger, api_selector, num_components) # Resize the download file so each child process can seek to its start byte. with open(download_file_name, 'r+b') as fp: fp.truncate(src_obj_metadata.size) # Assign a start FileMessage to each component for (i, component) in enumerate(components_to_download): size = component.end_byte - component.start_byte + 1 download_start_byte = GetDownloadStartByte(src_obj_metadata, dst_url, api_selector, component.start_byte, size, i) bytes_already_downloaded = download_start_byte - component.start_byte PutToQueueWithTimeout( status_queue, FileMessage(src_url, dst_url, time.time(), size=size, finished=False, component_num=i, message_type=FileMessage.COMPONENT_TO_DOWNLOAD, bytes_already_downloaded=bytes_already_downloaded)) cp_results = command_obj.Apply( _PerformSlicedDownloadObjectToFile, components_to_download, copy_exception_handler, arg_checker=gslib.command.DummyArgChecker, parallel_operations_override=command_obj.ParallelOverrideReason.SLICE, should_return_results=True) if len(cp_results) < num_components: raise CommandException( 'Some components of %s were not downloaded successfully. ' 'Please retry this download.' % dst_url.object_name) # Crc32c hashes have to be concatenated in the correct order. cp_results = sorted(cp_results, key=attrgetter('component_num')) crc32c = cp_results[0].crc32c if crc32c is not None: for i in range(1, num_components): crc32c = ConcatCrc32c(crc32c, cp_results[i].crc32c, component_lengths[i]) bytes_transferred = 0 expect_gzip = ObjectIsGzipEncoded(src_obj_metadata) # Assign an end FileMessage to each component for cp_result in cp_results: PutToQueueWithTimeout( status_queue, FileMessage(src_url, dst_url, time.time(), size=cp_result.component_total_size, finished=True, component_num=cp_result.component_num, message_type=FileMessage.COMPONENT_TO_DOWNLOAD)) bytes_transferred += cp_result.bytes_transferred server_gzip = (cp_result.server_encoding and cp_result.server_encoding.lower().endswith('gzip')) # If the server gzipped any components on the fly, we will have no chance of # properly reconstructing the file. if server_gzip and not expect_gzip: raise CommandException( 'Download of %s failed because the server sent back data with an ' 'unexpected encoding.' % dst_url.object_name) return bytes_transferred, crc32c def _DownloadObjectToFileResumable(src_url, src_obj_metadata, dst_url, download_file_name, gsutil_api, logger, digesters, component_num=None, start_byte=0, end_byte=None, decryption_key=None): """Downloads an object to a local file using the resumable strategy. Args: src_url: Source CloudUrl. src_obj_metadata: Metadata from the source object. dst_url: Destination FileUrl. download_file_name: Temporary file name to be used for download. gsutil_api: gsutil Cloud API instance to use for the download. logger: for outputting log messages. digesters: Digesters corresponding to the hash algorithms that will be used for validation. component_num: Which component of a sliced download this call is for, or None if this is not a sliced download. start_byte: The first byte of a byte range for a sliced download. end_byte: The last byte of a byte range for a sliced download. decryption_key: Base64-encoded decryption key for the source object, if any. Returns: (bytes_transferred, server_encoding) bytes_transferred: Number of bytes transferred from server this call. server_encoding: Content-encoding string if it was detected that the server sent encoded bytes during transfer, None otherwise. """ if end_byte is None: end_byte = src_obj_metadata.size - 1 download_size = end_byte - start_byte + 1 is_sliced = component_num is not None api_selector = gsutil_api.GetApiSelector(provider=src_url.scheme) server_encoding = None # Used for logging download_name = dst_url.object_name if is_sliced: download_name += ' component %d' % component_num fp = None try: fp = open(download_file_name, 'r+b') fp.seek(start_byte) api_selector = gsutil_api.GetApiSelector(provider=src_url.scheme) existing_file_size = GetFileSize(fp) tracker_file_name, download_start_byte = ReadOrCreateDownloadTrackerFile( src_obj_metadata, dst_url, logger, api_selector, start_byte, existing_file_size, component_num, ) if download_start_byte < start_byte or download_start_byte > end_byte + 1: DeleteTrackerFile(tracker_file_name) raise CommandException( 'Resumable download start point for %s is not in the correct byte ' 'range. Deleting tracker file, so if you re-try this download it ' 'will start from scratch' % download_name) download_complete = (download_start_byte == start_byte + download_size) resuming = (download_start_byte != start_byte) and not download_complete if resuming: logger.info('Resuming download for %s', download_name) elif download_complete: logger.info( 'Download already complete for %s, skipping download but ' 'will run integrity checks.', download_name) # This is used for resuming downloads, but also for passing the mediaLink # and size into the download for new downloads so that we can avoid # making an extra HTTP call. serialization_data = GetDownloadSerializationData( src_obj_metadata, progress=download_start_byte, user_project=gsutil_api.user_project) if resuming or download_complete: # Catch up our digester with the hash data. bytes_digested = 0 total_bytes_to_digest = download_start_byte - start_byte hash_callback = ProgressCallbackWithTimeout( total_bytes_to_digest, FileProgressCallbackHandler(gsutil_api.status_queue, component_num=component_num, src_url=src_url, dst_url=dst_url, operation_name='Hashing').call) while bytes_digested < total_bytes_to_digest: bytes_to_read = min(DEFAULT_FILE_BUFFER_SIZE, total_bytes_to_digest - bytes_digested) data = fp.read(bytes_to_read) bytes_digested += bytes_to_read for alg_name in digesters: digesters[alg_name].update(six.ensure_binary(data)) hash_callback.Progress(len(data)) elif not is_sliced: # Delete file contents and start entire object download from scratch. fp.truncate(0) existing_file_size = 0 progress_callback = FileProgressCallbackHandler( gsutil_api.status_queue, start_byte=start_byte, override_total_size=download_size, src_url=src_url, dst_url=dst_url, component_num=component_num, operation_name='Downloading').call if global_copy_helper_opts.test_callback_file: with open(global_copy_helper_opts.test_callback_file, 'rb') as test_fp: progress_callback = pickle.loads(test_fp.read()).call if is_sliced and src_obj_metadata.size >= ResumableThreshold(): fp = SlicedDownloadFileWrapper(fp, tracker_file_name, src_obj_metadata, start_byte, end_byte) compressed_encoding = ObjectIsGzipEncoded(src_obj_metadata) # TODO: With gzip encoding (which may occur on-the-fly and not be part of # the object's metadata), when we request a range to resume, it's possible # that the server will just resend the entire object, which means our # caught-up hash will be incorrect. We recalculate the hash on # the local file in the case of a failed gzip hash anyway, but it would # be better if we actively detected this case. if not download_complete: fp.seek(download_start_byte) server_encoding = gsutil_api.GetObjectMedia( src_url.bucket_name, src_url.object_name, fp, start_byte=download_start_byte, end_byte=end_byte, compressed_encoding=compressed_encoding, generation=src_url.generation, object_size=src_obj_metadata.size, download_strategy=CloudApi.DownloadStrategy.RESUMABLE, provider=src_url.scheme, serialization_data=serialization_data, digesters=digesters, progress_callback=progress_callback, decryption_tuple=CryptoKeyWrapperFromKey(decryption_key)) except ResumableDownloadException as e: logger.warning('Caught ResumableDownloadException (%s) for download of %s.', e.reason, download_name) raise finally: if fp: fp.close() bytes_transferred = end_byte - download_start_byte + 1 return bytes_transferred, server_encoding def _DownloadObjectToFileNonResumable(src_url, src_obj_metadata, dst_url, download_file_name, gsutil_api, digesters, decryption_key=None): """Downloads an object to a local file using the non-resumable strategy. This function does not support component transfers. Args: src_url: Source CloudUrl. src_obj_metadata: Metadata from the source object. dst_url: Destination FileUrl. download_file_name: Temporary file name to be used for download. gsutil_api: gsutil Cloud API instance to use for the download. digesters: Digesters corresponding to the hash algorithms that will be used for validation. decryption_key: Base64-encoded decryption key for the source object, if any. Returns: (bytes_transferred, server_encoding) bytes_transferred: Number of bytes transferred from server this call. server_encoding: Content-encoding string if it was detected that the server sent encoded bytes during transfer, None otherwise. """ fp = None try: fp = open(download_file_name, 'w') # This is used to pass the mediaLink and the size into the download so that # we can avoid making an extra HTTP call. serialization_data = GetDownloadSerializationData( src_obj_metadata, 0, user_project=gsutil_api.user_project) progress_callback = FileProgressCallbackHandler( gsutil_api.status_queue, src_url=src_url, dst_url=dst_url, operation_name='Downloading').call if global_copy_helper_opts.test_callback_file: with open(global_copy_helper_opts.test_callback_file, 'rb') as test_fp: progress_callback = pickle.loads(test_fp.read()).call server_encoding = gsutil_api.GetObjectMedia( src_url.bucket_name, src_url.object_name, fp, generation=src_url.generation, object_size=src_obj_metadata.size, download_strategy=CloudApi.DownloadStrategy.ONE_SHOT, provider=src_url.scheme, serialization_data=serialization_data, digesters=digesters, progress_callback=progress_callback, decryption_tuple=CryptoKeyWrapperFromKey(decryption_key)) finally: if fp: fp.close() return src_obj_metadata.size, server_encoding def _DownloadObjectToFile(src_url, src_obj_metadata, dst_url, gsutil_api, logger, command_obj, copy_exception_handler, allow_splitting=True, decryption_key=None, is_rsync=False, preserve_posix=False, use_stet=False): """Downloads an object to a local file. Args: src_url: Source CloudUrl. src_obj_metadata: Metadata from the source object. dst_url: Destination FileUrl. gsutil_api: gsutil Cloud API instance to use for the download. logger: for outputting log messages. command_obj: command object for use in Apply in sliced downloads. copy_exception_handler: For handling copy exceptions during Apply. allow_splitting: Whether or not to allow sliced download. decryption_key: Base64-encoded decryption key for the source object, if any. is_rsync: Whether or not the caller is the rsync command. preserve_posix: Whether or not to preserve POSIX attributes. use_stet: Decrypt downloaded file with STET binary if available on system. Returns: (elapsed_time, bytes_transferred, dst_url, md5), where time elapsed excludes initial GET. Raises: FileConcurrencySkipError: if this download is already in progress. CommandException: if other errors encountered. """ global open_files_map, open_files_lock if dst_url.object_name.endswith(dst_url.delim): logger.warn('\n'.join( textwrap.wrap( 'Skipping attempt to download to filename ending with slash (%s). This ' 'typically happens when using gsutil to download from a subdirectory ' 'created by the Cloud Console (https://cloud.google.com/console)' % dst_url.object_name))) # The warning above is needed because errors might get ignored # for parallel processing. raise InvalidUrlError('Invalid destination path: %s' % dst_url.object_name) api_selector = gsutil_api.GetApiSelector(provider=src_url.scheme) download_strategy = _SelectDownloadStrategy(dst_url) sliced_download = _ShouldDoSlicedDownload(download_strategy, src_obj_metadata, allow_splitting, logger) download_file_name, need_to_unzip = _GetDownloadFile(dst_url, src_obj_metadata, logger) # Ensure another process/thread is not already writing to this file. with open_files_lock: if open_files_map.get(download_file_name, False): raise FileConcurrencySkipError open_files_map[download_file_name] = True # Set up hash digesters. consider_md5 = src_obj_metadata.md5Hash and not sliced_download hash_algs = GetDownloadHashAlgs(logger, consider_md5=consider_md5, consider_crc32c=src_obj_metadata.crc32c) digesters = dict((alg, hash_algs[alg]()) for alg in hash_algs or {}) # Tracks whether the server used a gzip encoding. server_encoding = None download_complete = (src_obj_metadata.size == 0) bytes_transferred = 0 start_time = time.time() if not download_complete: if sliced_download: (bytes_transferred, crc32c) = (_DoSlicedDownload(src_url, src_obj_metadata, dst_url, download_file_name, command_obj, logger, copy_exception_handler, api_selector, decryption_key=decryption_key, status_queue=gsutil_api.status_queue)) if 'crc32c' in digesters: digesters['crc32c'].crcValue = crc32c elif download_strategy is CloudApi.DownloadStrategy.ONE_SHOT: bytes_transferred, server_encoding = _DownloadObjectToFileNonResumable( src_url, src_obj_metadata, dst_url, download_file_name, gsutil_api, digesters, decryption_key=decryption_key, ) elif download_strategy is CloudApi.DownloadStrategy.RESUMABLE: bytes_transferred, server_encoding = _DownloadObjectToFileResumable( src_url, src_obj_metadata, dst_url, download_file_name, gsutil_api, logger, digesters, decryption_key=decryption_key, ) else: raise CommandException('Invalid download strategy %s chosen for' 'file %s' % (download_strategy, download_file_name)) end_time = time.time() server_gzip = server_encoding and server_encoding.lower().endswith('gzip') local_md5 = _ValidateAndCompleteDownload(logger, src_url, src_obj_metadata, dst_url, need_to_unzip, server_gzip, digesters, hash_algs, download_file_name, api_selector, bytes_transferred, gsutil_api, is_rsync=is_rsync, preserve_posix=preserve_posix, use_stet=use_stet) with open_files_lock: open_files_map.delete(download_file_name) PutToQueueWithTimeout( gsutil_api.status_queue, FileMessage(src_url, dst_url, message_time=end_time, message_type=FileMessage.FILE_DOWNLOAD, size=src_obj_metadata.size, finished=True)) return (end_time - start_time, bytes_transferred, dst_url, local_md5) def _ValidateAndCompleteDownload(logger, src_url, src_obj_metadata, dst_url, need_to_unzip, server_gzip, digesters, hash_algs, temporary_file_name, api_selector, bytes_transferred, gsutil_api, is_rsync=False, preserve_posix=False, use_stet=False): """Validates and performs necessary operations on a downloaded file. Validates the integrity of the downloaded file using hash_algs. If the file was compressed (temporarily), the file will be decompressed. Then, if the integrity of the file was successfully validated, the file will be moved from its temporary download location to its permanent location on disk. Args: logger: For outputting log messages. src_url: StorageUrl for the source object. src_obj_metadata: Metadata for the source object, potentially containing hash values. dst_url: StorageUrl describing the destination file. need_to_unzip: If true, a temporary zip file was used and must be uncompressed as part of validation. server_gzip: If true, the server gzipped the bytes (regardless of whether the object metadata claimed it was gzipped). digesters: dict of {string, hash digester} that contains up-to-date digests computed during the download. If a digester for a particular algorithm is None, an up-to-date digest is not available and the hash must be recomputed from the local file. hash_algs: dict of {string, hash algorithm} that can be used if digesters don't have up-to-date digests. temporary_file_name: Temporary file name that was used for download. api_selector: The Cloud API implementation used (used tracker file naming). bytes_transferred: Number of bytes downloaded (used for logging). gsutil_api: Cloud API to use for service and status. is_rsync: Whether or not the caller is the rsync function. Used to determine if timeCreated should be used. preserve_posix: Whether or not to preserve the posix attributes. use_stet: If True, attempt to decrypt downloaded files with the STET binary if it's present on the system. Returns: An MD5 of the local file, if one was calculated as part of the integrity check. """ final_file_name = dst_url.object_name digesters_succeeded = True for alg in digesters: # If we get a digester with a None algorithm, the underlying # implementation failed to calculate a digest, so we will need to # calculate one from scratch. if not digesters[alg]: digesters_succeeded = False break if digesters_succeeded: local_hashes = _CreateDigestsFromDigesters(digesters) else: local_hashes = _CreateDigestsFromLocalFile(gsutil_api.status_queue, hash_algs, temporary_file_name, src_url, src_obj_metadata) digest_verified = True hash_invalid_exception = None try: _CheckHashes(logger, src_url, src_obj_metadata, final_file_name, local_hashes) DeleteDownloadTrackerFiles(dst_url, api_selector) except HashMismatchException as e: # If an non-gzipped object gets sent with gzip content encoding, the hash # we calculate will match the gzipped bytes, not the original object. Thus, # we'll need to calculate and check it after unzipping. if server_gzip: logger.debug('Hash did not match but server gzipped the content, will ' 'recalculate.') digest_verified = False elif api_selector == ApiSelector.XML: logger.debug( 'Hash did not match but server may have gzipped the content, will ' 'recalculate.') # Save off the exception in case this isn't a gzipped file. hash_invalid_exception = e digest_verified = False else: DeleteDownloadTrackerFiles(dst_url, api_selector) if _RENAME_ON_HASH_MISMATCH: os.rename(temporary_file_name, final_file_name + _RENAME_ON_HASH_MISMATCH_SUFFIX) else: os.unlink(temporary_file_name) raise if not (need_to_unzip or server_gzip): unzipped_temporary_file_name = temporary_file_name else: # This will not result in the same string as temporary_file_name b/c # GetTempFileName returns ".gstmp" and gzipped temp files have ".gztmp". unzipped_temporary_file_name = temporary_file_util.GetTempFileName(dst_url) # Log that we're uncompressing if the file is big enough that # decompressing would make it look like the transfer "stalled" at the end. if bytes_transferred > TEN_MIB: logger.info('Uncompressing temporarily gzipped file to %s...', final_file_name) gzip_fp = None try: # Downloaded temporarily gzipped file, unzip to file without '_.gztmp' # suffix. gzip_fp = gzip.open(temporary_file_name, 'rb') with open(unzipped_temporary_file_name, 'wb') as f_out: data = gzip_fp.read(GZIP_CHUNK_SIZE) while data: f_out.write(data) data = gzip_fp.read(GZIP_CHUNK_SIZE) except IOError as e: # In the XML case where we don't know if the file was gzipped, raise # the original hash exception if we find that it wasn't. if 'Not a gzipped file' in str(e) and hash_invalid_exception: # Linter improperly thinks we're raising None despite the above check. # pylint: disable=raising-bad-type raise hash_invalid_exception finally: if gzip_fp: gzip_fp.close() os.unlink(temporary_file_name) if not digest_verified: try: # Recalculate hashes on the unzipped local file. local_hashes = _CreateDigestsFromLocalFile(gsutil_api.status_queue, hash_algs, unzipped_temporary_file_name, src_url, src_obj_metadata) _CheckHashes(logger, src_url, src_obj_metadata, final_file_name, local_hashes) DeleteDownloadTrackerFiles(dst_url, api_selector) except HashMismatchException: DeleteDownloadTrackerFiles(dst_url, api_selector) if _RENAME_ON_HASH_MISMATCH: os.rename( unzipped_temporary_file_name, unzipped_temporary_file_name + _RENAME_ON_HASH_MISMATCH_SUFFIX) else: os.unlink(unzipped_temporary_file_name) raise if use_stet: # Decrypt data using STET binary. stet_util.decrypt_download(src_url, dst_url, unzipped_temporary_file_name, logger) os.rename(unzipped_temporary_file_name, final_file_name) ParseAndSetPOSIXAttributes(final_file_name, src_obj_metadata, is_rsync=is_rsync, preserve_posix=preserve_posix) if 'md5' in local_hashes: return local_hashes['md5'] def _CopyFileToFile(src_url, dst_url, status_queue=None, src_obj_metadata=None): """Copies a local file to a local file. Args: src_url: Source FileUrl. dst_url: Destination FileUrl. status_queue: Queue for posting file messages for UI/Analytics. src_obj_metadata: An apitools Object that may contain file size, or None. Returns: (elapsed_time, bytes_transferred, dst_url, md5=None). Raises: CommandException: if errors encountered. """ src_fp = GetStreamFromFileUrl(src_url) dir_name = os.path.dirname(dst_url.object_name) if dir_name: try: os.makedirs(dir_name) except OSError as e: if e.errno != errno.EEXIST: raise with open(dst_url.object_name, 'wb') as dst_fp: start_time = time.time() shutil.copyfileobj(src_fp, dst_fp) if not src_url.IsStream(): src_fp.close() # Explicitly close the src fp - necessary if it is a fifo. end_time = time.time() PutToQueueWithTimeout( status_queue, FileMessage(src_url, dst_url, end_time, message_type=FileMessage.FILE_LOCAL_COPY, size=src_obj_metadata.size if src_obj_metadata else None, finished=True)) return (end_time - start_time, os.path.getsize(dst_url.object_name), dst_url, None) def _DummyTrackerCallback(_): pass # pylint: disable=undefined-variable def _CopyObjToObjDaisyChainMode(src_url, src_obj_metadata, dst_url, dst_obj_metadata, preconditions, gsutil_api, logger, decryption_key=None): """Copies from src_url to dst_url in "daisy chain" mode. See -D OPTION documentation about what daisy chain mode is. Args: src_url: Source CloudUrl src_obj_metadata: Metadata from source object dst_url: Destination CloudUrl dst_obj_metadata: Object-specific metadata that should be overidden during the copy. preconditions: Preconditions to use for the copy. gsutil_api: gsutil Cloud API to use for the copy. logger: For outputting log messages. decryption_key: Base64-encoded decryption key for the source object, if any. Returns: (elapsed_time, bytes_transferred, dst_url with generation, md5 hash of destination) excluding overhead like initial GET. Raises: CommandException: if errors encountered. """ # We don't attempt to preserve ACLs across providers because # GCS and S3 support different ACLs and disjoint principals. if (global_copy_helper_opts.preserve_acl and src_url.scheme != dst_url.scheme): raise NotImplementedError('Cross-provider cp -p not supported') if not global_copy_helper_opts.preserve_acl: dst_obj_metadata.acl = [] # Don't use callbacks for downloads on the daisy chain wrapper because # upload callbacks will output progress, but respect test hooks if present. progress_callback = None if global_copy_helper_opts.test_callback_file: with open(global_copy_helper_opts.test_callback_file, 'rb') as test_fp: progress_callback = pickle.loads(test_fp.read()).call compressed_encoding = ObjectIsGzipEncoded(src_obj_metadata) encryption_keywrapper = GetEncryptionKeyWrapper(config) start_time = time.time() upload_fp = DaisyChainWrapper(src_url, src_obj_metadata.size, gsutil_api, compressed_encoding=compressed_encoding, progress_callback=progress_callback, decryption_key=decryption_key) uploaded_object = None if src_obj_metadata.size == 0: # Resumable uploads of size 0 are not supported. uploaded_object = gsutil_api.UploadObject( upload_fp, object_metadata=dst_obj_metadata, canned_acl=global_copy_helper_opts.canned_acl, preconditions=preconditions, provider=dst_url.scheme, fields=UPLOAD_RETURN_FIELDS, size=src_obj_metadata.size, encryption_tuple=encryption_keywrapper) else: # TODO: Support process-break resumes. This will resume across connection # breaks and server errors, but the tracker callback is a no-op so this # won't resume across gsutil runs. # TODO: Test retries via test_callback_file. uploaded_object = gsutil_api.UploadObjectResumable( upload_fp, object_metadata=dst_obj_metadata, canned_acl=global_copy_helper_opts.canned_acl, preconditions=preconditions, provider=dst_url.scheme, fields=UPLOAD_RETURN_FIELDS, size=src_obj_metadata.size, progress_callback=FileProgressCallbackHandler( gsutil_api.status_queue, src_url=src_url, dst_url=dst_url, operation_name='Uploading').call, tracker_callback=_DummyTrackerCallback, encryption_tuple=encryption_keywrapper) end_time = time.time() try: _CheckCloudHashes(logger, src_url, dst_url, src_obj_metadata, uploaded_object) except HashMismatchException: if _RENAME_ON_HASH_MISMATCH: corrupted_obj_metadata = apitools_messages.Object( name=dst_obj_metadata.name, bucket=dst_obj_metadata.bucket, etag=uploaded_object.etag) dst_obj_metadata.name = (dst_url.object_name + _RENAME_ON_HASH_MISMATCH_SUFFIX) decryption_keywrapper = CryptoKeyWrapperFromKey(decryption_key) gsutil_api.CopyObject(corrupted_obj_metadata, dst_obj_metadata, provider=dst_url.scheme, decryption_tuple=decryption_keywrapper, encryption_tuple=encryption_keywrapper) # If the digest doesn't match, delete the object. gsutil_api.DeleteObject(dst_url.bucket_name, dst_url.object_name, generation=uploaded_object.generation, provider=dst_url.scheme) raise result_url = dst_url.Clone() result_url.generation = GenerationFromUrlAndString(result_url, uploaded_object.generation) PutToQueueWithTimeout( gsutil_api.status_queue, FileMessage(src_url, dst_url, end_time, message_type=FileMessage.FILE_DAISY_COPY, size=src_obj_metadata.size, finished=True)) return (end_time - start_time, src_obj_metadata.size, result_url, uploaded_object.md5Hash) def GetSourceFieldsNeededForCopy(dst_is_cloud, skip_unsupported_objects, preserve_acl, is_rsync=False, preserve_posix=False, delete_source=False, file_size_will_change=False): """Determines the metadata fields needed for a copy operation. This function returns the fields we will need to successfully copy any cloud objects that might be iterated. By determining this prior to iteration, the cp command can request this metadata directly from the iterator's get/list calls, avoiding the need for a separate get metadata HTTP call for each iterated result. As a trade-off, filtering objects at the leaf nodes of the iteration (based on a remaining wildcard) is more expensive. This is because more metadata will be requested when object name is all that is required for filtering. The rsync command favors fast listing and comparison, and makes the opposite trade-off, optimizing for the low-delta case by making per-object get metadata HTTP call so that listing can return minimal metadata. It uses this function to determine what is needed for get metadata HTTP calls. Args: dst_is_cloud: if true, destination is a Cloud URL. skip_unsupported_objects: if true, get metadata for skipping unsupported object types. preserve_acl: if true, get object ACL. is_rsync: if true, the calling function is rsync. Determines if metadata is needed to verify download. preserve_posix: if true, retrieves POSIX attributes into user metadata. delete_source: if true, source object will be deleted after the copy (mv command). file_size_will_change: if true, do not try to record file size. Returns: List of necessary field metadata field names. """ src_obj_fields_set = set() if dst_is_cloud: # For cloud or daisy chain copy, we need every copyable field. # If we're not modifying or overriding any of the fields, we can get # away without retrieving the object metadata because the copy # operation can succeed with just the destination bucket and object # name. But if we are sending any metadata, the JSON API will expect a # complete object resource. Since we want metadata like the object size for # our own tracking, we just get all of the metadata here. src_obj_fields_set.update([ 'cacheControl', 'componentCount', 'contentDisposition', 'contentEncoding', 'contentLanguage', 'contentType', 'crc32c', 'customerEncryption', 'etag', 'generation', 'md5Hash', 'mediaLink', 'metadata', 'metageneration', 'storageClass', 'timeCreated', ]) # We only need the ACL if we're going to preserve it. if preserve_acl: src_obj_fields_set.update(['acl']) if not file_size_will_change: src_obj_fields_set.update(['size']) else: # Just get the fields needed to perform and validate the download. src_obj_fields_set.update([ 'crc32c', 'contentEncoding', 'contentType', 'customerEncryption', 'etag', 'mediaLink', 'md5Hash', 'size', 'generation', ]) if is_rsync: src_obj_fields_set.update(['metadata/%s' % MTIME_ATTR, 'timeCreated']) if preserve_posix: posix_fields = [ 'metadata/%s' % ATIME_ATTR, 'metadata/%s' % MTIME_ATTR, 'metadata/%s' % GID_ATTR, 'metadata/%s' % MODE_ATTR, 'metadata/%s' % UID_ATTR, ] src_obj_fields_set.update(posix_fields) if delete_source: src_obj_fields_set.update([ 'storageClass', 'timeCreated', ]) if skip_unsupported_objects: src_obj_fields_set.update(['storageClass']) return list(src_obj_fields_set) # Map of (lowercase) storage classes with early deletion charges to their # minimum lifetime in seconds. EARLY_DELETION_MINIMUM_LIFETIME = { 'nearline': 30 * SECONDS_PER_DAY, 'coldline': 90 * SECONDS_PER_DAY, 'archive': 365 * SECONDS_PER_DAY } def WarnIfMvEarlyDeletionChargeApplies(src_url, src_obj_metadata, logger): """Warns when deleting a gs:// object could incur an early deletion charge. This function inspects metadata for Google Cloud Storage objects that are subject to early deletion charges (such as Nearline), and warns when performing operations like mv that would delete them. Args: src_url: CloudUrl for the source object. src_obj_metadata: source object metadata with necessary fields (per GetSourceFieldsNeededForCopy). logger: logging.Logger for outputting warning. """ if (src_url.scheme == 'gs' and src_obj_metadata and src_obj_metadata.timeCreated and src_obj_metadata.storageClass): object_storage_class = src_obj_metadata.storageClass.lower() early_deletion_cutoff_seconds = EARLY_DELETION_MINIMUM_LIFETIME.get( object_storage_class, None) if early_deletion_cutoff_seconds: minimum_delete_age = ( early_deletion_cutoff_seconds + ConvertDatetimeToPOSIX(src_obj_metadata.timeCreated)) if time.time() < minimum_delete_age: logger.warn( 'Warning: moving %s object %s may incur an early deletion ' 'charge, because the original object is less than %s ' 'days old according to the local system time.', object_storage_class, src_url.url_string, early_deletion_cutoff_seconds // SECONDS_PER_DAY) def MaybeSkipUnsupportedObject(src_url, src_obj_metadata): """Skips unsupported object types if requested. Args: src_url: CloudUrl for the source object. src_obj_metadata: source object metadata with storageClass field (per GetSourceFieldsNeededForCopy). Raises: SkipGlacierError: if skipping a s3 Glacier object. """ if (src_url.scheme == 's3' and global_copy_helper_opts.skip_unsupported_objects and src_obj_metadata.storageClass == 'GLACIER'): raise SkipGlacierError() def GetDecryptionCSEK(src_url, src_obj_metadata): """Ensures a matching decryption key is available for the source object. Args: src_url: CloudUrl for the source object. src_obj_metadata: source object metadata with optional customerEncryption field. Raises: EncryptionException if the object is encrypted and no matching key is found. Returns: Base64-encoded decryption key string if the object is encrypted and a matching key is found, or None if object is not encrypted. """ if src_obj_metadata.customerEncryption: decryption_key = FindMatchingCSEKInBotoConfig( src_obj_metadata.customerEncryption.keySha256, config) if not decryption_key: raise EncryptionException( 'Missing decryption key with SHA256 hash %s. No decryption key ' 'matches object %s' % (src_obj_metadata.customerEncryption.keySha256, src_url)) return decryption_key # pylint: disable=undefined-variable # pylint: disable=too-many-statements def PerformCopy(logger, src_url, dst_url, gsutil_api, command_obj, copy_exception_handler, src_obj_metadata=None, allow_splitting=True, headers=None, manifest=None, gzip_exts=None, is_rsync=False, preserve_posix=False, gzip_encoded=False, use_stet=False): """Performs copy from src_url to dst_url, handling various special cases. Args: logger: for outputting log messages. src_url: Source StorageUrl. dst_url: Destination StorageUrl. gsutil_api: gsutil Cloud API instance to use for the copy. command_obj: command object for use in Apply in parallel composite uploads and sliced object downloads. copy_exception_handler: for handling copy exceptions during Apply. src_obj_metadata: If source URL is a cloud object, source object metadata with all necessary fields (per GetSourceFieldsNeededForCopy). Required for cloud source URLs. If source URL is a file, an apitools Object that may contain file size, or None. allow_splitting: Whether to allow the file to be split into component pieces for an parallel composite upload or download. headers: optional headers to use for the copy operation. manifest: optional manifest for tracking copy operations. gzip_exts: List of file extensions to gzip, if any. If gzip_exts is GZIP_ALL_FILES, gzip all files. is_rsync: Whether or not the caller is the rsync command. preserve_posix: Whether or not to preserve posix attributes. gzip_encoded: Whether to use gzip transport encoding for the upload. Used in conjunction with gzip_exts. Streaming files compressed is only supported on the JSON GCS API. use_stet: If True, will perform STET encryption or decryption using the binary specified in the boto config or PATH. Returns: (elapsed_time, bytes_transferred, version-specific dst_url) excluding overhead like initial GET. Raises: ItemExistsError: if no clobber flag is specified and the destination object already exists. SkipUnsupportedObjectError: if skip_unsupported_objects flag is specified and the source is an unsupported type. CommandException: if other errors encountered. """ # TODO: Remove elapsed_time as it is currently unused by all callers. if headers: dst_obj_headers = headers.copy() else: dst_obj_headers = {} # Create a metadata instance for each destination object so metadata # such as content-type can be applied per-object. # Initialize metadata from any headers passed in via -h. dst_obj_metadata = ObjectMetadataFromHeaders(dst_obj_headers) if dst_url.IsCloudUrl() and dst_url.scheme == 'gs': preconditions = PreconditionsFromHeaders(dst_obj_headers) else: preconditions = Preconditions() src_obj_filestream = None decryption_key = None copy_in_the_cloud = False if src_url.IsCloudUrl(): if (dst_url.IsCloudUrl() and src_url.scheme == dst_url.scheme and not global_copy_helper_opts.daisy_chain): copy_in_the_cloud = True if global_copy_helper_opts.perform_mv: WarnIfMvEarlyDeletionChargeApplies(src_url, src_obj_metadata, logger) MaybeSkipUnsupportedObject(src_url, src_obj_metadata) decryption_key = GetDecryptionCSEK(src_url, src_obj_metadata) src_obj_size = src_obj_metadata.size dst_obj_metadata.contentType = src_obj_metadata.contentType if global_copy_helper_opts.preserve_acl and dst_url.IsCloudUrl(): if src_url.scheme == 'gs' and not src_obj_metadata.acl: raise CommandException( 'No OWNER permission found for object %s. OWNER permission is ' 'required for preserving ACLs.' % src_url) dst_obj_metadata.acl = src_obj_metadata.acl # Special case for S3-to-S3 copy URLs using # global_copy_helper_opts.preserve_acl. # dst_url will be verified in _CopyObjToObjDaisyChainMode if it # is not s3 (and thus differs from src_url). if src_url.scheme == 's3': acl_text = S3MarkerAclFromObjectMetadata(src_obj_metadata) if acl_text: AddS3MarkerAclToObjectMetadata(dst_obj_metadata, acl_text) else: # src_url.IsFileUrl() if use_stet: source_stream_url = stet_util.encrypt_upload(src_url, dst_url, logger) else: source_stream_url = src_url try: src_obj_filestream = GetStreamFromFileUrl(source_stream_url) except Exception as e: # pylint: disable=broad-except message = 'Error opening file "%s": %s.' % (src_url, str(e)) if command_obj.continue_on_error: command_obj.op_failure_count += 1 logger.error(message) return else: raise CommandException(message) if src_url.IsStream() or src_url.IsFifo(): src_obj_size = None elif src_obj_metadata and src_obj_metadata.size and not use_stet: # Iterator retrieved the file's size, no need to stat it again. # Unless STET changed the file size. src_obj_size = src_obj_metadata.size else: src_obj_size = os.path.getsize(source_stream_url.object_name) if global_copy_helper_opts.use_manifest: # Set the source size in the manifest. manifest.Set(src_url.url_string, 'size', src_obj_size) if (dst_url.scheme == 's3' and src_url != 's3' and src_obj_size is not None and # Can't compare int to None in py3 src_obj_size > S3_MAX_UPLOAD_SIZE): raise CommandException( '"%s" exceeds the maximum gsutil-supported size for an S3 upload. S3 ' 'objects greater than %s in size require multipart uploads, which ' 'gsutil does not support.' % (src_url, MakeHumanReadable(S3_MAX_UPLOAD_SIZE))) # On Windows, stdin is opened as text mode instead of binary which causes # problems when piping a binary file, so this switches it to binary mode. if IS_WINDOWS and src_url.IsFileUrl() and src_url.IsStream(): msvcrt.setmode(GetStreamFromFileUrl(src_url).fileno(), os.O_BINARY) if global_copy_helper_opts.no_clobber: # There are two checks to prevent clobbering: # 1) The first check is to see if the URL # already exists at the destination and prevent the upload/download # from happening. This is done by the exists() call. # 2) The second check is only relevant if we are writing to gs. We can # enforce that the server only writes the object if it doesn't exist # by specifying the header below. This check only happens at the # server after the complete file has been uploaded. We specify this # header to prevent a race condition where a destination file may # be created after the first check and before the file is fully # uploaded. # In order to save on unnecessary uploads/downloads we perform both # checks. However, this may come at the cost of additional HTTP calls. if preconditions.gen_match: raise ArgumentException('Specifying x-goog-if-generation-match is ' 'not supported with cp -n') else: preconditions.gen_match = 0 if dst_url.IsFileUrl() and os.path.exists(dst_url.object_name): raise ItemExistsError() elif dst_url.IsCloudUrl(): try: dst_object = gsutil_api.GetObjectMetadata(dst_url.bucket_name, dst_url.object_name, provider=dst_url.scheme) except NotFoundException: dst_object = None if dst_object: raise ItemExistsError() if dst_url.IsCloudUrl(): # Cloud storage API gets object and bucket name from metadata. dst_obj_metadata.name = dst_url.object_name dst_obj_metadata.bucket = dst_url.bucket_name if src_url.IsCloudUrl(): # Preserve relevant metadata from the source object if it's not already # provided from the headers. src_obj_metadata.name = src_url.object_name src_obj_metadata.bucket = src_url.bucket_name else: _SetContentTypeFromFile(src_url, dst_obj_metadata) # Only set KMS key name if destination provider is 'gs'. encryption_keywrapper = GetEncryptionKeyWrapper(config) if (encryption_keywrapper and encryption_keywrapper.crypto_type == CryptoKeyType.CMEK and dst_url.scheme == 'gs'): dst_obj_metadata.kmsKeyName = encryption_keywrapper.crypto_key if src_obj_metadata: # Note that CopyObjectMetadata only copies specific fields. We intentionally # do not copy storageClass, as the bucket's default storage class should be # used (when copying to a gs:// bucket) unless explicitly overridden. CopyObjectMetadata(src_obj_metadata, dst_obj_metadata, override=False) if global_copy_helper_opts.dest_storage_class: dst_obj_metadata.storageClass = global_copy_helper_opts.dest_storage_class if config.get('GSUtil', 'check_hashes') == CHECK_HASH_NEVER: # GCS server will perform MD5 validation if the md5 hash is present. # Remove md5_hash if check_hashes=never. dst_obj_metadata.md5Hash = None _LogCopyOperation(logger, src_url, dst_url, dst_obj_metadata) if src_url.IsCloudUrl(): if dst_url.IsFileUrl(): PutToQueueWithTimeout( gsutil_api.status_queue, FileMessage(src_url, dst_url, time.time(), message_type=FileMessage.FILE_DOWNLOAD, size=src_obj_size, finished=False)) return _DownloadObjectToFile(src_url, src_obj_metadata, dst_url, gsutil_api, logger, command_obj, copy_exception_handler, allow_splitting=allow_splitting, decryption_key=decryption_key, is_rsync=is_rsync, preserve_posix=preserve_posix, use_stet=use_stet) elif copy_in_the_cloud: PutToQueueWithTimeout( gsutil_api.status_queue, FileMessage(src_url, dst_url, time.time(), message_type=FileMessage.FILE_CLOUD_COPY, size=src_obj_size, finished=False)) return _CopyObjToObjInTheCloud(src_url, src_obj_metadata, dst_url, dst_obj_metadata, preconditions, gsutil_api, decryption_key=decryption_key) else: PutToQueueWithTimeout( gsutil_api.status_queue, FileMessage(src_url, dst_url, time.time(), message_type=FileMessage.FILE_DAISY_COPY, size=src_obj_size, finished=False)) return _CopyObjToObjDaisyChainMode(src_url, src_obj_metadata, dst_url, dst_obj_metadata, preconditions, gsutil_api, logger, decryption_key=decryption_key) else: # src_url.IsFileUrl() if dst_url.IsCloudUrl(): # The FileMessage for this upload object is inside _UploadFileToObject(). # This is such because the function may alter src_url, which would prevent # us from correctly tracking the new url. uploaded_metadata = _UploadFileToObject(src_url, src_obj_filestream, src_obj_size, dst_url, dst_obj_metadata, preconditions, gsutil_api, logger, command_obj, copy_exception_handler, gzip_exts=gzip_exts, allow_splitting=allow_splitting, gzip_encoded=gzip_encoded) if use_stet: # Delete temporary file. os.unlink(src_obj_filestream.name) return uploaded_metadata else: # dst_url.IsFileUrl() PutToQueueWithTimeout( gsutil_api.status_queue, FileMessage(src_url, dst_url, time.time(), message_type=FileMessage.FILE_LOCAL_COPY, size=src_obj_size, finished=False)) result = _CopyFileToFile(src_url, dst_url, status_queue=gsutil_api.status_queue, src_obj_metadata=src_obj_metadata) # Need to let _CopyFileToFile return before setting the POSIX attributes. if not src_url.IsStream() and not dst_url.IsStream(): ParseAndSetPOSIXAttributes(dst_url.object_name, src_obj_metadata, is_rsync=is_rsync, preserve_posix=preserve_posix) return result class Manifest(object): """Stores the manifest items for the CpCommand class.""" def __init__(self, path): # self.items contains a dictionary of rows self.items = {} self.manifest_filter = {} self.lock = parallelism_framework_util.CreateLock() self.manifest_path = os.path.expanduser(path) self._ParseManifest() self._CreateManifestFile() def _ParseManifest(self): """Load and parse a manifest file. This information will be used to skip any files that have a skip or OK status. """ try: if os.path.exists(self.manifest_path): # Note: we can't use io.open here or CSV reader will become upset # https://stackoverflow.com/a/18449496 with open(self.manifest_path, 'r') as f: first_row = True reader = csv.reader(f) for row in reader: if first_row: try: source_index = row.index('Source') result_index = row.index('Result') except ValueError: # No header and thus not a valid manifest file. raise CommandException('Missing headers in manifest file: %s' % self.manifest_path) first_row = False source = row[source_index] result = row[result_index] if result in ['OK', 'skip']: # We're always guaranteed to take the last result of a specific # source url. self.manifest_filter[source] = result except IOError: raise CommandException('Could not parse %s' % self.manifest_path) def WasSuccessful(self, src): """Returns whether the specified src url was marked as successful.""" return src in self.manifest_filter def _CreateManifestFile(self): """Opens the manifest file and assigns it to the file pointer.""" try: if ((not os.path.exists(self.manifest_path)) or (os.stat(self.manifest_path).st_size == 0)): # Add headers to the new file. if six.PY3: with open(self.manifest_path, 'w', newline='') as f: writer = csv.writer(f) writer.writerow([ 'Source', 'Destination', 'Start', 'End', 'Md5', 'UploadId', 'Source Size', 'Bytes Transferred', 'Result', 'Description', ]) else: with open(self.manifest_path, 'wb', 1) as f: writer = csv.writer(f) writer.writerow([ 'Source', 'Destination', 'Start', 'End', 'Md5', 'UploadId', 'Source Size', 'Bytes Transferred', 'Result', 'Description', ]) except IOError: raise CommandException('Could not create manifest file.') def Set(self, url, key, value): if value is None: # In case we don't have any information to set we bail out here. # This is so that we don't clobber existing information. # To zero information pass '' instead of None. return if url in self.items: self.items[url][key] = value else: self.items[url] = {key: value} def Initialize(self, source_url, destination_url): # Always use the source_url as the key for the item. This is unique. self.Set(source_url, 'source_uri', source_url) self.Set(source_url, 'destination_uri', destination_url) self.Set(source_url, 'start_time', datetime.datetime.now(tz=datetime.timezone.utc).replace(tzinfo=None)) def SetResult(self, source_url, bytes_transferred, result, description=''): self.Set(source_url, 'bytes', bytes_transferred) self.Set(source_url, 'result', result) self.Set(source_url, 'description', description) self.Set(source_url, 'end_time', datetime.datetime.now(tz=datetime.timezone.utc).replace(tzinfo=None)) self._WriteRowToManifestFile(source_url) self._RemoveItemFromManifest(source_url) def _WriteRowToManifestFile(self, url): """Writes a manifest entry to the manifest file for the url argument.""" row_item = self.items[url] data = [ row_item['source_uri'], row_item['destination_uri'], '%sZ' % row_item['start_time'].isoformat(), '%sZ' % row_item['end_time'].isoformat(), row_item['md5'] if 'md5' in row_item else '', row_item['upload_id'] if 'upload_id' in row_item else '', str(row_item['size']) if 'size' in row_item else '', str(row_item['bytes']) if 'bytes' in row_item else '', row_item['result'], row_item['description'], ] data = [six.ensure_str(value) for value in data] # Aquire a lock to prevent multiple threads writing to the same file at # the same time. This would cause a garbled mess in the manifest file. with self.lock: if IS_WINDOWS and six.PY3: f = open(self.manifest_path, 'a', 1, newline='') else: f = open(self.manifest_path, 'a', 1) # 1 == line buffered writer = csv.writer(f) writer.writerow(data) f.close() def _RemoveItemFromManifest(self, url): # Remove the item from the dictionary since we're done with it and # we don't want the dictionary to grow too large in memory for no good # reason. del self.items[url] class ItemExistsError(Exception): """Exception class for objects that are skipped because they already exist.""" pass class SkipUnsupportedObjectError(Exception): """Exception for objects skipped because they are an unsupported type.""" def __init__(self): super(SkipUnsupportedObjectError, self).__init__() self.unsupported_type = 'Unknown' class SkipGlacierError(SkipUnsupportedObjectError): """Exception for objects skipped because they are an unsupported type.""" def __init__(self): super(SkipGlacierError, self).__init__() self.unsupported_type = 'GLACIER' def GetPathBeforeFinalDir(url, exp_src_url): """Returns the path section before the final directory component of the URL. This handles cases for file system directories, bucket, and bucket subdirectories. Example: for gs://bucket/dir/ we'll return 'gs://bucket', and for file://dir we'll return file:// Args: url: StorageUrl representing a filesystem directory, cloud bucket or bucket subdir. exp_src_url: StorageUrl representing the fully expanded object to-be-copied; used for resolving cloud wildcards. Returns: String name of above-described path, sans final path separator. """ sep = url.delim if url.IsFileUrl(): past_scheme = url.url_string[len('file://'):] if past_scheme.find(sep) == -1: return 'file://' else: return 'file://%s' % past_scheme.rstrip(sep).rpartition(sep)[0] if url.IsBucket(): return '%s://' % url.scheme # Else it names a bucket subdir. path_sans_final_dir = url.url_string.rstrip(sep).rpartition(sep)[0] return ResolveWildcardsInPathBeforeFinalDir(path_sans_final_dir, exp_src_url) def ResolveWildcardsInPathBeforeFinalDir(src_url_path_sans_final_dir, exp_src_url): """Returns the path section for a bucket subdir with wildcards resolved. This handles cases for bucket subdirectories where the initial source URL contains a wildcard. In this case, src_url must be wildcard-expanded before calculating the final directory. Example: A bucket containing: gs://bucket/dir1/subdir/foo gs://bucket/dir2/subdir/foo and source URL gs://bucket/*/subdir and src_url_path_sans_final dir gs://bucket/* should yield final path gs://bucket/dir1 or gs://bucket/dir2 according to the expanded source URL. Args: src_url_path_sans_final_dir: URL string with wildcards representing a bucket subdir as computed from GetPathBeforeFinalDir. exp_src_url: CloudUrl representing the fully expanded object to-be-copied. Returns: String name of above-described path, sans final path separator. """ if not ContainsWildcard(src_url_path_sans_final_dir): return src_url_path_sans_final_dir # Parse the expanded source URL, replacing wildcarded # portions of the path with what they actually expanded to. wildcarded_src_obj_path = StorageUrlFromString( src_url_path_sans_final_dir).object_name.split('/') expanded_src_obj_path = exp_src_url.object_name.split('/') for path_segment_index in range(len(wildcarded_src_obj_path)): if ContainsWildcard(wildcarded_src_obj_path[path_segment_index]): # The expanded path is guaranteed to be have at least as many path # segments as the wildcarded path. wildcarded_src_obj_path[path_segment_index] = ( expanded_src_obj_path[path_segment_index]) resolved_src_path = '/'.join(wildcarded_src_obj_path) final_path_url = exp_src_url.Clone() final_path_url.object_name = resolved_src_path return final_path_url.url_string def _GetPartitionInfo(file_size, max_components, default_component_size): """Gets info about a file partition for parallel file/object transfers. Args: file_size: The number of bytes in the file to be partitioned. max_components: The maximum number of components that can be composed. default_component_size: The size of a component, assuming that max_components is infinite. Returns: The number of components in the partitioned file, and the size of each component (except the last, which will have a different size iff file_size != 0 (mod num_components)). """ # num_components = ceil(file_size / default_component_size) num_components = DivideAndCeil(file_size, default_component_size) # num_components must be in the range [2, max_components] num_components = max(min(num_components, max_components), 2) # component_size = ceil(file_size / num_components) component_size = DivideAndCeil(file_size, num_components) return (num_components, component_size) def _DeleteTempComponentObjectFn(cls, url_to_delete, thread_state=None): """Wrapper func to be used with command.Apply to delete temporary objects.""" gsutil_api = GetCloudApiInstance(cls, thread_state) try: gsutil_api.DeleteObject(url_to_delete.bucket_name, url_to_delete.object_name, generation=url_to_delete.generation, provider=url_to_delete.scheme) except NotFoundException: # The temporary object could already be gone if a retry was # issued at a lower layer but the original request succeeded. # Barring other errors, the top-level command should still report success, # so don't raise here. pass def FilterExistingComponents(dst_args, existing_components, bucket_url, gsutil_api): """Determines course of action for component objects. Given the list of all target objects based on partitioning the file and the list of objects that have already been uploaded successfully, this function determines which objects should be uploaded, which existing components are still valid, and which existing components should be deleted. Args: dst_args: The map of file_name -> PerformParallelUploadFileToObjectArgs calculated by partitioning the file. existing_components: A list of ObjectFromTracker objects that have been uploaded in the past. bucket_url: CloudUrl of the bucket in which the components exist. gsutil_api: gsutil Cloud API instance to use for retrieving object metadata. Returns: components_to_upload: List of components that need to be uploaded. uploaded_components: List of components that have already been uploaded and are still valid. Each element of the list contains the dst_url for the uploaded component and its size. existing_objects_to_delete: List of components that have already been uploaded, but are no longer valid and are in a versioned bucket, and therefore should be deleted. """ components_to_upload = [] existing_component_names = [ component.object_name for component in existing_components ] for component_name in dst_args: if component_name not in existing_component_names: components_to_upload.append(dst_args[component_name]) objects_already_chosen = [] # Don't reuse any temporary components whose MD5 doesn't match the current # MD5 of the corresponding part of the file. If the bucket is versioned, # also make sure that we delete the existing temporary version. existing_objects_to_delete = [] uploaded_components = [] for tracker_object in existing_components: if (tracker_object.object_name not in dst_args.keys() or tracker_object.object_name in objects_already_chosen): # This could happen if the component size has changed. This also serves # to handle object names that get duplicated in the tracker file due # to people doing things they shouldn't (e.g., overwriting an existing # temporary component in a versioned bucket). url = bucket_url.Clone() url.object_name = tracker_object.object_name url.generation = tracker_object.generation existing_objects_to_delete.append(url) continue dst_arg = dst_args[tracker_object.object_name] file_part = FilePart(dst_arg.filename, dst_arg.file_start, dst_arg.file_length) # TODO: calculate MD5's in parallel when possible. content_md5 = CalculateB64EncodedMd5FromContents(file_part) try: # Get the MD5 of the currently-existing component. dst_url = dst_arg.dst_url dst_metadata = gsutil_api.GetObjectMetadata( dst_url.bucket_name, dst_url.object_name, generation=dst_url.generation, provider=dst_url.scheme, fields=['customerEncryption', 'etag', 'md5Hash']) cloud_md5 = dst_metadata.md5Hash except Exception: # pylint: disable=broad-except # We don't actually care what went wrong - we couldn't retrieve the # object to check the MD5, so just upload it again. cloud_md5 = None if cloud_md5 != content_md5: components_to_upload.append(dst_arg) objects_already_chosen.append(tracker_object.object_name) if tracker_object.generation: # If the old object doesn't have a generation (i.e., it isn't in a # versioned bucket), then we will just overwrite it anyway. invalid_component_with_generation = dst_arg.dst_url.Clone() invalid_component_with_generation.generation = tracker_object.generation existing_objects_to_delete.append(invalid_component_with_generation) else: url = dst_arg.dst_url.Clone() url.generation = tracker_object.generation uploaded_components.append((url, dst_arg.file_length)) objects_already_chosen.append(tracker_object.object_name) if uploaded_components: logging.info('Found %d existing temporary components to reuse.', len(uploaded_components)) return (components_to_upload, uploaded_components, existing_objects_to_delete)
Save Changes
Back to File Manager