from scipy.optimize import minimize
import numpy as np
import matplotlib.pyplot as plt
from numba import jit
#TITRE: LE SANDEUR D'ARTEFACT
#mon site internet: artefact.mataroa.blog
#ce code a des bouts en français et en anglais, je m'en excuse. C'est la première fois que je fais un projet aussi compliqué donc j'ai codé de la façon qui me met le plus à l'aise, ce qui n'est pas celle qui vous aidera le mieux à le lire. Malgrés tout j'ai essayé de rendre le code le plus compact possible pour simplifier le débuggage.
#Sandeur - Lisseur de sondages d'opinion.
#Copyright (C) 2026 Artefact, https://artefact.mataroa.blog/
#This program is free software: you can redistribute it and/or modify
#it under the terms of the GNU General Public License as published by
#the Free Software Foundation, either version 3 of the License, or
#(at your option) any later version.
#This program is distributed in the hope that it will be useful,
#but WITHOUT ANY WARRANTY; without even the implied warranty of
#MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
#GNU General Public License for more details.
#You should have received a copy of the GNU General Public License
#along with this program. If not, see <https://www.gnu.org/licenses/>.
#j'implémente en python, acceléré par numbas, l'algo viens cet article: https://users.aalto.fi/~ssarkka/pub/cats-final.pdf j'implémente uniquement la partie kalman filter
#N nombre de sondages
#n taille de l'échantillon du sondage
#t dates des sondages
#predt dates où l'on souhaite obtenir les prédictions
#obst date des observations (=date des sondages)
@jit(nopython=True) #dit à numbas de compiler le numpy en JIT JUST IN TIME, je sais pas ce que ça veut dire mais ça marche bien) pour accélérer les calculs. La validation croisée beaucoup est trop lente sans ça.
def kfstep(X_hat_prev,P_prev,A,Q,R,H,y): #KF comme Kalman filter
X_hat_=A@X_hat_prev
P_cov_=A@P_prev@A.T+Q
v= y - H @ X_hat_
S= H @ P_cov_ @ H.T + R
K=np.linalg.solve( S.T, (P_cov_ @ H.T).T).T
X_hat=X_hat_ + K @ v
P_cov=P_cov_ - K @ H @ P_cov_
return X_hat,P_cov
@jit(nopython=True)
def kfstep_no_meas(X_hat_prev,P_prev,A,Q): #quoi faire quand le filtre de kalman n'a pas de mesure à se mettre sous la dent
X_hat_=A@X_hat_prev
P_cov_=A@P_prev@A.T+Q
X_hat=X_hat_
P_cov=P_cov_
return X_hat,P_cov
@jit(nopython=True)
def rtsstep(X_hat,X_hat_next_smo,P,P_next_smo,A,Q,R,H):
P_next_= A @ P @ A.T + Q
C=np.linalg.solve( P_next_.T, (P @ A.T).T).T
X_hat_smo = X_hat + C @ ( X_hat_next_smo - A @ X_hat)
P_smo= P + C @ ( P_next_smo - P_next_) @ C.T
return X_hat_smo,P_smo
@jit(nopython=True)
def rtss(As,Qs,Rs,Hs,y): # rtss comme Rauch–Tung–Striebel Smoother. Après avoir filtré avec kalman, il retraverse les prédictions de la dernière à la premier pour apporter l'information du "futur" pour mieux prédire le "passé".
N=len(y)
(x_hats,Ps)=kalman(As,Qs,Rs,Hs,y)
(x_hat_smo,P_smo)=(x_hats[-1],Ps[-1])
for k in range(2,N):
(x_hat_smo,P_smo)=rtsstep(x_hats[N-k],x_hat_smo,Ps[N-k],P_smo,As[N-k],Qs[N-k],Rs[N-k],Hs[N-k])
x_hats[N-k]=x_hat_smo
Ps[N-k]=P_smo
return (x_hats,Ps)
@jit(nopython=True)
def kalman(As,Qs,Rs,Hs,y): #Kalman comme le filtre de Kalman
N=len(y)
(X_hats,Ps)=( np.full((N,2),np.nan) , np.full((N,2,2),np.nan) )
Ps[0]=np.diagflat([Rs[0],1e9])
X_hats[0]=np.array([y[0],0],dtype="float64")
for k in range(0,N-1):
if np.isnan(y[k+1]):
(X_hats[k+1],Ps[k+1])=kfstep_no_meas(X_hats[k],Ps[k],As[k],Qs[k])
else:
(X_hats[k+1],Ps[k+1])=kfstep(X_hats[k],Ps[k],As[k],Qs[k],Rs[k+1],Hs[k+1],y[k+1])
return (X_hats,Ps)
@jit(nopython=True)
def loo_rtss(As,Qs,Rs,Hs,y): #fourni les prédictions pour le leave one out du rtss
N=len(y)
preds=np.full(N,np.nan)
preds[0]=y[0]
for i in range(1,N):
y2=np.copy(y)
y2[i]=np.nan
(X_hats,P_covs)=rtss(As,Qs,Rs,Hs,y2)
preds[i]=X_hats[i,0]
return preds
@jit(nopython=True)
def rtss_error_hat(logratio,As,Qs,Rs,Hs,y): #estimateur de l'erreur de rtss par validation croisée leave one out. Techniquement, c'est biaisé comme estimateur à cause de la corrélation temporelle, mais expérimentalement ça marche bien comme montré dans l'exemple en dessous.
preds=loo_rtss(As,Qs*np.exp(logratio),Rs,Hs,y) #le niveau de smoothing est un ratio, et il est généralement immense ou miniscule, donc illisible sans le log
error=np.var(preds-y)
return error
def choose_noise_ratio(As,Qs,Rs,Hs,y):
args=(As,Qs,Rs,Hs,y)
bornes=[-20,20] #arbitraire mais si la borne inf est trop négative, il y a une matrice devient trop mal conditionnée dans l'algo ce qui engendre une erreur d'inversion.
opt=minimize(rtss_error_hat,-15,args,bounds=[bornes],method="COBYLA") #j'ai testé différent optimiseur de scipy pour des raison obscure celui là marche le mieux
a=np.linspace(*bornes,50)
s=[]
for i in a:
s.append(rtss_error_hat(i,As,Qs,Rs,Hs,y))
fig,ax=plt.subplots()
ax.plot(a,np.log(s)) #trace un graphique de l'erreur pour vérifier que l'optimisation s'est bien passée
ax.set_title("choix du niveau de smoothing optimal")
ax.set_xlabel("logratio")
ax.set_ylabel("logerreur")
ax.axvline(x=opt.x,color="red")
print(opt)
return np.exp(opt.x)
#definition du modèle
#même modèle d'évolution que dans l'article : https://users.aalto.fi/~ssarkka/pub/cats-final.pdf
#le modèle me plait car je pense que l'opinion est lisse, et il est assez flexible malgré tout. Expérimentalement il marche très bien sur des données simulées.
def Q_t(dt):
Q=np.matrix([
[(dt**3)/3,(dt**2)/2],
[(dt**2)/2,dt]
])
return (Q)
def A_t(dt):
A=np.matrix([
[1.0,dt ],
[0,1.0 ],
])
return (A)
def def_model1(t,sample_size,y):
N=len(t)
Qs=np.full((N,2,2),np.nan)
As=np.full((N,2,2),np.nan)
Hs=np.full((N,1,2),np.nan)
Rs= y*(1-y)/sample_size + (1/sample_size)**2 #variance estimée theorique de la loi binomiale plus un peu de régularisation pour quand p est trop proche de 0 ou 1
H=np.array([[1,0]],dtype="float64")
for k in range(1,N):
dt=(t[k]-t[k-1])
Qs[k-1]=(Q_t(dt))
As[k-1]=(A_t(dt))
Hs[k]=H
return As,Qs,Rs,Hs
def def_modeldiff(t,sample_size,y1,y2):
N=len(t)
Qs=np.full((N,2,2),np.nan)
As=np.full((N,2,2),np.nan)
Hs=np.full((N,1,2),np.nan)
Rs= (y1*(1-y1) + y2*(1-y2)) /sample_size + (1/sample_size)**2 #variance estimée theorique de la loi binomiale plus un peu de régularisation pour quand p est trop proche de 0 ou 1
H=np.array([[1,0]],dtype="float64")
for k in range(1,N):
dt=(t[k]-t[k-1])
Qs[k-1]=(Q_t(dt))
As[k-1]=(A_t(dt))
Hs[k]=H
return As,Qs,Rs,Hs
def estim_niv_R_Q(As,Qs,Rs,Hs,y):
N=len(y)
ratio=choose_noise_ratio(As,Qs,Rs,Hs,y) #nivQ/nivR correspond au niveau de lissage. Il est estimé comme l'argument qui minimize l'erreur de validation croisée
(Xs,Ps)=rtss(As,Qs*ratio,Rs,Hs,y)
xest=Xs[:,0]
res_corrected=(xest-y)/np.sqrt(Rs) #On suppose que l'erreur de mesure des sondages est différente à une multiplication par un scalaire près.
niv_R=np.var(res_corrected) #niveau R = multiplicateur de l'erreur de mesure. Il sert à calibrer les intervalles de confiance des prédictions. Si les erreurs de mesures fournies par le modèle sont correcte, il devrait être proche de 1. Cela permet de vérifier que le modèle est calibré.
print("niv_R:",niv_R)
return(ratio,niv_R)
def tracer_avec_marges_derreur(t,y,var_y,fig,ax):
erreurs=1.96*np.sqrt(var_y)
ax.plot(t, y,label="estimation du vote de la population des répondants",zorder=3,color="black")
ax.fill_between(t,y-erreurs,y+erreurs,alpha=0.4,color="darkorange",label="marge d'erreur 95%",zorder=2)
def complet(obst,ns,obsy,predx,fig,ax):
(predy,prederreur)=prediction(obst,ns,obsy,predx)
tracer_avec_marges_derreur(predx,predy,prederreur,fig,ax)
ax.scatter(obst,obsy,marker="+",label="sondages",c="grey",zorder=4)
print("prédiction finale:", obsy[-1]," std erreur finale:",np.sqrt(prederreur[-1]))
return (obsy[-1],np.sqrt(prederreur[-1]))
def prepare(obst,obsy,ns,predx): #honnêtement je me suis tellement cassé la tête c'est quasiment de la magie noire. Mais ça permet de séparer les données des observations de celles avec lesquelles on fait les prédictions. Pour l'algo elles doivent êtré réunies, ce code sert à les placer au bon endroit temporellement, et à se souvenir où on les a mis.
nbobs=len(obst)
unsorted=np.concatenate((obst,predx))
ordrepourtrie=np.argsort(unsorted)
inverse=np.argsort(ordrepourtrie)
x=unsorted[ordrepourtrie]
y=np.full(len(unsorted),np.nan)
ns2=np.full(len(unsorted),np.nan)
obs_tri=inverse[:nbobs]
y[obs_tri]=obsy
ns2[obs_tri]=ns
return (x,y,ns2,inverse)
def getpreds(nbobs,inverse,y,var_y): #obtiens l'ordre original afin de séparer les observations des prédictions
predy=y[inverse][nbobs:]
var_predy=var_y[inverse][nbobs:]
return(predy,var_predy)
def prediction(obst,ns,obsy,predx):
#obst: array de float , dates des sondages doivent être par ordre croissant
#ns: array de int ou float >0 , tailles des sondages
#obsy: array de float entre 0 et 1
#predx : array de float, la date de la première prédiction doit être après celle du premier sondage (sinon bug d'initialisation)
N=len(obst)
(As,Qs,Rs,Hs)=def_model1(obst,ns,obsy)
(ratio,niv_R)=estim_niv_R_Q(As,Qs,Rs,Hs,obsy)
(t2,y2,ns2,reverse)=prepare(obst,obsy,ns,predx)
(As2,Qs2,Rs2,Hs2)=def_model1(t2,ns2,y2)
Rs2_corrected=Rs2*niv_R
Qs2_corrected=ratio*niv_R*Qs2
(Xs,Ps)=rtss(As2,Qs2_corrected,Rs2_corrected,Hs2,y2)
(Xspred,Pspred)=getpreds(N,reverse,Xs,Ps)
return(Xspred[:,0],Pspred[:,0,0]) #renvoie les prédictions aux dates fournies, avec leurs variances estimées
#dans ce modèle factice je fixe le bruit à 1 pour voir si niv_R l'estime correctement.
seed=456
rng=np.random.default_rng(seed)
N=80
t=np.sort(rng.uniform(size=N))
predt=np.linspace(np.min(t)+1e-5,1.1,100) #le 1e-5 pour s'assurer que les endroit où l'on prédit sont APRES les mesures sinon bug
true_value_obst=np.abs(np.sin(2*np.pi*np.sqrt(t)))*0.6+0.1
true_value_predt=np.abs(np.sin(2*np.pi*np.sqrt(predt)))*0.6+0.1
tailles=rng.binomial(100,0.5,size=N)+1 #Tout de même assez puissant qu'avec des échantillon de tailles <200 cela soit aussi précis
y=rng.binomial(tailles,true_value_obst,size=N)/tailles
fig, ax = plt.subplots()
complet(t,tailles,y,predt,fig,ax)
ax.plot(predt,true_value_predt,c="red",label="la véritable opinion qui a produit les observations")
ax.legend()