\(\newcommand{\B}[1]{ {\bf #1} }\) \(\newcommand{\R}[1]{ {\rm #1} }\) \(\newcommand{\W}[1]{ \; #1 \; }\)
user_hes_fixed_math.py#
View page sourceCheck the Hessian of the Fixed Effects Objective#
Purpose#
This is a detailed mathematical explanation of computing Hessian of the fixed effects objective used by the asymptotic sampling method.
Reference#
See the
theory
section of the cppad_mixed
documentation.
Notation#
\(\theta\) |
model incidence for the parent region |
\(u_0\) |
incidence random effect for first child |
\(u_1\) |
incidence random effect for second child |
\(y_0\) |
measured incidence for the first child |
\(y_1\) |
measured incidence for the second child |
\(s\) |
standard deviation for data and random effects |
The only fixed effect in this model is \(\theta\) (sometimes written theta ) the incidence level for the world. The random effects are \(u_0\) and \(u_1\).
Problem Parameters#
import math
import time
theta_true = 0.5
u_true = [ 0.5, -0.5 ]
y_0 = theta_true * math.exp( u_true[0] )
y_1 = theta_true * math.exp( u_true[1] )
standard_dev = theta_true # the standard deviation s
random_seed = int( time.time() )
number_sample = 1000
eta_in_prior = 1e-6 # if None, the fixed effects are not scaled
Random Likelihood#
The negative log-likelihood for this example, ignoring the constant of integration, is
Gradient w.r.t. Fixed Effects#
Gradient w.r.t. Random Effects#
Hessian w.r.t. Fixed Effects#
Hessian w.r.t. Random Effects#
Hessian Cross Term#
Optimal Random Effects#
Implicit Function Definition#
The optimal random effects \(\hat{u} ( \theta )\) solve the equation
Derivatives of Optimal Random Effects#
Using the implicit function theorem we have
Substituting in the formulas above for the Hessian terms on the right hand side we obtain:
We can compute \(\hat{u}_i ( \theta )\) by optimizing the random effects corresponding to the fixed effects being \(\theta\). We define \(g_i ( \theta )\) by the equation
Give \(\hat{u}_i ( \theta )\) we can compute \(g_i ( \theta )\). Given \(g_i ( \theta )\), we can compute the derivative \(\hat{u}_i^{(1)} ( \theta )\) using
Given \(\hat{u}^{(1)} ( \theta )\), we can compute the derivative \(g_i^{(1)} ( \theta )\) using
Given \(g_i^{(1)} ( \theta )\), we can compute the second derivative \(\hat{u}_i^{(2)} ( \theta )\) using
Given \(\hat{u}^{(2)} ( \theta )\), we can compute the second derivative \(g_i^{(2)} ( \theta )\) using
Laplace Approximation#
Up to a constant, the Laplace approximation (fixed effects objective), as a function of the fixed effects, is:
where \(F( \theta )\) and \(G( \theta )\) are defined by
The derivative of \(F( \theta )\) is given by
It follows from the definition of \(\hat{u} ( \theta )\) that \(f_u [ \theta , \hat{u} ( \theta ) ] = 0\) and
Taking the derivative of the equation above we obtain
Combining the definition of \(G( \theta )\), \(g_i ( \theta )\) and the formula for \(f_{u,u} ( \theta , u )\) we have
Defining \(h_i ( \theta )\) by
we obtain the representation
The first and second derivative of \(h_i ( \theta )\) and \(G( \theta )\) are given by
Asymptotic Statistics#
The asymptotic posterior distribution for the optimal estimate of \(\theta\) give the data \(y\) is a normal with variance equal to the inverse of
Scaling Fixed Effects#
If eta is not null, the Hessian is with respect to \(\alpha = \log( \theta + \eta )\). Inverting this relation we define \(\theta ( \alpha ) = \exp( \alpha ) - \eta\). The fixed effects objective as a function of \(\alpha\) is
Taking the first and second derivatives, we have
Optimal Fixed Effects#
The first order necessary conditions for \(\hat{\alpha}\) to be a minimizer of the fixed effects object is \(H^{(1)} ( \hat{\alpha} ) = 0\). In this case we can simplify the Hessian scaling as follows:
where \(\hat{\theta} = \theta( \hat{\alpha} )\).
Source Code#
import numpy
import sys
import os
import copy
test_program = 'example/user/hes_fixed_math.py'
if sys.argv[0] != test_program or len(sys.argv) != 1 :
usage = 'python3 ' + test_program + '\n'
usage += 'where python3 is the python 3 program on your system\n'
usage += 'and working directory is the dismod_at distribution directory\n'
sys.exit(usage)
print(test_program)
#
# import dismod_at
local_dir = os.getcwd() + '/python'
if( os.path.isdir( local_dir + '/dismod_at' ) ) :
sys.path.insert(0, local_dir)
import dismod_at
#
# change into the build/example/user directory
if not os.path.exists('build/example/user') :
os.makedirs('build/example/user')
os.chdir('build/example/user')
#
def example_db (file_name) :
def fun_rate_child(a, t) :
return ('prior_gauss_zero', None, None)
def fun_rate_parent(a, t) :
return ('prior_rate_parent', None, None)
# ----------------------------------------------------------------------
# age table
age_list = [ 0.0, 100.0 ]
#
# time table
time_list = [ 1995.0, 2015.0 ]
#
# integrand table
integrand_table = [ { 'name':'Sincidence' } ]
#
node_table = [
{ 'name':'world', 'parent':'' },
{ 'name':'child_0', 'parent':'world' },
{ 'name':'child_1', 'parent':'world' },
]
#
# weight table:
weight_table = list()
#
# covariate table: no covriates
covariate_table = list()
#
# mulcov table
mulcov_table = list()
#
# avgint table:
avgint_table = list()
#
# nslist_dict:
nslist_dict = dict()
# ----------------------------------------------------------------------
# data table:
data_table = list()
row = {
'node': 'child_0',
'subgroup': 'world',
'density': 'gaussian',
'weight': '',
'hold_out': False,
'time_lower': 2000.0,
'time_upper': 2000.0,
'age_lower': 50.0,
'age_upper': 50.0,
'integrand': 'Sincidence',
'meas_value': y_0,
'meas_std': standard_dev,
}
data_table.append( copy.copy(row) )
#
row['node'] = 'child_1'
row['meas_value'] = y_1
data_table.append( copy.copy(row) )
#
# ----------------------------------------------------------------------
# prior_table
prior_table = [
{ # prior_rate_parent
'name': 'prior_rate_parent',
'density': 'uniform',
'lower': 1e-2 * theta_true,
'upper': 1e+2 * theta_true,
'mean': theta_true / 3.0, # only used for start and scale
'eta': eta_in_prior,
},{ # prior_gauss_zero
'name': 'prior_gauss_zero',
'density': 'gaussian',
'mean': 0.0,
'std': standard_dev,
}
]
# ----------------------------------------------------------------------
# smooth table
smooth_table = [
{ # smooth_rate_parent
'name': 'smooth_rate_parent',
'age_id': [ 0 ],
'time_id': [ 0 ],
'fun': fun_rate_parent
}, { # smooth_rate_child
'name': 'smooth_rate_child',
'age_id': [ 0 ],
'time_id': [ 0 ],
'fun': fun_rate_child
}
]
# ----------------------------------------------------------------------
# rate table
rate_table = [
{
'name': 'iota',
'parent_smooth': 'smooth_rate_parent',
'child_smooth': 'smooth_rate_child',
}
]
# ----------------------------------------------------------------------
# option_table
option_table = [
{ 'name':'parent_node_name', 'value':'world' },
{ 'name':'ode_step_size', 'value':'10.0' },
{ 'name':'random_seed', 'value':str(random_seed) },
{ 'name':'rate_case', 'value':'iota_pos_rho_zero' },
{ 'name':'quasi_fixed', 'value':'true' },
{ 'name':'derivative_test_fixed', 'value':'none' },
{ 'name':'max_num_iter_fixed', 'value':'100' },
{ 'name':'print_level_fixed', 'value':'0' },
{ 'name':'tolerance_fixed', 'value':'1e-12' },
{ 'name':'derivative_test_random', 'value':'none' },
{ 'name':'max_num_iter_random', 'value':'100' },
{ 'name':'print_level_random', 'value':'0' },
{ 'name':'tolerance_random', 'value':'1e-12' }
]
# ----------------------------------------------------------------------
# subgroup_table
subgroup_table = [ { 'subgroup':'world', 'group':'world' } ]
# ----------------------------------------------------------------------
# create database
dismod_at.create_database(
file_name,
age_list,
time_list,
integrand_table,
node_table,
subgroup_table,
weight_table,
covariate_table,
avgint_table,
data_table,
prior_table,
smooth_table,
nslist_dict,
rate_table,
mulcov_table,
option_table
)
# ===========================================================================
file_name = 'example.db'
example_db(file_name)
#
program = '../../devel/dismod_at'
dismod_at.system_command_prc([ program, file_name, 'init' ])
dismod_at.system_command_prc([ program, file_name, 'fit', 'both' ])
dismod_at.system_command_prc(
[ program, file_name, 'sample', 'asymptotic', 'both', str(number_sample) ]
)
# -----------------------------------------------------------------------
# get tables
connection = dismod_at.create_connection(
file_name, new = False, readonly = True
)
var_table = dismod_at.get_table_dict(connection, 'var')
node_table = dismod_at.get_table_dict(connection, 'node')
rate_table = dismod_at.get_table_dict(connection, 'rate')
fit_var_table = dismod_at.get_table_dict(connection, 'fit_var')
sample_table = dismod_at.get_table_dict(connection, 'sample')
hes_fixed_table = dismod_at.get_table_dict(connection, 'hes_fixed')
connection.close()
dismod_at.db2csv_command(file_name)
# -----------------------------------------------------------------------
# node_name2var_id
node_name2var_id = dict()
assert len(var_table) == 3
for var_id in range(len(var_table) ) :
assert var_id < 3
row = var_table[var_id]
assert row['var_type'] == 'rate'
assert rate_table[row['rate_id']]['rate_name'] == 'iota'
node_name = node_table[row['node_id']]['node_name']
node_name2var_id[node_name] = var_id
# -----------------------------------------------------------------------
# uhat(theta)
def optimal_random_effect(fixed_effect) :
#
# set start_var value for the fixed effects
var_id = node_name2var_id['world']
sql_cmd = 'UPDATE start_var SET start_var_value = ' + str(fixed_effect)
sql_cmd += ' WHERE start_var_id == ' + str(var_id)
connection = dismod_at.create_connection(
file_name, new = False, readonly = False
)
dismod_at.sql_command(connection, sql_cmd)
connection.close()
#
# optimize the random effects
dismod_at.system_command_prc([ program, file_name, 'fit', 'random' ])
#
# retrieve the optimal random effects
connection = dismod_at.create_connection(
file_name, new = False, readonly = True
)
fit_var_table = dismod_at.get_table_dict(connection, 'fit_var')
#
var_id = node_name2var_id['child_0']
uhat_0 = fit_var_table[var_id]['fit_var_value']
#
var_id = node_name2var_id['child_1']
uhat_1 = fit_var_table[var_id]['fit_var_value']
#
uhat = numpy.array( [ uhat_0, uhat_1 ] )
return uhat
# ------------------------------------------------------------------------
# f(theta, u)
def random_likelihood(fixed_effect, random_effect) :
theta = fixed_effect
u = random_effect
y = numpy.array( [y_0, y_1] )
residual = y - theta * numpy.exp(u)
residual = numpy.append(residual, u)
sum_sq = numpy.sum( residual * residual )
return sum_sq / (2.0 * standard_dev * standard_dev)
# -----------------------------------------------------------------------
# log det f_{u,u} ( theta , u )
def log_det_random_hessian(fixed_effect, random_effect) :
theta = fixed_effect
u = random_effect
exp_u = numpy.exp( u )
exp_2u = numpy.exp( 2.0 * u )
g = 2.0 * theta * exp_2u - y * exp_u
h = theta * g + 1.0
s_sq = standard_dev * standard_dev
log_det = numpy.sum( numpy.log(h / s_sq) )
return log_det
# -----------------------------------------------------------------------
def check_rel_error(check, value, tolerance) :
rel_error = abs( check / value - 1.0 )
# print(rel_error, tolerance)
if numpy.isscalar( rel_error ) :
ok = rel_error < tolerance
else :
ok = all( rel_error < tolerance )
if not ok :
print("random_seed =", random_seed)
assert ok
# -----------------------------------------------------------------------
# Some values
#
# s_sq
s_sq = standard_dev * standard_dev
#
# y
y = numpy.array( [ y_0, y_1 ] )
#
# optimal theta (from the fit_var table)
var_id = node_name2var_id['world']
theta = fit_var_table[var_id]['fit_var_value']
#
# delta_theta
delta_theta = theta / 1000.0
theta_plus = theta + delta_theta
theta_minus = theta - delta_theta
#
# uhat
uhat = optimal_random_effect(theta)
uhat_plus = optimal_random_effect(theta_plus)
uhat_minus = optimal_random_effect(theta_minus)
#
# exp_u
exp_u = numpy.exp( uhat )
exp_u_plus = numpy.exp( uhat_plus )
exp_u_minus = numpy.exp( uhat_minus )
#
# exp_2u
exp_2u = numpy.exp( 2.0 * uhat )
exp_2u_plus = numpy.exp( 2.0 * uhat_plus )
exp_2u_minus = numpy.exp( 2.0 * uhat_minus )
#
# g(theta)
g = 2.0 * theta * exp_2u - y * exp_u
g_plus = 2.0 * theta_plus * exp_2u_plus - y * exp_u_plus
g_minus = 2.0 * theta_minus * exp_2u_minus - y * exp_u_minus
#
# h(theta)
h = theta * g + 1
#
# F(theta)
F = random_likelihood(theta, uhat)
F_plus = random_likelihood(theta_plus, uhat_plus)
F_minus = random_likelihood(theta_minus, uhat_minus)
#
# G(theta)
G = log_det_random_hessian(theta, uhat) / 2.0
G_plus = log_det_random_hessian(theta_plus, uhat_plus) / 2.0
G_minus = log_det_random_hessian(theta_minus, uhat_minus) / 2.0
#
# L(theta)
L = F + G
L_plus = F_plus + G_plus
L_minus = F_minus + G_minus
#
# f_u (theta, uhat)
f_u = ( theta * theta * exp_2u - theta * y * exp_u + uhat ) / s_sq
#
# f_theta (theta, uhat)
f_theta = numpy.sum(theta * exp_2u - y * exp_u) / s_sq
#
# f_{theta,theta} ( theta , uhat )
f_theta_theta = numpy.sum( exp_2u ) / s_sq
#
# f_{theta, u} ( theta , uhat )
f_theta_u = ( 2 * theta * exp_2u - y * exp_u ) / s_sq
#
# dF_dtheta = F^{(1)} ( theta )
dF_dtheta = f_theta
#
# duhat_dtheta = uhat^{(1)} ( theta )
duhat_dtheta = - g / (theta * g + 1)
#
# dg_dtheta = g^{(1)} ( theta )
dg_dtheta = 2.0 * exp_2u + (4.0 * theta * exp_2u - y * exp_u) * duhat_dtheta
#
# d2uhat_dtheta = uhat^{(2)} ( theta )
d2uhat_d2theta = (g * g - dg_dtheta) / ( (theta * g + 1) * (theta * g + 1) )
#
# d2F_d2theta = F^{(2)} ( theta )
d2F_d2theta = f_theta_theta
d2F_d2theta += numpy.dot( f_theta_u, duhat_dtheta )
#
# dh_dtheta = h^{(1)} ( theta )
dh_dtheta = g + theta * dg_dtheta
#
# dG_dtheta = G^{(1)} ( theta )
dG_dtheta = numpy.sum( dh_dtheta / h ) / 2.0
#
# d2g_d2theta = g^{(2)} ( theta )
duhat_dtheta_sq = duhat_dtheta * duhat_dtheta
d2g_d2theta = 8.0 * exp_2u * duhat_dtheta
d2g_d2theta += (8.0 * theta * exp_2u - y * exp_u) * duhat_dtheta_sq
d2g_d2theta += (4.0 * theta * exp_2u - y * exp_u) * d2uhat_d2theta
#
# dh2_d2theta = h^{(2)} ( theta )
d2h_d2theta = 2.0 * dg_dtheta + theta * d2g_d2theta
#
# d2G_d2theta = G^{(2)} ( theta )
d2G_d2theta = (h * d2h_d2theta - dh_dtheta * dh_dtheta) / (2.0 * h * h)
d2G_d2theta = numpy.sum( d2G_d2theta )
# ----------------------------------------------------------------------------
# check that f_u ( theta , uhat ) = 0
assert all( abs(f_u) < 1e-13 )
#
# check that dF_dtheta + dG_dtheta = 0
assert abs( dF_dtheta + dG_dtheta ) < 1e-11
#
# check duhat_dtheta
check = (uhat_plus - uhat_minus) / (2.0 * delta_theta)
check_rel_error(check, duhat_dtheta, 1e-5)
#
# check dg_dtheta
check = (g_plus - g_minus) / (2.0 * delta_theta)
check_rel_error(check, dg_dtheta, 1e-6)
#
# check d2uhat_d2theta
check = (uhat_plus - 2.0 * uhat + uhat_minus) / (delta_theta * delta_theta)
check_rel_error(check, d2uhat_d2theta, 1e-6)
#
# check d2F_d2theta
check = (F_plus - 2.0 * F + F_minus) / (delta_theta * delta_theta)
check_rel_error(check, d2F_d2theta, 1e-6)
#
# check dG_dtheta
check = (G_plus - G_minus) / (2.0 * delta_theta)
check_rel_error(check, dG_dtheta, 1e-6)
#
check = (g_plus - 2.0 * g + g_minus) / (delta_theta * delta_theta)
check_rel_error(check, d2g_d2theta, 1e-6)
#
# check d2G_d2theta
check = (G_plus - 2.0 * G + G_minus) / (delta_theta * delta_theta)
check_rel_error(check, d2G_d2theta, 1e-5)
# ============================================================================
# check the Hessian of the fixed effects objective
#
# The world rate for incidence is the only fixed effect
world_var_id = node_name2var_id['world']
assert len(hes_fixed_table) == 1
row = hes_fixed_table[0]
assert row['row_var_id'] == world_var_id
assert row['col_var_id'] == world_var_id
hes_fixed_value = row['hes_fixed_value']
if eta_in_prior == None :
dL2_d2theta = d2F_d2theta + d2G_d2theta
check_rel_error(dL2_d2theta, hes_fixed_value, 1e-14)
else :
alpha = math.log(theta + eta_in_prior)
exp_alpha = math.exp(alpha)
exp_2alpha = math.exp( 2.0 * alpha )
dL_dtheta = dF_dtheta + dG_dtheta
d2L_d2theta = d2F_d2theta + d2G_d2theta
d2H_d2alpha = dL_dtheta * exp_alpha + d2L_d2theta * exp_2alpha
check_rel_error(d2H_d2alpha, hes_fixed_value, 1e-14)
#
# compute sample statistics
assert len(sample_table) == number_sample * len(var_table)
sample_array = numpy.zeros(number_sample, dtype = float)
for row in sample_table :
if row['var_id'] == var_id :
sample_index = row['sample_index']
if eta_in_prior == None :
sample_value = row['var_value']
else :
sample_value = math.log( row['var_value'] + eta_in_prior )
sample_array[sample_index] = sample_value
sample_avg = numpy.average(sample_array)
sample_var = numpy.var(sample_array, ddof=1)
#
# check sample statistics
if eta_in_prior == None :
check_rel_error(theta, sample_avg, 1e-1)
else :
check_rel_error(math.log(theta + eta_in_prior), sample_avg, 1e-1)
check_rel_error(1.0 / hes_fixed_value, sample_var, 1e-1)
#
print('hes_fixed_math.py: OK')