Source code for sharrow.dataset

import base64
import hashlib
import logging
import pickle
import re
from typing import Any, Hashable, Mapping, Sequence

import dask
import dask.array as da
import numpy as np
import pandas as pd
import pyarrow as pa
import xarray as xr

from .aster import extract_all_name_tokens
from .shared_memory import (
    create_shared_list,
    create_shared_memory_array,
    delete_shared_memory_files,
    get_shared_list_nbytes,
    open_shared_memory_array,
    read_shared_list,
    release_shared_memory,
)
from .table import Table

logger = logging.getLogger("sharrow")

well_known_names = {
    "nb",
    "np",
    "pd",
    "xr",
    "pa",
    "log",
    "exp",
    "log1p",
    "expm1",
    "max",
    "min",
    "piece",
    "hard_sigmoid",
    "transpose_leading",
    "clip",
}


def one_based(n):
    return pd.RangeIndex(1, n + 1)


def zero_based(n):
    return pd.RangeIndex(0, n)


def clean(s):
    """
    Convert any string into a similar python identifier.

    If any modification of the string is made, or if the string
    is longer than 120 characters, it is truncated and a hash of the
    original string is added to the end, to ensure every
    string maps to a unique cleaned name.

    Parameters
    ----------
    s : str

    Returns
    -------
    cleaned : str
    """
    if not isinstance(s, str):
        s = f"{type(s)}-{s}"
    cleaned = re.sub(r"\W|^(?=\d)", "_", s)
    if cleaned != s or len(cleaned) > 120:
        # digest size 15 creates a 24 character base32 string
        h = base64.b32encode(
            hashlib.blake2b(s.encode(), digest_size=15).digest()
        ).decode()
        cleaned = f"{cleaned[:90]}_{h}"
    return cleaned


def coerce_to_range_index(idx):
    if isinstance(idx, pd.RangeIndex):
        return idx
    if isinstance(idx, (pd.Int64Index, pd.Float64Index, pd.UInt64Index)):
        if idx.is_monotonic_increasing and idx[-1] - idx[0] == idx.size - 1:
            return pd.RangeIndex(idx[0], idx[0] + idx.size)
    return idx


def is_dict_like(value: Any) -> bool:
    return hasattr(value, "keys") and hasattr(value, "__getitem__")


class _LocIndexer:
    __slots__ = ("dataset",)

    def __init__(self, dataset: "Dataset"):
        self.dataset = dataset

    def __getitem__(self, key: Mapping[Hashable, Any]) -> "Dataset":
        if not is_dict_like(key):
            if len(self.dataset.dims) == 1:
                dim_name = self.dataset.dims.__iter__().__next__()
                key = {dim_name: key}
            else:
                raise TypeError(
                    "can only lookup dictionaries from Dataset.loc, "
                    "unless there is only one dimension"
                )
        return self.dataset.sel(key)


class _iLocIndexer:
    __slots__ = ("dataset",)

    def __init__(self, dataset: "Dataset"):
        self.dataset = dataset

    def __getitem__(self, key: Mapping[Hashable, Any]) -> "Dataset":
        if not is_dict_like(key):
            if len(self.dataset.dims) == 1:
                dim_name = self.dataset.dims.__iter__().__next__()
                key = {dim_name: key}
            else:
                raise TypeError(
                    "can only lookup dictionaries from Dataset.iloc, "
                    "unless there is only one dimension"
                )
        return self.dataset.isel(key)


class Dataset(xr.Dataset):
    """
    A multi-dimensional, in memory, array database.

    A dataset consists of variables, coordinates and attributes which
    together form a self describing dataset.

    Dataset implements the mapping interface with keys given by variable
    names and values given by DataArray objects for each variable name.

    One dimensional variables with name equal to their dimension are
    index coordinates used for label based indexing.

    Parameters
    ----------
    data_vars : dict-like, optional
        A mapping from variable names to :py:class:`~xarray.DataArray`
        objects, :py:class:`~xarray.Variable` objects or to tuples of
        the form ``(dims, data[, attrs])`` which can be used as
        arguments to create a new ``Variable``. Each dimension must
        have the same length in all variables in which it appears.

        The following notations are accepted:

        - mapping {var name: DataArray}
        - mapping {var name: Variable}
        - mapping {var name: (dimension name, array-like)}
        - mapping {var name: (tuple of dimension names, array-like)}
        - mapping {dimension name: array-like}
          (it will be automatically moved to coords, see below)

        Each dimension must have the same length in all variables in
        which it appears.
    coords : dict-like, optional
        Another mapping in similar form as the `data_vars` argument,
        except the each item is saved on the dataset as a "coordinate".
        These variables have an associated meaning: they describe
        constant/fixed/independent quantities, unlike the
        varying/measured/dependent quantities that belong in
        `variables`. Coordinates values may be given by 1-dimensional
        arrays or scalars, in which case `dims` do not need to be
        supplied: 1D arrays will be assumed to give index values along
        the dimension with the same name.

        The following notations are accepted:

        - mapping {coord name: DataArray}
        - mapping {coord name: Variable}
        - mapping {coord name: (dimension name, array-like)}
        - mapping {coord name: (tuple of dimension names, array-like)}
        - mapping {dimension name: array-like}
          (the dimension name is implicitly set to be the same as the
          coord name)

        The last notation implies that the coord name is the same as
        the dimension name.

    attrs : dict-like, optional
        Global attributes to save on this dataset.
    """

    __slots__ = (
        "_shared_memory_key_",
        "_shared_memory_objs_",
        "_shared_memory_owned_",
        "__global_shared_memory_pool",
    )

    def __init__(self, *args, **kwargs):
        super(Dataset, self).__init__(*args, **kwargs)
        if len(args) == 1 and len(kwargs) == 0 and isinstance(args[0], xr.Dataset):
            self.attrs = args[0].attrs

    @classmethod
    def construct(cls, source):
        """
        A generic constructor for creating Datasets from various similar objects.

        Parameters
        ----------
        source : pandas.DataFrame, pyarrow.Table, xarray.Dataset, or Sequence[str]
            The source from which to create a Dataset.  DataFrames and Tables
            are converted to Datasets that have one dimension (the rows) and
            seperate variables for each of the columns.  A list of strings
            creates a dataset with those named empty variables.

        Returns
        -------
        Dataset
        """
        if isinstance(source, pd.DataFrame):
            source = cls.from_dataframe(source)
            # source = cls.from_dataframe_fast(source) # older xarray was slow
        elif isinstance(source, (Table, pa.Table)):
            source = cls.from_table(source)
        elif isinstance(source, (pa.Table)):
            source = cls.from_table(source)
        elif isinstance(source, cls):
            pass  # don't do the superclass things
        elif isinstance(source, xr.Dataset):
            source = cls(source)
        elif isinstance(source, Sequence) and all(isinstance(i, str) for i in source):
            source = cls.from_table(pa.table({i: [] for i in source}))
        else:
            raise TypeError(f"source cannot be type {type(source)}")
        return source

    def update(self, other):
        super().update(other)
        if isinstance(other, Dataset):
            match_names = self.match_names
            match_names.update(other.match_names)
            self.match_names = match_names
        return self  # deprecated return for consistency until xarray 0.19

[docs] @classmethod def from_table( cls, tbl, index_name="index", index=None, ): """ Convert a pyarrow.Table into an xarray.Dataset Parameters ---------- tbl : Table Table from which to use data and indices. index_name : str, default 'index' This name will be given to the default dimension index, if none is given. Ignored if `index` is given explicitly. index : Index-like, optional Use this index instead of a default RangeIndex. Returns ------- New Dataset. """ if len(set(tbl.column_names)) != len(tbl.column_names): raise ValueError("cannot convert Table with non-unique columns") if index is None: index = pd.RangeIndex(len(tbl), name=index_name) else: if len(index) != len(tbl): raise ValueError( f"length of index ({len(index)}) does not match length of table ({len(tbl)})" ) if isinstance(index, pd.MultiIndex) and not index.is_unique: raise ValueError( "cannot attach a non-unique MultiIndex and convert into xarray" ) arrays = [ (tbl.column_names[n], np.asarray(tbl.column(n))) for n in range(len(tbl.column_names)) ] result = cls() if isinstance(index, pd.MultiIndex): dims = tuple( name if name is not None else "level_%i" % n for n, name in enumerate(index.names) ) for dim, lev in zip(dims, index.levels): result[dim] = (dim, lev) else: index_name = index.name if index.name is not None else "index" dims = (index_name,) result[index_name] = (dims, index) result._set_numpy_data_from_dataframe(index, arrays, dims) return result
[docs] @classmethod def from_omx( cls, omx, index_names=("otaz", "dtaz"), indexes="one-based", renames=None, ): """ Create a Dataset from an OMX file. Parameters ---------- omx : openmatrix.File or larch.OMX An OMX-format file, opened for reading. index_names : tuple, default ("otaz", "dtaz", "time_period") Should be a tuple of length 3, giving the names of the three dimensions. The first two names are the native dimensions from the open matrix file, the last is the name of the implicit dimension that is created by parsing array names. indexes : str, optional The name of a 'lookup' in the OMX file, which will be used to populate the coordinates for the two native dimensions. Or, specify "one-based" or "zero-based" to assume sequential and consecutive numbering starting with 1 or 0 respectively. renames : Mapping or Collection, optional Limit the import only to these data elements. If given as a mapping, the keys will be the names of variables in the resulting dataset, and the values give the names of data matrix tables in the OMX file. If given as a list or other non-mapping collection, elements are not renamed but only elements in the collection are included. Returns ------- Dataset """ # handle both larch.OMX and openmatrix.open_file versions if "larch" in type(omx).__module__: omx_data = omx.data omx_shape = omx.shape else: omx_data = omx.root["data"] omx_shape = omx.shape() arrays = {} if renames is None: for k in omx_data._v_children: arrays[k] = omx_data[k][:] elif isinstance(renames, dict): for new_k, old_k in renames.items(): arrays[new_k] = omx_data[old_k][:] else: for k in renames: arrays[k] = omx_data[k][:] d = { "dims": index_names, "data_vars": {k: {"dims": index_names, "data": arrays[k]} for k in arrays}, } if indexes == "one-based": indexes = { index_names[0]: one_based(omx_shape[0]), index_names[1]: one_based(omx_shape[1]), } elif indexes == "zero-based": indexes = { index_names[0]: zero_based(omx_shape[0]), index_names[1]: zero_based(omx_shape[1]), } if indexes is not None: d["coords"] = { index_name: {"dims": index_name, "data": index} for index_name, index in indexes.items() } return cls.from_dict(d)
@classmethod def from_amx( cls, amx, index_names=("otaz", "dtaz"), indexes="one-based", renames=None, ): arrays = {} if renames is None: for k in amx.list_matrices(): arrays[k] = amx[k][:] elif isinstance(renames, dict): for new_k, old_k in renames.items(): arrays[new_k] = amx[old_k] else: for k in renames: arrays[k] = amx[k] d = { "dims": index_names, "data_vars": {k: {"dims": index_names, "data": arrays[k]} for k in arrays}, } if indexes == "one-based": indexes = {index_names[i]: "1" for i in range(len(index_names))} elif indexes == "zero-based": indexes = {index_names[i]: "0" for i in range(len(index_names))} if isinstance(indexes, (list, tuple)): indexes = dict(zip(index_names, indexes)) if isinstance(indexes, dict): for n, i in enumerate(index_names): if indexes.get(i) == "1": indexes[i] = one_based(amx.shape[n]) elif indexes.get(i) == "0": indexes[i] = zero_based(amx.shape[n]) if indexes is not None: d["coords"] = { index_name: {"dims": index_name, "data": index} for index_name, index in indexes.items() } return cls.from_dict(d)
[docs] @classmethod def from_zarr(cls, store, *args, **kwargs): """ Load and decode a dataset from a Zarr store. The `store` object should be a valid store for a Zarr group. `store` variables must contain dimension metadata encoded in the `_ARRAY_DIMENSIONS` attribute. Parameters ---------- store : MutableMapping or str A MutableMapping where a Zarr Group has been stored or a path to a directory in file system where a Zarr DirectoryStore has been stored. synchronizer : object, optional Array synchronizer provided to zarr group : str, optional Group path. (a.k.a. `path` in zarr terminology.) chunks : int or dict or tuple or {None, 'auto'}, optional Chunk sizes along each dimension, e.g., ``5`` or ``{'x': 5, 'y': 5}``. If `chunks='auto'`, dask chunks are created based on the variable's zarr chunks. If `chunks=None`, zarr array data will lazily convert to numpy arrays upon access. This accepts all the chunk specifications as Dask does. overwrite_encoded_chunks : bool, optional Whether to drop the zarr chunks encoded for each variable when a dataset is loaded with specified chunk sizes (default: False) decode_cf : bool, optional Whether to decode these variables, assuming they were saved according to CF conventions. mask_and_scale : bool, optional If True, replace array values equal to `_FillValue` with NA and scale values according to the formula `original_values * scale_factor + add_offset`, where `_FillValue`, `scale_factor` and `add_offset` are taken from variable attributes (if they exist). If the `_FillValue` or `missing_value` attribute contains multiple values a warning will be issued and all array values matching one of the multiple values will be replaced by NA. decode_times : bool, optional If True, decode times encoded in the standard NetCDF datetime format into datetime objects. Otherwise, leave them encoded as numbers. concat_characters : bool, optional If True, concatenate along the last dimension of character arrays to form string arrays. Dimensions will only be concatenated over (and removed) if they have no corresponding variable and if they are only used as the last dimension of character arrays. decode_coords : bool, optional If True, decode the 'coordinates' attribute to identify coordinates in the resulting dataset. drop_variables : str or iterable, optional A variable or list of variables to exclude from being parsed from the dataset. This may be useful to drop variables with problems or inconsistent values. consolidated : bool, optional Whether to open the store using zarr's consolidated metadata capability. Only works for stores that have already been consolidated. By default (`consolidate=None`), attempts to read consolidated metadata, falling back to read non-consolidated metadata if that fails. chunk_store : MutableMapping, optional A separate Zarr store only for chunk data. storage_options : dict, optional Any additional parameters for the storage backend (ignored for local paths). decode_timedelta : bool, optional If True, decode variables and coordinates with time units in {'days', 'hours', 'minutes', 'seconds', 'milliseconds', 'microseconds'} into timedelta objects. If False, leave them encoded as numbers. If None (default), assume the same value of decode_time. use_cftime : bool, optional Only relevant if encoded dates come from a standard calendar (e.g. "gregorian", "proleptic_gregorian", "standard", or not specified). If None (default), attempt to decode times to ``np.datetime64[ns]`` objects; if this is not possible, decode times to ``cftime.datetime`` objects. If True, always decode times to ``cftime.datetime`` objects, regardless of whether or not they can be represented using ``np.datetime64[ns]`` objects. If False, always decode times to ``np.datetime64[ns]`` objects; if this is not possible raise an error. Returns ------- dataset : Dataset The newly created dataset. References ---------- http://zarr.readthedocs.io/ """ return cls(xr.open_zarr(store, *args, **kwargs))
def to_zarr(self, *args, **kwargs): """ Write dataset contents to a zarr group. Parameters ---------- store : MutableMapping, str or Path, optional Store or path to directory in file system. If given with a ".zarr.zip" extension, and keyword arguments limited to 'mode' and 'compression', then a ZipStore will be created, populated, and then immediately closed. chunk_store : MutableMapping, str or Path, optional Store or path to directory in file system only for Zarr array chunks. Requires zarr-python v2.4.0 or later. mode : {"w", "w-", "a", None}, optional Persistence mode: "w" means create (overwrite if exists); "w-" means create (fail if exists); "a" means override existing variables (create if does not exist). If ``append_dim`` is set, ``mode`` can be omitted as it is internally set to ``"a"``. Otherwise, ``mode`` will default to `w-` if not set. synchronizer : object, optional Zarr array synchronizer. group : str, optional Group path. (a.k.a. `path` in zarr terminology.) encoding : dict, optional Nested dictionary with variable names as keys and dictionaries of variable specific encodings as values, e.g., ``{"my_variable": {"dtype": "int16", "scale_factor": 0.1,}, ...}`` compute : bool, optional If True write array data immediately, otherwise return a ``dask.delayed.Delayed`` object that can be computed to write array data later. Metadata is always updated eagerly. consolidated : bool, optional If True, apply zarr's `consolidate_metadata` function to the store after writing metadata. append_dim : hashable, optional If set, the dimension along which the data will be appended. All other dimensions on overriden variables must remain the same size. region : dict, optional Optional mapping from dimension names to integer slices along dataset dimensions to indicate the region of existing zarr array(s) in which to write this dataset's data. For example, ``{'x': slice(0, 1000), 'y': slice(10000, 11000)}`` would indicate that values should be written to the region ``0:1000`` along ``x`` and ``10000:11000`` along ``y``. Two restrictions apply to the use of ``region``: - If ``region`` is set, _all_ variables in a dataset must have at least one dimension in common with the region. Other variables should be written in a separate call to ``to_zarr()``. - Dimensions cannot be included in both ``region`` and ``append_dim`` at the same time. To create empty arrays to fill in with ``region``, use a separate call to ``to_zarr()`` with ``compute=False``. See "Appending to existing Zarr stores" in the reference documentation for full details. compression : int, optional Only used for ".zarr.zip" files. By default zarr uses blosc compression for chunks, so adding another layer of compression here is typically redundant. References ---------- https://zarr.readthedocs.io/ Notes ----- Zarr chunking behavior: If chunks are found in the encoding argument or attribute corresponding to any DataArray, those chunks are used. If a DataArray is a dask array, it is written with those chunks. If not other chunks are found, Zarr uses its own heuristics to choose automatic chunk sizes. """ if ( len(args) == 1 and isinstance(args[0], str) and args[0].endswith(".zarr.zip") ): if {"compression", "mode"}.issuperset(kwargs.keys()): import zarr with zarr.ZipStore(args[0], **kwargs) as store: self.to_zarr(store) return return super().to_zarr(*args, **kwargs) def iat(self, *, _names=None, _load=False, _index_name=None, **idxs): """ Multi-dimensional fancy indexing by position. Provide the dataset dimensions to index up as keywords, each with a value giving an array (one dimensional) of positions to extract. All other arguments are keyword-only arguments beginning with an underscore. Parameters ---------- _names : Collection[str], optional Only include these variables of this Dataset. _load : bool, default False Call `load` on the result, which will trigger a compute operation if the data underlying this Dataset is in dask, otherwise this does nothing. _index_name, str, default "index" The name to use for the resulting dataset's dimension. **idxs : Mapping[str, Any] Positions to extract. Returns ------- Dataset """ loaders = {} if _index_name is None: _index_name = "index" for k, v in idxs.items(): loaders[k] = xr.DataArray(v, dims=[_index_name]) if _names: ds = self[_names] else: ds = self if _load: ds = ds.load() return ds.isel(**loaders) def at(self, *, _names=None, _load=False, _index_name=None, **idxs): """ Multi-dimensional fancy indexing by label. Provide the dataset dimensions to index up as keywords, each with a value giving an array (one dimensional) of labels to extract. All other arguments are keyword-only arguments beginning with an underscore. Parameters ---------- _names : Collection[str], optional Only include these variables of this Dataset. _load : bool, default False Call `load` on the result, which will trigger a compute operation if the data underlying this Dataset is in dask, otherwise this does nothing. _index_name, str, default "index" The name to use for the resulting dataset's dimension. **idxs : Mapping[str, Any] Labels to extract. Returns ------- Dataset """ loaders = {} if _index_name is None: _index_name = "index" for k, v in idxs.items(): loaders[k] = xr.DataArray(v, dims=[_index_name]) if _names: ds = self[_names] else: ds = self if _load: ds = ds.load() return ds.sel(**loaders) def at_df(self, df): """ Extract values by label on the coordinates indicated by columns of a DataFrame. Parameters ---------- df : pandas.DataFrame or Mapping[str, array-like] The columns (or keys) of `df` should match the named dimensions of this Dataset. The resulting extracted DataFrame will have one row per row of `df`, columns matching the data variables in this dataset, and each value is looked up by the labels. Returns ------- pandas.DataFrame """ result = self.at(**df).reset_coords(drop=True).to_dataframe() if isinstance(df, pd.DataFrame): result.index = df.index return result def iat_df(self, df): """ Extract values by position on the coordinates indicated by columns of a DataFrame. Parameters ---------- df : pandas.DataFrame or Mapping[str, array-like] The columns (or keys) of `df` should match the named dimensions of this Dataset. The resulting extracted DataFrame will have one row per row of `df`, columns matching the data variables in this dataset, and each value is looked up by the positions. Returns ------- pandas.DataFrame """ result = self.iat(**df).reset_coords(drop=True).to_dataframe() if isinstance(df, pd.DataFrame): result.index = df.index return result def select_and_rename(self, name_dict=None, **names): """ Select and rename variables from this Dataset Parameters ---------- name_dict, **names: dict The keys or keyword arguments give the current names of the variables that will be selected out of this Dataset. The values give the new names of the same variables in the resulting Dataset. Returns ------- Dataset """ if name_dict is None: name_dict = names else: name_dict.update(names) return self[list(name_dict.keys())].rename(name_dict) # def squash_index(self, indexes_dict=None, *, set_match_names=True, **indexes): # if indexes_dict is None: # indexes_dict = indexes # else: # indexes_dict.update(indexes) # ds = self.reset_index(list(indexes_dict.keys()), drop=True) # ds = ds.rename(**indexes_dict) # if set_match_names: # ds = ds.set_match_names({v: v for v in indexes_dict.values()}) # return ds def _repr_html_(self): html = super()._repr_html_() html = html.replace("xarray.Dataset", "sharrow.Dataset") return html def __repr__(self): r = super().__repr__() r = r.replace("xarray.Dataset", "sharrow.Dataset") return r @property def match_names(self): """ Mapping[str,str] The keys of this mapping give the named dimensions of this Dataset, and the values of this mapping give either a named index or column in the shared data. If the value is a plain string, the target must be an exact match with no processing of the matching data. If a match_name target begins with an '@', the match is a dynamic match, where the particular index-position values are created based on data in the main or other source[s]. This allows for match columns that do not exist yet, including columns where the key column exists but is a label-based or offset-based match that needs to be processed into index-position values. """ result = {} for k in self.attrs.keys(): if k.startswith("match_names_"): result[k[12:]] = self.attrs.get(k) for k in self.indexes.keys(): if k not in result: result[k] = None return result @match_names.setter def match_names(self, names): if names is None: existing_match_name_keys = list(self.match_names.keys()) for k in existing_match_name_keys: del self.attrs[k] return if isinstance(names, str): dims = list(self.dims.keys()) assert len(dims) == 1 names = {dims[0]: names} for k in names.keys(): if k not in self.dims: raise ValueError(f"'{k}' not in dims") for k, v in names.items(): if v is not None: self.attrs[f"match_names_{k}"] = v elif f"match_names_{k}" in self.attrs: del self.attrs[f"match_names_{k}"] def set_match_names(self, names): """ Create a copy of this dataset with the given match_names for flowing. Parameters ---------- names : str or Mapping[str,str] Returns ------- Dataset """ result = self.copy() result.match_names = names return result def keep_dims(self, keep_dims, *, errors="raise"): """ Keep only certain dimensions and associated variables from this dataset. Parameters ---------- keep_dims : hashable or iterable of hashable Dimension or dimensions to keep. errors : {"raise", "ignore"}, optional If 'raise' (default), raises a ValueError error if any of the dimensions passed are not in the dataset. If 'ignore', any given dimensions that are in the dataset are dropped and no error is raised. Returns ------- obj : Dataset The dataset without the given dimensions (or any variables containing those dimensions) """ if isinstance(keep_dims, str): keep_dims = {keep_dims} else: keep_dims = set(keep_dims) all_dims = set(self.dims) if errors == "raise": missing_dims = keep_dims - all_dims if missing_dims: raise ValueError( "Dataset does not contain the dimensions: %s" % missing_dims ) return self.drop_dims([i for i in all_dims if i not in keep_dims]) @property def loc(self): """ Attribute for location based indexing. Only supports __getitem__, and only when the key is a dict of the form {dim: labels}, or when there is only one dimension. """ return _LocIndexer(self) @property def iloc(self): """ Attribute for position based indexing. Only supports __getitem__, and only when there is only one dimension. """ return _iLocIndexer(self) @classmethod def from_dataframe_fast( cls, dataframe: pd.DataFrame, sparse: bool = False ) -> "Dataset": """Convert a pandas.DataFrame into an xarray.Dataset Each column will be converted into an independent variable in the Dataset. If the dataframe's index is a MultiIndex, it will be expanded into a tensor product of one-dimensional indices (filling in missing values with NaN). This method will produce a Dataset very similar to that on which the 'to_dataframe' method was called, except with possibly redundant dimensions (since all dataset variables will have the same dimensionality) Parameters ---------- dataframe : DataFrame DataFrame from which to copy data and indices. sparse : bool, default: False If true, create a sparse arrays instead of dense numpy arrays. This can potentially save a large amount of memory if the DataFrame has a MultiIndex. Requires the sparse package (sparse.pydata.org). Returns ------- New Dataset. See Also -------- xarray.DataArray.from_series pandas.DataFrame.to_xarray """ # this is much faster than the default xarray version when not # using a MultiIndex. if isinstance(dataframe.index, pd.MultiIndex) or sparse: return super().from_dataframe(dataframe, sparse) if not dataframe.columns.is_unique: raise ValueError("cannot convert DataFrame with non-unique columns") if isinstance(dataframe.index, pd.CategoricalIndex): idx = dataframe.index.remove_unused_categories() else: idx = dataframe.index index_name = idx.name if idx.name is not None else "index" dims = (index_name,) # Cast to a NumPy array first, in case the Series is a pandas Extension # array (which doesn't have a valid NumPy dtype) # TODO: allow users to control how this casting happens, e.g., by # forwarding arguments to pandas.Series.to_numpy? arrays = { k: xr.DataArray(np.asarray(v), dims=dims) for k, v in dataframe.items() } return cls(arrays).assign_coords({index_name: dataframe.index}) def match_names_on(self, key): dims = self[key].dims match_names = self.match_names result = [] for dim in dims: next_dim = match_names.get(dim, None) result.append(_dyno(dim, next_dim)) return tuple(result) def squash_index(self, indexes_dict=None, *, set_match_names=True, **indexes): if indexes_dict is None: indexes_dict = indexes else: indexes_dict.update(indexes) ds = super().squash_index( indexes_dict, set_match_names=set_match_names, ) if set_match_names: ds = ds.set_match_names({v: v for v in indexes_dict.values()}) return ds def release_shared_memory(self): """ Release shared memory allocated to this Dataset. """ release_shared_memory(self._shared_memory_key_) @classmethod def delete_shared_memory_files(cls, key): delete_shared_memory_files(key) def to_shared_memory(self, key=None, mode="r+"): """ Load this Dataset into shared memory. The returned Dataset object references the shared memory and is the "owner" of this data. When this object is destroyed, the data backing it may also be freed, which can result in a segfault or other unfortunate condition if that memory is still accessed from elsewhere. Parameters ---------- key : str An identifying key for this shared memory. Use the same key in `from_shared_memory` to recreate this Dataset elsewhere. mode : {‘r+’, ‘r’, ‘w+’, ‘c’}, optional This methid returns a copy of the Dataset in shared memory. If memmapped, that copy can be opened in various modes. See numpy.memmap() for details. Returns ------- Dataset """ logger.info(f"sharrow.Dataset.to_shared_memory({key})") if key is None: import random key = random.randbytes(4).hex() self._shared_memory_key_ = key self._shared_memory_owned_ = False self._shared_memory_objs_ = [] wrappers = [] sizes = [] names = [] position = 0 def emit(k, a, is_coord): nonlocal names, wrappers, sizes, position wrappers.append( { "dims": a.dims, "name": a.name, "attrs": a.attrs, "dtype": a.dtype, "shape": a.shape, "coord": is_coord, "nbytes": a.nbytes, "position": position, } ) sizes.append(a.nbytes) names.append(k) position += a.nbytes for k, a in self.coords.items(): emit(k, a, True) for k in self.variables: if k in names: continue a = self[k] emit(k, a, False) mem = create_shared_memory_array(key, size=position) if key.startswith("memmap:"): buffer = memoryview(mem) else: buffer = mem.buf # @dask.delayed # def read_chunk(key_, size_, pos_, arr): # mem_ = open_shared_memory_array(key_, mode='r+') # if key_.startswith("memmap:"): # buffer_ = memoryview(mem_) # else: # buffer_ = mem_.buf # mem_arr_ = np.ndarray(shape=arr.shape, dtype=arr.dtype, buffer=buffer_[pos_:pos_ + size_]) # da.store(arr, mem_arr_, lock=False, compute=True) tasks = [] for w in wrappers: _size = w["nbytes"] _name = w["name"] _pos = w["position"] a = self[_name] mem_arr = np.ndarray( shape=a.shape, dtype=a.dtype, buffer=buffer[_pos : _pos + _size] ) if isinstance(a, xr.DataArray) and isinstance(a.data, da.Array): tasks.append(da.store(a.data, mem_arr, lock=False, compute=False)) # tasks.append(read_chunk(key, _size, _pos, a.data)) else: mem_arr[:] = a[:] if tasks: dask.compute(tasks, scheduler="threads") if key.startswith("memmap:"): mem.flush() create_shared_list([pickle.dumps(i) for i in wrappers], key) return type(self).from_shared_memory(key, own_data=True, mode=mode) @property def shared_memory_key(self): try: return self._shared_memory_key_ except AttributeError: raise ValueError("this dataset is not in shared memory") @classmethod def from_shared_memory(cls, key, own_data=False, mode="r+"): """ Connect to an existing Dataset in shared memory. Parameters ---------- key : str An identifying key for this shared memory. Use the same key in `from_shared_memory` to recreate this Dataset elsewhere. own_data : bool, default False The returned Dataset object references the shared memory but is not the "owner" of this data unless this flag is set. Returns ------- Dataset """ import pickle from xarray import DataArray _shared_memory_objs_ = [] shr_list = read_shared_list(key) try: _shared_memory_objs_.append(shr_list.shm) except AttributeError: # for memmap, list is loaded from pickle, not shared ram pass mem = open_shared_memory_array(key, mode=mode) _shared_memory_objs_.append(mem) if key.startswith("memmap:"): buffer = memoryview(mem) else: buffer = mem.buf content = {} for w in shr_list: t = pickle.loads(w) shape = t.pop("shape") dtype = t.pop("dtype") name = t.pop("name") coord = t.pop("coord", False) # noqa: F841 position = t.pop("position") nbytes = t.pop("nbytes") mem_arr = np.ndarray( shape, dtype=dtype, buffer=buffer[position : position + nbytes] ) content[name] = DataArray(mem_arr, **t) self = cls(content) self._shared_memory_key_ = key self._shared_memory_owned_ = own_data self._shared_memory_objs_ = _shared_memory_objs_ return self @property def shared_memory_size(self): try: return sum(i.size for i in self._shared_memory_objs_) except AttributeError: raise ValueError("this dataset is not in shared memory") @property def is_shared_memory(self): try: return sum(i.size for i in self._shared_memory_objs_) > 0 except AttributeError: return False @classmethod def preload_shared_memory_size(cls, key): """ Compute the size in bytes of a shared Dataset without actually loading it. Parameters ---------- key : str The identifying key for this shared memory. Returns ------- int """ memsize = 0 try: n = get_shared_list_nbytes(key) except FileNotFoundError: pass else: memsize += n try: mem = open_shared_memory_array(key, mode="r") except FileNotFoundError: pass else: memsize += mem.size return memsize
[docs] @classmethod def from_omx_3d( cls, omx, index_names=("otaz", "dtaz", "time_period"), indexes=None, *, time_periods=None, time_period_sep="__", max_float_precision=32, ): """ Create a Dataset from an OMX file with an implicit third dimension. Parameters ---------- omx : openmatrix.File or larch.OMX An OMX-format file, opened for reading. index_names : tuple, default ("otaz", "dtaz", "time_period") Should be a tuple of length 3, giving the names of the three dimensions. The first two names are the native dimensions from the open matrix file, the last is the name of the implicit dimension that is created by parsing array names. indexes : str, optional The name of a 'lookup' in the OMX file, which will be used to populate the coordinates for the two native dimensions. Or, specify "one-based" or "zero-based" to assume sequential and consecutive numbering starting with 1 or 0 respectively. time_periods : list-like, required keyword argument A list of index values from which the third dimension is constructed for all variables with a third dimension. time_period_sep : str, default "__" (double underscore) The presence of this separator within the name of any table in the OMX file indicates that table is to be considered a page in a three dimensional variable. The portion of the name preceding the first instance of this separator is the name of the resulting variable, and the portion of the name after the first instance of this separator is the label of the position for this page, which should appear in `time_periods`. max_float_precision : int, default 32 When loading, reduce all floats in the OMX file to this level of precision, generally to save memory if they were stored as double precision but that level of detail is unneeded in the present application. Returns ------- Dataset """ if not isinstance(omx, (list, tuple)): omx = [omx] # handle both larch.OMX and openmatrix.open_file versions if "larch" in type(omx[0]).__module__: omx_shape = omx[0].shape omx_lookup = omx[0].lookup else: omx_shape = omx[0].shape() omx_lookup = omx[0].root["lookup"] omx_data = [] omx_data_map = {} for n, i in enumerate(omx): if "larch" in type(i).__module__: omx_data.append(i.data) for k in i.data._v_children: omx_data_map[k] = n else: omx_data.append(i.root["data"]) for k in i.root["data"]._v_children: omx_data_map[k] = n import dask.array data_names = list(omx_data_map.keys()) n1, n2 = omx_shape if indexes is None: # default reads mapping if only one lookup is included, otherwise one-based if len(omx_lookup._v_children) == 1: ranger = None indexes = list(omx_lookup._v_children)[0] else: ranger = one_based elif indexes == "one-based": ranger = one_based elif indexes == "zero-based": ranger = zero_based elif indexes in set(omx_lookup._v_children): ranger = None else: raise NotImplementedError( "only one-based, zero-based, and named indexes are implemented" ) if ranger is not None: r1 = ranger(n1) r2 = ranger(n2) else: r1 = r2 = pd.Index(omx_lookup[indexes]) if time_periods is None: raise ValueError("must give time periods explicitly") time_periods_map = {t: n for n, t in enumerate(time_periods)} pending_3d = {} content = {} for k in data_names: if time_period_sep in k: base_k, time_k = k.split(time_period_sep, 1) if base_k not in pending_3d: pending_3d[base_k] = [None] * len(time_periods) pending_3d[base_k][time_periods_map[time_k]] = dask.array.from_array( omx_data[omx_data_map[k]][k] ) else: content[k] = xr.DataArray( dask.array.from_array(omx_data[omx_data_map[k]][k]), dims=index_names[:2], coords={ index_names[0]: r1, index_names[1]: r2, }, ) for base_k, darrs in pending_3d.items(): # find a prototype array prototype = None for i in darrs: prototype = i if prototype is not None: break if prototype is None: raise ValueError("no prototype") darrs_ = [ (i if i is not None else dask.array.zeros_like(prototype)) for i in darrs ] content[base_k] = xr.DataArray( dask.array.stack(darrs_, axis=-1), dims=index_names, coords={ index_names[0]: r1, index_names[1]: r2, index_names[2]: time_periods, }, ) for i in content: if np.issubdtype(content[i].dtype, np.floating): if content[i].dtype.itemsize > max_float_precision / 8: content[i] = content[i].astype(f"float{max_float_precision}") return cls(content)
def max_float_precision(self, p=32): """ Set the maximum precision for floating point values. This modifies the Dataset in-place. Parameters ---------- p : {64, 32, 16} The max precision to set. Returns ------- self """ for i in self: if np.issubdtype(self[i].dtype, np.floating): if self[i].dtype.itemsize > p / 8: self[i] = self[i].astype(f"float{p}") return self @property def digital_encodings(self): """ dict: All digital_encoding attributes from Dataset variables. """ result = {} for k in self.variables: k_attrs = self._variables[k].attrs if "digital_encoding" in k_attrs: result[k] = k_attrs["digital_encoding"] return result def set_digital_encoding(self, name, *args, **kwargs): logger.info(f"set_digital_encoding({name})") from .digital_encoding import array_encode result = self.copy() result[name] = array_encode(self[name], *args, **kwargs) return result def interchange_dims(self, dim1, dim2): """ Rename a pair of dimensions by swapping their names. Parameters ---------- dim1, dim2 : str The names of the two dimensions to swap. Returns ------- Dataset """ p21 = "PLACEHOLD21" p12 = "PLACEHOLD12" s1 = {dim1: p12, dim2: p21} s2 = {p12: dim2, p21: dim1} rv = {} vr = {} if dim1 in self.variables: rv[dim1] = p12 vr[p12] = dim2 if dim2 in self.variables: rv[dim2] = p21 vr[p21] = dim1 return self.rename_dims(s1).rename_vars(rv).rename_dims(s2).rename_vars(vr) def rename_dims_and_coords(self, dims_dict=None, **dims_kwargs): from xarray.core.utils import either_dict_or_kwargs dims_dict = either_dict_or_kwargs( dims_dict, dims_kwargs, "rename_dims_and_coords" ) out = self.rename_dims(dims_dict) coords_dict = {} for k in out.coords: if k in dims_dict: coords_dict[k] = dims_dict[k] return out.rename_vars(coords_dict) def rename_or_ignore(self, dims_dict=None, **dims_kwargs): from xarray.core.utils import either_dict_or_kwargs dims_dict = either_dict_or_kwargs( dims_dict, dims_kwargs, "rename_dims_and_coords" ) dims_dict = { k: v for (k, v) in dims_dict.items() if (k in self.dims or k in self._variables) } return self.rename(dims_dict) def explode(self): dims = self.dims out = self.rename_dims({f"{k}": f"{k}_" for k in dims}) out = out.reset_coords() out = out.broadcast_like(out) return out
[docs] @classmethod def from_named_objects(cls, *args): """ Create a Dataset by populating it with named objects. A mapping of names to values is first created, and then that mapping is used in the standard constructor to initialize a Dataset. Parameters ---------- *args : Any A collection of objects, each exposing a `name` attribute. Returns ------- Dataset """ objs = {} for n, a in enumerate(args): try: name = a.name except AttributeError: raise ValueError(f"argument {n} has no name") if name is None: raise ValueError(f"the name for argument {n} is None") objs[name] = a return cls(objs)
def ensure_integer(self, names, bitwidth=32, inplace=False): """ Convert dataset variables to integers, if they are not already integers. Parameters ---------- names : Iterable[str] Variable names in this dataset to convert. bitwidth : int, default 32 Bit width of integers that are created when a conversion is made. Note that variables that are already integer are not modified, even if their bit width differs from this. inplace : bool, default False Whether to make the conversion in-place on this Dataset, or return a copy. Returns ------- Dataset """ if inplace: result = self else: result = self.copy() for name in names: if name not in result: continue if not np.issubdtype(result[name].dtype, np.integer): result[name] = result[name].astype(f"int{bitwidth}") if not inplace: return result def filter_name_tokens(expr, matchable_names=None): name_tokens = extract_all_name_tokens(expr) name_tokens -= {"_args", "_inputs", "_outputs", "np"} name_tokens -= well_known_names if matchable_names: name_tokens &= matchable_names return name_tokens def _dyno(k, v): if isinstance(v, str) and v[0] == "@": return f"__dynamic_{k}{v}" elif v is None: return f"__dynamic_{k}" else: return v def _flip_flop_def(v): if "# sharrow:" in v: return v.split("# sharrow:", 1)[1].strip() else: return v