Benchmarking Performance of History vs. Timeseries Files with ecgtools, Intake-ESM, and Dask#

In this example, we will look at how long reading data from the Community Earth System Model (CESM), applying calculations, and visualizing the output takes using the following packages:

We are going to investigate whether it is faster to do these operations on the history files output by the model or on time series files that have been generated from the history files. Our hypothesis is that performance should be substantially better when reading from timeseries files, but let’s take a look…

We use CESM data on the GLADE filesystem, from a case which includes both history and timeseries files on disk.

Imports#

Installing packages via conda-forge#

As of this week, ecgtools is available via conda-forge, which is very exciting! You can install the packages used here using the following:

conda install -c conda-forge ecgtools ncar-jobqueue distributed intake-esm pandas

We will also install hvPlot to help with visualization, installing from the pyviz channel!

conda install -c pyviz hvplot
import ast
import time
import warnings

warnings.filterwarnings("ignore")

import holoviews as hv
import hvplot
import hvplot.pandas
import intake
import pandas as pd
from dask.distributed import performance_report
from distributed import Client
from ecgtools import Builder
from ecgtools.parsers.cesm import parse_cesm_history, parse_cesm_timeseries
from IPython.core.display import HTML
from ncar_jobqueue import NCARCluster

hv.extension('bokeh')

Spin up a Dask Cluster#

In this example, we jump directly into the computation! If you are interested in building the catalogs used in the example, checkout the Appendix

The cluster here is using a set number of workers (19) other tests in the future could investigate the scalability by modifying the computational resources used, using different cluster configurations

cluster = NCARCluster(memory='25 GB', cores=1, walltime='06:00:00')
cluster.scale(19)
client = Client(cluster)
cluster

Read the Catalogs Using Intake-ESM#

Here, we read the catalogs we just created in the previous section! Remember, there are numerous variables in a single file (row) for the history files, so we use ast to help parse those rows!

history_catalog = intake.open_esm_datastore(
    'cesm-hist-smyle-fosi.json',
    csv_kwargs={"converters": {"variables": ast.literal_eval}},
    sep="/",
)
history_catalog

None catalog with 5 dataset(s) from 6015 asset(s):

unique
component 2
stream 5
date 5235
case 1
member_id 1
frequency 3
variables 620
path 6015

The timeseries file does not require any additional arguments 👌

timeseries_catalog = intake.open_esm_datastore('cesm-tseries-smyle-fosi.json')
timeseries_catalog

None catalog with 4 dataset(s) from 579 asset(s):

unique
component 2
stream 4
case 1
member_id 1
variable 579
start_time 3
end_time 3
time_range 3
long_name 570
units 67
vertical_levels 1
frequency 3
path 579

Search for Just Monthly Ocean Output#

We are only interested in monthly (frequency='month_1') ocean (component='ocn') output in this case

monthly_ocean_timeseries = timeseries_catalog.search(component='ocn', frequency='month_1')
monthly_ocean_timeseries

None catalog with 1 dataset(s) from 410 asset(s):

unique
component 1
stream 1
case 1
member_id 1
variable 410
start_time 1
end_time 1
time_range 1
long_name 409
units 55
vertical_levels 1
frequency 1
path 410
monthly_ocean_history = history_catalog.search(component='ocn', frequency='month_1')
monthly_ocean_history

None catalog with 1 dataset(s) from 756 asset(s):

unique
component 1
stream 1
date 756
case 1
member_id 1
frequency 1
variables 446
path 756

Test File Access Speeds#

Here, we test the time it takes to read in the following time ranges:

  • 1 year

  • 5 years

  • 10 years

  • 20 years

  • 40 years

  • 60 years

A useful tool we will use here is the Dask Performance Report which enables the user to output their Dask dashboard, so they can share it with others. This provides a means of going back to a computation on your cluster to investigate which tasks, workers, etc.

Test out the Timeseries Files#

def test_timeseries_file(
    num_years, chunk_strategy='time', long_term_average=False, monthly_average=False
):
    chunk_dict = {'time': {'time': 240}, 'spatial': {'nlon': 160, 'nlat': 192}}
    subset = monthly_ocean_timeseries.search(variable='FG_CO2')

    # Start the timing when we read in the dataset using .to_dataset_dict()
    start = time.time()
    dsets = subset.to_dataset_dict(
        'FG_CO2',
        cdf_kwargs={'use_cftime': True, 'chunks': chunk_dict[chunk_strategy]},
        progressbar=False,
    )
    keys = list(dsets.keys())
    ds = dsets[keys[0]]

    if long_term_average:
        ds.mean(dim='time').compute()

    if monthly_average:
        ds.groupby('time.month').mean(dim='time').compute()

    end = time.time()
    return end - start

Apply the Computation with Timeseries#

We also including the Dask performance report, which can be viewed following the computation cell.

years = [1, 5, 10, 20, 30, 40, 50, 60]

df = pd.DataFrame()

with performance_report(filename="timeseries-computation.html"):
    for year in years:
        print(f'Starting year {year}')
        # Compute without computation
        wall_time = test_timeseries_file(year)
        df = df.append(
            {
                'catalog': 'timeseries',
                'computation': 'file_access',
                'num_years': year,
                'wall_time': wall_time,
            },
            ignore_index=True,
        )

        # Compute with long-term mean computation
        wall_time = test_timeseries_file(year, long_term_average=True)
        df = df.append(
            {
                'catalog': 'timeseries',
                'computation': 'long_term_average',
                'num_years': year,
                'wall_time': wall_time,
            },
            ignore_index=True,
        )

        # Compute with long-term mean computation
        wall_time = test_timeseries_file(year, monthly_average=True)
        df = df.append(
            {
                'catalog': 'timeseries',
                'computation': 'monthly_average',
                'num_years': year,
                'wall_time': wall_time,
            },
            ignore_index=True,
        )
df_timeseries = df
Starting year 1
Starting year 5
Starting year 10
Starting year 20
Starting year 30
Starting year 40
Starting year 50
Starting year 60
Hide code cell source
HTML('timeseries-computation.html')
Dask Performance Report
file_read_in = df.loc[df.computation == 'file_access'].hvplot(
    x='num_years', y='wall_time', ylabel='Time (s)', width=400, label='No computation'
)
long_term_mean = df.loc[df.computation == 'long_term_average'].hvplot(
    x='num_years', y='wall_time', ylabel='Time (s)', width=400, label='Long term mean'
)
monthly_average = df.loc[df.computation == 'monthly_average'].hvplot(
    x='num_years', y='wall_time', ylabel='Time (s)', width=400, label='Monthly climatology'
)

# We combine all the plots, and specify we want a single column
(file_read_in + long_term_mean + monthly_average).cols(1)

Test out the History Files#

Next, we investigate the time it takes to read the datasets, compute a temporal average over the entire time period, and compute a monthly climatology using CESM history file output

def test_history_file(
    num_years, chunk_strategy='time', long_term_average=False, monthly_average=False
):
    chunk_dict = {'time': {'time': 240}, 'spatial': {'nlon': 160, 'nlat': 192}}
    subset = monthly_ocean_history.search(
        date=monthly_ocean_history.df.date[: num_years * 12], variables='FG_CO2'
    )
    # Start the timing when we read in the dataset using .to_dataset_dict()
    start = time.time()
    dsets = subset.to_dataset_dict(
        'FG_CO2',
        cdf_kwargs={'use_cftime': True, 'chunks': chunk_dict[chunk_strategy]},
        progressbar=False,
    )
    keys = list(dsets.keys())
    ds = dsets[keys[0]]

    if long_term_average:
        ds.mean(dim='time').compute()

    if monthly_average:
        ds.groupby('time.month').mean(dim='time').compute()

    end = time.time()
    return end - start

Apply the Computation with History Files#

We also including the Dask performance report, which can be viewed following the computation cell.

df = pd.DataFrame()
with performance_report(filename="history-computation.html"):
    for year in years:
        print(f'Starting year {year}')

        # Compute without computation
        wall_time = test_history_file(year)
        df = df.append(
            {
                'catalog': 'history',
                'computation': 'file_access',
                'num_years': year,
                'wall_time': wall_time,
            },
            ignore_index=True,
        )

        # Compute with long-term mean computation
        wall_time = test_history_file(year, long_term_average=True)
        df = df.append(
            {
                'catalog': 'history',
                'computation': 'long_term_average',
                'num_years': year,
                'wall_time': wall_time,
            },
            ignore_index=True,
        )

        # Compute with monthly climatology
        wall_time = test_history_file(year, monthly_average=True)
        df = df.append(
            {
                'catalog': 'history',
                'computation': 'monthly_average',
                'num_years': year,
                'wall_time': wall_time,
            },
            ignore_index=True,
        )

df_history = df
Starting year 1
Starting year 5
Starting year 10
Starting year 20
Starting year 30
Starting year 40
Starting year 50
Starting year 60
Hide code cell source
HTML('history-computation.html')
Dask Performance Report

Plot a Comparison of Wall time for Different Computations#

file_read_in = df.loc[df.computation == 'file_access'].hvplot(
    x='num_years', y='wall_time', ylabel='Time (s)', width=400, label='No computation'
)
long_term_mean = df.loc[df.computation == 'long_term_average'].hvplot(
    x='num_years', y='wall_time', ylabel='Time (s)', width=400, label='Long term mean'
)
monthly_average = df.loc[df.computation == 'monthly_average'].hvplot(
    x='num_years', y='wall_time', ylabel='Time (s)', width=400, label='Monthly climatology'
)

(file_read_in + long_term_mean + monthly_average).cols(1)

Visualize the Comparisons#

Using additional options in hvPlot, we can investigate these comparisons further!

Intercomparison of Timeseries#

(
    df_timeseries.hvplot(
        x='num_years',
        y='wall_time',
        by='computation',
        xlabel='Number of Years',
        ylabel='Wall Time (s)',
        width=550,
    )
    + df_timeseries.hvplot.table(width=420)
).cols(1)

Intercomparison of History#

(
    df_history.hvplot(
        x='num_years',
        y='wall_time',
        by='computation',
        xlabel='Number of Years',
        ylabel='Wall Time (s)',
        width=550,
    )
    + df_history.hvplot.table(width=420)
).cols(1)

Bringing These Two Together#

df = pd.concat([df_timeseries, df_history])
(
    df.hvplot(
        x='num_years',
        y='wall_time',
        by=['catalog', 'computation'],
        xlabel='Number of Years',
        ylabel='Wall Time (s)',
        width=800,
    )
    + df.hvplot.table(width=420)
).cols(1)

Conclusions#

We see something unsuprising here; the performance working with timeseries files is much faster, with the greatest difference being the file access. With the history files, we see that the data read in step takes the longest, with relatively smaller differences in the computation time for the various averages. Once you read in the datasets, performance is similar.

One item to note here is that we were able to read in 60 years worth of history files in under a minute, which is impressive! If you are plotting quick comparisons from model output, you may not need to neccessarily convert to timeseries right away; however in the long run, it will save on both space and computational expense.

Appendix#

Build the Catalogs#

Something to keep in mind here is that ecgtools’s builder parallelizes across the number of cores you have available; so ideally, for this section of the notebook, you will want to use more than a single core, up to however many you see fit.

Build the History File Catalog#

cesm_history_builder = Builder(
    "/glade/campaign/cesm/development/espwg/SMYLE/initial_conditions/SMYLE-FOSI",
    exclude_patterns=["*/rest/*", "*/logs/*", "*/proc/*"],
)

Build the catalog using the parse_cesm_history parser#

cesm_history_builder.build(parse_cesm_history)
[Parallel(n_jobs=-1)]: Using backend LokyBackend with 40 concurrent workers.
[Parallel(n_jobs=-1)]: Done   2 out of   4 | elapsed:    1.2s remaining:    1.2s
[Parallel(n_jobs=-1)]: Done   4 out of   4 | elapsed:    1.3s remaining:    0.0s
[Parallel(n_jobs=-1)]: Done   4 out of   4 | elapsed:    1.3s finished
[Parallel(n_jobs=-1)]: Using backend LokyBackend with 40 concurrent workers.
[Parallel(n_jobs=-1)]: Done  82 tasks      | elapsed:    5.4s
[Parallel(n_jobs=-1)]: Done 208 tasks      | elapsed:    5.8s
[Parallel(n_jobs=-1)]: Done 370 tasks      | elapsed:    6.4s
[Parallel(n_jobs=-1)]: Done 568 tasks      | elapsed:    7.1s
[Parallel(n_jobs=-1)]: Done 802 tasks      | elapsed:    8.1s
[Parallel(n_jobs=-1)]: Done 1072 tasks      | elapsed:    9.3s
[Parallel(n_jobs=-1)]: Done 1378 tasks      | elapsed:   10.7s
[Parallel(n_jobs=-1)]: Done 1720 tasks      | elapsed:   12.4s
[Parallel(n_jobs=-1)]: Done 2098 tasks      | elapsed:   13.8s
[Parallel(n_jobs=-1)]: Done 2512 tasks      | elapsed:   15.4s
[Parallel(n_jobs=-1)]: Done 2962 tasks      | elapsed:   17.0s
[Parallel(n_jobs=-1)]: Done 3448 tasks      | elapsed:   18.9s
[Parallel(n_jobs=-1)]: Done 3970 tasks      | elapsed:   21.1s
[Parallel(n_jobs=-1)]: Done 4528 tasks      | elapsed:   24.9s
[Parallel(n_jobs=-1)]: Done 5122 tasks      | elapsed:   35.4s
[Parallel(n_jobs=-1)]: Done 5752 tasks      | elapsed:   53.8s
[Parallel(n_jobs=-1)]: Done 6015 out of 6015 | elapsed:   59.1s finished
Builder(root_path=PosixPath('/glade/campaign/cesm/development/espwg/SMYLE/initial_conditions/SMYLE-FOSI'), extension='.nc', depth=0, exclude_patterns=['*/rest/*', '*/logs/*', '*/proc/*'], njobs=-1)
cesm_history_builder.df
component stream date case member_id frequency variables path
0 ice cice.h 0001-01 g.e22.GOMIPECOIAF_JRA-1p4-2018.TL319_g17.SMYLE... 005 month_1 [hi, hs, snowfrac, Tsfc, aice, uvel, vvel, uat... /glade/campaign/cesm/development/espwg/SMYLE/i...
1 ice cice.h 0001-02 g.e22.GOMIPECOIAF_JRA-1p4-2018.TL319_g17.SMYLE... 005 month_1 [hi, hs, snowfrac, Tsfc, aice, uvel, vvel, uat... /glade/campaign/cesm/development/espwg/SMYLE/i...
2 ice cice.h 0001-03 g.e22.GOMIPECOIAF_JRA-1p4-2018.TL319_g17.SMYLE... 005 month_1 [hi, hs, snowfrac, Tsfc, aice, uvel, vvel, uat... /glade/campaign/cesm/development/espwg/SMYLE/i...
3 ice cice.h 0001-04 g.e22.GOMIPECOIAF_JRA-1p4-2018.TL319_g17.SMYLE... 005 month_1 [hi, hs, snowfrac, Tsfc, aice, uvel, vvel, uat... /glade/campaign/cesm/development/espwg/SMYLE/i...
4 ice cice.h 0001-05 g.e22.GOMIPECOIAF_JRA-1p4-2018.TL319_g17.SMYLE... 005 month_1 [hi, hs, snowfrac, Tsfc, aice, uvel, vvel, uat... /glade/campaign/cesm/development/espwg/SMYLE/i...
... ... ... ... ... ... ... ... ...
6010 ocn pop.h.nday1 0368-08-01 g.e22.GOMIPECOIAF_JRA-1p4-2018.TL319_g17.SMYLE... 005 day_1 [SST, SST2, SSS, HMXL_DR_2, XMXL_2] /glade/campaign/cesm/development/espwg/SMYLE/i...
6011 ocn pop.h.nday1 0368-09-01 g.e22.GOMIPECOIAF_JRA-1p4-2018.TL319_g17.SMYLE... 005 day_1 [SST, SST2, SSS, HMXL_DR_2, XMXL_2] /glade/campaign/cesm/development/espwg/SMYLE/i...
6012 ocn pop.h.nday1 0368-10-01 g.e22.GOMIPECOIAF_JRA-1p4-2018.TL319_g17.SMYLE... 005 day_1 [SST, SST2, SSS, HMXL_DR_2, XMXL_2] /glade/campaign/cesm/development/espwg/SMYLE/i...
6013 ocn pop.h.nday1 0368-11-01 g.e22.GOMIPECOIAF_JRA-1p4-2018.TL319_g17.SMYLE... 005 day_1 [SST, SST2, SSS, HMXL_DR_2, XMXL_2] /glade/campaign/cesm/development/espwg/SMYLE/i...
6014 ocn pop.h.nday1 0368-12-01 g.e22.GOMIPECOIAF_JRA-1p4-2018.TL319_g17.SMYLE... 005 day_1 [SST, SST2, SSS, HMXL_DR_2, XMXL_2] /glade/campaign/cesm/development/espwg/SMYLE/i...

6015 rows × 8 columns

Save the catalog locally#

cesm_history_builder.save(
    # File path - could save as .csv (uncompressed csv) or .csv.gz (compressed csv)
    "cesm-hist-smyle-fosi.csv",
    # Column name including filepath
    path_column_name='path',
    # Column name including variables
    variable_column_name='variables',
    # Data file format - could be netcdf or zarr (in this case, netcdf)
    data_format="netcdf",
    # Which attributes to groupby when reading in variables using intake-esm
    groupby_attrs=["component", "stream", "case"],
    # Aggregations which are fed into xarray when reading in data using intake
    aggregations=[
        {
            "type": "join_existing",
            "attribute_name": "date",
            "options": {"dim": "time", "coords": "minimal", "compat": "override"},
        }
    ],
)
Saved catalog location: cesm-hist-smyle-fosi.json and cesm-hist-smyle-fosi.csv

Build the Timeseries Catalog#

cesm_timeseries_builder = Builder(
    "/glade/campaign/cesm/development/espwg/SMYLE/initial_conditions/SMYLE-FOSI",
    exclude_patterns=["*/rest/*", "*/logs/*", "*/hist/*"],
)

Build the catalog using the parse_cesm_timeseries parser#

cesm_timeseries_builder.build(parse_cesm_timeseries)
[Parallel(n_jobs=-1)]: Using backend LokyBackend with 40 concurrent workers.
[Parallel(n_jobs=-1)]: Done   2 out of   4 | elapsed:    0.1s remaining:    0.1s
[Parallel(n_jobs=-1)]: Done   4 out of   4 | elapsed:    0.1s remaining:    0.0s
[Parallel(n_jobs=-1)]: Done   4 out of   4 | elapsed:    0.1s finished
[Parallel(n_jobs=-1)]: Using backend LokyBackend with 40 concurrent workers.
[Parallel(n_jobs=-1)]: Done  84 tasks      | elapsed:    0.3s
[Parallel(n_jobs=-1)]: Done 336 tasks      | elapsed:    1.4s
[Parallel(n_jobs=-1)]: Done 545 out of 624 | elapsed:    2.2s remaining:    0.3s
[Parallel(n_jobs=-1)]: Done 624 out of 624 | elapsed:    2.4s finished
/glade/work/mgrover/git_repos/ecgtools/ecgtools/builder.py:179: UserWarning: Unable to parse 45 assets/files. A list of these assets can be found in `.invalid_assets` attribute.
  self.get_directories().get_filelist()._parse(
Builder(root_path=PosixPath('/glade/campaign/cesm/development/espwg/SMYLE/initial_conditions/SMYLE-FOSI'), extension='.nc', depth=0, exclude_patterns=['*/rest/*', '*/logs/*', '*/hist/*'], njobs=-1)

Save the catalog locally#

cesm_timeseries_builder.save(
    # File path - could save as .csv (uncompressed csv) or .csv.gz (compressed csv)
    "cesm-tseries-smyle-fosi.csv",
    # Column name including filepath
    path_column_name='path',
    # Column name including variables
    variable_column_name='variable',
    # Data file format - could be netcdf or zarr (in this case, netcdf)
    data_format="netcdf",
    # Which attributes to groupby when reading in variables using intake-esm
    groupby_attrs=["component", "stream", "case"],
    # Aggregations which are fed into xarray when reading in data using intake
    aggregations=[
        {"type": "union", "attribute_name": "variable"},
        {
            "type": "join_existing",
            "attribute_name": "time_range",
            "options": {"dim": "time", "coords": "minimal", "compat": "override"},
        },
    ],
)
Saved catalog location: cesm-tseries-smyle-fosi.json and cesm-tseries-smyle-fosi.csv
/glade/scratch/mgrover/ipykernel_197849/2723049127.py:1: UserWarning: Unable to parse 45 assets/files. A list of these assets can be found in invalid_assets_cesm-tseries-smyle-fosi.csv.
  cesm_timeseries_builder.save(