Dataframewrapper¶
- class sds_data_model.dataframe.DataFrameWrapper(name, data, metadata, lookup, graph)¶
This is a thin wrapper around a Spark DataFrame.
This class stores a Spark DataFrame alongside other objects which both enhance the richness of information associated with the data, and allow for increased flexibility in transforming it.
- name¶
The name of the dataset.
- Type
str
- data¶
Tabular data, optionally containing a geometry column.
- Type
Union[SparkDataFrame, GroupedData]
- metadata¶
Object of class Metadata containing descriptive information relating to the dataset represented in data.
- Type
Metadata, optional
- lookup¶
Dictionary of {column: value-map, …} for columns in the data. Not applied to the data. Use of categorize will write the lookups used by the method to this attribute. This attribute is written to the output of to_zarr.
- Type
Dict[str, Dict[Any, float]], optional
- graph¶
Object of class Digraph containing nodes and edges relating to the source data and transformations that have taken place.
- Type
Digraph, optional
- from_files()¶
Reads in data and converts it to a SparkDataFrame.
- Parameters
data_path (str) –
metadata_path (Optional[str]) –
metadata_kwargs (Optional[Dict[str, Any]]) –
lookup (Optional[Dict[str, Dict[Any, float]]]) –
name (Optional[str]) –
read_file_kwargs (Optional[Dict[str, Any]]) –
spark (Optional[pyspark.sql.session.SparkSession]) –
- Return type
sds_data_model.dataframe._DataFrameWrapper
- call_method()¶
Calls spark method specified by user on SparkDataFrame in wrapper.
- Parameters
self (sds_data_model.dataframe._DataFrameWrapper) –
method_name (str) –
args (Optional[Union[str, Sequence[str]]]) –
kwargs (Optional[Union[str, Dict[str, Any], pyspark.sql.dataframe.DataFrame, pyspark.sql.group.GroupedData]]) –
- Return type
sds_data_model.dataframe._DataFrameWrapper
- categorize()¶
Maps an auto-generated or given dictionary onto provided columns.
- Parameters
self (sds_data_model.dataframe._DataFrameWrapper) –
columns (Sequence[str]) –
lookup (Optional[Dict[str, Dict[Any, float]]]) –
spark (Optional[pyspark.sql.session.SparkSession]) –
- Return type
sds_data_model.dataframe._DataFrameWrapper
- index()¶
Adds a spatial index to data in a Spark DataFrame.
- Parameters
self (sds_data_model.dataframe._DataFrameWrapper) –
resolution (int) –
how (str) –
index_column_name (str) –
bounds_column_name (str) –
geometry_column_name (str) –
exploded (bool) –
- Return type
sds_data_model.dataframe._DataFrameWrapper
- to_zarr()¶
Rasterises columns of self.data and writes them to zarr.
- Parameters
self (sds_data_model.dataframe._DataFrameWrapper) –
path (str) –
columns (Optional[List[str]]) –
nodata (Optional[Dict[str, float]]) –
index_column_name (str) –
geometry_column_name (str) –
overwrite (bool) –
cell_size (int) –
bng_xmin (int) –
bng_xmax (int) –
bng_ymax (int) –
out_shape (Tuple[int, int]) –
- Return type
None
- Returns
SparkDataFrameWrapper
- Return type
_DataFrameWrapper
- Parameters
name (str) –
data (Union[pyspark.sql.dataframe.DataFrame, pyspark.sql.group.GroupedData]) –
metadata (Optional[sds_data_model.metadata.Metadata]) –
lookup (Optional[Dict[str, Dict[Any, float]]]) –
graph (Optional[graphviz.graphs.Digraph]) –
- __init__(name, data, metadata, lookup, graph)¶
- Parameters
name (str) –
data (Union[pyspark.sql.dataframe.DataFrame, pyspark.sql.group.GroupedData]) –
metadata (Optional[sds_data_model.metadata.Metadata]) –
lookup (Optional[Dict[str, Dict[Any, float]]]) –
graph (Optional[graphviz.graphs.Digraph]) –
- Return type
None
Methods
__init__
(name, data, metadata, lookup, graph)call_method
(method_name, /, *args, **kwargs)Calls Spark method specified by user on Spark DataFrame in wrapper.
categorize
(columns[, lookup, spark])Maps an auto-generated or given dictionary onto provided columns.
from_files
(data_path[, metadata_path, ...])Reads in data and converts it to a SparkDataFrame.
index
([resolution, how, index_column_name, ...])Adds a spatial index to data in a Spark DataFrame.
to_zarr
(path[, columns, nodata, ...])Rasterises columns of self.data and writes them to zarr.
Attributes
- call_method(method_name, /, *args, **kwargs)¶
Calls Spark method specified by user on Spark DataFrame in wrapper.
The function does the following: 1) assigns method call to attribute object, to examine it before implementing anything 2) check if method_name is a method 3) get the signature of method (what argument it takes, return type, etc.) 4) bind arguments to function arguments 5) create string to pass through graph generator 6) return value, as have checked relevant information and can now call method 7) if not a method, assume its a property (eg. CRS) 8) check if method_name returns a dataframe, if so update self.data attribute with that new data 9) if dataframe not returned, (eg. display called), return original data
Examples
>>> from sds_data_model.dataframe import DataFrameWrapper >>> wrapped = DataFrameWrapper.from_files(name = "National parks", data_path="/dbfs/mnt/base/unrestricted/source_defra_data_services_platform/dataset_traditional_orchards/format_SHP_traditional_orchards/LATEST_traditional_orchards/", read_file_kwargs = {'suffix':'.shp'})
Limit number of rows to 5 >>> wrapped_small = wrapped.call_method(‘limit’, num = 5) Look at data >>> wrapped_small.call_method(“show”)
- Parameters
method_name (str) – Name of method or property called by user.
*args (Optional[Union[List[int], int]]) – Additional non-keyword arguments provided by user.
**kwargs (Optional[Dict[str, int]]) – Additional keyword arguments provided by user.
self (sds_data_model.dataframe._DataFrameWrapper) –
args (Optional[Union[str, Sequence[str]]]) –
kwargs (Optional[Union[str, Dict[str, Any], pyspark.sql.dataframe.DataFrame, pyspark.sql.group.GroupedData]]) –
- Returns
The SparkDataFrameWrapper, updated if necessary
- Return type
_DataFrameWrapper
- categorize(columns, lookup=None, spark=None)¶
Maps an auto-generated or given dictionary onto provided columns.
This method is used to create a lookup (stored within the DataFrameWrapper) that is then used during the rasterisation process.
Note: do not use categorize more than once in a given session, as the data from each run of the method update values inplace. If a new categorization is required then, re-create the DataFrameWrapper first.
Examples
>>> from sds_data_model.dataframe import DataFrameWrapper >>> wrapped = DataFrameWrapper.from_files(name = "priority_habitats", data_path = '/dbfs/mnt/base/unrestricted/source_defra_data_services_platform', metadata_path = 'https://ckan.publishing.service.gov.uk/harvest/object/85e03bf0-4e95-4739-a5fa-21d60cf7f069', read_file_kwargs = {'pattern': 'dataset_priority_habitat_inventory_*/format_SHP_priority_habitat_inventory_*/LATEST_priority_habitat_inventory_*/PHI_v2_3_*', 'suffix': '.shp'})
Categorize by main habitat type >>> wrapped.categorize([‘Main_Habit’]) Look at lookup >>> wrapped.lookup
- Parameters
columns (Sequence[str]) – Columns to map on.
lookup (Optional[Dict[str, Dict[Any, float]]]) – {column: value-map} dictionary to map. Defaults to {}.
spark (Optional[SparkSession]) – spark session.
self (sds_data_model.dataframe._DataFrameWrapper) –
- Returns
SparkDataFrameWrapper
- Return type
_DataFrameWrapper
- classmethod from_files(data_path, metadata_path=None, metadata_kwargs=None, lookup=None, name=None, read_file_kwargs=None, spark=None)¶
Reads in data and converts it to a SparkDataFrame.
A wide range of data can be read in with from_files. This includes vector data supported by GDAL drivers, multiple spreadsheet formats read with pandas.read_excel, and csvs, json and parquet files are handled by Spark.
Examples
>>> from sds_data_model.dataframe import DataFrameWrapper >>> wrapped_shp = DataFrameWrapper.from_files( name = "National parks", data_path="/dbfs/mnt/base/unrestricted/source_defra_data_services_platform/dataset_national_parks/format_SHP_national_parks/LATEST_national_parks/", read_file_kwargs = {'suffix':'.shp'}, metadata_path = "https://ckan.publishing.service.gov.uk/harvest/object/656c07d1-67b3-4bdb-8ab3-75e118a7cf14" ) >>> wrapped_csv = DataFrameWrapper.from_files( name = "indicator_5__species_in_the_wider_countryside__farmland_1970_to_2020", data_path="dbfs:/mnt/lab/unrestricted/source_isr/dataset_england_biodiversity_indicators/format_CSV_england_biodiversity_indicators/LATEST_england_biodiversity_indicators/indicator_5__species_in_the_wider_countryside__farmland_1970_to_2020.csv",
read_file_kwargs = {‘header’ :True}
)
- Parameters
data_path (str) – Path to data.
metadata_path (Optional[str], optional) – Path to metadata supplied by user. Defaults to None.
metadata_kwargs (Optional[str]) – Optional kwargs for metadata.
lookup (Optional[Dict[str, Dict[Any, float]]]) – Dictionary of {column: value-map, …} for columns in the data. Not applied to the data. Defaults to None.
name (Optional[str], optional) – Name for data, either supplied by caller or obtained from metadata title. Defaults to None.
read_file_kwargs (Optional[Dict[str,Any]], optional) – Additional kwargs supplied by the caller, dependent on the function called. Defaults to None.
spark (Optional[SparkSession]) – Optional spark session.
- Returns
SparkDataFrameWrapper
- Return type
_DataFrameWrapper
- index(resolution=100000, how='intersects', index_column_name='bng_index', bounds_column_name='bounds', geometry_column_name='geometry', exploded=True)¶
Adds a spatial index to data in a Spark DataFrame.
Calculates the grid index or indices for the geometrty provided in well-known binary format at a given resolution. An index is required for the rasterisation process executed in the to_zarr maethod. Executing this method will add relevant index columns to the dataframe stored within the DataFrameWrapper.
Examples
>>> from sds_data_model.dataframe import DataFrameWrapper >>> wrapped = DataFrameWrapper.from_files(name = "priority_habitats", data_path = '/dbfs/mnt/base/unrestricted/source_defra_data_services_platform', metadata_path = 'https://ckan.publishing.service.gov.uk/harvest/object/85e03bf0-4e95-4739-a5fa-21d60cf7f069', read_file_kwargs = {'pattern': 'dataset_priority_habitat_inventory_*/format_SHP_priority_habitat_inventory_*/LATEST_priority_habitat_inventory_*/PHI_v2_3_*', 'suffix': '.shp'})
Index data. >>> wrapped.index(resolution = 100_000)
Look at additional columns added to dataframe. >>> wrapped.data.dtypes
- Parameters
resolution (int) – Resolution of British National Grid cell(s) to return. Defaults to 100_000.
how (str) – Indexing method of: bounding box, intersects (default), contains. Defaults to “intersects”.
index_column_name (str) – Name of column in dataframe. Defaults to “bng_index”.
bounds_column_name (str) – Name of column in dataframe. Defaults to “bounds”.
geometry_column_name (str) – Name of column in dataframe. Defaults to “geometry”.
exploded (bool) – ???. Defaults to True.
self (sds_data_model.dataframe._DataFrameWrapper) –
- Raises
ValueError – If self.data is an instance of pyspark.sql.GroupedData instead of pyspark.sql.DataFrame.
- Returns
An indexed DataFrameWrapper.
- Return type
_DataFrameWrapper
- to_zarr(path, columns=None, nodata=None, index_column_name='bng_index', geometry_column_name='geometry', overwrite=False, cell_size=10, bng_xmin=0, bng_xmax=700000, bng_ymax=1300000, out_shape=(10000, 10000))¶
Rasterises columns of self.data and writes them to zarr.
This function requires two additional columns: * A “bng_index” column containing the BNG index of the geometry in each row. * A “bounds” column containing the BNG bounds of the BNG index as a list in
each row.
Examples
>>> wrapper.to_zarr( path = "/path/to/file.zarr", )
- Parameters
path (str) – Path to save the zarr file including file name.
columns (Optional[List[str]]) – Columns to rasterize. If None, a geometry mask will be generated. Defaults to None.
nodata (Optional[Dict[str, float]]) – Dictionary of {column: nodata}. Manual assignment of the nodata/fill value. Defaults to None.
index_column_name (str) – Name of the BNG index column. Defaults to “bng_index”.
geometry_column_name (str) – Name of the geometry column. Defaults to “geometry”.
overwrite (bool) – Overwrite existing zarr? Defaults to False.
cell_size (int) – The resolution of the cells in the DataArray. Defaults to CELL_SIZE.
out_shape (Tuple[int, int]) – The shape (height, width) of the DataArray. Defaults to OUT_SHAPE.
bng_xmin (int) – The minimum x value of the DataArray. Defaults to BNG_XMIN, the minimum x value of the British National Grid.
bng_xmax (int) – The maximum x value of the DataArray. Defaults to BNG_XMAX, the maximum x value of the British National Grid.
bng_ymax (int) – The maximum y value of the DataArray. Defaults to BNG_YMAX, the maximum y value of the British National Grid.
self (sds_data_model.dataframe._DataFrameWrapper) –
- Raises
ValueError – If self.data is not an instance of of pyspark.sql.DataFrame.
ValueError – If index_column_name isn’t in the dataframe.
ValueError – If geometry_column_name isn’t in the dataframe.
ValueError – If column of type string is in columns.
- Return type
None
Returns: None.