Source code for ska_sdp_exec_swiftly.generate_hdf5

#!/usr/bin/env python
# coding: utf-8
# pylint: disable=unused-argument, too-many-arguments, too-many-locals
"""
Small script for generating hdf5 test files, including FG, G
"""
import argparse
import logging
import os
import sys

import h5py
import numpy
from distributed import Lock

from .dask_wrapper import dask_wrapper, set_up_dask, tear_down_dask
from .fourier_transform import make_subgrid_from_sources

log = logging.getLogger("fourier-data-generater-logger")
log.setLevel(logging.INFO)
log.addHandler(logging.StreamHandler(sys.stdout))


[docs] @dask_wrapper def direct_ft_chunk_work( G_2_path, chunk_slice, sources, chunksize, N, use_dask=False, **kwargs ): """ Calculate the value of a chunk of direct fourier transform and write to hdf5 :param G_2_path: the hdf5 file path of G :param chunk_slice: slice of hdf5 chunk :param sources: sources array :param chunksize: size of chunk :param N: whole data size """ offs = [s.start - N // 2 + chunksize // 2 for s in chunk_slice] chunk_G = make_subgrid_from_sources(sources, N, chunksize, offs) # lock if use_dask: lock = Lock(G_2_path) lock.acquire() with h5py.File(G_2_path, "r+") as f: dataset = f["G_data"] dataset[chunk_slice[0], chunk_slice[1]] = chunk_G / (N * N) if use_dask: lock.release()
[docs] def generate_data_hdf5( npixel, G_2_path, FG_2_path, chunksize_G, chunksize_FG, client=None ): """ Generate standard data G and FG with hdf5 :param sparse_ft_class: StreamingDistributedFFT class object :param G_2_path: the hdf5 file path of G :param FG_2_path: the hdf5 file path of FG :param chunksize: size of chunk :param client: dask client :returns: G_2_path, FG_2_path """ if not os.path.exists(FG_2_path): source_count = 10 sources = numpy.array( [ ( numpy.random.rand() * npixel * npixel / numpy.sqrt(source_count) / 2, numpy.random.randint(-npixel // 2, npixel // 2 - 1), numpy.random.randint(-npixel // 2, npixel // 2 - 1), ) for _ in range(source_count) ] ) f = h5py.File(FG_2_path, "w") FG_dataset = f.create_dataset( "FG_data", (npixel, npixel), dtype="complex128", chunks=(chunksize_FG, chunksize_FG), ) # write data point by point for i, y, x in sources: FG_dataset[int(y) + npixel // 2, int(x) + npixel // 2] += ( i / npixel / npixel ) f.close() if client is None: use_dask = False else: use_dask = True if not os.path.exists(G_2_path): # create a empty hdf5 file f = h5py.File(G_2_path, "w") G_dataset = f.create_dataset( "G_data", (npixel, npixel), dtype="complex128", chunks=(chunksize_G, chunksize_G), ) chunk_list = [] for chunk_slice in G_dataset.iter_chunks(): chunk_list.append( direct_ft_chunk_work( G_2_path, chunk_slice, sources, chunksize_G, npixel, use_dask=use_dask, nout=1, ) ) f.close() if use_dask: chunk_list = client.compute(chunk_list, sync=True) return G_2_path, FG_2_path
[docs] def cli_parser(): """ Parse command line arguments :return: argparse """ parser = argparse.ArgumentParser( description="generate G and FG hdf5 file for test", fromfile_prefix_chars="@", ) parser.add_argument( "--N", type=int, default=1024, help="hdf5 chunksize for G" ) parser.add_argument( "--hdf5_chunksize_G", type=int, default=256, help="hdf5 chunksize for G", ) parser.add_argument( "--hdf5_chunksize_FG", type=int, default=256, help="hdf5 chunksize for FG", ) parser.add_argument( "--hdf5_prefix", type=str, default="./", help="hdf5 path prefix" ) return parser
[docs] def main(args): """ Main function to generate G and FG hdf5 file The hdf5 file naming follows the format of G/FG_N_chunksize.h5 """ # Fixing seed of numpy random numpy.random.seed(123456789) scheduler = os.environ.get("DASK_SCHEDULER", None) log.info("Scheduler: %s", scheduler) dask_client = set_up_dask(scheduler_address=scheduler) generate_data_hdf5( npixel=args.N, G_2_path=f"{args.hdf5_prefix}/\ G_{args.N}_{args.hdf5_chunksize_G}.h5", FG_2_path=f"{args.hdf5_prefix}/\ FG_{args.N}_{args.hdf5_chunksize_FG}.h5", chunksize_G=args.hdf5_chunksize_G, chunksize_FG=args.hdf5_chunksize_FG, client=dask_client, ) tear_down_dask(dask_client)
if __name__ == "__main__": parser_args = cli_parser().parse_args() main(parser_args)