1
+ from policyengine .economic_impact .base_metric_calculator import BaseMetricCalculator
2
+ from policyengine_uk import Microsimulation
3
+ from policyengine_core .reforms import Reform
4
+ from microdf import MicroSeries
5
+ import numpy as np
6
+
7
+ class IncomeLSR (BaseMetricCalculator ):
8
+ def __init__ (self , baseline : Microsimulation , reformed : Microsimulation , default_period : int = 2024 ) -> None :
9
+ super ().__init__ (baseline , reformed , default_period )
10
+ self .baseline = baseline
11
+ self .reformed = reformed
12
+
13
+ def calculate (self ):
14
+ # Calculate household counts
15
+ # household_count_people_baseline = self.baseline.calculate("household_count_people")
16
+ # household_count_people_reformed = self.reformed.calculate("household_count_people")
17
+
18
+ # Calculate baseline and reformed substitution LSR
19
+ substitution_lsr_hh_baseline = self ._calculate_substitution_lsr (self .baseline )
20
+ substitution_lsr_hh_reformed = self ._calculate_substitution_lsr (self .reformed )
21
+
22
+ # Compute the change in substitution LSR
23
+ substitution_lsr_hh = np .array (substitution_lsr_hh_reformed ) - np .array (substitution_lsr_hh_baseline )
24
+
25
+ # Calculate baseline and reformed income LSR
26
+ income_lsr_hh_baseline = self ._calculate_income_lsr (self .baseline )
27
+ income_lsr_hh_reformed = self ._calculate_income_lsr (self .reformed )
28
+
29
+ # Compute the change in income LSR
30
+ income_lsr_hh = np .array (income_lsr_hh_reformed ) - np .array (income_lsr_hh_baseline )
31
+
32
+ # Calculate weights and earnings
33
+ household_weight = self .baseline .calculate ("household_weight" )
34
+ emp_income = MicroSeries (
35
+ self .baseline .calculate ("employment_income" , map_to = "household" ).astype (float ).tolist (),
36
+ weights = household_weight
37
+ )
38
+ self_emp_income = MicroSeries (
39
+ self .baseline .calculate ("self_employment_income" , map_to = "household" ).astype (float ).tolist (),
40
+ weights = household_weight
41
+ )
42
+ total_lsr_hh = substitution_lsr_hh + income_lsr_hh
43
+ earnings = emp_income + self_emp_income
44
+ original_earnings = earnings - total_lsr_hh
45
+
46
+ # Calculate income LSR ratio
47
+ income_lsr_hh = MicroSeries (income_lsr_hh , weights = household_weight )
48
+ income = income_lsr_hh .sum () / original_earnings .sum () * 100
49
+
50
+ return {"income" : round (income ,2 )}
51
+
52
+ def _calculate_substitution_lsr (self , simulation : Microsimulation ):
53
+ """Calculate substitution LSR for a given simulation."""
54
+ if "employment_income_behavioral_response" in simulation .tax_benefit_system .variables :
55
+ if any (simulation .calculate ("employment_income_behavioral_response" ) != 0 ):
56
+ return simulation .calculate ("substitution_elasticity_lsr" , map_to = "household" ).astype (float ).tolist ()
57
+ return (self .baseline .calculate ("household_count_people" ) * 0 ).astype (float ).tolist ()
58
+
59
+ def _calculate_income_lsr (self , simulation : Microsimulation ):
60
+ """Calculate income LSR for a given simulation."""
61
+ if "employment_income_behavioral_response" in simulation .tax_benefit_system .variables :
62
+ if any (simulation .calculate ("employment_income_behavioral_response" ) != 0 ):
63
+ return simulation .calculate ("income_elasticity_lsr" , map_to = "household" ).astype (float ).tolist ()
64
+ return (self .baseline .calculate ("household_count_people" ) * 0 ).astype (float ).tolist ()
65
+
66
+
67
+ class SubstitutionLSR (BaseMetricCalculator ):
68
+ def __init__ (self , baseline : Microsimulation , reformed : Microsimulation , default_period : int = 2024 ) -> None :
69
+ super ().__init__ (baseline , reformed , default_period )
70
+ self .baseline = baseline
71
+ self .reformed = reformed
72
+
73
+ def calculate (self ):
74
+ # Calculate household counts
75
+ # household_count_people_baseline = self.baseline.calculate("household_count_people")
76
+ # household_count_people_reformed = self.reformed.calculate("household_count_people")
77
+
78
+ # Calculate baseline and reformed income LSR
79
+ income_lsr_hh_baseline = self ._calculate_income_lsr (self .baseline )
80
+ income_lsr_hh_reformed = self ._calculate_income_lsr (self .reformed )
81
+
82
+ # Calculate baseline and reformed substitution LSR
83
+ substitution_lsr_hh_baseline = self ._calculate_substitution_lsr (self .baseline )
84
+ substitution_lsr_hh_reformed = self ._calculate_substitution_lsr (self .reformed )
85
+
86
+ # Compute the change in substitution LSR
87
+ substitution_lsr_hh = np .array (substitution_lsr_hh_reformed ) - np .array (substitution_lsr_hh_baseline )
88
+ household_weight = self .baseline .calculate ("household_weight" )
89
+
90
+ # Convert to MicroSeries and compute total LSR
91
+ substitution_lsr_hh = MicroSeries (substitution_lsr_hh , weights = household_weight )
92
+ income_lsr_hh = np .array (income_lsr_hh_reformed ) - np .array (income_lsr_hh_baseline )
93
+ total_lsr_hh = substitution_lsr_hh + income_lsr_hh
94
+
95
+ # Calculate earnings
96
+ emp_income = MicroSeries (
97
+ self .baseline .calculate ("employment_income" , map_to = "household" ).astype (float ).tolist (),
98
+ weights = household_weight
99
+ )
100
+ self_emp_income = MicroSeries (
101
+ self .baseline .calculate ("self_employment_income" , map_to = "household" ).astype (float ).tolist (),
102
+ weights = household_weight
103
+ )
104
+ earnings = emp_income + self_emp_income
105
+ original_earnings = earnings - total_lsr_hh
106
+
107
+ # Calculate substitution ratio
108
+ substitution = substitution_lsr_hh .sum () / original_earnings .sum () * 100
109
+
110
+ return {"substitution" : round (substitution ,2 )}
111
+
112
+ def _calculate_substitution_lsr (self , simulation : Microsimulation ):
113
+ """Calculate substitution LSR for a given simulation."""
114
+ if "employment_income_behavioral_response" in simulation .tax_benefit_system .variables :
115
+ if any (simulation .calculate ("employment_income_behavioral_response" ) != 0 ):
116
+ return simulation .calculate ("substitution_elasticity_lsr" , map_to = "household" ).astype (float ).tolist ()
117
+ return (self .baseline .calculate ("household_count_people" ) * 0 ).astype (float ).tolist ()
118
+
119
+ def _calculate_income_lsr (self , simulation : Microsimulation ):
120
+ """Calculate income LSR for a given simulation."""
121
+ if "employment_income_behavioral_response" in simulation .tax_benefit_system .variables :
122
+ if any (simulation .calculate ("employment_income_behavioral_response" ) != 0 ):
123
+ return simulation .calculate ("income_elasticity_lsr" , map_to = "household" ).astype (float ).tolist ()
124
+ return (self .baseline .calculate ("household_count_people" ) * 0 ).astype (float ).tolist ()
125
+
126
+
127
+
128
+ class NetLSRChange (BaseMetricCalculator ):
129
+ def __init__ (self , baseline : Microsimulation , reformed : Microsimulation , default_period : int = 2024 ) -> None :
130
+ super ().__init__ (baseline , reformed , default_period )
131
+ self .baseline = baseline
132
+ self .reformed = reformed
133
+
134
+ def calculate (self ):
135
+ income_result = IncomeLSR (baseline = self .baseline , reformed = self .reformed ).calculate ()
136
+ substitution_result = SubstitutionLSR (baseline = self .baseline , reformed = self .reformed ).calculate ()
137
+
138
+ income_value = income_result ["income" ]
139
+ substitution_value = substitution_result ["substitution" ]
140
+
141
+ net_change = (income_value + substitution_value )
142
+
143
+ return {"net_change" : round (net_change ,2 )}
0 commit comments