Source code for pycirk.labels

# -*- coding: utf-8 -*-
"""
Created on sat Jan 28 2017

Description: Labelling elements for SUTs and IOTs

Scope: Modelling the Circular Economy in EEIO

@author:Franco Donati
@institution:Leiden University CML
"""

from pandas import DataFrame as df
from pandas import MultiIndex as mi
from pandas import read_csv
from munch import Munch


[docs]class Labels: def __init__(self): self.country_labels = None self.region_labels = None self.product_labels = None self.industry_labels = None self.W_labels = None self.E_labels = None self.R_labels = None self.M_labels = None self.Y_labels = None self.Cr_E_labels = None self.Cr_M_labels = None self.Cr_R_labels = None self.Cr_W_labels = None
[docs] def calc_no_of_something(self, labels): """ A general method to calculate the number of unique entries contained in a series """ return len(labels.unique())
[docs] def list_of_something(self, labels): """ A general method to return a list of unique entries contained in a series """ return labels.unique()
[docs] def get_unique_labels(self, dataframe_of_labels, for_units=True): """ Calculates all unique entries (labels) contained in a dataframe and puts them together with their units and total count of unique entries It returns an object... which is munched, not a pretty solution but it works ok for now. Please consider refactoring in the future """ organize = dict() for keys, labels in dataframe_of_labels.items(): if for_units is True: organize[keys] = self.list_of_something(labels) elif for_units is False: if keys == "unit": organize[keys] = labels else: organize[keys] = self.list_of_something(labels) count = max(len(labels) for keys, labels in organize.items()) organize["count"] = count return Munch(organize)
[docs] def organize_unique_labels(self, directory): labels = self.load_labels(directory) for l, v in labels.items(): labels[l] = Munch(v) labels = Munch(labels) labels.products = self.get_unique_labels(labels.prod) labels.industries = self.get_unique_labels(labels.ind) labels.primary = self.get_unique_labels(labels.primary) labels.fin_dem = self.get_unique_labels(labels.fin_dem) labels.emis = self.get_unique_labels(labels.emis, False) labels.res = self.get_unique_labels(labels.res, False) labels.mat = self.get_unique_labels(labels.mat, False) labels.car_emis = self.get_unique_labels(labels.car_emis, False) labels.car_res = self.get_unique_labels(labels.car_res, False) labels.car_mat = self.get_unique_labels(labels.car_mat, False) labels.car_prim = self.get_unique_labels(labels.car_prim, False) return labels
[docs] def load_labels(self, directory): try: ind = read_csv(directory + "/industry.csv") # with unit column except Exception: ind = None try: prod = read_csv(directory + "/products.csv") # with unit column except Exception: prod = None primary = read_csv(directory + "/factor_inputs.csv") fin_dem = read_csv(directory + "/final_demand.csv") emis = read_csv(directory + "/emissions.csv") res = read_csv(directory + "/resources.csv") mat = read_csv(directory + "/materials.csv") car_emis = read_csv(directory + "/charact_emissions.csv") car_res = read_csv(directory + "/charact_resources.csv") car_mat = read_csv(directory + "/charact_materials.csv") car_prim = read_csv(directory + "/charact_factor_inputs.csv") return {"ind": ind, "prod": prod, "primary": primary, "fin_dem": fin_dem, "emis": emis, "res": res, "mat": mat, "car_emis": car_emis, "car_res": car_res, "car_mat": car_mat, "car_prim": car_prim}
[docs] def get_labels(self, matrix): """ Collects labels from a dataframe """ try: return matrix.index.to_frame(index=False) # print("ind",matrix.index[0]) except Exception: # this exception is here only in the case the multi-index is # as a list or flat strings instead of an actual multi-index # it is not the case with our EXIOBASE database but future #adaptation to include other databases may require it return df(list(matrix.index)).copy()
[docs] def save_labels(self, data, directory): """ saves the labels of the database in the labels directory """ try: self.get_labels(data["V"].T).to_csv(directory + "/industry.csv", index=False) except Exception: pass self.get_labels(data["Cr_E_k"]).to_csv(directory + "/charact_emissions.csv", index=False) self.get_labels(data["Cr_R_k"]).to_csv(directory + "/charact_resources.csv", index=False) self.get_labels(data["Cr_M_k"]).to_csv(directory + "/charact_materials.csv", index=False) self.get_labels(data["Cr_W_k"]).to_csv(directory + "/charact_factor_inputs.csv", index=False) self.get_labels(data["Y"]).to_csv(directory + "/products.csv", index=False) # with unit column self.get_labels(data["Y"].T).to_csv(directory + "/final_demand.csv", index=False) try: self.get_labels(data["W"]).to_csv(directory + "/factor_inputs.csv", index=False) self.get_labels(data["E"]).to_csv(directory + "/emissions.csv", index=False) self.get_labels(data["R"]).to_csv(directory + "/resources.csv", index=False) self.get_labels(data["M"]).to_csv(directory + "/materials.csv", index=False) except Exception: self.get_labels(data["w"]).to_csv(directory + "/factor_inputs.csv", index=False) self.get_labels(data["e"]).to_csv(directory + "/emissions.csv", index=False) self.get_labels(data["r"]).to_csv(directory + "/resources.csv", index=False) self.get_labels(data["m"]).to_csv(directory + "/materials.csv", index=False)
[docs] def relabel_to_save(self, data, trans_method, labels_directory): """ This function makes sure that everything is labeled in IOT tables trans_method = 0 is prod x prod , 1 is ind x ind """ lb = Munch(self.load_labels(labels_directory)) if trans_method in [0, 1]: cat = lb.prod elif trans_method not in [0, 1]: cat = lb.ind data = Munch(data) try: # Relabel Main IOT elements data.Z = self.relabel(data.Z, cat.iloc[:, :4], cat) data.Y = self.relabel(data.Y, lb.fin_dem, cat) data.W = self.relabel(data.W, cat.iloc[:, :4], lb.primary) except Exception: cat = lb.ind prod = lb.prod data.V = self.relabel(data.V, cat, prod) # Labeling final demand extensions' data.EY = self.relabel(data.EY, lb.fin_dem, lb.emis) data.RY = self.relabel(data.RY, lb.fin_dem, lb.res) data.MY = self.relabel(data.MY, lb.fin_dem, lb.mat) # Inter-trans extensions' data.E = self.relabel(data.E, cat, lb.emis) data.R = self.relabel(data.R, cat, lb.res) data.M = self.relabel(data.M, cat, lb.mat) # Relabel characterization tables data.Cr_E_k = self.relabel(data.Cr_E_k, lb.emis, lb.car_emis) data.Cr_R_k = self.relabel(data.Cr_R_k, lb.res, lb.car_res) data.Cr_M_k = self.relabel(data.Cr_M_k, lb.mat, lb.car_mat) data.Cr_W_k = self.relabel(data.Cr_W_k, lb.primary, lb.car_prim) return data
[docs] def apply_labels(self, matrix, labels, axis=0): """ Applies labels to a dataframe axis = 0 => Index axis = 1 => columns """ if axis == 0: # apply index matrix.index = mi.from_arrays(labels.values.T) matrix.index.names = labels.columns elif axis == 1: # collects columns matrix.columns = mi.from_arrays(labels.values.T) matrix.columns.names = labels.columns return matrix
[docs] def relabel(self, M, column_labels, index_labels): """ Processes apply_labels and apply _names together """ M = df(M) try: M = self.apply_labels(M, column_labels, axis=1) # columns except Exception: # in case a string is passed for column label for a vector M.columns = [column_labels] return self.apply_labels(M, index_labels, axis=0) # index
[docs] def identify_labels(self, M_name): """ A method to understand what type of labels are being handled depending on the name of the matrix in dataframe type that is being passed """ # identifying colum and index labels if self.country_labels is None: reg_labels = self.region_labels elif self.country_labels is not None: reg_labels = self.country_labels if "Y" in M_name: column_labels = self.Y_labels row_labels = self.product_labels else: column_labels = self.product_labels row_labels = self.product_labels if M_name in ["V", "U", "S", "D"]: column_labels = self.industry_labels name = "" if "Cr" in M_name: name = "Cr_" M_name = M_name[2:] if any(True for l in M_name.lower() if l in ["e", "m", "r", "w"]): name_2 = [l for l in ["e", "m", "r", "w"] if l in M_name.lower()][0].upper() attr_name = name + name_2 + "_labels" row_labels = eval("self." + attr_name) no_row_labs = row_labels.count no_reg_labs = len(reg_labels) no_col_labs = column_labels.count return {"reg_labels": reg_labels, "g_labels": column_labels, "i_labels": row_labels, "no_i": no_row_labs, "no_g": no_col_labs, "no_reg": no_reg_labs}