Source code for drtsans.path
from mantid.api import AnalysisDataService, FileFinder
from mantid.kernel import ConfigService, amend_config
from drtsans.instruments import (
instrument_label,
extract_run_number,
instrument_filesystem_name,
InstrumentEnumName,
)
import os
import stat
import pathlib
__all__ = ["abspath", "abspaths", "exists", "registered_workspace", "allow_overwrite"]
[docs]
def allow_overwrite(folder):
r"""
Changes permissions for all the files and folders in the path
to allow write by anyone. It is not recursive. It will do that
only for the files/folders the user has permissions to do that
Parameters
----------
path: str
string to the folder in which the file permissions are to be changed
Returns
-------
None
"""
# loop over the files in the directory
for path in pathlib.Path(folder).iterdir():
try:
if path.is_dir() or path.is_symlink() or not path.is_file():
continue # don't fix directories or symbolic links
# update permissions if necessary
permissions = path.stat().st_mode
permissions_new = permissions | stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH
if permissions != permissions_new:
path.chmod(permissions_new)
except PermissionError:
pass # ignore all permission errors
[docs]
def abspath(path: str, instrument="", ipts="", directory=None, search_archive=True):
r"""
Returns an absolute path
In addition to fully supporting what os.path.abspath does,
this also supports path strings in such as ``EQSANS_106026`` and
``EQSANS106026``. It will search your data search path and the
data archive using ONCat.
This looks in ``/instrument/proposal/nexus/instrument_runnumber.nxs.h5`` then
falls back to use :py:obj:`mantid.api.FileFinder`.
Parameters
----------
path
Either a run number, a file name (e.g. CG2_1235), or an absolute file path,
directory: str, list
One or more directory paths where to look for the data file
"""
# don't use network for first check
if os.path.exists(path):
return os.path.abspath(path)
# directories is None or a list
directories = [directory] if isinstance(directory, str) else directory
# try using the supplied directory
if directories is not None:
for directory in directories:
option = os.path.join(directory, str(path))
if os.path.exists(option):
return option
# Elucidate the string identifying the instrument in the filesystem
if not instrument:
try:
instrument = instrument_label(path)
except RuntimeError:
pass # failed to extract instrument/runnumber
if instrument:
instrument = instrument_filesystem_name(instrument) # from GPSANS to CG2, since we need /HFIR/CG2/IPTS-....
if instrument == InstrumentEnumName.UNDEFINED:
raise RuntimeError("Failed to extract instrument name")
# runnumber may be used later
try:
runnumber = extract_run_number(path)
except ValueError as e:
raise RuntimeError("Could not extract runnumber") from e
# try again using the supplied directories
if directories is not None:
for directory in directories:
option_base = os.path.join(directory, "{}_{}".format(instrument, runnumber))
for suffix in (".nxs.h5", ".nxs", ""):
option = option_base + suffix
if os.path.exists(option):
return option
# guess the path from existing information
if ipts:
if instrument: # only try if instrument is known
try:
runnumber = int(runnumber) # make sure it doesn't have extra characters
instrument_config = ConfigService.getInstrument(instrument) # to object
facility = str(instrument_config.facility())
if not instrument:
instrument = str(instrument_config)
option = f"/{facility}/{instrument}/IPTS-{ipts}/nexus/{instrument}_{runnumber}.nxs.h5"
print('Seeing if "{}" exists'.format(option))
if os.path.exists(option):
return option
except RuntimeError:
pass # facility not found
except ValueError:
pass # could not convert run number to an integer
# get a full path from `datasearch.directories`
option = FileFinder.getFullPath(str(path))
if option and os.path.exists(option):
return option
# try again putting things together
with amend_config(data_dir=directories):
option = FileFinder.getFullPath("{}_{}".format(instrument, runnumber))
if option and os.path.exists(option):
return option
# get all of the options from FileFinder and convert them to an absolute
# path in case any weren't already
try:
config = {
"datasearch.searcharchive": "hfir, sns",
}
if bool(search_archive) is False:
config["datasearch.searcharchive"] = "Off"
with amend_config(instrument=instrument, data_dir=directories, **config):
options = [os.path.abspath(item) for item in FileFinder.findRuns(str(runnumber))]
found = "nothing" if len(options) == 0 else f"{options}"
message_archive = f"FileFinder.findRuns({runnumber}) found {found} with {config}"
except RuntimeError as e:
options = [] # marks things as broken
message_archive = f"{e} with {config}"
if not options: # empty result
raise RuntimeError(f'{message_archive}\nFailed to find location of file from hint "{path}"')
for option in options:
if os.path.exists(option):
return option
raise RuntimeError("None of the locations suggested by ONCat contain " 'existing files for "{}"'.format(path))
[docs]
def abspaths(runnumbers, instrument="", ipts="", directory=None, search_archive=True):
"""
Parameters
----------
runnumbers: str
Comma separated list of run numbers
instrument: str
Name of the instrument
ipts: str
Proposal number the run is expected to be in
directory: str, list
One or more directory paths where to look for the data file
Returns
-------
str
Comma separated list of all of the full paths
"""
# this could be written differently to call ONCAT directly with all of the missing run numbers
# once guessing the path didn't work
filenames = []
for runnumber in runnumbers.split(","):
filenames.append(
abspath(
str(runnumber).strip(),
instrument=instrument,
ipts=ipts,
directory=directory,
search_archive=search_archive,
)
)
return ",".join(filenames)
[docs]
def exists(path):
r"""
Test whether a path exists. Returns False for broken symbolic links
In addition to fully supporting what os.path.exists does,
this also supports path strings in such as ``EQSANS_106026`` and
``EQSANS106026``. It will search your data search path and the
data archive using ONCat.
This uses mantid.api.FileFinder.
"""
# quickest way is to assume it is a regular file
if os.path.exists(path):
return True
# check in `datasearch.directories`
if os.path.exists(FileFinder.getFullPath(path)):
return True
# check via locations provided by ONCat
try:
for option in FileFinder.findRuns(path):
if os.path.exists(option):
return True
except RuntimeError:
return False # no suggestions found
return False
[docs]
def registered_workspace(source):
r"""
Find out if the source is a workspace registered in the Analysis Data
Service.
Parameters
----------
source: str, Workspace
Returns
-------
bool
"""
return AnalysisDataService.doesExist(str(source))