\(\newcommand{\B}[1]{ {\bf #1} }\) \(\newcommand{\R}[1]{ {\rm #1} }\) \(\newcommand{\W}[1]{ \; #1 \; }\)
user_hes_fixed_math.py¶
View page sourceCheck the Hessian of the Fixed Effects Objective¶
Purpose¶
This is a detailed mathematical explanation of computing Hessian of the fixed effects objective used by the asymptotic sampling method.
Reference¶
See the
theory
section of the cppad_mixed documentation.
Notation¶
\(\theta\) |
model incidence for the parent region |
\(u_0\) |
incidence random effect for first child |
\(u_1\) |
incidence random effect for second child |
\(y_0\) |
measured incidence for the first child |
\(y_1\) |
measured incidence for the second child |
\(s\) |
standard deviation for data and random effects |
The only fixed effect in this model is \(\theta\) (sometimes written theta ) the incidence level for the world. The random effects are \(u_0\) and \(u_1\).
Problem Parameters¶
import math
import time
theta_true = 0.5
u_true = [ 0.5, -0.5 ]
y_0 = theta_true * math.exp( u_true[0] )
y_1 = theta_true * math.exp( u_true[1] )
standard_dev = theta_true # the standard deviation s
random_seed = int( time.time() )
number_sample = 1000
eta_in_prior = 1e-6 # if None, the fixed effects are not scaled
Random Likelihood¶
The negative log-likelihood for this example, ignoring the constant of integration, is
Gradient w.r.t. Fixed Effects¶
Gradient w.r.t. Random Effects¶
Hessian w.r.t. Fixed Effects¶
Hessian w.r.t. Random Effects¶
Hessian Cross Term¶
Optimal Random Effects¶
Implicit Function Definition¶
The optimal random effects \(\hat{u} ( \theta )\) solve the equation
Derivatives of Optimal Random Effects¶
Using the implicit function theorem we have
Substituting in the formulas above for the Hessian terms on the right hand side we obtain:
We can compute \(\hat{u}_i ( \theta )\) by optimizing the random effects corresponding to the fixed effects being \(\theta\). We define \(g_i ( \theta )\) by the equation
Give \(\hat{u}_i ( \theta )\) we can compute \(g_i ( \theta )\). Given \(g_i ( \theta )\), we can compute the derivative \(\hat{u}_i^{(1)} ( \theta )\) using
Given \(\hat{u}^{(1)} ( \theta )\), we can compute the derivative \(g_i^{(1)} ( \theta )\) using
Given \(g_i^{(1)} ( \theta )\), we can compute the second derivative \(\hat{u}_i^{(2)} ( \theta )\) using
Given \(\hat{u}^{(2)} ( \theta )\), we can compute the second derivative \(g_i^{(2)} ( \theta )\) using
Laplace Approximation¶
Up to a constant, the Laplace approximation (fixed effects objective), as a function of the fixed effects, is:
where \(F( \theta )\) and \(G( \theta )\) are defined by
The derivative of \(F( \theta )\) is given by
It follows from the definition of \(\hat{u} ( \theta )\) that \(f_u [ \theta , \hat{u} ( \theta ) ] = 0\) and
Taking the derivative of the equation above we obtain
Combining the definition of \(G( \theta )\), \(g_i ( \theta )\) and the formula for \(f_{u,u} ( \theta , u )\) we have
Defining \(h_i ( \theta )\) by
we obtain the representation
The first and second derivative of \(h_i ( \theta )\) and \(G( \theta )\) are given by
Asymptotic Statistics¶
The asymptotic posterior distribution for the optimal estimate of \(\theta\) give the data \(y\) is a normal with variance equal to the inverse of
Scaling Fixed Effects¶
If eta is not null, the Hessian is with respect to \(\alpha = \log( \theta + \eta )\). Inverting this relation we define \(\theta ( \alpha ) = \exp( \alpha ) - \eta\). The fixed effects objective as a function of \(\alpha\) is
Taking the first and second derivatives, we have
Optimal Fixed Effects¶
The first order necessary conditions for \(\hat{\alpha}\) to be a minimizer of the fixed effects object is \(H^{(1)} ( \hat{\alpha} ) = 0\). In this case we can simplify the Hessian scaling as follows:
where \(\hat{\theta} = \theta( \hat{\alpha} )\).
Source Code¶
import numpy
import sys
import os
import copy
test_program = 'example/user/hes_fixed_math.py'
check_program = sys.argv[0].replace('\\', '/')
if check_program != test_program or len(sys.argv) != 1 :
usage = 'python3 ' + test_program + '\n'
usage += 'where python3 is the python 3 program on your system\n'
usage += 'and working directory is the dismod_at distribution directory\n'
sys.exit(usage)
print(test_program)
#
# import dismod_at
local_dir = os.getcwd() + '/python'
if( os.path.isdir( local_dir + '/dismod_at' ) ) :
sys.path.insert(0, local_dir)
import dismod_at
#
# change into the build/example/user directory
if not os.path.exists('build/example/user') :
os.makedirs('build/example/user')
os.chdir('build/example/user')
#
def example_db (file_name) :
def fun_rate_child(a, t) :
return ('prior_gauss_zero', None, None)
def fun_rate_parent(a, t) :
return ('prior_rate_parent', None, None)
# ----------------------------------------------------------------------
# age table
age_list = [ 0.0, 100.0 ]
#
# time table
time_list = [ 1995.0, 2015.0 ]
#
# integrand table
integrand_table = [ { 'name':'Sincidence' } ]
#
node_table = [
{ 'name':'world', 'parent':'' },
{ 'name':'child_0', 'parent':'world' },
{ 'name':'child_1', 'parent':'world' },
]
#
# weight table:
weight_table = list()
#
# covariate table: no covariates
covariate_table = list()
#
# mulcov table
mulcov_table = list()
#
# avgint table:
avgint_table = list()
#
# nslist_dict:
nslist_dict = dict()
# ----------------------------------------------------------------------
# data table:
data_table = list()
row = {
'node': 'child_0',
'subgroup': 'world',
'density': 'gaussian',
'weight': '',
'hold_out': False,
'time_lower': 2000.0,
'time_upper': 2000.0,
'age_lower': 50.0,
'age_upper': 50.0,
'integrand': 'Sincidence',
'meas_value': y_0,
'meas_std': standard_dev,
}
data_table.append( copy.copy(row) )
#
row['node'] = 'child_1'
row['meas_value'] = y_1
data_table.append( copy.copy(row) )
#
# ----------------------------------------------------------------------
# prior_table
prior_table = [
{ # prior_rate_parent
'name': 'prior_rate_parent',
'density': 'uniform',
'lower': 1e-2 * theta_true,
'upper': 1e+2 * theta_true,
'mean': theta_true / 3.0, # only used for start and scale
'eta': eta_in_prior,
},{ # prior_gauss_zero
'name': 'prior_gauss_zero',
'density': 'gaussian',
'mean': 0.0,
'std': standard_dev,
}
]
# ----------------------------------------------------------------------
# smooth table
smooth_table = [
{ # smooth_rate_parent
'name': 'smooth_rate_parent',
'age_id': [ 0 ],
'time_id': [ 0 ],
'fun': fun_rate_parent
}, { # smooth_rate_child
'name': 'smooth_rate_child',
'age_id': [ 0 ],
'time_id': [ 0 ],
'fun': fun_rate_child
}
]
# ----------------------------------------------------------------------
# rate table
rate_table = [
{
'name': 'iota',
'parent_smooth': 'smooth_rate_parent',
'child_smooth': 'smooth_rate_child',
}
]
# ----------------------------------------------------------------------
# option_table
option_table = [
{ 'name':'parent_node_name', 'value':'world' },
{ 'name':'ode_step_size', 'value':'10.0' },
{ 'name':'random_seed', 'value':str(random_seed) },
{ 'name':'rate_case', 'value':'iota_pos_rho_zero' },
{ 'name':'quasi_fixed', 'value':'true' },
{ 'name':'derivative_test_fixed', 'value':'none' },
{ 'name':'max_num_iter_fixed', 'value':'100' },
{ 'name':'print_level_fixed', 'value':'0' },
{ 'name':'tolerance_fixed', 'value':'1e-12' },
{ 'name':'derivative_test_random', 'value':'none' },
{ 'name':'max_num_iter_random', 'value':'100' },
{ 'name':'print_level_random', 'value':'0' },
{ 'name':'tolerance_random', 'value':'1e-12' }
]
# ----------------------------------------------------------------------
# subgroup_table
subgroup_table = [ { 'subgroup':'world', 'group':'world' } ]
# ----------------------------------------------------------------------
# create database
dismod_at.create_database(
file_name,
age_list,
time_list,
integrand_table,
node_table,
subgroup_table,
weight_table,
covariate_table,
avgint_table,
data_table,
prior_table,
smooth_table,
nslist_dict,
rate_table,
mulcov_table,
option_table
)
# ===========================================================================
file_name = 'example.db'
example_db(file_name)
#
program = '../../devel/dismod_at'
dismod_at.system_command_prc([ program, file_name, 'init' ])
dismod_at.system_command_prc([ program, file_name, 'fit', 'both' ])
dismod_at.system_command_prc(
[ program, file_name, 'sample', 'asymptotic', 'both', str(number_sample) ]
)
# -----------------------------------------------------------------------
# get tables
connection = dismod_at.create_connection(
file_name, new = False, readonly = True
)
var_table = dismod_at.get_table_dict(connection, 'var')
node_table = dismod_at.get_table_dict(connection, 'node')
rate_table = dismod_at.get_table_dict(connection, 'rate')
fit_var_table = dismod_at.get_table_dict(connection, 'fit_var')
sample_table = dismod_at.get_table_dict(connection, 'sample')
hes_fixed_table = dismod_at.get_table_dict(connection, 'hes_fixed')
connection.close()
dismod_at.db2csv_command(file_name)
# -----------------------------------------------------------------------
# node_name2var_id
node_name2var_id = dict()
assert len(var_table) == 3
for var_id in range(len(var_table) ) :
assert var_id < 3
row = var_table[var_id]
assert row['var_type'] == 'rate'
assert rate_table[row['rate_id']]['rate_name'] == 'iota'
node_name = node_table[row['node_id']]['node_name']
node_name2var_id[node_name] = var_id
# -----------------------------------------------------------------------
# uhat(theta)
def optimal_random_effect(fixed_effect) :
#
# set start_var value for the fixed effects
var_id = node_name2var_id['world']
sql_cmd = 'UPDATE start_var SET start_var_value = ' + str(fixed_effect)
sql_cmd += ' WHERE start_var_id == ' + str(var_id)
connection = dismod_at.create_connection(
file_name, new = False, readonly = False
)
dismod_at.sql_command(connection, sql_cmd)
connection.close()
#
# optimize the random effects
dismod_at.system_command_prc([ program, file_name, 'fit', 'random' ])
#
# retrieve the optimal random effects
connection = dismod_at.create_connection(
file_name, new = False, readonly = True
)
fit_var_table = dismod_at.get_table_dict(connection, 'fit_var')
connection.close()
#
var_id = node_name2var_id['child_0']
uhat_0 = fit_var_table[var_id]['fit_var_value']
#
var_id = node_name2var_id['child_1']
uhat_1 = fit_var_table[var_id]['fit_var_value']
#
uhat = numpy.array( [ uhat_0, uhat_1 ] )
return uhat
# ------------------------------------------------------------------------
# f(theta, u)
def random_likelihood(fixed_effect, random_effect) :
theta = fixed_effect
u = random_effect
y = numpy.array( [y_0, y_1] )
residual = y - theta * numpy.exp(u)
residual = numpy.append(residual, u)
sum_sq = numpy.sum( residual * residual )
return sum_sq / (2.0 * standard_dev * standard_dev)
# -----------------------------------------------------------------------
# log det f_{u,u} ( theta , u )
def log_det_random_hessian(fixed_effect, random_effect) :
theta = fixed_effect
u = random_effect
exp_u = numpy.exp( u )
exp_2u = numpy.exp( 2.0 * u )
g = 2.0 * theta * exp_2u - y * exp_u
h = theta * g + 1.0
s_sq = standard_dev * standard_dev
log_det = numpy.sum( numpy.log(h / s_sq) )
return log_det
# -----------------------------------------------------------------------
def check_rel_error(check, value, tolerance) :
rel_error = abs( check / value - 1.0 )
# print(rel_error, tolerance)
if numpy.isscalar( rel_error ) :
ok = rel_error < tolerance
else :
ok = all( rel_error < tolerance )
if not ok :
print("random_seed =", random_seed)
assert ok
# -----------------------------------------------------------------------
# Some values
#
# s_sq
s_sq = standard_dev * standard_dev
#
# y
y = numpy.array( [ y_0, y_1 ] )
#
# optimal theta (from the fit_var table)
var_id = node_name2var_id['world']
theta = fit_var_table[var_id]['fit_var_value']
#
# delta_theta
delta_theta = theta / 1000.0
theta_plus = theta + delta_theta
theta_minus = theta - delta_theta
#
# uhat
uhat = optimal_random_effect(theta)
uhat_plus = optimal_random_effect(theta_plus)
uhat_minus = optimal_random_effect(theta_minus)
#
# exp_u
exp_u = numpy.exp( uhat )
exp_u_plus = numpy.exp( uhat_plus )
exp_u_minus = numpy.exp( uhat_minus )
#
# exp_2u
exp_2u = numpy.exp( 2.0 * uhat )
exp_2u_plus = numpy.exp( 2.0 * uhat_plus )
exp_2u_minus = numpy.exp( 2.0 * uhat_minus )
#
# g(theta)
g = 2.0 * theta * exp_2u - y * exp_u
g_plus = 2.0 * theta_plus * exp_2u_plus - y * exp_u_plus
g_minus = 2.0 * theta_minus * exp_2u_minus - y * exp_u_minus
#
# h(theta)
h = theta * g + 1
#
# F(theta)
F = random_likelihood(theta, uhat)
F_plus = random_likelihood(theta_plus, uhat_plus)
F_minus = random_likelihood(theta_minus, uhat_minus)
#
# G(theta)
G = log_det_random_hessian(theta, uhat) / 2.0
G_plus = log_det_random_hessian(theta_plus, uhat_plus) / 2.0
G_minus = log_det_random_hessian(theta_minus, uhat_minus) / 2.0
#
# L(theta)
L = F + G
L_plus = F_plus + G_plus
L_minus = F_minus + G_minus
#
# f_u (theta, uhat)
f_u = ( theta * theta * exp_2u - theta * y * exp_u + uhat ) / s_sq
#
# f_theta (theta, uhat)
f_theta = numpy.sum(theta * exp_2u - y * exp_u) / s_sq
#
# f_{theta,theta} ( theta , uhat )
f_theta_theta = numpy.sum( exp_2u ) / s_sq
#
# f_{theta, u} ( theta , uhat )
f_theta_u = ( 2 * theta * exp_2u - y * exp_u ) / s_sq
#
# dF_dtheta = F^{(1)} ( theta )
dF_dtheta = f_theta
#
# duhat_dtheta = uhat^{(1)} ( theta )
duhat_dtheta = - g / (theta * g + 1)
#
# dg_dtheta = g^{(1)} ( theta )
dg_dtheta = 2.0 * exp_2u + (4.0 * theta * exp_2u - y * exp_u) * duhat_dtheta
#
# d2uhat_dtheta = uhat^{(2)} ( theta )
d2uhat_d2theta = (g * g - dg_dtheta) / ( (theta * g + 1) * (theta * g + 1) )
#
# d2F_d2theta = F^{(2)} ( theta )
d2F_d2theta = f_theta_theta
d2F_d2theta += numpy.dot( f_theta_u, duhat_dtheta )
#
# dh_dtheta = h^{(1)} ( theta )
dh_dtheta = g + theta * dg_dtheta
#
# dG_dtheta = G^{(1)} ( theta )
dG_dtheta = numpy.sum( dh_dtheta / h ) / 2.0
#
# d2g_d2theta = g^{(2)} ( theta )
duhat_dtheta_sq = duhat_dtheta * duhat_dtheta
d2g_d2theta = 8.0 * exp_2u * duhat_dtheta
d2g_d2theta += (8.0 * theta * exp_2u - y * exp_u) * duhat_dtheta_sq
d2g_d2theta += (4.0 * theta * exp_2u - y * exp_u) * d2uhat_d2theta
#
# dh2_d2theta = h^{(2)} ( theta )
d2h_d2theta = 2.0 * dg_dtheta + theta * d2g_d2theta
#
# d2G_d2theta = G^{(2)} ( theta )
d2G_d2theta = (h * d2h_d2theta - dh_dtheta * dh_dtheta) / (2.0 * h * h)
d2G_d2theta = numpy.sum( d2G_d2theta )
# ----------------------------------------------------------------------------
# check that f_u ( theta , uhat ) = 0
assert all( abs(f_u) < 1e-13 )
#
# check that dF_dtheta + dG_dtheta = 0
assert abs( dF_dtheta + dG_dtheta ) < 1e-11
#
# check duhat_dtheta
check = (uhat_plus - uhat_minus) / (2.0 * delta_theta)
check_rel_error(check, duhat_dtheta, 1e-5)
#
# check dg_dtheta
check = (g_plus - g_minus) / (2.0 * delta_theta)
check_rel_error(check, dg_dtheta, 1e-6)
#
# check d2uhat_d2theta
check = (uhat_plus - 2.0 * uhat + uhat_minus) / (delta_theta * delta_theta)
check_rel_error(check, d2uhat_d2theta, 1e-6)
#
# check d2F_d2theta
check = (F_plus - 2.0 * F + F_minus) / (delta_theta * delta_theta)
check_rel_error(check, d2F_d2theta, 1e-6)
#
# check dG_dtheta
check = (G_plus - G_minus) / (2.0 * delta_theta)
check_rel_error(check, dG_dtheta, 1e-6)
#
check = (g_plus - 2.0 * g + g_minus) / (delta_theta * delta_theta)
check_rel_error(check, d2g_d2theta, 1e-6)
#
# check d2G_d2theta
check = (G_plus - 2.0 * G + G_minus) / (delta_theta * delta_theta)
check_rel_error(check, d2G_d2theta, 1e-5)
# ============================================================================
# check the Hessian of the fixed effects objective
#
# The world rate for incidence is the only fixed effect
world_var_id = node_name2var_id['world']
assert len(hes_fixed_table) == 1
row = hes_fixed_table[0]
assert row['row_var_id'] == world_var_id
assert row['col_var_id'] == world_var_id
hes_fixed_value = row['hes_fixed_value']
if eta_in_prior == None :
dL2_d2theta = d2F_d2theta + d2G_d2theta
check_rel_error(dL2_d2theta, hes_fixed_value, 1e-14)
else :
alpha = math.log(theta + eta_in_prior)
exp_alpha = math.exp(alpha)
exp_2alpha = math.exp( 2.0 * alpha )
dL_dtheta = dF_dtheta + dG_dtheta
d2L_d2theta = d2F_d2theta + d2G_d2theta
d2H_d2alpha = dL_dtheta * exp_alpha + d2L_d2theta * exp_2alpha
check_rel_error(d2H_d2alpha, hes_fixed_value, 1e-14)
#
# compute sample statistics
assert len(sample_table) == number_sample * len(var_table)
sample_array = numpy.zeros(number_sample, dtype = float)
for row in sample_table :
if row['var_id'] == var_id :
sample_index = row['sample_index']
if eta_in_prior == None :
sample_value = row['var_value']
else :
sample_value = math.log( row['var_value'] + eta_in_prior )
sample_array[sample_index] = sample_value
sample_avg = numpy.average(sample_array)
sample_var = numpy.var(sample_array, ddof=1)
#
# check sample statistics
if eta_in_prior == None :
check_rel_error(theta, sample_avg, 1e-1)
else :
check_rel_error(math.log(theta + eta_in_prior), sample_avg, 1e-1)
check_rel_error(1.0 / hes_fixed_value, sample_var, 1e-1)
#
print('hes_fixed_math.py: OK')