Source code for gcs_client.gcs_object

# -*- coding: utf-8 -*-
# Copyright 2015 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
#     Unless required by applicable law or agreed to in writing, software
#     distributed under the License is distributed on an "AS IS" BASIS,
#     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#     See the License for the specific language governing permissions and
#     limitations under the License.

from __future__ import absolute_import

import collections
import json
import os
import six

import requests

from gcs_client import base
from gcs_client import common
from gcs_client import errors


__all__ = ('BLOCK_MULTIPLE', 'DEFAULT_BLOCK_SIZE', 'Object', 'GCSObjFile')


BLOCK_MULTIPLE = 256 * 1024
DEFAULT_BLOCK_SIZE = 4 * BLOCK_MULTIPLE


[docs]class Object(base.Fillable): """GCS Stored Object Object representation. :ivar bucket: The name of the bucket containing this object. :vartype bucket: string :ivar contentType: Content-Type of the object data. :vartype contentType: string :ivar crc32c: CRC32c checksum, as described in RFC 4960, Appendix B; encoded using base64 in big-endian byte order. :vartype crc32c: string :ivar etag: HTTP 1.1 Entity tag for the object. :vartype etag: string :ivar generation: The content generation of this object. Used for object versioning. :vartype generation: long :ivar id: The ID of the object. :vartype id: string :ivar kind: The kind of item this is. For objects, this is always storage#object. :vartype kind: string :ivar md5Hash: MD5 hash of the data; encoded using base64. :vartype md5Hash: string :ivar mediaLink: Media download link. :vartype mediaLink: string :ivar metadata: User-provided metadata, in key/value pairs. :vartype metadata: dict :ivar metageneration: The version of the metadata for this object at this generation. Used for preconditions and for detecting changes in metadata. A metageneration number is only meaningful in the context of a particular generation of a particular object. :vartype metageneration: long :ivar name: The name of this object. :vartype name: string :ivar owner: The owner of the object. This will always be the uploader of the object. Contains entity and entityId keys. :vartype owner: dict :ivar selfLink: The link to this object. :vartype selfLink: string :ivar size: Content-Length of the data in bytes. :vartype size: unsigned long :ivar storageClass: Storage class of the object. :vartype storageClass: string :ivar timeCreated: The creation time of the object in RFC 3339 format. :vartype timeCreated: string :ivar timeDeleted: The deletion time of the object in RFC 3339 format. Will be None if this version of the object has not been deleted. :vartype timeDeleted: string :ivar updated: The modification time of the object metadata in RFC 3339 format. :vartype updated: string """ kind = 'storage#objects' timeDeleted = None metadata = {} _required_attributes = base.GCS._required_attributes + ['bucket', 'name'] _URL = base.Fillable._URL + '/{bucket}/o/{name}' def __init__(self, bucket=None, name=None, generation=None, credentials=None, retry_params=None, chunksize=None): """Initialize an Object object. :param bucket: Name of the bucket to use. :type bucket: String :param name: Name of the object. :type name: String :param generation: If present, selects a specific revision of this object (as opposed to the latest version, the default). :type generation: long :param credentials: A credentials object to authorize the connection. :type credentials: Credentials :param retry_params: Retry configuration used for communications with GCS. If None is passed default retries will be used. :type retry_params: RetryParams or NoneType :param chunksize: Size in bytes of the payload to send/receive to/from GCS. Default is gcs_client.DEFAULT_BLOCK_SIZE :type chunksize: int """ super(Object, self).__init__(credentials, retry_params) self.name = name self.bucket = bucket self.generation = generation self._chunksize = chunksize @common.retry def _get_data(self): r = self._request(parse=True, generation=self.generation) return r.json() @common.is_complete @common.retry
[docs] def delete(self, generation=None, if_generation_match=None, if_generation_not_match=None, if_metageneration_match=None, if_metageneration_not_match=None): """Deletes an object and its metadata. Deletions are permanent if versioning is not enabled for the bucket, or if the generation parameter is used. The authenticated user in the credentials must have WRITER permissions on the bucket. :param generation: If present, permanently deletes a specific revision of this object (as opposed to the latest version, the default). :type generation: long :param if_generation_match: Makes the operation conditional on whether the object's current generation matches the given value. :type if_generation_match: long :param if_generation_not_match: Makes the operation conditional on whether the object's current generation does not match the given value. :type if_generation_not_match: long :param if_metageneration_match: Makes the operation conditional on whether the object's current metageneration matches the given value. :type if_metageneration_match: long :param if_metageneration_not_match: Makes the operation conditional on whether the object's current metageneration does not match the given value. :type if_metageneration_not_match: long :returns: None """ self._request(op='DELETE', ok=(requests.codes.no_content,), generation=generation or self.generation, ifGenerationMatch=if_generation_match, ifGenerationNotMatch=if_generation_not_match, ifMetagenerationMatch=if_metageneration_match, ifMetagenerationNotMatch=if_metageneration_not_match)
@common.is_complete
[docs] def open(self, mode='r', chunksize=None): """Open this object. :param mode: Mode to open the file with, 'r' for read and 'w' for writing are only supported formats. Default is 'r' if this argument is not provided. :type mode: String :param chunksize: Size in bytes of the payload to send/receive to/from GCS. Default chunksize is the one defined on object's initialization. :type chunksize: int """ return GCSObjFile(self.bucket, self.name, self._credentials, mode, chunksize or self._chunksize, self.retry_params, self.generation)
def __str__(self): return '%s/%s' % (self.bucket, self.name) def __repr__(self): return ("%s.%s('%s', '%s', '%s') #etag: %s" % (self.__module__, self.__class__.__name__, self.bucket, self.name, self.generation, getattr(self, 'etag', '?')))
[docs]class GCSObjFile(object): """Reader/Writer for GCS Objects. Supports basic functionality: - Read - Write - Close - Seek - Tell Instances support context manager behavior. """ _URL = base.Fillable._URL + '/%s/o/%s' _URL_UPLOAD = base.Fillable._URL_UPLOAD + '/%s/o' def __init__(self, bucket, name, credentials, mode='r', chunksize=None, retry_params=None, generation=None): """Initialize reader/writer of GCS object. On initialization connection to GCS will be tested. For reading it'll confirm the existence of the object in the bucket and for writing it'll create the object (it won't send any content). :param bucket: Name of the bucket to use. :type bucket: String :param name: Name of the object. :type name: String :param credentials: A credentials object to authorize the connection. :type credentials: gcs_client.Credentials :param mode: Mode to open the file with, 'r' for read and 'w' for writing are only supported formats. Default is 'r' if this argument is not provided. :type mode: String :param chunksize: Size in bytes of the payload to send/receive to/from GCS. Default is gcs_client.DEFAULT_BLOCK_SIZE :type chunksize: int :param retry_params: Retry configuration used for communications with GCS. If None is passed default retries will be used. :type retry_params: RetryParams or NoneType :param generation: If present, selects a specific revision of this object (as opposed to the latest version, the default). :type generation: long """ if mode not in ('r', 'w'): raise IOError('Only r or w modes supported') self.mode = mode self._chunksize = chunksize or DEFAULT_BLOCK_SIZE assert self._chunksize % BLOCK_MULTIPLE == 0, \ 'chunksize must be multiple of %s' % BLOCK_MULTIPLE self.name = name self.bucket = bucket self._offset = 0 self._eof = False self._gcs_offset = 0 self._credentials = credentials self._buffer = _Buffer() self._retry_params = retry_params self._generation = generation self.closed = True try: self._open() except errors.NotFound: raise IOError('Object %s does not exist in bucket %s' % (name, bucket)) def _is_readable(self): return self.mode == 'r' def _is_writable(self): return self.mode == 'w' def _check_is_writable(self, action='write'): if self._is_readable(): raise IOError('File open for reading, cannot %s' % action) def _check_is_readable(self, action='read'): if self._is_writable(): raise IOError('File open for writing, cannot %s' % action) def _check_is_open(self): if self.closed: raise IOError('File is closed') @common.retry def _open(self): safe_bucket = requests.utils.quote(self.bucket, safe='') safe_name = requests.utils.quote(self.name, safe='') if self._is_readable(): self._location = self._URL % (safe_bucket, safe_name) params = {'fields': 'size', 'generation': self._generation} headers = {'Authorization': self._credentials.authorization} r = requests.get(self._location, params=params, headers=headers) if r.status_code == requests.codes.ok: try: self.size = int(json.loads(r.content)['size']) except Exception as exc: raise errors.Error('Bad data returned by GCS %s' % exc) else: self.size = 0 initial_url = self._URL_UPLOAD % safe_bucket params = {'uploadType': 'resumable', 'name': self.name} headers = {'x-goog-resumable': 'start', 'Authorization': self._credentials.authorization, 'Content-type': 'application/octet-stream'} r = requests.post(initial_url, params=params, headers=headers) if r.status_code == requests.codes.ok: self._location = r.headers['Location'] if r.status_code != requests.codes.ok: raise errors.create_http_exception( r.status_code, 'Error opening object %s in bucket %s: %s-%s' % (self.name, self.bucket, r.status_code, r.content)) self.closed = False
[docs] def tell(self): """Return file's current position from the beginning of the file.""" self._check_is_open() return self._offset
[docs] def seek(self, offset, whence=os.SEEK_SET): """Set the file's current position, like stdio's fseek(). Note that only files open for reading are seekable. :param offset: Offset to move the file cursor. :type offset: int :param whence: How to interpret the offset, defaults to os.SEEK_SET (0) -absolute file positioning- other values are os.SEEK_CUR (1) -seek relative to the current position- and os.SEEK_END (2) -seek relative to the file's end-. :type whence: int :returns: None """ self._check_is_open() self._check_is_readable('seek') if whence == os.SEEK_SET: position = offset elif whence == os.SEEK_CUR: position = self._offset + offset elif whence == os.SEEK_END: position = self.size + offset else: raise ValueError('whence value %s is invalid.' % whence) position = min(position, self.size) position = max(position, 0) # TODO: This could be optimized to not discard all buffer for small # movements. self._offset = self._gcs_offset = position self._buffer.clear()
[docs] def write(self, data): """Write a string to the file. Due to buffering, the string may not actually show up in the file until we close the file or enough data to send another chunk has been buffered. :param data: Data to write to the object. :type data: String :returns: None """ self._check_is_open() self._check_is_writable() self.size += len(data) self._buffer.write(data) while len(self._buffer) >= self._chunksize: data = self._buffer.read(self._chunksize) self._send_data(data, self._gcs_offset) self._gcs_offset += len(data)
@common.retry def _send_data(self, data, begin=0, finalize=False): if not (data or finalize): return if not data: size = self.size data_range = 'bytes */%s' % size else: end = begin + len(data) - 1 size = self.size if finalize else '*' data_range = 'bytes %s-%s/%s' % (begin, end, size) headers = {'Authorization': self._credentials.authorization, 'Content-Range': data_range} r = requests.put(self._location, data=data, headers=headers) if size == '*': expected = requests.codes.resume_incomplete else: expected = requests.codes.ok if r.status_code != expected: raise errors.create_http_exception( r.status_code, 'Error writting to object %s in bucket %s: %s-%s' % (self.name, self.bucket, r.status_code, r.content))
[docs] def close(self): """Close the file. A closed file cannot be read or written any more. Any operation which requires that the file be open will raise an error after the file has been closed. Calling close() more than once is allowed. """ if not self.closed: if self._is_writable(): self._send_data(self._buffer.read(), self._gcs_offset, finalize=True) self.closed = True
[docs] def read(self, size=None): """Read data from the file. Read at most size bytes from the file (less if the read hits EOF before obtaining size bytes). If the size argument is None, read all data until EOF is reached. The bytes are returned as a bytes object. An empty string is returned when EOF is encountered immediately. Note that this method may make multiple requests to GCS service in an effort to acquire as close to size bytes as possible. :param size: Number of bytes to read. :type size: int :returns: Bytes with read data from GCS. :rtype: bytes """ self._check_is_open() self._check_is_readable() if size is 0 or self._eof: return '' while not self._eof and (not size or len(self._buffer) < size): data, self._eof = self._get_data(self._chunksize, self._gcs_offset) self._gcs_offset += len(data) self._buffer.write(data) data = self._buffer.read(size) self._offset += len(data) return data.tobytes()
@common.retry def _get_data(self, size, begin=0): if not size: return '' end = begin + size - 1 headers = {'Authorization': self._credentials.authorization, 'Range': 'bytes=%d-%d' % (begin, end)} params = {'alt': 'media'} r = requests.get(self._location, params=params, headers=headers) expected = (requests.codes.ok, requests.codes.partial_content, requests.codes.requested_range_not_satisfiable) if r.status_code not in expected: raise errors.create_http_exception( r.status_code, 'Error reading object %s in bucket %s: %s-%s' % (self.name, self.bucket, r.status_code, r.content)) if r.status_code == requests.codes.requested_range_not_satisfiable: return ('', True) content_range = r.headers.get('Content-Range') total_size = None if content_range: try: total_size = int(content_range.split('/')[-1]) eof = total_size <= begin + len(r.content) self.size = total_size except Exception: eof = len(r.content) < size if total_size is None: eof = len(r.content) < size return (r.content, eof) def __enter__(self): return self def __exit__(self, exc_type, exc_value, traceback): self.close()
class _Buffer(object): def __init__(self): self._queue = collections.deque() self._size = 0 def __len__(self): return self._size def clear(self): self._queue.clear() self._size = 0 def write(self, data): if data: if six.PY3 and isinstance(data, six.string_types): data = data.encode() self._queue.append(memoryview(data)) self._size += len(data) def read(self, size=None): if size is None or size > self._size: size = self._size result = bytearray(size) written = 0 remaining = size while remaining: data = self._queue.popleft() if len(data) > remaining: data_view = memoryview(data) self._queue.appendleft(data_view[remaining:]) data = data_view[:remaining] result[written: written + len(data)] = data written += len(data) remaining -= len(data) self._size -= size return memoryview(result)