import io
import logging
import numpy as np
import os
import pandas as pd
import re
from datetime import datetime as datetime
from jinja2 import Template
from numpy import array
from pathlib import Path
from tabulate import tabulate
from tqdm import tqdm
[docs]class DATAOUTPUT():
"""This class will provide different methods for data output from climate dataframes
Args:
logger (str): A pointer to an initialized Argparse logger
data_source (str): The climate database where the values are being extracted from: SILO or NASAPOWER
Returns:
DATAOUTPUT: A class object with access to DATAOUTPUT methods
"""
def __init__(self, data_source):
# Setup logging
# We need to pass the "logger" to any Classes or Modules that may use it
# in our script
try:
import coloredlogs
logger = logging.getLogger('POPBEAST.DATAOUTPUT')
if 'bestiapop' in __name__:
coloredlogs.install(fmt='%(asctime)s - %(name)s - %(message)s', level="WARNING", logger=logger)
else:
coloredlogs.install(fmt='%(asctime)s - %(name)s - %(message)s', level="DEBUG", logger=logger)
except ModuleNotFoundError:
logger = logging.getLogger('POPBEAST.DATAOUTPUT')
formatter = logging.Formatter('%(asctime)s - %(name)s - %(message)s')
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
console_handler.setLevel(logging.DEBUG)
logger.addHandler(console_handler)
if 'bestiapop' in __name__:
logger.setLevel(logging.WARNING)
else:
logger.setLevel(logging.INFO)
# Setting up class variables
self.logger = logger
self.data_source = data_source
if 'bestiapop' in __name__:
self.tqdm_enabled = True
else:
self.tqdm_enabled = False
[docs] def generate_output(self, final_daily_df, lat_range, lon_range, outputdir=None, output_type="met"):
"""Generate required Output based on Output Type selected
Args:
final_daily_df (pandas.core.frame.DataFrame): the pandas daframe containing all the values that are going to be parsed into a specific output
lat_range (numpy.ndarray): an array of latitude values to select from the final_daily_df
lon_range (numpy.ndarray): an array of longitude values to select from the final_daily_df
outputdir (str): the folder that will be used to store the output files
output_type (str, optional): the output type: csv (not implemented yet), json(not implemented yet), met. Defaults to "met".
"""
# Determine the variable that has the highest range so we can
# benefit from parallel processing when active, based on the
# variable that can be allocated the highest ammount of cores
if len(lat_range) > len(lon_range):
primary_var_desc = "lat"
secondary_var_desc = "lon"
primary_var = lat_range
secondary_var = lon_range
elif len(lat_range) < len(lon_range):
primary_var_desc = "lon"
secondary_var_desc = "lat"
primary_var = lon_range
secondary_var = lat_range
else:
# by default, let's leave "lat" as the primary var
primary_var_desc = "lat"
secondary_var_desc = "lon"
primary_var = lat_range
secondary_var = lon_range
if output_type == "stdout":
# Rename df columns and sort them
final_daily_df = final_daily_df.rename(columns={"days": "day","daily_rain": "rain",'min_temp':'mint','max_temp':'maxt','radiation':'radn'})
final_daily_df = final_daily_df.groupby(['lon', 'lat', 'year', 'day'])[['radn', 'maxt', 'mint', 'rain']].sum().reset_index()
for primary_data_point in tqdm(primary_var, ascii=True, desc=primary_var_desc, disable=self.tqdm_enabled):
for secondary_data_point in tqdm(secondary_var, ascii=True, desc=secondary_var_desc, disable=self.tqdm_enabled):
if primary_var_desc == "lat":
lat = primary_data_point
lon = secondary_data_point
elif primary_var_desc == "lon":
lon = primary_data_point
lat = secondary_data_point
coordinate_slice_df = final_daily_df[(final_daily_df.lon == lon) & (final_daily_df.lat == lat)]
# We shall output the plain final DataFrame to stdout using tabulate
print("\n")
print(tabulate(
coordinate_slice_df,
headers=coordinate_slice_df.keys(),
tablefmt='psql',
numalign='right',
stralign='right',
showindex=False))
print("\n")
if output_type == "met":
# Rename variables
# Check if final df is empty, if so, then return and do not proceed with the rest of the file
if final_daily_df.empty == True:
self.logger.error("No data in final dataframe. No file can be generated. Exiting...")
return
try:
# Rename df columns and sort them to match order expected by MET
final_daily_df = final_daily_df.rename(columns={"days": "day","daily_rain": "rain",'min_temp':'mint','max_temp':'maxt','radiation':'radn'})
final_daily_df = final_daily_df.groupby(['lon', 'lat', 'year', 'day'])[['radn', 'maxt', 'mint', 'rain']].sum().reset_index()
self.logger.info("Proceeding to the generation of MET files")
for primary_data_point in tqdm(primary_var, ascii=True, desc=primary_var_desc, disable=self.tqdm_enabled):
for secondary_data_point in tqdm(secondary_var, ascii=True, desc=secondary_var_desc, disable=self.tqdm_enabled):
if primary_var_desc == "lat":
lat = primary_data_point
lon = secondary_data_point
elif primary_var_desc == "lon":
lon = primary_data_point
lat = secondary_data_point
coordinate_slice_df = final_daily_df[(final_daily_df.lon == lon) & (final_daily_df.lat == lat)]
del coordinate_slice_df['lat']
del coordinate_slice_df['lon']
self.generate_met(outputdir, coordinate_slice_df, lat, lon)
# Delete unused df
del coordinate_slice_df
except KeyError as e:
self.logger.error("Could not find all required climate variables to generate MET: {}".format(str(e)))
if output_type == "wth":
# Rename variables
# Check if final df is empty, if so, then return and do not proceed with the rest of the file
if final_daily_df.empty == True:
self.logger.error("No data in final dataframe. No file can be generated. Exiting...")
return
try:
# Rename df columns and sort them to match order expected by DSSAT
final_daily_df = final_daily_df.rename(columns={"days": "day","daily_rain": "rain",'min_temp':'mint','max_temp':'maxt','radiation':'radn'})
final_daily_df = final_daily_df.groupby(['lon', 'lat', 'year', 'day'])[['radn', 'maxt', 'mint', 'rain']].sum().reset_index()
# Let's generate DSSAT Year+JulianDay time format
# Creating pandas series with last two digits of the year
dssat_year_series = final_daily_df.year.apply(lambda x: str(x)[2:])
# Creating pandas series with julian days with leading zeroes up to two spaces
dssat_julian_day_series = np.char.zfill(final_daily_df.day.apply(str).to_list(), 3)
# Add DSSAT julian day values as first column
final_daily_df.insert(0, 'dssatday', dssat_year_series + dssat_julian_day_series)
self.logger.info("Proceeding to the generation of WTH files")
for primary_data_point in tqdm(primary_var, ascii=True, desc=primary_var_desc, disable=self.tqdm_enabled):
for secondary_data_point in tqdm(secondary_var, ascii=True, desc=secondary_var_desc, disable=self.tqdm_enabled):
if primary_var_desc == "lat":
lat = primary_data_point
lon = secondary_data_point
elif primary_var_desc == "lon":
lon = primary_data_point
lat = secondary_data_point
coordinate_slice_df = final_daily_df[(final_daily_df.lon == lon) & (final_daily_df.lat == lat)]
del coordinate_slice_df['lat']
del coordinate_slice_df['lon']
self.generate_wth(outputdir, coordinate_slice_df, lat, lon)
# Delete unused df
del coordinate_slice_df
except KeyError as e:
self.logger.error("Could not find all required climate variables to generate WTH file: {}".format(str(e)))
if output_type == "dataframe":
try:
# Rename df columns and sort them
final_daily_df = final_daily_df.rename(columns={"days": "day","daily_rain": "rain",'min_temp':'mint','max_temp':'maxt','radiation':'radn'})
final_daily_df = final_daily_df.groupby(['lon', 'lat', 'year', 'day'])[['radn', 'maxt', 'mint', 'rain']].sum().reset_index()
return final_daily_df
except Exception as e:
self.logger.error(e)
if output_type == "csv":
# TODO: Clean this up...
# let's build the name of the file based on the value of lat/lon combinations
# followed by the climate data source used (SILO or NASA POWER)
if outputdir.is_dir() == True:
try:
# Rename df columns and sort them
final_daily_df = final_daily_df.rename(columns={"days": "day","daily_rain": "rain",'min_temp':'mint','max_temp':'maxt','radiation':'radn'})
final_daily_df = final_daily_df.groupby(['lon', 'lat', 'year', 'day'])[['radn', 'maxt', 'mint', 'rain']].sum().reset_index()
for primary_data_point in tqdm(primary_var, ascii=True, desc=primary_var_desc, disable=self.tqdm_enabled):
for secondary_data_point in tqdm(secondary_var, ascii=True, desc=secondary_var_desc, disable=self.tqdm_enabled):
if primary_var_desc == "lat":
lat = primary_data_point
lon = secondary_data_point
elif primary_var_desc == "lon":
lon = primary_data_point
lat = secondary_data_point
coordinate_slice_df = final_daily_df[(final_daily_df.lon == lon) & (final_daily_df.lat == lat)]
# Let's create a CSV for each lat/lon combination
csv_file_name = '{}-{}.{}.csv'.format(lat, lon, self.data_source)
full_output_path = outputdir/csv_file_name
self.logger.debug('Writting CSV file {} to {}'.format(csv_file_name, full_output_path))
coordinate_slice_df.to_csv(full_output_path, sep=',', index=False, mode='a', float_format='%.2f')
# Let's also create a CSV containing all the datapoints
csv_file_name = 'bestiapop-beastly-dataframe.csv'
full_output_path = outputdir/csv_file_name
self.logger.debug('Writting BEAST DATAFRAME :) CSV file {} to {}'.format(csv_file_name, full_output_path))
final_daily_df.to_csv(full_output_path, sep=',', na_rep=np.nan, index=False, mode='w', float_format='%.2f')
except Exception as e:
self.logger.error(e)
[docs] def generate_met(self, outputdir, met_dataframe, lat, lon):
"""Generate APSIM MET File
Args:
outputdir (str): the folder where the generated MET files will be stored
met_dataframe (pandas.core.frame.DataFrame): the pandas dataframe slice to convert to MET file
lat (float): the latitude for which this MET file is being generated
lon (float): the longitude for which this MET file is being generated
"""
# Creating final MET file
# Setting up Jinja2 Template for final MET file if required
# Text alignment looks weird here but it must be left this way for proper output
met_file_j2_template = '''[weather.met.weather]
!station number={{ lat }}-{{ lon }}
!This climate file was created by BestiaPop on {{ current_date }} - Taming the Climate Beast
!Check our docs in https://bestiapop.readthedocs.io/en/latest/
!Source: {{ data_source }}
!Date period from: {{ year_from }} to {{ year_to }}
Latitude={{ lat }}
Longitude={{ lon }}
tav={{ tav }}
amp={{ amp }}
year day radn maxt mint rain
() () (MJ^m2) (oC) (oC) (mm)
{{ vardata }}
'''
j2_template = Template(met_file_j2_template)
# Initialize a string buffer to receive the output of df.to_csv in-memory
df_output_buffer = io.StringIO()
# Save data to a buffer (same as with a regular file but in-memory):
met_dataframe.to_csv(df_output_buffer, sep=" ", header=False, na_rep="NaN", index=False, mode='w', float_format='%.1f')
# Get values from buffer
# Go back to position 0 to read from buffer
# Replace get rid of carriage return or it will add an extra new line between lines
df_output_buffer.seek(0)
met_df_text_output = df_output_buffer.getvalue()
met_df_text_output = met_df_text_output.replace("\r\n", "\n")
# Calculate here the tav, amp values
# Calculate amp
# Get the months as a column
met_dataframe.loc[:, 'cte'] = 1997364
met_dataframe.loc[:, 'day2'] = met_dataframe['day'] + met_dataframe['cte']
met_dataframe.loc[:, 'date'] = (pd.to_datetime((met_dataframe.day2 // 1000)) + pd.to_timedelta(met_dataframe.day2 % 1000, unit='D'))
met_dataframe.loc[:, 'month'] = met_dataframe.date.dt.month
month=met_dataframe.loc[:, 'month']
met_dataframe.loc[:, 'tmean'] = met_dataframe[['maxt', 'mint']].mean(axis=1)
tmeanbymonth = met_dataframe.groupby(month)[["tmean"]].mean()
maxmaxtbymonth = tmeanbymonth['tmean'].max()
minmaxtbymonth = tmeanbymonth['tmean'].min()
amp = np.round((maxmaxtbymonth-minmaxtbymonth), decimals=5)
# Calculate tav
tav = tmeanbymonth.mean().tmean.round(decimals=5)
# Configure some header variables
current_date = datetime.now().strftime("%d%m%Y")
year_from = met_dataframe.year.min()
year_to = met_dataframe.year.max()
if self.data_source == "silo":
data_source="SILO (Scientific Information for Land Owners) (https://www.longpaddock.qld.gov.au/silo/)"
elif self.data_source == "nasapower":
data_source="NASA POWER (https://power.larc.nasa.gov/)"
# Delete df
del met_dataframe
in_memory_met = j2_template.render(
lat=lat,
lon=lon,
tav=tav,
amp=amp,
data_source=data_source,
current_date=current_date,
year_from=year_from,
year_to=year_to,
vardata=met_df_text_output
)
df_output_buffer.close()
full_output_path = outputdir/'{}-{}.met'.format(lat, lon)
with open(full_output_path, 'w+') as f:
self.logger.info('Writting MET file {}'.format(full_output_path))
f.write(in_memory_met)
[docs] def generate_wth(self, outputdir, wth_dataframe, lat, lon):
"""Generate WTH File
Args:
outputdir (str): the folder where the generated WTH files will be stored
wth_dataframe (pandas.core.frame.DataFrame): the pandas dataframe slice to convert to WTH file
lat (float): the latitude for which this WTH file is being generated
lon (float): the longitude for which this WTH file is being generated
"""
# Creating final WTH file
# Setting up Jinja2 Template for final WTH file if required
# Text alignment looks weird here but it must be left this way for proper output
wth_file_j2_template = '''*WEATHER DATA : {{ lat }}-{{ lon }}
{{ wth_header }}
{{ vardata }}
'''
j2_template = Template(wth_file_j2_template)
# Initialize a string buffer to receive the output of df.to_csv in-memory
df_output_buffer = io.StringIO()
# Save data to a buffer (same as with a regular file but in-memory):
# Make a copy of the original dataframe so as to remove unnecessary values for the WTH file
# but to leave the values required to calculate TAV and AMP
wth_df_2 = wth_dataframe.copy()
# remove year but first capture it for output file name
del wth_df_2['year']
# remove day
del wth_df_2['day']
# rename columns to match expected values in preparation for "tabulate" and right alignment
wth_df_2 = wth_df_2.rename(columns={'dssatday':'@DATE', 'rain':'RAIN', 'mint':'TMIN', 'maxt':'TMAX', 'radn':'SRAD'})
wth_var_data_ascii = tabulate(
wth_df_2.set_index('@DATE'),
tablefmt='plain',
numalign='right',
stralign='right',
headers=wth_df_2.columns.values) # Add this for float equalization if required --> floatfmt=['.2f' for x in wth_df_2.columns]
df_output_buffer.write(wth_var_data_ascii)
# delete df copy
del wth_df_2
# Get values from buffer
# Go back to position 0 to read from buffer
# Replace get rid of carriage return or it will add an extra new line between lines
df_output_buffer.seek(0)
wth_df_text_output = df_output_buffer.getvalue()
# Get rid of Tabulate's annoying double-space padding
wth_df_text_output = re.sub(r'^\s\s', '', wth_df_text_output)
wth_df_text_output = re.sub(r'\n\s\s', '\n', wth_df_text_output)
# Calculate here the tav, amp values
# Calculate amp
# Get the months as a column
wth_dataframe.loc[:, 'cte'] = 1997364
wth_dataframe.loc[:, 'day2'] = wth_dataframe['day'] + wth_dataframe['cte']
wth_dataframe.loc[:, 'date'] = (pd.to_datetime((wth_dataframe.day2 // 1000)) + pd.to_timedelta(wth_dataframe.day2 % 1000, unit='D'))
wth_dataframe.loc[:, 'month'] = wth_dataframe.date.dt.month
month=wth_dataframe.loc[:, 'month']
wth_dataframe.loc[:, 'tmean'] = wth_dataframe[['maxt', 'mint']].mean(axis=1)
tmeanbymonth = wth_dataframe.groupby(month)[["tmean"]].mean()
maxmaxtbymonth = tmeanbymonth['tmean'].max()
minmaxtbymonth = tmeanbymonth['tmean'].min()
amp = np.round((maxmaxtbymonth-minmaxtbymonth), decimals=1)
# Calculate tav
tav = tmeanbymonth.mean().tmean.round(decimals=1)
# Create WTH Header values
# We don't have elevation?
elev = -99
wth_header_dict = {
'@ INSI': 'BPOP',
'LAT': [lat],
'LONG': [lon],
'ELEV': [elev],
'TAV': [tav],
'AMP': [amp],
'REFHT': [-99],
'WNDHT': [-99],
}
wth_dssat_header = pd.DataFrame(wth_header_dict)
wth_header = tabulate(
wth_dssat_header.set_index('@ INSI'),
tablefmt='plain',
numalign='right',
stralign='right',
headers=wth_dssat_header.columns.values,
floatfmt=('', '.2f', '.2f', '.1f', '.1f', '.1f', '.1f', '.1f')
)
# Get rid of Tabulate's annoying double-space padding
wth_header = re.sub(r"^\s\s", "", wth_header)
wth_header = re.sub(r"\n\s\s", "\n", wth_header)
# Get required values to configure WTH file name as per DSSAT convention
flat = str(lat).replace(".", "")
flon = str(lon).replace(".", "")
fyear_array = wth_dataframe['dssatday'].apply(lambda x: int(str(x)[:2:])).unique()
fyear = fyear_array[0]
fyear_len = len(fyear_array)
# Delete df
del wth_dataframe
in_memory_dssat = j2_template.render(
lat=lat,
lon=lon,
wth_header=wth_header,
vardata=wth_df_text_output)
df_output_buffer.close()
full_output_path = outputdir/'{}{}{}{}.WTH'.format(flat, flon, fyear, fyear_len)
with open(full_output_path, 'w+') as f:
self.logger.info('Writting WTH file {}'.format(full_output_path))
f.write(in_memory_dssat)