\(\newcommand{\B}[1]{ {\bf #1} }\) \(\newcommand{\R}[1]{ {\rm #1} }\) \(\newcommand{\W}[1]{ \; #1 \; }\)
user_hes_random.py¶
View page sourceCheck the Hessian of the Random Effects Objective¶
Purpose¶
This example checks the computation of the Hessian of the random effects objective used by the asymptotic sampling method.
Reference¶
See the
theory
section of the cppad_mixed documentation.
Notation¶
\(\theta\) |
model incidence for the parent region |
\(u_0\) |
incidence random effect for first child |
\(u_1\) |
incidence random effect for second child |
\(y_0\) |
measured incidence for the first child |
\(y_1\) |
measured incidence for the second child |
\(s\) |
standard deviation for data and random effects |
The only fixed effect in this model is \(\theta\) (sometimes written theta ) the incidence level for the world. The random effects are \(u_0\) and \(u_1\).
Problem Parameters¶
import csv
import math
import time
theta_true = 0.5
u_true = [ 0.5, -0.5 ]
y_0 = theta_true * math.exp( u_true[0] )
y_1 = theta_true * math.exp( u_true[1] )
standard_dev = theta_true / 10.0 # the standard deviation s
random_seed = int( time.time() )
number_sample = 4000
Random Likelihood¶
The negative log-likelihood for this example, ignoring the constant of integration, is
Gradient w.r.t. Random Effects¶
Hessian w.r.t. Random Effects¶
Asymptotic Statistics¶
The asymptotic posterior distribution for random effects is normal with mean \(\hat{u} ( \theta )\) and variance \(f_{u,u} [ \theta , \hat{u} ( \theta ) ]^{-1}\) where \(\hat{u} ( \theta )\) minimizes the random effects objective \(f( \theta , \cdot )\).
Source Code¶
import numpy
import sys
import os
import copy
test_program = 'example/user/hes_random.py'
check_program = sys.argv[0].replace('\\', '/')
if check_program != test_program or len(sys.argv) != 1 :
usage = 'python3 ' + test_program + '\n'
usage += 'where python3 is the python 3 program on your system\n'
usage += 'and working directory is the dismod_at distribution directory\n'
sys.exit(usage)
print(test_program)
#
# import dismod_at
local_dir = os.getcwd() + '/python'
if( os.path.isdir( local_dir + '/dismod_at' ) ) :
sys.path.insert(0, local_dir)
import dismod_at
#
# change into the build/example/user directory
if not os.path.exists('build/example/user') :
os.makedirs('build/example/user')
os.chdir('build/example/user')
#
def example_db (file_name) :
def fun_rate_child(a, t) :
return ('prior_gauss_zero', None, None)
def fun_rate_parent(a, t) :
return ('prior_rate_parent', None, None)
# ----------------------------------------------------------------------
# age table
age_list = [ 0.0, 100.0 ]
#
# time table
time_list = [ 1995.0, 2015.0 ]
#
# integrand table
integrand_table = [ { 'name':'Sincidence' } ]
#
node_table = [
{ 'name':'world', 'parent':'' },
{ 'name':'child_0', 'parent':'world' },
{ 'name':'child_1', 'parent':'world' },
]
#
# weight table:
weight_table = list()
#
# covariate table: no covariates
covariate_table = list()
#
# mulcov table
mulcov_table = list()
#
# avgint table:
avgint_table = list()
#
# nslist_dict:
nslist_dict = dict()
# ----------------------------------------------------------------------
# data table:
data_table = list()
row = {
'node': 'child_0',
'subgroup': 'world',
'density': 'gaussian',
'weight': '',
'hold_out': False,
'time_lower': 2000.0,
'time_upper': 2000.0,
'age_lower': 50.0,
'age_upper': 50.0,
'integrand': 'Sincidence',
'meas_value': y_0,
'meas_std': standard_dev,
}
data_table.append( copy.copy(row) )
#
row['node'] = 'child_1'
row['meas_value'] = y_1
data_table.append( copy.copy(row) )
#
# ----------------------------------------------------------------------
# prior_table
prior_table = [
{ # prior_rate_parent
'name': 'prior_rate_parent',
'density': 'uniform',
'lower': 1e-2 * theta_true,
'upper': 1e+2 * theta_true,
'mean': theta_true / 3.0, # only used for start and scale
},{ # prior_gauss_zero
'name': 'prior_gauss_zero',
'density': 'gaussian',
'mean': 0.0,
'std': standard_dev,
}
]
# ----------------------------------------------------------------------
# smooth table
smooth_table = [
{ # smooth_rate_parent
'name': 'smooth_rate_parent',
'age_id': [ 0 ],
'time_id': [ 0 ],
'fun': fun_rate_parent
}, { # smooth_rate_child
'name': 'smooth_rate_child',
'age_id': [ 0 ],
'time_id': [ 0 ],
'fun': fun_rate_child
}
]
# ----------------------------------------------------------------------
# rate table
rate_table = [
{
'name': 'iota',
'parent_smooth': 'smooth_rate_parent',
'child_smooth': 'smooth_rate_child',
}
]
# ----------------------------------------------------------------------
# option_table
option_table = [
{ 'name':'parent_node_name', 'value':'world' },
{ 'name':'ode_step_size', 'value':'10.0' },
{ 'name':'random_seed', 'value':str(random_seed) },
{ 'name':'rate_case', 'value':'iota_pos_rho_zero' },
{ 'name':'quasi_fixed', 'value':'true' },
{ 'name':'derivative_test_fixed', 'value':'none' },
{ 'name':'max_num_iter_fixed', 'value':'100' },
{ 'name':'print_level_fixed', 'value':'0' },
{ 'name':'tolerance_fixed', 'value':'1e-12' },
{ 'name':'derivative_test_random', 'value':'none' },
{ 'name':'max_num_iter_random', 'value':'100' },
{ 'name':'print_level_random', 'value':'0' },
{ 'name':'tolerance_random', 'value':'1e-12' }
]
# ----------------------------------------------------------------------
# subgroup_table
subgroup_table = [ { 'subgroup':'world', 'group':'world' } ]
# ----------------------------------------------------------------------
# create database
dismod_at.create_database(
file_name,
age_list,
time_list,
integrand_table,
node_table,
subgroup_table,
weight_table,
covariate_table,
avgint_table,
data_table,
prior_table,
smooth_table,
nslist_dict,
rate_table,
mulcov_table,
option_table
)
# ===========================================================================
file_name = 'example.db'
example_db(file_name)
#
program = '../../devel/dismod_at'
dismod_at.system_command_prc([ program, file_name, 'init' ])
dismod_at.system_command_prc([ program, file_name, 'fit', 'both' ])
dismod_at.system_command_prc(
[ program, file_name, 'sample', 'asymptotic', 'both', str(number_sample) ]
)
# -----------------------------------------------------------------------
# get tables
connection = dismod_at.create_connection(
file_name, new = False, readonly = True
)
var_table = dismod_at.get_table_dict(connection, 'var')
node_table = dismod_at.get_table_dict(connection, 'node')
rate_table = dismod_at.get_table_dict(connection, 'rate')
fit_var_table = dismod_at.get_table_dict(connection, 'fit_var')
hes_random_before = dismod_at.get_table_dict(connection, 'hes_random')
sample_table = dismod_at.get_table_dict(connection, 'sample')
hes_random_after = dismod_at.get_table_dict(connection, 'hes_random')
connection.close()
dismod_at.db2csv_command(file_name)
# -----------------------------------------------------------------------
# var_id2name
var_id2node_name = dict()
assert len(var_table) == 3
for var_id in range(len(var_table) ) :
assert var_id < 3
row = var_table[var_id]
assert row['var_type'] == 'rate'
assert rate_table[row['rate_id']]['rate_name'] == 'iota'
node_name = node_table[row['node_id']]['node_name']
var_id2node_name[var_id] = node_name
if node_name == 'world' :
theta = fit_var_table[var_id]['fit_var_value']
# -----------------------------------------------------------------------
# check the Hessian of the random effects objective
#
# The Hessian of the random effects objective it diagonal
# and there are two random effects
assert hes_random_before == hes_random_after
assert len(hes_random_after) == 2
covariance = numpy.zeros((2, 2), dtype = float)
uhat = numpy.zeros(2, dtype = float)
for row in hes_random_after :
assert row['row_var_id'] == row['col_var_id']
var_id = row['row_var_id']
hes_random_value = row['hes_random_value']
u = fit_var_table[var_id]['fit_var_value']
node_name = var_id2node_name[var_id]
if node_name == 'child_0' :
uhat[0] = u
covariance[0,0] = 1.0 /hes_random_value
y = y_0
elif node_name == 'child_1' :
uhat[1] = u
covariance[1,1] = 1.0 / hes_random_value
y = y_1
else :
assert False
exp_2u = math.exp( 2.0 * u )
exp_u = math.exp( u )
check = 2.0 * theta * theta * exp_2u - y * theta * exp_u + 1.0
check /= standard_dev * standard_dev
rel_error = hes_random_value / check - 1.0
assert abs(rel_error) < 1e-15
#
# compute sample statistics
assert len(sample_table) == number_sample * 3
sample_array = numpy.zeros( (2, number_sample), dtype = float)
for row in sample_table :
var_id = row['var_id']
sample_index = row['sample_index']
node_name = var_id2node_name[var_id]
if node_name == 'child_0' :
sample_array[0, sample_index] = row['var_value']
elif node_name == 'child_1' :
sample_array[1, sample_index] = row['var_value']
else :
assert node_name == 'world'
#
# check sample averages
sample_avg = numpy.average(sample_array, axis=1)
sample_cov = numpy.cov(sample_array, ddof=1)
rel_error = sample_avg / uhat - 1.0
assert all( abs(rel_error) < 2e-2 )
#
# check sample covariance
scale = math.sqrt( covariance[0,0] * covariance[1,1] )
for i in range(2) :
for j in range(2) :
if i == j :
rel_error = sample_cov[i,i] / covariance[i,i] - 1.0
else :
rel_error = sample_cov[i,j] / scale
assert abs(rel_error) < 1e-1
#
# check db2csv output of hes_random.csv
# (assumes database is in current working directory)
file_ptr = open('hes_random.csv', 'r')
hes_random_csv = list()
reader = csv.DictReader(file_ptr)
for row in reader :
hes_random_csv.append(row)
file_ptr.close()
#
for hes_random_id in range( len(hes_random_after) ) :
row_table = hes_random_after[hes_random_id]
row_csv = hes_random_csv[hes_random_id]
#
value_table = row_table['row_var_id']
value_csv = int( row_csv['row_var_id'] )
assert value_table == value_csv
#
value_table = row_table['col_var_id']
value_csv = int( row_csv['col_var_id'] )
assert value_table == value_csv
#
value_table = row_table['hes_random_value']
value_csv = float( row_csv['hes_random_value'] )
rel_error = value_csv / value_table - 1.0
eps9 = 9.0 * numpy.finfo(float).eps
assert abs(rel_error) < eps9
print('hes_random.py: OK')