Source code for stor.dx

import logging
import sys
import tempfile
import urllib.parse
import warnings

from cached_property import cached_property
from contextlib import contextmanager
import dxpy
from dxpy.exceptions import DXError
from dxpy.exceptions import DXSearchError

from stor import exceptions as stor_exceptions
from stor import Path
from stor import settings
from stor import utils
from stor.obs import OBSPath
from stor.obs import OBSUploadObject
from stor.posix import PosixPath
import stor.settings


logger = logging.getLogger(__name__)
progress_logger = logging.getLogger('%s.progress' % __name__)


[docs]class DNAnexusError(stor_exceptions.RemoteError): """Base class for all remote errors thrown by this DX module""" pass
[docs]class MultipleObjectsSameNameError(DNAnexusError): """Thrown when multiple objects exist with the same name Currently, we throw this when trying to get the canonical project from virtual path and two or more projects were found with same name """ pass
[docs]class ProjectNotFoundError(stor_exceptions.NotFoundError): """Thrown when no project exists with the given name Currently, we throw this when trying to get the canonical project from virtual path and no project was found with same name """ pass
[docs]class InconsistentUploadDownloadError(DNAnexusError): """Thrown during checksum mismatch or part length mismatch..""" pass
def _dx_error_to_descriptive_exception(client_exception): """Converts dxpy errors to more descriptive exceptions with transaction ID""" http_status = getattr(client_exception, 'code', None) if isinstance(client_exception, dxpy.DXAPIError): exc_str = '{} - {}'.format(client_exception.name or '', client_exception.msg or client_exception.error_message()) else: exc_str = str(client_exception) if http_status in [401, 403]: return stor_exceptions.UnauthorizedError( 'Either use `dx login --token {{your_dx_token}} --save` or set DX_AUTH_TOKEN ' 'environment variable. {}'.format(exc_str), client_exception ) status_to_exception = { 404: stor_exceptions.NotFoundError, 409: stor_exceptions.ConflictError } if http_status in status_to_exception: return status_to_exception[http_status](exc_str, client_exception) elif 'DXChecksumMismatchError' in exc_str or 'DXPartLengthMismatchError' in exc_str: return InconsistentUploadDownloadError(exc_str, client_exception) else: return DNAnexusError(exc_str, client_exception) @contextmanager def _wrap_dx_calls(): """Updates the dx_auth_token from settings for dxpy Bubbles all dxpy exceptions as `DNAnexusError` classes """ auth_token = settings.get()['dx']['auth_token'] if auth_token: # pragma: no cover dxpy.set_security_context({ 'auth_token_type': 'Bearer', 'auth_token': auth_token }) try: yield except DXError as e: raise _dx_error_to_descriptive_exception(e) from e
[docs]class DXPath(OBSPath): """ Provides the ability to manipulate and access resources on DNAnexus servers with stor interfaces. """ def __new__(cls, path): """Custom __new__ method so that the validation checks happen during creation This ensures invalid dx paths like DXPath('dx://') are never initialized """ return super(DXPath, cls).__new__(Path, path) drive = 'dx://' def _get_parts(self): """Returns the path parts (excluding the drive) as a list of strings. Project is always the first part returned here. """ colon_pieces = self[len(self.drive):].split(':', 1) project = colon_pieces[0] resource = (colon_pieces[1] if len(colon_pieces) == 2 else '').lstrip('/') parts = resource.split('/') parts.insert(0, project) return parts def _noop(attr_name): def wrapper(self): return type(self)(self) wrapper.__name__ = attr_name wrapper.__doc__ = 'No-op for %r' % attr_name return wrapper abspath = _noop('abspath') realpath = _noop('realpath') expanduser = _noop('expanduser')
[docs] def clear_cached_properties(self): """Clears all cached properties in DXPath objects. The canonical and virtual forms of DXPath objects are cached to not hit the server for every transformation call. However, after copy/remove/rename, the cached information is outdated and needs to be cleared. """ for prop in ('canonical_project', 'canonical_resource', 'virtual_path', 'virtual_project'): self.__dict__.pop(prop, None)
def virtual_project(self): raise NotImplementedError def virtual_resource(self): raise NotImplementedError def virtual_path(self): raise NotImplementedError def canonical_project(self): raise NotImplementedError def canonical_resource(self): raise NotImplementedError def canonical_path(self): raise NotImplementedError
[docs] def normpath(self): raise NotImplementedError
[docs] def exists(self): raise NotImplementedError
@property def project(self): """The project name from the path or None""" parts = self._get_parts() return parts[0] if len(parts) > 0 and parts[0] else None
[docs] def joinpath(self, *others): """Wrapper around base joinpath function which converts the first part to normpath before joining with others. """ return self.path_class(self.path_module.join(self.normpath(), *others))
[docs] def to_url(self): """For compatibility with OBS - returns ``temp_url()``""" return self.temp_url()
[docs] def temp_url(self, lifetime=300, filename=None): """Obtains a temporary URL to a DNAnexus data-object. If ``DX_FILE_PROXY_URL`` or ``[dx] file_proxy_url=`` is set, will use that to construct a path instead, e.g.:: >>> stor.Path('dx://proj:/folder/mypath.csv').temp_url() 'https://dl.dnanex.us/F/D/awe1323/mypath.csv' >>> with stor.settings.use({'dx': {'file_proxy_url': ... 'https://my-dnax-proxy.example.com/gateway'}): ... stor.Path('dx://proj:/folder/mypath.csv').temp_url() 'https://my-dnax-proxy.example.com/gateway/proj/folder/mypath.csv' The file proxy is assumed to be a service that, when given DX path and project, will proxy through to DNAnexus to render content. Args: lifetime (int): The time (in seconds) the temporary URL will be valid (only for temp URL generation) filename (str, optional): A urlencoded filename to use for attachment, otherwise defaults to object name (to use no filename at all, use ``filename=''``) Raises: ValueError: The path points to a project ValueError: ``file_proxy_url`` is set and ``filename`` does not match object name ValueError: ``file_proxy_url`` does not look like a valid http(s) path NotFoundError: The path could not be resolved to a file (when ``file_proxy_url`` unset) """ if not self.resource: raise ValueError('DX Projects cannot have a temporary download url') file_proxy_url = stor.settings.get()['dx']['file_proxy_url'] if file_proxy_url: if not file_proxy_url.startswith('http'): raise ValueError("if set, ``file_proxy_url`` must be an http(s) path") if filename and filename != self.name: raise ValueError( 'filename MUST match object name when file_proxy_url is set' ) return urllib.parse.urljoin( file_proxy_url, f'{self.virtual_project}/{self.virtual_resource}' ) with _wrap_dx_calls(): if filename is None: filename = self.virtual_path.name elif not filename: # e.g., set to empty string filename = None file_handler = dxpy.DXFile(dxid=self.canonical_resource, project=self.canonical_project) return file_handler.get_download_url( duration=lifetime, preauthenticated=True, filename=filename, project=self.canonical_project )[0]
@property def resource(self): """The virtual or canonical path to the file within the project (as a POSIXPath). Examples: >>> Path('dx://project:dir/file').resource PosixPath('dir/file') >>> Path('dx://project-123:file-456').resource PosixPath('file-456') NOTE: to avoid making API requests, this operation only uses the local string """ parts = self._get_parts() joined_resource = '/'.join(parts[1:]) if len(parts) > 1 else None return self.parts_class(joined_resource) if joined_resource else None
[docs] def dirname(self): """Returns directory name of path. Returns self if path is a project. To avoid making API calls, canonical paths will return the project ID """ if not self.resource: return self else: parts = self._get_parts() if len(parts) == 2: # paths like ('dx://proj:file') need different logic parts[0] += ':' new_path = '/'.join(parts) return self.path_class(self.drive + self.path_module.dirname(new_path)) else: return super(DXPath, self).dirname()
@property def name(self): """File or folder name of the path. Empty string for projects or folders with trailing slash. Makes no API calls to server, canonical paths are treated normally, and the basename of the path is returned. """ parts = self._get_parts() if len(parts) == 2 and parts[1]: # paths like ('dx://proj:file') need different logic parts[0] += ':' new_path = '/'.join(parts) return self.parts_class(self.path_module.basename(new_path)) else: return super(DXPath, self).name
[docs] def remove(self): """Removes a single object from DX platform Raises: ValueError: The path is invalid. """ if not self.resource: raise ValueError('DXPath.remove() can only be called on single object') file_handler = dxpy.DXFile(dxid=self.canonical_resource, project=self.canonical_project) with _wrap_dx_calls(): file_handler.remove() self.clear_cached_properties()
[docs] @_wrap_dx_calls() def rmtree(self): """ Removes a resource and all of its contents. The path should point to a project or directory. Raises: NotFoundError: The path points to a nonexistent directory """ proj_handler = dxpy.DXProject(self.canonical_project) if not self.resource: folders = self.listdir(only='folders') files = self.listdir(only='objects') for folder_p in folders: folder_p.rmtree() for file_p in files: file_p.remove() return try: proj_handler.remove_folder('/' + self.resource, recurse=True) except dxpy.exceptions.ResourceNotFound as e: raise stor_exceptions.NotFoundError('No folders were found with the given path ({})' .format(self), e) self.clear_cached_properties()
[docs] def makedirs_p(self, mode=0o777): """Make directories, including parents on DX from DX folder paths. Args: mode: unused, present for compatibility (access permissions are managed at project level) """ if not self.resource: if not self.exists(): raise ValueError('Cannot create a project via makedirs_p()') return proj_handler = dxpy.DXProject(self.canonical_project) with _wrap_dx_calls(): proj_handler.new_folder('/' + self.resource, parents=True)
[docs] def isdir(self): """Determine if path is directory-like (i.e., it's a project, or it's a folder that can be listed) Returns: bool: True if path is an existing folder path or project """ if not self.resource and self.exists(): # path could be a project return True # or path could be a folder try: self.listdir(only='folders') return True except stor_exceptions.NotFoundError: return False
[docs] def isfile(self): """Determine an object exists at the specified path Returns: bool: True if path points to an existing file """ if not self.resource or utils.has_trailing_slash(self): return False try: self.stat() return True except stor_exceptions.NotFoundError: return False
def _rename(self, new_name): """Rename a single data object on the DX Platform Args: new_name (str): New name of the object Raises: ValueError: When trying to rename a project NotFoundError: When path cannot be resolved to a file. """ if not self.resource: raise ValueError('Projects cannot be renamed') if new_name == self.name: return file_handler = dxpy.DXFile(dxid=self.canonical_resource, project=self.canonical_project) with _wrap_dx_calls(): file_handler.rename(new_name) self.clear_cached_properties() def _prep_for_copy(self, dest): """Handles logic, for finalizing target destination, making parent folders and deleting existing target, common to _clone and _move""" dest_is_dir = dest.isdir() target_dest = dest if dest_is_dir or utils.has_trailing_slash(dest): target_dest = dest / self.name if not dest_is_dir and target_dest.parent.resource: target_dest.parent.makedirs_p() if target_dest.isfile(): target_dest.remove() should_rename = not dest_is_dir and not utils.has_trailing_slash(dest) return target_dest, should_rename def _clone(self, dest): """Clones the data object into the destination path. The original file is retained. Args: dest (Path): The destination file/folder path in a different project Raises: ValueError: If attempting to clone a project DNAnexusError: If cloning within same project """ if not self.resource: raise ValueError('Cannot clone project ({})'.format(self)) if dest.canonical_project == self.canonical_project: raise DNAnexusError('Cannot clone within same project') file_handler = dxpy.DXFile(dxid=self.canonical_resource, project=self.canonical_project) target_dest, should_rename = self._prep_for_copy(dest) with _wrap_dx_calls(): new_file_h = file_handler.clone(project=dest.canonical_project, folder='/' + (target_dest.parent.resource or '')) # no need to rename if we changed destination to include original name if should_rename: new_file_h.rename(dest.name) def _move(self, dest): """Moves the data object to a different folder within project. Args: dest (Path): The destination file/folder path within same project Raises: ValueError: When attempting to move projects DNAnexusError: If attempting to move across projects """ if not self.resource: raise ValueError('Cannot move project ({})'.format(self)) if dest.canonical_project != self.canonical_project: # This can be implemented by clone and remove original raise DNAnexusError('Cannot move across different projects') if self == dest: return file_handler = dxpy.DXFile(dxid=self.canonical_resource, project=self.canonical_project) target_dest, should_rename = self._prep_for_copy(dest) with _wrap_dx_calls(): file_handler.move('/' + (target_dest.parent.resource or '')) if should_rename: file_handler.rename(dest.name) self.clear_cached_properties()
[docs] def copy(self, dest, raise_if_same_project=False, **kwargs): """Copies data object to destination path. If dest already exists as a directory on the DX platform, the file is copied underneath dest directory with original name. If the target destination already exists as a file, it is first deleted before the copy is attempted. For example, assume the following file hierarchy:: dxProject/ - a/ - - 1.txt anotherDxProject/ Doing a copy of ``1.txt`` to a new destination of ``b.txt`` is performed with:: Path('dx://dxProject:/a/1.txt').copy('dx://anotherDxProject/b.txt') The end result for anotherDxProject looks like:: anotherDxProject/ - b.txt And, if the destination already exists as a directory, i.e. we have:: dxProject/ - a/ - - 1.txt anotherDxProject/ - b.txt/ Performing copy with following command:: Path('dx://dxProject:/a/1.txt').copy('dx://anotherDxProject/b.txt') Will yield the resulting structure to be:: anotherDxProject/ - b.txt/ - - 1.txt If the source file and destination belong to the same project, the files are moved instead of copied, if the raise_if_same_project flag is False; because the same underlying file cannot appear in two locations in the same project. If the final destination for the file already is an existing file, that file is deleted before the file is copied. Args: dest (Path|str): The destination file or directory. raise_if_same_project (bool, default False): Controls moving file within project instead of cloning. If True, raises an error to prevent this move. Only takes effect when both source and destination are within the same DX Project Raises: DNAnexusError: When copying within same project with raise_if_same_project=False NotFoundError: When the source file path doesn't exist """ dest = Path(dest) if utils.is_dx_path(dest): if self.isfile(): if dest.canonical_project == self.canonical_project: if not raise_if_same_project: self._move(dest) else: raise DNAnexusError('Source and destination are in same project. ' 'Set raise_if_same_project=False to allow this.') else: self._clone(dest) else: raise stor_exceptions.NotFoundError( 'No data object was found for the given path on DNAnexus') else: super(DXPath, self).copy(dest) # for other filesystems, delegate to utils.copy
[docs] def copytree(self, dest, raise_if_same_project=False, **kwargs): """Copies a source directory to a destination directory. This is not an atomic operation. If the destination path already exists as a directory, the source tree including the root folder is copied over as a subfolder of the destination. If the source and destination directories belong to the same project, the tree is moved instead of copied. Also, in such cases, the root folder of the project cannot be the source path. Please listdir the root folder and copy/copytree individual items if needed. For example, assume the following file hierarchy:: project1/ - b/ - - 1.txt project2/ Doing a copytree from ``project1:/b/`` to a new dx destination of ``project2:/c`` is performed with:: Path('dx://project1:/b').copytree('dx://project2:/c') The end result for project2 looks like:: project2/ - c/ - - 1.txt If the destination path directory already exists, the folder is copied as a subfolder of the destination. If this new destination also exists, a TargetExistsError is raised. If the source is a root folder, and is cloned to an existing destination directory or if the destination is also a root folder, the tree is moved under project name. Refer to ``dx`` docs for detailed information. Args: dest (Path|str): The directory to copy to. Must not exist if its a posix directory raise_if_same_project (bool, default False): Allows moving files within project instead of cloning. If True, raises an error to prevent moving the directory. Only takes effect when both source and destination directory are within the same DX Project Raises: DNAnexusError: Attempt to clone within same project and raise_if_same_project=True TargetExistsError: All possible destinations for source directory already exist NotFoundError: source directory path doesn't exist """ dest = Path(dest) if utils.is_dx_path(dest): if self.isdir(): if dest.canonical_project == self.canonical_project: if not raise_if_same_project: self._movetree(dest) else: raise DNAnexusError('Source and destination are in same project. ' 'Set raise_if_same_project=False to allow this.') else: self._clonetree(dest) else: raise stor_exceptions.NotFoundError( 'No project or directory was found at path ({})'.format(self)) else: super(DXPath, self).copytree(dest) # for other filesystems, delegate to utils.copytree
def _prep_for_copytree(self, dest): """Handles logic, for finalizing target destination, making parent folders and checking for clashes, common to _clonetree and _movetree""" source = utils.remove_trailing_slash(self) dest_is_dir = dest.isdir() should_rename = True target_dest = dest if dest_is_dir or utils.has_trailing_slash(dest): target_dest = dest / (source.name if source.resource else source.virtual_project) if target_dest.isdir(): raise stor_exceptions.TargetExistsError( 'Destination path ({}) already exists, will not cause ' 'duplicate folders to exist. Remove the original first' .format(target_dest) ) should_rename = False if not source.resource: target_dest.makedirs_p() elif not dest_is_dir and target_dest.parent.resource: # don't call makedirs_p on project target_dest.parent.makedirs_p() moved_folder_path = target_dest.parent / source.name return target_dest, should_rename, moved_folder_path def _clonetree(self, dest): """Clones the project or directory into the destination path. The original tree is retained. If the destination path already exists as a directory, the source tree including the root folder is copied over as a subfolder of the destination. If the source root folder is cloned to an existing destination directory or to root folder of destination, the tree is moved under project name. Args: dest (Path): The destination directory path in a different project Raises: TargetExistsError: When all possible destinations for source directory already exist DNAnexusError: When cloning within same project """ if dest.canonical_project == self.canonical_project: raise DNAnexusError('Cannot clonetree within same project') if dest == (self.drive+dest.project): # need to convert dx://proj to dx://proj: dest = dest + ':' target_dest, should_rename, moved_folder_path = self._prep_for_copytree(dest) project_handler = dxpy.DXProject(self.canonical_project) with _wrap_dx_calls(): project_handler.clone( container=dest.canonical_project, destination=('/' + (target_dest.parent.resource or '') ) if self.resource else '/' + target_dest.resource, folders=['/' + (self.resource or '')] ) if self.resource and should_rename: dxpy.api.project_rename_folder( dest.canonical_project, input_params={ 'folder': '/' + moved_folder_path.resource, 'name': target_dest.name } ) def _movetree(self, dest): """Moves the project or directory to a different folder within project. Like copytree, if the destination exists as a folder already, the source dir is moved inside that dest folder with its original name. The source cannot be the root directory. Refer to copytree or copytree for detailed information. Args: dest (Path): The destination directory path within same project Raises: TargetExistsError: When destination directory already exists DNAnexusError: When attempting to move across projects """ if dest.canonical_project != self.canonical_project: raise DNAnexusError('Cannot movetree across different projects') if not self.resource: raise DNAnexusError('Cannot move root folder within same project on DX') if self == dest: return if dest == (self.drive+dest.project): # need to convert dx://proj to dx://proj: dest = dest + ':' target_dest, should_rename, moved_folder_path = self._prep_for_copytree(dest) project_handler = dxpy.DXProject(self.canonical_project) with _wrap_dx_calls(): project_handler.move_folder( folder='/' + self.resource, destination='/' + (target_dest.parent.resource or '') ) if should_rename: dxpy.api.project_rename_folder( dest.canonical_project, input_params={ 'folder': '/' + moved_folder_path.resource, 'name': target_dest.name } ) self.clear_cached_properties()
[docs] @_wrap_dx_calls() def download_object(self, dest, **kwargs): """Download a single path or object to file. Args: dest (Path): The output file Raises: NotFoundError: When source path is not an existing file """ dxpy.download_dxfile( dxid=self.canonical_resource, filename=dest, project=self.canonical_project )
[docs] def download_objects(self, dest, objects): def is_parent_dir(poss_parent, poss_child): """Checks if poss_child is a sub-path of poss_parent""" if not poss_parent.resource: return poss_child.resource and poss_child.project == poss_parent.project return poss_child.startswith(utils.with_trailing_slash(poss_parent)) source = self if source == (self.drive + self.project): # need to convert dx://proj to dx://proj: source = DXPath(self + ':') for obj in objects: if utils.is_dx_path(obj) and not is_parent_dir(source, DXPath(obj)): raise ValueError( '"%s" must be child of download path "%s"' % (obj, self)) # Convert requested download objects to full object paths objs_to_download = { obj: DXPath(obj) if utils.is_dx_path(obj) else source / obj for obj in objects } results = {} for obj, dx_obj in objs_to_download.items(): dest_resource = dx_obj[len(source):].lstrip('/') dest_obj = PosixPath(dest) / dest_resource dx_obj.copy(dest_obj) results[obj] = dest_obj return results
[docs] @_wrap_dx_calls() def download(self, dest, **kwargs): """Download a directory. Args: dest (Path): The output directory Raises: NotFoundError: When source or dest path is not a directory """ dxpy.download_folder( project=self.canonical_project, destdir=dest, folder='/' + (self.resource or '') )
[docs] def upload(self, to_upload, **kwargs): """Upload a list of files and directories to a directory. This is not a batch level operation. If some file errors, the files uploaded before will remain present. Args: to_upload (List[Union[str, OBSUploadObject]]): A list of posix file names, directory names, or OBSUploadObject objects to upload. Raises: ValueError: When source path is not a directory TargetExistsError: When destination directory already exists """ dx_upload_objects = [ name for name in to_upload if isinstance(name, OBSUploadObject) ] all_files_to_upload = utils.walk_files_and_dirs([ name for name in to_upload if not isinstance(name, OBSUploadObject) ]) dx_upload_objects.extend([ OBSUploadObject(f, object_name=('/' + self.resource if self.resource else Path('')) / utils.file_name_to_object_name(f)) for f in all_files_to_upload ]) for upload_obj in dx_upload_objects: upload_obj.object_name = Path(upload_obj.object_name) upload_obj.source = Path(upload_obj.source) dest_file = Path('{drive}{project}:{path}'.format( drive=self.drive, project=self.canonical_project, path=upload_obj.object_name)) if upload_obj.source.isfile(): dest_is_file = dest_file.isfile() if dest_is_file: # only occurs if upload is called directly with existing objects logger.warning( 'Destination path ({}) already exists, will not cause ' 'duplicate file objects on the platform. Skipping...' .format(dest_file)) else: with _wrap_dx_calls(): dxpy.upload_local_file( filename=upload_obj.source, project=self.canonical_project, folder='/' + (dest_file.parent.resource or ''), parents=True, name=dest_file.name ) elif upload_obj.source.isdir(): dest_file.makedirs_p() else: raise stor_exceptions.NotFoundError( 'Source path ({}) does not exist. Please provide a valid source' .format(upload_obj.source))
[docs] def read_object(self): """Reads an individual object from DX. Note dxpy for Py3 automatically decodes the DXFile.read using utf-8. Returns: bytes: the raw bytes from the object on DX. """ if not self.resource: raise ValueError('Can only read_object() on a file path, not a project') file_handler = dxpy.DXFile(dxid=self.canonical_resource, project=self.canonical_project) with _wrap_dx_calls(): result = file_handler.read() # TODO (akumar): allow other encoding after update of encoding in dxpy for Py3 result = result.encode('utf-8') # dxpy for py3 already decodes the data with 'utf-8' return result
[docs] def write_object(self, content, **kwargs): """Writes an individual object to DX. Note that this method writes the provided content to a temporary file before uploading. This allows us to reuse code from DXPath's uploader (multi part object support, etc.). Args: content (bytes): raw bytes to write to OBS **kwargs: Keyword arguments to pass to `DXPath.upload` """ if not self.resource: raise ValueError('Cannot write to project. Please provide a file path') if not isinstance(content, bytes): # pragma: no cover # bytes/unicode a little confused so allow it warnings.warn('A future version of stor will raise a' ' TypeError if content is not bytes') mode = 'wb' if type(content) == bytes else 'wt' if self.isfile(): self.remove() with tempfile.NamedTemporaryFile(mode=mode) as fp: fp.write(content) fp.flush() suo = OBSUploadObject(fp.name, object_name='/' + self.resource) return self.upload([suo], **kwargs)
[docs] def open(self, mode='r', encoding=None): """ Opens a OBSFile that can be read or written to and is uploaded to the remote service. For examples of reading and writing opened objects, view OBSFile. Args: mode (str): The mode of object IO. Currently supports reading ("r" or "rb") and writing ("w", "wb") encoding (str): text encoding to use. Defaults to ``locale.getpreferredencoding(False)`` Returns: OBSFile: The file object for Swift/S3/DX. Raises: ValueError: if attempting to write to project DNAnexusError: A dxpy client error occured. """ if encoding and encoding not in ('utf-8', 'utf8'): raise ValueError('For DNAnexus paths, encoding is always assumed to be ' 'utf-8. Please switch your encoding') if not self.resource: raise ValueError("Can only read or write on file paths not project paths") return super().open(mode=mode, encoding=encoding)
[docs] def list(self, canonicalize=False, starts_with=None, limit=None, classname=None, condition=None ): """List contents using the resource of the path as a prefix. This will only list the file resources (and not empty directories like other OBS). .. warning:: Prefer `list_iter()` to this method in production code. If there are many files (i.e., more than 1-2K) to list, this method may take a long time to return and use a lot of memory to construct all of the objects. Examples: >>> Path('dx://MyProject:/my/path/').list(canonicalize=False) [Path('dx://MyProject:/my/path/to/file.txt, ...] >>> Path('dx://MyProject:/my/path/').list(canonicalize=True) [Path('dx://project-123:file-123'), ...] Args: canonicalize (bool, default False): if True, return canonical paths starts_with (str): Allows for an additional search path to be appended to the resource of the dx path. Note that this resource path is treated as a directory limit (int): Limit the amount of results returned classname (str): Restricting class : One of 'record', 'file', 'gtable, 'applet', 'workflow' condition (function(results) -> bool): The method will only return when the results matches the condition. Returns: List[DXPath]: Iterates over listed files that match an optional pattern. """ results = list(self.walkfiles( canonicalize=canonicalize, starts_with=starts_with, limit=limit, classname=classname )) if not results or not results[0]: # when results == [[]] results = [] utils.validate_condition(condition) utils.check_condition(condition, results) return results
[docs] def list_iter(self, canonicalize=False, starts_with=None, limit=None, classname=None ): """Iterable that yields objects under prefix (especially useful when a folder may have many small files) Note that this is a wrapper function to walkfiles. Args: canonicalize (bool, default False): if True, return canonical paths starts_with (str): Allows for an additional search path to be appended to the resource of the dx path. Note that this resource path is treated as a directory limit (int): Limit the amount of results returned classname (str): Restricting class : One of 'record', 'file', 'gtable, 'applet', 'workflow' Returns: Iterable[DXPath]: Iterates over listed files that match an optional pattern. """ return self.walkfiles( canonicalize=canonicalize, starts_with=starts_with, limit=limit, classname=classname )
[docs] def listdir(self, only='all', canonicalize=False): """List the path as a dir, returning top-level directories and files. Args: canonicalize (bool, default False): if True, return canonical paths only (str): "objects" for only objects, "folders" for only folders, "all" for both Returns: List[DXPath]: Iterates over listed files directly within the resource Raises: NotFoundError: When resource folder is not present on DX platform """ proj_id = self.canonical_project proj_name = self.virtual_project ans_list = [] kwargs = { 'only': only, 'describe': {'fields': {'name': True, 'folder': True}}, 'folder': '/' + (self.resource or '') } with _wrap_dx_calls(): obj_dict = dxpy.DXProject(dxid=proj_id).list_folder(**kwargs) for key, values in obj_dict.items(): for entry in values: if canonicalize: ans_list.append(DXCanonicalPath('dx://{}:/{}'.format( proj_id, (entry.lstrip('/') if key == 'folders' else entry['id'])))) else: if key == 'folders': ans_list.append(DXVirtualPath('{drive}{proj_name}:{folder}'.format( drive=self.drive, proj_name=proj_name, folder=entry))) else: ans_list.append(DXVirtualPath('{drive}{proj_name}:{folder}/{name}'.format( drive=self.drive, proj_name=proj_name, folder=entry['describe']['folder'].rstrip('/'), name=entry['describe']['name'])) ) return ans_list
[docs] def listdir_iter(self, canonicalize=False): """Iterate the path as a dir, returning top-level directories and files. Args: canonicalize (bool, default False): if True, return canonical paths Returns: Iterable[DXPath]: Iterates over listed files directly within the resource """ folders = self.listdir(only='folders', canonicalize=canonicalize) for folder in folders: yield folder for data in self.walkfiles(canonicalize=canonicalize, recurse=False): yield data
[docs] def walkfiles(self, pattern=None, canonicalize=False, recurse=True, starts_with=None, limit=None, classname=None): """Iterates over listed files that match an optional pattern. Args: pattern (str): glob pattern to match the filenames against. canonicalize (bool, default False): if True, return canonical paths recurse (bool, default True): if True, look in subfolders of folder as well starts_with (str): Allows for an additional search path to be appended to the resource of the dx path. Note that this resource path is treated as a directory limit (int): Limit the amount of results returned classname (str): Restricting class : One of 'record', 'file', 'gtable, 'applet', 'workflow' Returns: Iter[DXPath]: Iterates over listed files that match an optional pattern. """ proj_id = self.canonical_project proj_name = self.virtual_project kwargs = { 'project': proj_id, 'name': pattern, 'name_mode': 'glob', # the query performance is similar w/wo describe field, # hence no need to customize query based on canonicalize flag 'describe': {'fields': {'name': True, 'folder': True}}, 'recurse': recurse, 'classname': classname, 'limit': limit, 'folder': ('/' + (self.resource or '')) + (starts_with or '') } with _wrap_dx_calls(): list_gen = dxpy.find_data_objects(**kwargs) for obj in list_gen: if canonicalize: yield DXCanonicalPath('dx://{}:/{}'.format(obj['project'], obj['id'])) else: yield DXVirtualPath('{drive}{proj_name}:{folder}/{name}'.format( drive=self.drive, proj_name=proj_name, folder=obj['describe']['folder'].rstrip('/'), name=obj['describe']['name']) )
[docs] def glob(self, pattern, condition=None, canonicalize=False): """ Glob for pattern relative to this directory.""" results = list(self.walkfiles( canonicalize=canonicalize, pattern=pattern )) if not results or not results[0]: # when results == [[]] results = [] utils.validate_condition(condition) utils.check_condition(condition, results) return results
[docs] def getsize(self): if not self.resource: return self.stat()['dataUsage']*1e9 else: return self.stat()['size']
[docs] @_wrap_dx_calls() def stat(self): """Performs a stat on the path. This method follows (slightly vague) behavior of dxpy's describe method. It works as expected for a virtual path. However, for a canonical path: Path('dx://project-123:/file-123') say project-123 exists and file-123 exists, but file-123 doesn't exist inside project-123, stat will still return the describe response on file-123 (with its default project). Use stor.exists to check if a canonical path actually exists. Raises: MultipleObjectsSameNameError: If project or resource is not unique NotFoundError: When the project or resource cannot be found ValueError: If path is folder path """ if not self.resource: return dxpy.DXProject(dxid=self.canonical_project).describe() return dxpy.DXFile(dxid=self.canonical_resource, project=self.canonical_project).describe()
@property def content_type(self): """Get content type for DXObject. Returns empty string if not present or is project/""" return self.stat().get('media') or ''
[docs]class DXVirtualPath(DXPath): """Class Handler for DXPath of form 'dx://MyProject:/a/b/c' or 'dx://project-{uuid}:/b/c'""" @cached_property def virtual_project(self): """Returns the virtual name of the project associated with the DXVirtualPath""" if utils.is_valid_dxid(self.project, 'project'): with _wrap_dx_calls(): return dxpy.DXProject(dxid=self.project).name return self.project @property def virtual_resource(self): """Human-readable path to the object in its DNAnexus Project (as PosixPath)""" return self.resource @property def virtual_path(self): """Path as DXVirtualPath""" return self @cached_property def canonical_project(self): """The dxid of the unique project for the given project name. Only resolves project user has access to. Raises: MultipleObjectsSameNameError: If project name is not unique on DX platform NotFoundError: If project name doesn't exist on DNAnexus """ if utils.is_valid_dxid(self.project, 'project'): return self.project with _wrap_dx_calls(): try: proj_dict = dxpy.find_one_project( name=self.project, level='VIEW', zero_ok=True, more_ok=False) except DXSearchError as e: raise MultipleObjectsSameNameError('Found more than one project for given name: ' '{!r}'.format(self.project), e) if proj_dict is None: raise ProjectNotFoundError('Found no projects for name: {!r}' .format(self.project)) return proj_dict['id'] @cached_property def canonical_resource(self): """The dxid of the file at this path Raises: MultipleObjectsSameNameError: if filename is not unique NotFoundError: if resource is not found on DX platform ValueError: if path looks like a folder path (i.e., ends with trailing slash) """ if not self.resource: return None if utils.has_trailing_slash(self): raise ValueError('Invalid operation ({method}) on folder path ({path})' .format(path=self, method=sys._getframe(2).f_code.co_name)) objects = [{ 'name': self.name, 'folder': ('/' + self.resource).parent, 'project': self.canonical_project, 'batchsize': 2 }] with _wrap_dx_calls(): results = dxpy.resolve_data_objects(objects=objects)[0] if len(results) > 1: raise MultipleObjectsSameNameError('Multiple objects found at path ({}). ' 'Try using a canonical ID instead'.format(self)) elif len(results) == 1: return results[0]['id'] else: raise stor_exceptions.NotFoundError( 'No data object was found for the given path ({}) on DNAnexus'.format(self)) @property def canonical_path(self): """The unique file or project that matches the given path""" return DXCanonicalPath('{drive}{proj_id}:/{resource}'.format( drive=self.drive, proj_id=self.canonical_project, resource=(self.canonical_resource or '')))
[docs] def normpath(self): normed_resource = self.path_module.normpath('/' + (self.resource or ''))[1:] norm_pth = self.path_class(self.drive + self.project + ':/' + normed_resource) if isinstance(norm_pth, DXCanonicalPath): return norm_pth.normpath() return norm_pth
[docs] def splitpath(self): """Wrapper around base splitpath function which calls splitpath on the normpath of self """ path_to_split = self.normpath() parent, child = self.path_module.split(path_to_split) return self.path_class(parent), child
[docs] def exists(self): """Checks existence of the path. Returns: bool: True if the path exists, False otherwise. """ try: # first see if there is a specific corresponding object self.stat() return True except (stor_exceptions.NotFoundError, ValueError): pass # otherwise we could be a directory, so try to listdir folder # note: list doesn't error on non-existent folder and cannot be used here try: self.listdir(only='folders') return True except stor_exceptions.NotFoundError: return False
[docs]class DXCanonicalPath(DXPath): """Represents fully canonicalized DNAnexus paths: 'dx://project-{dxID}:/file-{dxID}' or 'dx://project-{dxID}:' """ @property def virtual_project(self): """The virtual (human-readable) name of the project associated with this path""" return self.virtual_path.project @property def virtual_resource(self): """The virtual (human-readable) path of the resource associated with this path""" return self.virtual_path.resource @cached_property @_wrap_dx_calls() def virtual_path(self): """The DXVirtualPath instance equivalent to the canonical path within the specified project """ proj = dxpy.DXProject(dxid=self.project) virtual_p = DXVirtualPath(self.drive + proj.name + ':/') if self.resource: file_h = dxpy.DXFile(dxid=self.canonical_resource, project=self.canonical_project) virtual_p = virtual_p / file_h.folder[1:] / file_h.name return virtual_p @property def canonical_project(self): """The canonical dxid for the project""" return self.project @property def canonical_resource(self): """The canonical dxID of the file resource""" return self.resource @property def canonical_path(self): """Get DXCanonicalPath instance for path""" return self
[docs] def normpath(self): return self.path_class(self.drive + self.project + ':' + (self.resource or ''))
[docs] def splitpath(self): """Wrapper around base splitpath function which calls splitpath on the normpath of self """ if self.resource: path_to_split = self.path_class(self.drive + self.project + ':/' + self.resource) else: path_to_split = self.path_class(self.drive + self.project + ':/') parent, child = self.path_module.split(path_to_split) return self.path_class(parent), child
[docs] def exists(self): """Checks existence of the path. Returns: bool: True if the path exists, False otherwise. """ if self.resource: # check that file exists AND is in the specified project # (list_projects() returns {} when file id doesn't exist) return self.canonical_project in dxpy.DXFile(self.canonical_resource).list_projects() else: try: self.stat() return True except stor_exceptions.NotFoundError: return False