# -*- coding: utf-8 -*-
"""
Created on Fri Nov 25 12:13:29 2016
Description: Reading policy values and modifying matrices for scenarios
Scope: Modelling the Circular Economy in EEIO
@author: Franco Donati
@institution: Leiden University CML
"""
import pandas as pd
import numpy as np
from pycirk.positions import make_coord_array as coord
from pycirk.positions import single_position as sing_pos
from pycirk.fundamental_operations import Operations as ops
from copy import deepcopy
import warnings
[docs]def make_counterfactuals(data, scen_no, scen_file, labels):
"""
Calculate all the counterfactual IO matrices
Parameters
----------
data : obj
An object containing all necessary matrices of the IO system
scen_no : int
the identification number of the scenario to reference in scen_file
scen_file : str
the directory where the scenarios.xlsx file is store
labels : obj
an object containing all labels for the IO matrices
Outputs
-------
An object contaning a mofified IO system
"""
# set basic data and variables
data = deepcopy(data)
x_ = ops.IOT.x(data.Z, data.Y)
diag_x_ = np.diag(x_)
inv_diag_x_ = ops.inv(diag_x_)
diag_yj = np.diag(data.Y.sum(axis=0))
inv_diag_yj = ops.inv(diag_yj)
w = ops.IOT.B(data.W, inv_diag_x_)
e = ops.IOT.B(data.E, inv_diag_x_)
r = ops.IOT.B(data.R, inv_diag_x_)
m = ops.IOT.B(data.M, inv_diag_x_)
eY = ops.IOT.bY(data.EY, inv_diag_yj)
rY = ops.IOT.bY(data.RY, inv_diag_yj)
mY = ops.IOT.bY(data.MY, inv_diag_yj)
# Apply policy to economic matrices
data.Z = counterfactual(scen_file, scen_no, data.Z, "Z", labels)
inv_diag_x_int = np.diag(ops.inv(ops.IOT.x(data.Z, data.Y)))
A = ops.IOT.A(data.Z, inv_diag_x_int)
data.A = counterfactual(scen_file, scen_no, A, "A", labels)
data.Y = counterfactual(scen_file, scen_no, data.Y, "Y", labels)
L = ops.IOT.L(data.A)
x_new = ops.IOT.x_IAy(L, data.Y.sum(1))
diag_x_new = np.diag(x_new)
diag_yj_new = np.diag(data.Y.sum(axis=0))
# Apply policy to intermediate extension intensities
data.w = counterfactual(scen_file, scen_no, w, "w", labels)
data.e = counterfactual(scen_file, scen_no, e, "e", labels)
data.r = counterfactual(scen_file, scen_no, r, "r", labels)
data.m = counterfactual(scen_file, scen_no, m, "m", labels)
# Apply policy to final demand extension intensities
data.eY = counterfactual(scen_file, scen_no, eY, "eY", labels)
data.rY = counterfactual(scen_file, scen_no, rY, "rY", labels)
data.mY = counterfactual(scen_file, scen_no, mY, "mY", labels)
data.Z = ops.IOT.Z(data.A, diag_x_new)
data.W = ops.IOT.R(data.w, diag_x_new) # primary inputs coef
data.E = ops.IOT.R(data.e, diag_x_new) # emissions ext coef
data.R = ops.IOT.R(data.r, diag_x_new) # resource ext coef
data.M = ops.IOT.R(data.m, diag_x_new) # material ext coef
data.EY = ops.IOT.RY(data.eY, diag_yj_new) # emissions ext FD coef
data.RY = ops.IOT.RY(data.rY, diag_yj_new) # resource ext FD coef
data.MY = ops.IOT.RY(data.mY, diag_yj_new) # material ext FD coef
data.W = counterfactual(scen_file, scen_no, data.W, "W", labels)
# Apply policy to intermediate extension coefficient matrices
data.E = counterfactual(scen_file, scen_no, data.E, "E", labels)
data.R = counterfactual(scen_file, scen_no, data.R, "R", labels)
data.M = counterfactual(scen_file, scen_no, data.M, "M", labels)
# Apply policy to final demand extension coefficient matrices
data.EY = counterfactual(scen_file, scen_no, data.EY, "EY", labels)
data.RY = counterfactual(scen_file, scen_no, data.RY, "RY", labels)
data.MY = counterfactual(scen_file, scen_no, data.MY, "MY", labels)
# print((1-np.sum(x_)/np.sum(x_new))*100)
return data
[docs]def counterfactual(scen_file, scen_no, M, M_name, labels):
"""
Separates changes by matrix subject to intervention and apply them on a
specific matrix
Parameters
----------
scen_file: str
directory of the file in which the scenarios are specified
scen_no : int
specific scenario e.g "1" or "scenario_1"
M : numpy.array
matrix affected by the policies
M_name : str
matrix name as diplayed under sheet_name["matrix"]
labels : obj
matrix labels
Output
------
A numpy array modified according to the specified changes in the scenario file
"""
if type(scen_no) is int:
scen_no = "scenario_" + str(scen_no)
elif scen_no.startswith("scenario_"):
pass
else:
raise KeyError("only integer or explicit name (scenario_x)" +
"are allowed")
scenario = pd.read_excel(scen_file, sheet_name=scen_no, header=1, index=None)
if scenario["matrix"].isnull().values.all():
matrix = M
warnings.warn("\n\nA scenario sheet was found but no parameters" +
"were set, please check: \n\n file: scenarios.xls" +
"\n sheet: " + scen_no + "\n\n")
else:
filtered_changes = scenario.loc[scenario['matrix'] == M_name]
matrix = make_new(filtered_changes, M, M_name, labels)
return matrix
[docs]def basic_mult(ide, a, kt, kp):
"""
Policy intervention
It may be a primary intervention or an acillary action.
Parameters
----------
a: numpy.array
a supply chain or a point in it subject to a change
kt: float
technical coefficient (max achievable technically)
kp: float
penetration coefficient (level of market penet. of the policy)
ide: int
identification number of the intervention in case of missing information
Output
------
A a numpy.array of the same order/shape of a
"""
if np.isnan(kt):
d = a
elif np.isnan(kp):
raise ValueError("please specify penetration coefficient for" +
" technical change - Policy identifier "
+ str(ide))
else:
kt = kt * 1e-2
kp = kp * 1e-2
totk = 1 - -kt * kp
d = a * totk
d = np.nan_to_num(d)
return d
[docs]def basic_add(a, at):
"""
Adds values together
Parameters
----------
a : numpy.array
at : numpy array
"""
if np.isnan(at):
at = 0
a += at
return(a)
[docs]def substitution(d, s, fx_kp):
"""
Moves the value from one or multiple cells (in the same row or column)
Substitution: Material subsitution or certain types of rebound effects
If the size of the array of the original value is different from that
of the destination (substituted), we obtain the total of the value to
be substituted and the substitution is implemented by dividing the tot
by the number of elements on the destination array and then added to
the destination array (equally distributed)
Parameters
----------
d : numpy.array
transaction with which we are substituting
s : numpy.array
original transaction that was subject to changes
(the transactions from which the value is coming from)
fx_kp : float
relative size of c that is added on the transaction to expand d
Output
------
A numpy.array of modified d
"""
fx_kp = fx_kp * 1e-2
if d.shape != s.shape: # checking whether we need to distribute values
mask = (d == 0)
no_non_zeros = np.count_nonzero(d)
d[~mask] = d[~mask] + (np.sum(s.sum())/no_non_zeros) * fx_kp
else:
d = d + np.array(s) * fx_kp
return d
[docs]def counterfactual_engine(M, inter_sets, subs=False, copy=False):
"""
This function allows for the proccessing of the specified interventions
onto a selected matrix. It calls various functions to modify the values
in a specified matrix.
Parameters
----------
M : numpy.array
matrix of reference
inter_sets: dict
contains all specfication concerning the changes to be applied
(intervention sets)
subs : bool
If True it will call the subsitution function according to
specifications in scenarios.xlsx
copy : bool
if True it will copy value from one part of the matrix to another
according to specifications in scenarios.xlsx
Others
-------
i : index coordinate
g : column coordinate
ide : intervention identification number
kt : technical change coefficient
kp : market penetration coefficient
fx_kp : market penetration coeffient applicable to substitution
expan : expansion coef. (used only for simple transaction changes)
Output
------
A numpy.array of the modified matrix
"""
ide = inter_sets["ide"]
i = inter_sets["i"] # row
g = inter_sets["g"] # columns
a = M[np.ix_(i, g)]
if copy is True:
i1 = inter_sets["i1"]
g1 = inter_sets["g1"]
d = M[np.ix_(i1, g1)]
if np.isnan(inter_sets["swk"]):
raise ValueError(f"I can't copy the values. You forgot to add the weighing factor for identifier no: {ide}")
else:
M[np.ix_(i1, g1)] = d + (a * inter_sets["swk"]*1e-2)
else:
int1 = inter_sets["kt1"]
int1 = basic_mult(ide, a, int1["kt"], int1["kp"])
int2 = inter_sets["kt2"]
int2 = basic_mult(ide, int1, int2["kt"], int2["kp"])
int3 = inter_sets["at1"]
int3 = basic_add(int2, int3)
int4 = inter_sets["at2"]
int4 = basic_add(int3, int4)
M[np.ix_(i, g)] = int4
if subs is True:
i1 = inter_sets["i1"]
g1 = inter_sets["g1"]
sub = {"1": inter_sets["sk1"],
"2": inter_sets["sk2"],
"3": inter_sets["sk3"],
"4": inter_sets["sk4"]
}
for key, value in sub.items():
if value == "x":
if key == "1":
ref = a # if it is the first sub then it will look
# at the difference b/ the original value and 1st inter
else:
key = int(key) - 1
ref = eval("int" + str(key))
s = ref - eval("int" + str(key))
d = M[np.ix_(i1, g1)]
if np.isnan(inter_sets["swk"]):
raise ValueError(f"I can't substitute the values. You forgot to add the weighing factor for identifier no: {ide}")
else:
hey = substitution(d, s, inter_sets["swk"])
M[np.ix_(i1, g1)] = hey
return M
[docs]def make_new(filtered_changes, M, M_name, labels):
"""
Organizes the data concerning the changes and calls the functions
to modified matrices based on specied scenarios
Parameters
----------
filterd_changes: pandas.DataFrame
A table filtered by matrix name containing all changes to be applied
M : numpy.array
matrix on which to implement the changes
M_name : str
nomenclature referring to the matrix to be changed
labels: obj
object containing all matrix labels
Others
------
g is any column in the matrix
i is any row row in the matrix
Output
------
A numpy.array of the processed matrix
"""
M = np.array(M)
spec_labels = labels.identify_labels(M_name)
reg_labels = spec_labels["reg_labels"]
row_labels = spec_labels["i_labels"]
column_labels = spec_labels["g_labels"]
no_row_labs = spec_labels["no_i"]
no_col_labs = spec_labels["no_g"]
no_reg_labs = spec_labels["no_reg"]
if len(filtered_changes) == 0:
return (M)
else:
for l, entry in filtered_changes.iterrows():
try:
change_type = entry.change_type
ide = entry.identifier # used during debugging
# Collecting the specified coordinates for the intevention
# coordinates for region and category
# Row items (i) => Supplied category or extension category
reg_o = sing_pos(entry.reg_o, reg_labels)
cat_o = sing_pos(entry.cat_o, row_labels)
# Column items (g) => Consumption / manufacturing activity
reg_d = sing_pos(entry.reg_d, reg_labels)
cat_d = sing_pos(entry.cat_d, column_labels)
# Identify coordinates
orig_coor = coord(cat_o, reg_o, no_reg_labs, no_row_labs)
dest_coor = coord(cat_d, reg_d, no_reg_labs, no_col_labs)
# organize main changes
kt1 = {"kt": entry.kt1, "kp": entry.kp1}
kt2 = {"kt": entry.kt2, "kp": entry.kp2}
intervention = {"change_type": change_type,
"ide": ide,
"i": orig_coor,
"g": dest_coor,
"kt1": kt1,
"kt2": kt2,
"at1": entry.at1,
"at2": entry.at2,
}
substitution = False
copy = False
# the following is only relevant for susbtitution
if "x" in [entry.Sub, entry.Copy]:
sub_reg_o = sing_pos(entry.reg_o_sc, reg_labels)
sub_cat_o = sing_pos(entry.cat_o_sc, row_labels)
# Column items => Consumption / manufacturing activity
sub_reg_d = sing_pos(entry.reg_d_sc, reg_labels)
sub_cat_d = sing_pos(entry.cat_d_sc, column_labels)
# Translate coordinates from str to numerical position
sub_orig_coor = coord(sub_cat_o, sub_reg_o, no_reg_labs, no_row_labs)
sub_dest_coor = coord(sub_cat_d, sub_reg_d, no_reg_labs, no_col_labs)
intervention["swk"] = entry.swk
intervention["i1"] = sub_orig_coor
intervention["g1"] = sub_dest_coor
intervention["sk1"] = entry.sk1
intervention["sk2"] = entry.sk2
intervention["sk3"] = entry.sk3
intervention["sk4"] = entry.sk4
if entry.Copy == "x":
copy = True
elif entry.Sub == "x":
substitution = True
except Exception:
raise ValueError(f"Check in this entry for potential coordinate errors in your scenario settings:\n{entry} ")
M = counterfactual_engine(M, intervention, substitution, copy)
return M
# =============================================================================
# =============================================================================
# # Here I put work that I started but I still need to finish
# =============================================================================
# =============================================================================
# =============================================================================
#
#
# def make_counterfactuals_SUT(data, scen_no, scen_file, labels):
# """
# Calculate all the counterfactual SUT matrices
#
# Parameters
# ----------
# data : obj
# An object containing all necessary matrices of the SUT system
#
# scen_no : int
# the identification number of the scenario to reference in scen_file
#
# scen_file : str
# the directory where the scenarios.xlsx file is store
#
# labels : obj
# an object containing all labels for the SUT matrices
#
# Outputs
# -------
# An object contaning a mofified SUT system
# """
#
# met = ops.PxP_ITA_MSC
#
# w = ops.IOT.B(data.W, data.inv_diag_g) # Primary input coef
# e = ops.IOT.B(data.E, data.inv_diag_g) # emissions extension coef
# r = ops.IOT.B(data.R, data.inv_diag_g) # Resources extension coef
# m = ops.IOT.B(data.M, data.inv_diag_g) # Materials extension coef
# S = met.S(data.U, data.inv_diag_g) # industry coefficients for intermediate use table
#
# # Start first from a supply approach
# # Supply matrix counterfactual
# data.V = counterfactual(scen_file, scen_no, data.V, "V", labels)
# # new total industry output
# g1 = np.sum(data.V, axis=0)
# # industry use coefficients counterfactual
# S_ = counterfactual(scen_file, scen_no, S, "S", labels)
#
# data.U = counterfactual(scen_file, scen_no, S_ @ np.diag(g1), "U", labels) # industry use transactions counterfactual
#
# W_ = np.array(ops.IOT.R(w, np.diag(g1)))
#
# g2 = np.array(W_[:9].sum(0)) + data.U.sum(0) # recalculate total industry output
#
# g_dif = np.multiply(g2, ops.inv(g1))*100 # calculate the difference between original and new total industry input
#
# # print([round((1-l)*100,4) for l in g_dif if 1-l>.5e-3 and l!=0])
# q2 = np.sum(data.U, axis=1) + np.sum(data.Y, axis=1)
#
# # updating the supply table to match the new total industry input
# D = met.D(data.V, np.diag(ops.inv(data.V.sum(1))))
# data.V = D @ np.diag(q2)
#
# q1 = np.sum(data.V, axis=0) # total product output
#
# q_dif = np.multiply(q2, ops.inv(q1))
#
# g1 = np.sum(data.V, axis=1)
#
# data.E = met.R(e, np.diag(x))
#
# data.R = met.R(r, np.diag(x))
#
# data.M = met.R(m, np.diag(x))
#
#
# return(IOT)
# def balancing_operation(V, U, Y, W):
# """
# Re-balancing of supply-use tables after data changes
#
# Parameters
# ----------
# V (supply) : numpy.array
#
# U (use) : numpy.array
#
# Y (final_demand) : numpy.array
#
# W (primary_inputs) : numpy.array
#
# Output
# ------
# output : dict
#
# It outputs a dictionary containing a re-balanced supply-use tables system
# where:
# V = supply table
#
# U = use table
#
# Y = final demand
#
# W = primary inputs
#
# """
# =============================================================================