Perform simulation task including data preparation, energy system creation, optimization,
and result processing.
This function is a Celery task that interacts with a database to fetch simulation and
scenario data. It creates an energy system model, optimizes using the oemof library,
processes the results, and updates the database with the results of the simulation.
Detailed actions performed by the function include:
- Fetching scenario and simulation details from the database.
- Preparing necessary directory structures and dumping input data.
- Configuring and initializing the oemof energy system.
- Solving an optimization model using specified solver parameters.
- Writing optimization results to files for further analysis.
- Updating the status of the simulation task in the database.
Parameters:
Name |
Type |
Description |
Default |
scenario_id
|
int
|
Identifier of the scenario to be simulated.
|
required
|
simulation_id
|
int
|
Identifier of the simulation instance.
|
required
|
Returns:
Type |
Description |
NoneType
|
|
Source code in backend/app/celery.py
| @celery_app.task(name="ensys.optimization")
def simulation_task(scenario_id: int, simulation_id: int):
"""
Perform simulation task including data preparation, energy system creation, optimization,
and result processing.
This function is a Celery task that interacts with a database to fetch simulation and
scenario data. It creates an energy system model, optimizes using the oemof library,
processes the results, and updates the database with the results of the simulation.
Detailed actions performed by the function include:
- Fetching scenario and simulation details from the database.
- Preparing necessary directory structures and dumping input data.
- Configuring and initializing the oemof energy system.
- Solving an optimization model using specified solver parameters.
- Writing optimization results to files for further analysis.
- Updating the status of the simulation task in the database.
:param scenario_id: Identifier of the scenario to be simulated.
:type scenario_id: int
:param simulation_id: Identifier of the simulation instance.
:type simulation_id: int
:return: None
:rtype: NoneType
"""
task_counter.inc()
task_in_progress.inc()
db = Session(create_engine(os.getenv("DATABASE_URL")))
scenario = db.get(EnScenarioDB, scenario_id)
simulation = db.get(EnSimulationDB, simulation_id)
simulation_token = simulation.sim_token
dump_path = os.path.join(os.getenv("LOCAL_DATADIR"), simulation_token, "dump")
log_path = os.path.join(os.getenv("LOCAL_DATADIR"), simulation_token, "log")
os.makedirs(dump_path, exist_ok=True)
os.makedirs(log_path, exist_ok=True)
logger = get_task_logger(__name__)
# Create Energysystem to be stored
energysystem_api = scenario.energysystem_model
simulation_model = EnModel(
energysystem=energysystem_api
)
simulation_folder = os.path.abspath(os.path.join(os.getenv("LOCAL_DATADIR"), simulation_token))
os.makedirs(
name=simulation_folder,
exist_ok=True
)
with open(os.path.join(simulation_folder, "es_" + simulation_token + ".json"), "wt") as f:
f.write(simulation_model.model_dump_json())
logger.info("read scenario data from database")
scenario = db.exec(select(EnScenarioDB).where(EnScenarioDB.id == scenario_id)).first()
print(f"Scenario Interval:{scenario.interval}")
print(f"Scenario Timesteps:{scenario.time_steps}")
print(f"Scenario Startdate:{scenario.start_date}")
print(f"Scenario Startdate:{type(scenario.start_date)}")
print(f"Scenario Startdate:{scenario.start_date.year}")
logger.info("create oemof energy system")
timeindex = solph.create_time_index(
start=scenario.start_date,
number=scenario.time_steps,
interval=scenario.interval,
)
print(f"timeindex:{timeindex}")
oemof_es: solph.EnergySystem = solph.EnergySystem(
timeindex=timeindex,
infer_last_interval=False
)
oemof_es = simulation_model.energysystem.to_oemof_energysystem(oemof_es)
# create the model for optimization
logger.info("create simulation model")
oemof_model = solph.Model(oemof_es)
# solve the optimization model
logger.info("solve optimization model")
oemof_model.solve(
solver=str(simulation_model.solver.value),
solve_kwargs=simulation_model.solver_kwargs if hasattr(simulation_model, "solver_kwargs") else {"tee": True},
cmdline_opts={"logfile": os.path.join(log_path, "solver.log")}
)
logger.info("simulation finished")
# write the lp file for specific analysis
logger.info("write lp file")
oemof_model.write(
filename=os.path.join(dump_path, "oemof_model.lp"),
io_options={"symbolic_solver_labels": True}
)
logger.info("collect results")
oemof_es.results["main"] = solph.processing.results(oemof_model)
oemof_es.results["meta"] = solph.processing.meta_results(oemof_model)
logger.info("dump results")
oemof_es.dump(
dpath=dump_path,
filename="oemof_es.dump"
)
logger.info("update database")
simulation.status = Status.FINISHED.value
simulation.end_date = datetime.now()
db.commit()
db.refresh(simulation)
logger.info("backgroundtask finished")
task_in_progress.dec()
|