import numpy as np from casadi import * import matplotlib.pyplot as plt import time # Specify building constants C_INT_SPEC = 10000 # [J/m^2/K] H_UFH_SPEC = 4.4 # [W/m^2/K] V_TS_SPEC = 10 #5 # [l/m^2] V_UFH_SPEC = 5 #5 # [l/m^2] R_SI = 0.13 # [m^2*K/W] RHO_WATER = 997 # Density water [kg/m3] RHO_AIR = 1.225 # Density air [kg/m3] C_WATER_SPEC = 4181 # Spec. heat capacity of water [J/kg/K] C_AIR_SPEC = 1.005 # Spec. heat capacity of air [J/kg/K] # model parameters H_ve= 88 # [W/K], sfh_1969_1978_0_soc H_tr= 493 # [W/K] c_bldg= 45 # [Wh/(m^2K)] Mauerwerk - mittel nach DIN EN ISO 13790 - Tabelle 12 area_floor= 173.2 # [m^2] height_room= 2.5 # [m] volume_air = area_floor * height_room # [m^3] H_ve_tr = H_ve + H_tr # [W/K] H_rad_con = H_UFH_SPEC * area_floor # [W/K] area_internal = 2 * area_floor + 4 * height_room * np.sqrt(area_floor) H_int = 1 / R_SI * area_internal volume_water = (V_TS_SPEC + V_UFH_SPEC) * area_floor / 1000 # [m^3] C_water = RHO_WATER * C_WATER_SPEC * volume_water # [J/K] C_air = RHO_AIR * C_AIR_SPEC * volume_air # [J/K] C_int = C_INT_SPEC * area_floor # [J/K] C_zone = C_air + C_int # [J/K] C_bldg = c_bldg * area_floor * 3600 # [J/K] C_wall = C_bldg - C_zone # [J/K] Qdot_gains = 0 T_amb = 0 mdot_hp = 0.25 # kg/s def shift(T, t0, x0, u, f): # Add noise to the control actions before applying it con_cov = np.diag([1])**2 con = u[:,0].reshape((n_controls,1)) + np.sqrt(con_cov) @ np.random.randn(n_controls,1) st = x0 f_value = f(st, con) st = st + (T * f_value) x0 = np.array(st).flatten() t0 = t0 + T u0 = np.vstack((u[1:, :], u[-1, :])) # Shift the control action return t0, x0, u0 # ----------------------------------------- # Start MPC implementation from here # ----------------------------------------- # dynamic system description N = 24 # prediction horizon T = 3600 # [s] # model states T_room = SX.sym('T_room') T_hp_ret = SX.sym('T_hp_ret') states = vertcat(T_room,T_hp_ret) n_states = states.size()[0] # model controls T_hp_sup = SX.sym('T_hp_sup') controls = vertcat(T_hp_sup) n_controls = controls.size()[0] # model dynamics rhs0 = 1/C_bldg * (Qdot_gains + H_rad_con * (T_hp_ret - T_room) - H_ve_tr * (T_room - T_amb)) # T_room [degC/s] rhs1 = 1/C_water* (mdot_hp * C_WATER_SPEC * (T_hp_sup - T_hp_ret) - H_rad_con * (T_hp_ret - T_room)) # T_hp_ret [degC/s] rhs = vertcat(rhs0,rhs1) # CasADi define model f = Function('f', [states, controls], [rhs]) U = SX.sym('U', n_controls, N) # Decision variables (controls) # parameters (which include at the initial state of the robot and the reference state) P = SX.sym('P', n_states + n_states) # A vector that represents the states over the optimization problem. X = SX.sym('X', n_states, (N+1)) # weighting matrices for objective Q = np.zeros((n_states,n_states)) # weighing matrices (states) Q[0, 0] = 100 Q[1, 1] = 1 R = np.zeros((n_controls,n_controls)) # weighing matrices (controls) R[0, 0] = .1 # objective and constraints for NLP obj = 0 g = [] st = X[:, 0] # initial state g = vertcat(g, st - P[0:n_states]) # initial conditions # multiple shooting discretization for k in range(N): st = X[:, k] con = U[:, k] obj = obj + (st - P[n_states:2*n_states]).T @ Q @ (st - P[n_states:2*n_states]) + con.T @ R @ con st_next = X[:, k+1] f_value = f(st, con) st_next_euler = st + (T * f_value) g = vertcat(g, st_next - st_next_euler) OPT_variables = vertcat(X.reshape((n_states*(N+1), 1)), U.reshape((n_controls*N, 1))) nlp_prob = {'f': obj, 'x': OPT_variables, 'g': g, 'p': P} opts = {'ipopt.max_iter': 2000, 'ipopt.print_level': 0, 'print_time': 0, 'ipopt.acceptable_tol': 1e-8, 'ipopt.acceptable_obj_change_tol': 1e-6} # define solver NLP solver = nlpsol('solver', 'ipopt', nlp_prob, opts) # bounds args = {} args['lbg'] = np.zeros(n_states*(N+1)) args['ubg'] = np.zeros(n_states*(N+1)) args['lbx'] = np.zeros((n_states*(N+1)+n_controls*N)) args['ubx'] = np.ones((n_states*(N+1)+n_controls*N))*60 #args['ubx'][0:n_states*(N+1):n_states] = 60 #args['lbx'][n_states*(N+1)+0:n_states*(N+1)+n_controls*N:n_controls] = 60 t0 = 0 x0 = np.array([15,30]) xs = np.array([20,35]) sim_tim = 5*24*3600 N_iter = int( sim_tim / T) xx = np.zeros((N_iter,n_states)) xx[0,:] = x0 t = np.zeros(N_iter) u0 = np.zeros((N, n_controls)) X0 = np.tile(x0, (N+1, 1)) mpciter = 0 xx1 = [] u_cl = [] # the main MPC simulaton loop... it works as long as the state regulation error is greater # than 10^-6 and the number of mpc steps is less than its maximum value. av=0 while np.linalg.norm(x0 - xs) > 0.05 and mpciter < sim_tim / T-1: tt0 = time.time() args['p'] = np.concatenate((x0, xs)) args['x0'] = np.concatenate((X0.flatten(), u0.flatten())) # solve NLP sol = solver(x0=args['x0'], lbx=args['lbx'], ubx=args['ubx'], lbg=args['lbg'], ubg=args['ubg'], p=args['p']) u = np.reshape(sol['x'][n_states*(N+1):], (N, n_controls)).T xx1.append(np.reshape(sol['x'][:n_states*(N+1)], (N+1, n_states))) if mpciter==0: u_cl=u[:,0] else: u_cl = np.vstack(( u_cl,u[:,0])) t[mpciter] = t0 t0, x0, u0 = shift(T, t0, x0, u, f) xx[mpciter+1, :] = x0 X0 = np.concatenate((X0[1:, :], X0[-1:, :])) mpciter += 1 av+=time.time()-tt0 print('MPC:',mpciter,'iterations, total elapsed time is',av,'s, average time per MPC iteration is',av/mpciter,'s.') ss_error = np.linalg.norm(x0 - xs, 2) xx = xx[:mpciter,:] t = t[:mpciter] # ----------------------------------------- # Plot the MPC simulation results # ----------------------------------------- # plot the ground truth fig, axs = plt.subplots(n_states+n_controls, 1) axs[0].plot(t, xx[:,0], 'b', linewidth=1.) axs[0].set_ylabel('T_room') axs[0].grid(True) axs[1].plot(t, xx[ :,1], 'b', linewidth=1.) axs[1].set_ylabel('T_hp_ret') axs[1].grid(True) axs[2].plot(t, u_cl[:], 'b', linewidth=1.) axs[2].set_ylabel('T_hp_set') axs[2].grid(True) # Synthesize the measurements con_cov = np.diag([.1])**2 meas_cov = np.diag([.5])**2 temp = np.empty((0, 1)) for k in range(xx.shape[0]): temp = np.vstack((temp,xx[k,1]+meas_cov[0, 0] * np.random.randn(1))) y_measurements = temp #np.hstack((r, alpha)) # plot the ground truth measurements VS the noisy measurements fig, axs = plt.subplots(1, 1) axs.plot(t, xx[:,1], 'b', linewidth=1.5) axs.plot(t, temp, 'r', linewidth=1.5) axs.set_ylabel('Temperature sensor') axs.grid(True) axs.legend(['Ground Truth', 'Measurement']) # The two matrices u_cl and y_measurements contain what we know about the system, i.e. # the nominal control actions applied (measured) and the range and bearing # measurements. # --------------------------------------------------------------------- # ----------------------------------------- # Start MHE implementation from here # ----------------------------------------- # Let's now formulate our MHE problem N_MHE = min(10,y_measurements.shape[0]-1) # y_measurements.shape[0]-1 # prediction horizon T = 3600 # [s] # output model outputs = vertcat(T_hp_ret) n_outputss = outputs.size()[0] measurement_rhs = vertcat(T_hp_ret) h = Function('h', [outputs], [measurement_rhs]) # MEASUREMENT MODEL # Decision variables U = SX.sym('U', n_controls, N_MHE) # (controls) X = SX.sym('X', n_states, N_MHE + 1) # (states) [remember multiple shooting] P = SX.sym('P', n_controls, N_MHE + (N_MHE + 1)) # parameters (include measurements as well as controls measurements) V = inv(sqrt(meas_cov)) # weighing matrices (output) y_tilde - y W = inv(sqrt(con_cov)) # weighing matrices (input) u_tilde - u obj = 0 # Objective function g = [] # constraints vector for k in range(N_MHE + 1): st = X[-1, k] h_x = h(st) y_tilde = P[:, k] obj = obj + (y_tilde - h_x).T @ V @ (y_tilde - h_x) # calculate obj for k in range(N_MHE): con = U[:, k] u_tilde = P[:, N_MHE + k] obj = obj + (u_tilde - con).T @ W @ (u_tilde - con) # calculate obj # multiple shooting constraints for k in range(N_MHE): st = X[:, k] con = U[:, k] st_next = X[:, k + 1] f_value = f(st, con) st_next_euler = st + (T*f_value) g = vertcat(g, st_next - st_next_euler) # compute constraints # make the decision variable one column vector OPT_variables = vertcat(reshape(X, n_states*(N_MHE + 1), 1), reshape(U, n_controls*N_MHE, 1)) nlp_mhe = {'f': obj, 'x': OPT_variables, 'g': g, 'p': P} opts = {} opts['ipopt.max_iter'] = 2000 opts['ipopt.print_level'] = 0 # 0,n_states opts['print_time'] = 0 opts['ipopt.acceptable_tol'] = 1e-8 opts['ipopt.acceptable_obj_change_tol'] = 1e-6 solver = nlpsol('solver', 'ipopt', nlp_mhe, opts) args = {} args['lbg'] = np.zeros(n_states*(N_MHE)) args['ubg'] = np.zeros(n_states*(N_MHE)) args['lbx'] = np.zeros(n_states*(N_MHE+1)+n_controls*N_MHE) args['ubx'] = np.ones(n_states*(N_MHE+1)+n_controls*N_MHE)*60 # ---------------------------------------------- # ALL OF THE ABOVE IS JUST A PROBLEM SET UP # MHE Simulation loop starts here # ------------------------------------------ U0 = np.zeros((N_MHE, n_controls)) # two control inputs for each robot X0 = np.zeros((N_MHE+1, n_states)) # initialization of the states decision variables # Start MHE U0 = u_cl[0:N_MHE, :] # initialize the control actions by the measured # initialize the states from the measured range and bearing X0[:, 0:n_states] = np.column_stack((np.ones(N_MHE+1)*xs[0],y_measurements[0:N_MHE+1, 0])) # Get the measurements window and put it as parameters in p args['p'] = np.vstack((y_measurements[:N_MHE+1,:],u_cl[:N_MHE,:])).transpose() # initial value of the optimization variables args['x0'] = np.vstack((X0.reshape(n_states*(N_MHE+1),1),U0.reshape(n_controls*N_MHE,1))) # Solve the optimization problem #sol = solver(x0=args['x0'], lbx=args['lbx'], ubx=args['ubx'], lbg=args['lbg'], ubg=args['ubg'], p=args['p']) # MHE Simulation loop starts here # ------------------------------------------ MHE simulation U0 = np.zeros((N_MHE, n_controls)) # two control inputs for each robot X0 = np.zeros((N_MHE+1, n_states)) # initialization of the states decision variables # initial value of the optimization variables k=0 # Get the measurements window and put it as parameters in p U0 = u_cl[k:k+N_MHE, :] # initialize the control actions by the measured # initialize the states from the measured range and bearing X0[:, 0:2] = np.column_stack(( y_measurements[k:k+N_MHE+1, 0], y_measurements[k:k+N_MHE+1, 0] )) X_estimate = [] U_estimate = [] for k in range(len(y_measurements) - N_MHE): tt1 = time.time() args['p'] = np.vstack((y_measurements[k:k+N_MHE+1,:],u_cl[k:k+N_MHE,:])).transpose() args['x0'] = np.vstack((X0.reshape(n_states*(N_MHE+1),1),U0.reshape(n_controls*N_MHE,1))) sol = solver(x0=args['x0'], lbx=args['lbx'], ubx=args['ubx'], lbg=args['lbg'], ubg=args['ubg'], p=args['p']) U_sol = np.reshape(sol['x'][n_states*(N_MHE+1)::n_controls], (n_controls, N_MHE)).T # get controls only from the solution X_sol = np.column_stack((sol['x'][0:n_states*(N_MHE+1):n_states],sol['x'][1:n_states*(N_MHE+1):n_states])) #np.reshape(sol['x'][:column_stack((n_states*(N_MHE+1)], (n_states, N_MHE+1)).T # get solution TRAJECTORY X_estimate.append(X_sol[-1,:]) U_estimate.append(U_sol[-1,:]) # Shift trajectory to initialize the next step X0 = np.concatenate((X_sol[1:,:], X_sol[-1:,:]), axis=0) U0 = np.concatenate((U_sol[1:,:], U_sol[-1:,:]), axis=0) av+=time.time()-tt1 X_estimate = np.array(X_estimate) U_estimate = np.array(U_estimate) print('MHE:',k+1,'iterations. Total elapsed time is ',av,'s, average time per MHE iteration is ',av/(k+1),'s.') # MHE Simulation loop ends here # ------------------------------------------ # Plot the MHE estimates solution fig, axs = plt.subplots(2, 1) axs[0].plot(t, xx[:,0], 'b', linewidth=1.5) axs[0].set_ylabel('T_room') axs[0].grid(True) axs[1].plot(t, xx[ :,1], 'b', linewidth=1.5) axs[1].set_ylabel('T_hp_ret') axs[1].grid(True) axs[0].plot(t[N_MHE:], X_estimate[:,0], 'g', linewidth=1.5) axs[1].plot(t[N_MHE:], X_estimate[:,-1], 'g', linewidth=1.5) axs[0].legend(['Simulation','MHE']) # Plot the control inputs fig, axs = plt.subplots(1, 1) axs.step(t[:], u_cl[:,0], 'b', linewidth=1.5) axs.step(t[N_MHE:], U_estimate[:], 'g', linewidth=1.5) axs.set_ylabel('T_hp_sup') axs.grid(True) axs.legend(['Simulation','MHE']) plt.show()