Dataframewrapper

class sds_data_model.dataframe.DataFrameWrapper(name, data, metadata, lookup, graph)

This is a thin wrapper around a Spark DataFrame.

This class stores a Spark DataFrame alongside other objects which both enhance the richness of information associated with the data, and allow for increased flexibility in transforming it.

name

The name of the dataset.

Type

str

data

Tabular data, optionally containing a geometry column.

Type

Union[SparkDataFrame, GroupedData]

metadata

Object of class Metadata containing descriptive information relating to the dataset represented in data.

Type

Metadata, optional

lookup

Dictionary of {column: value-map, …} for columns in the data. Not applied to the data. Use of categorize will write the lookups used by the method to this attribute. This attribute is written to the output of to_zarr.

Type

Dict[str, Dict[Any, float]], optional

graph

Object of class Digraph containing nodes and edges relating to the source data and transformations that have taken place.

Type

Digraph, optional

from_files()

Reads in data and converts it to a SparkDataFrame.

Parameters
  • data_path (str) –

  • metadata_path (Optional[str]) –

  • metadata_kwargs (Optional[Dict[str, Any]]) –

  • lookup (Optional[Dict[str, Dict[Any, float]]]) –

  • name (Optional[str]) –

  • read_file_kwargs (Optional[Dict[str, Any]]) –

  • spark (Optional[pyspark.sql.session.SparkSession]) –

Return type

sds_data_model.dataframe._DataFrameWrapper

call_method()

Calls spark method specified by user on SparkDataFrame in wrapper.

Parameters
  • self (sds_data_model.dataframe._DataFrameWrapper) –

  • method_name (str) –

  • args (Optional[Union[str, Sequence[str]]]) –

  • kwargs (Optional[Union[str, Dict[str, Any], pyspark.sql.dataframe.DataFrame, pyspark.sql.group.GroupedData]]) –

Return type

sds_data_model.dataframe._DataFrameWrapper

categorize()

Maps an auto-generated or given dictionary onto provided columns.

Parameters
  • self (sds_data_model.dataframe._DataFrameWrapper) –

  • columns (Sequence[str]) –

  • lookup (Optional[Dict[str, Dict[Any, float]]]) –

  • spark (Optional[pyspark.sql.session.SparkSession]) –

Return type

sds_data_model.dataframe._DataFrameWrapper

index()

Adds a spatial index to data in a Spark DataFrame.

Parameters
  • self (sds_data_model.dataframe._DataFrameWrapper) –

  • resolution (int) –

  • how (str) –

  • index_column_name (str) –

  • bounds_column_name (str) –

  • geometry_column_name (str) –

  • exploded (bool) –

Return type

sds_data_model.dataframe._DataFrameWrapper

to_zarr()

Rasterises columns of self.data and writes them to zarr.

Parameters
  • self (sds_data_model.dataframe._DataFrameWrapper) –

  • path (str) –

  • columns (Optional[List[str]]) –

  • nodata (Optional[Dict[str, float]]) –

  • index_column_name (str) –

  • geometry_column_name (str) –

  • overwrite (bool) –

  • cell_size (int) –

  • bng_xmin (int) –

  • bng_xmax (int) –

  • bng_ymax (int) –

  • out_shape (Tuple[int, int]) –

Return type

None

Returns

SparkDataFrameWrapper

Return type

_DataFrameWrapper

Parameters
  • name (str) –

  • data (Union[pyspark.sql.dataframe.DataFrame, pyspark.sql.group.GroupedData]) –

  • metadata (Optional[sds_data_model.metadata.Metadata]) –

  • lookup (Optional[Dict[str, Dict[Any, float]]]) –

  • graph (Optional[graphviz.graphs.Digraph]) –

__init__(name, data, metadata, lookup, graph)
Parameters
  • name (str) –

  • data (Union[pyspark.sql.dataframe.DataFrame, pyspark.sql.group.GroupedData]) –

  • metadata (Optional[sds_data_model.metadata.Metadata]) –

  • lookup (Optional[Dict[str, Dict[Any, float]]]) –

  • graph (Optional[graphviz.graphs.Digraph]) –

Return type

None

Methods

__init__(name, data, metadata, lookup, graph)

call_method(method_name, /, *args, **kwargs)

Calls Spark method specified by user on Spark DataFrame in wrapper.

categorize(columns[, lookup, spark])

Maps an auto-generated or given dictionary onto provided columns.

from_files(data_path[, metadata_path, ...])

Reads in data and converts it to a SparkDataFrame.

index([resolution, how, index_column_name, ...])

Adds a spatial index to data in a Spark DataFrame.

to_zarr(path[, columns, nodata, ...])

Rasterises columns of self.data and writes them to zarr.

Attributes

name

data

metadata

lookup

graph

call_method(method_name, /, *args, **kwargs)

Calls Spark method specified by user on Spark DataFrame in wrapper.

The function does the following: 1) assigns method call to attribute object, to examine it before implementing anything 2) check if method_name is a method 3) get the signature of method (what argument it takes, return type, etc.) 4) bind arguments to function arguments 5) create string to pass through graph generator 6) return value, as have checked relevant information and can now call method 7) if not a method, assume its a property (eg. CRS) 8) check if method_name returns a dataframe, if so update self.data attribute with that new data 9) if dataframe not returned, (eg. display called), return original data

Examples

>>> from sds_data_model.dataframe import DataFrameWrapper
>>> wrapped = DataFrameWrapper.from_files(name = "National parks",
            data_path="/dbfs/mnt/base/unrestricted/source_defra_data_services_platform/dataset_traditional_orchards/format_SHP_traditional_orchards/LATEST_traditional_orchards/",
            read_file_kwargs = {'suffix':'.shp'})

Limit number of rows to 5 >>> wrapped_small = wrapped.call_method(‘limit’, num = 5) Look at data >>> wrapped_small.call_method(“show”)

Parameters
  • method_name (str) – Name of method or property called by user.

  • *args (Optional[Union[List[int], int]]) – Additional non-keyword arguments provided by user.

  • **kwargs (Optional[Dict[str, int]]) – Additional keyword arguments provided by user.

  • self (sds_data_model.dataframe._DataFrameWrapper) –

  • args (Optional[Union[str, Sequence[str]]]) –

  • kwargs (Optional[Union[str, Dict[str, Any], pyspark.sql.dataframe.DataFrame, pyspark.sql.group.GroupedData]]) –

Returns

The SparkDataFrameWrapper, updated if necessary

Return type

_DataFrameWrapper

categorize(columns, lookup=None, spark=None)

Maps an auto-generated or given dictionary onto provided columns.

This method is used to create a lookup (stored within the DataFrameWrapper) that is then used during the rasterisation process.

Note: do not use categorize more than once in a given session, as the data from each run of the method update values inplace. If a new categorization is required then, re-create the DataFrameWrapper first.

Examples

>>> from sds_data_model.dataframe import DataFrameWrapper
>>> wrapped = DataFrameWrapper.from_files(name = "priority_habitats",
data_path = '/dbfs/mnt/base/unrestricted/source_defra_data_services_platform',
metadata_path = 'https://ckan.publishing.service.gov.uk/harvest/object/85e03bf0-4e95-4739-a5fa-21d60cf7f069',
read_file_kwargs = {'pattern': 'dataset_priority_habitat_inventory_*/format_SHP_priority_habitat_inventory_*/LATEST_priority_habitat_inventory_*/PHI_v2_3_*',
'suffix': '.shp'})

Categorize by main habitat type >>> wrapped.categorize([‘Main_Habit’]) Look at lookup >>> wrapped.lookup

Parameters
  • columns (Sequence[str]) – Columns to map on.

  • lookup (Optional[Dict[str, Dict[Any, float]]]) – {column: value-map} dictionary to map. Defaults to {}.

  • spark (Optional[SparkSession]) – spark session.

  • self (sds_data_model.dataframe._DataFrameWrapper) –

Returns

SparkDataFrameWrapper

Return type

_DataFrameWrapper

classmethod from_files(data_path, metadata_path=None, metadata_kwargs=None, lookup=None, name=None, read_file_kwargs=None, spark=None)

Reads in data and converts it to a SparkDataFrame.

A wide range of data can be read in with from_files. This includes vector data supported by GDAL drivers, multiple spreadsheet formats read with pandas.read_excel, and csvs, json and parquet files are handled by Spark.

Examples

>>> from sds_data_model.dataframe import DataFrameWrapper
>>> wrapped_shp = DataFrameWrapper.from_files(
    name = "National parks",
    data_path="/dbfs/mnt/base/unrestricted/source_defra_data_services_platform/dataset_national_parks/format_SHP_national_parks/LATEST_national_parks/",
    read_file_kwargs = {'suffix':'.shp'},
    metadata_path = "https://ckan.publishing.service.gov.uk/harvest/object/656c07d1-67b3-4bdb-8ab3-75e118a7cf14"
)
>>> wrapped_csv = DataFrameWrapper.from_files(
    name = "indicator_5__species_in_the_wider_countryside__farmland_1970_to_2020",
    data_path="dbfs:/mnt/lab/unrestricted/source_isr/dataset_england_biodiversity_indicators/format_CSV_england_biodiversity_indicators/LATEST_england_biodiversity_indicators/indicator_5__species_in_the_wider_countryside__farmland_1970_to_2020.csv",

read_file_kwargs = {‘header’ :True}

)

Parameters
  • data_path (str) – Path to data.

  • metadata_path (Optional[str], optional) – Path to metadata supplied by user. Defaults to None.

  • metadata_kwargs (Optional[str]) – Optional kwargs for metadata.

  • lookup (Optional[Dict[str, Dict[Any, float]]]) – Dictionary of {column: value-map, …} for columns in the data. Not applied to the data. Defaults to None.

  • name (Optional[str], optional) – Name for data, either supplied by caller or obtained from metadata title. Defaults to None.

  • read_file_kwargs (Optional[Dict[str,Any]], optional) – Additional kwargs supplied by the caller, dependent on the function called. Defaults to None.

  • spark (Optional[SparkSession]) – Optional spark session.

Returns

SparkDataFrameWrapper

Return type

_DataFrameWrapper

index(resolution=100000, how='intersects', index_column_name='bng_index', bounds_column_name='bounds', geometry_column_name='geometry', exploded=True)

Adds a spatial index to data in a Spark DataFrame.

Calculates the grid index or indices for the geometrty provided in well-known binary format at a given resolution. An index is required for the rasterisation process executed in the to_zarr maethod. Executing this method will add relevant index columns to the dataframe stored within the DataFrameWrapper.

Examples

>>> from sds_data_model.dataframe import DataFrameWrapper
>>> wrapped = DataFrameWrapper.from_files(name = "priority_habitats",
data_path = '/dbfs/mnt/base/unrestricted/source_defra_data_services_platform',
metadata_path = 'https://ckan.publishing.service.gov.uk/harvest/object/85e03bf0-4e95-4739-a5fa-21d60cf7f069',
read_file_kwargs = {'pattern': 'dataset_priority_habitat_inventory_*/format_SHP_priority_habitat_inventory_*/LATEST_priority_habitat_inventory_*/PHI_v2_3_*',
'suffix': '.shp'})

Index data. >>> wrapped.index(resolution = 100_000)

Look at additional columns added to dataframe. >>> wrapped.data.dtypes

Parameters
  • resolution (int) – Resolution of British National Grid cell(s) to return. Defaults to 100_000.

  • how (str) – Indexing method of: bounding box, intersects (default), contains. Defaults to “intersects”.

  • index_column_name (str) – Name of column in dataframe. Defaults to “bng_index”.

  • bounds_column_name (str) – Name of column in dataframe. Defaults to “bounds”.

  • geometry_column_name (str) – Name of column in dataframe. Defaults to “geometry”.

  • exploded (bool) – ???. Defaults to True.

  • self (sds_data_model.dataframe._DataFrameWrapper) –

Raises

ValueError – If self.data is an instance of pyspark.sql.GroupedData instead of pyspark.sql.DataFrame.

Returns

An indexed DataFrameWrapper.

Return type

_DataFrameWrapper

to_zarr(path, columns=None, nodata=None, index_column_name='bng_index', geometry_column_name='geometry', overwrite=False, cell_size=10, bng_xmin=0, bng_xmax=700000, bng_ymax=1300000, out_shape=(10000, 10000))

Rasterises columns of self.data and writes them to zarr.

This function requires two additional columns: * A “bng_index” column containing the BNG index of the geometry in each row. * A “bounds” column containing the BNG bounds of the BNG index as a list in

each row.

Examples

>>> wrapper.to_zarr(
    path = "/path/to/file.zarr",
)
Parameters
  • path (str) – Path to save the zarr file including file name.

  • columns (Optional[List[str]]) – Columns to rasterize. If None, a geometry mask will be generated. Defaults to None.

  • nodata (Optional[Dict[str, float]]) – Dictionary of {column: nodata}. Manual assignment of the nodata/fill value. Defaults to None.

  • index_column_name (str) – Name of the BNG index column. Defaults to “bng_index”.

  • geometry_column_name (str) – Name of the geometry column. Defaults to “geometry”.

  • overwrite (bool) – Overwrite existing zarr? Defaults to False.

  • cell_size (int) – The resolution of the cells in the DataArray. Defaults to CELL_SIZE.

  • out_shape (Tuple[int, int]) – The shape (height, width) of the DataArray. Defaults to OUT_SHAPE.

  • bng_xmin (int) – The minimum x value of the DataArray. Defaults to BNG_XMIN, the minimum x value of the British National Grid.

  • bng_xmax (int) – The maximum x value of the DataArray. Defaults to BNG_XMAX, the maximum x value of the British National Grid.

  • bng_ymax (int) – The maximum y value of the DataArray. Defaults to BNG_YMAX, the maximum y value of the British National Grid.

  • self (sds_data_model.dataframe._DataFrameWrapper) –

Raises
  • ValueError – If self.data is not an instance of of pyspark.sql.DataFrame.

  • ValueError – If index_column_name isn’t in the dataframe.

  • ValueError – If geometry_column_name isn’t in the dataframe.

  • ValueError – If column of type string is in columns.

Return type

None

Returns: None.