simulator
import "github.com/umbralcalc/stochadex/pkg/simulator"Package simulator provides the core simulation engine and infrastructure for stochadex simulations. It includes the main simulation loop, state management, partition coordination, and execution control mechanisms.
Key Features:
- Partition-based simulation architecture
- Concurrent execution with goroutine coordination
- State history management and time tracking
- Configurable termination and output conditions
- Flexible timestep control
- Storage and persistence utilities
Architecture: The simulator uses a partition-based approach where simulations are divided into independent partitions that can be executed concurrently. Each partition maintains its own state history and can communicate with other partitions through defined interfaces.
Usage Patterns:
- Configure and run complex multi-partition simulations
- Manage simulation state across multiple timesteps
- Coordinate concurrent execution of simulation components
- Store and retrieve simulation results and intermediate states
- Implement custom termination and output conditions
Index
- Variables
- func RunWithHarnesses(settings *Settings, implementations *Implementations) error
- type ConfigGenerator
- func NewConfigGenerator() *ConfigGenerator
- func (c *ConfigGenerator) GenerateConfigs() (*Settings, *Implementations)
- func (c *ConfigGenerator) GetGlobalSeed() uint64
- func (c *ConfigGenerator) GetPartition(name string) *PartitionConfig
- func (c *ConfigGenerator) GetSimulation() *SimulationConfig
- func (c *ConfigGenerator) ResetPartition(name string, config *PartitionConfig)
- func (c *ConfigGenerator) SetGlobalSeed(seed uint64)
- func (c *ConfigGenerator) SetPartition(config *PartitionConfig)
- func (c *ConfigGenerator) SetSimulation(config *SimulationConfig)
- type ConstantTimestepFunction
- type CumulativeTimestepsHistory
- type DownstreamStateValues
- type EveryNStepsOutputCondition
- type EveryStepOutputCondition
- type ExponentialDistributionTimestepFunction
- type Implementations
- type Iteration
- type IterationSettings
- type IterationTestHarness
- type IteratorInputMessage
- type
JsonLogChannelOutputFunction
- func NewJsonLogChannelOutputFunction(filePath string) *JsonLogChannelOutputFunction
- func (j *JsonLogChannelOutputFunction) Close()
- func (j *JsonLogChannelOutputFunction) Configure(*Settings)
- func (j *JsonLogChannelOutputFunction) Output(partitionName string, state []float64, cumulativeTimesteps float64)
- type JsonLogEntry
- type JsonLogOutputFunction
- type NamedPartitionIndex
- type NamedUpstreamConfig
- type NilOutputCondition
- type NilOutputFunction
- type NumberOfStepsTerminationCondition
- type OnlyGivenPartitionsOutputCondition
- type OutputCondition
- type OutputFunction
- type Params
- func NewParams(params map[string][]float64) Params
- func (p *Params) Get(name string) []float64
- func (p *Params) GetCopy(name string) []float64
- func (p *Params) GetCopyOk(name string) ([]float64, bool)
- func (p *Params) GetIndex(name string, index int) float64
- func (p *Params) GetOk(name string) ([]float64, bool)
- func (p *Params) Set(name string, values []float64)
- func (p *Params) SetIndex(name string, index int, value float64)
- func (p *Params) SetPartitionName(name string)
- type PartitionConfig
- type PartitionConfigOrdering
- type PartitionCoordinator
- func NewPartitionCoordinator(settings *Settings, implementations *Implementations) *PartitionCoordinator
- func (c *PartitionCoordinator) ReadyToTerminate() bool
- func (c *PartitionCoordinator) RequestMoreIterations(wg *sync.WaitGroup)
- func (c *PartitionCoordinator) Run()
- func (c *PartitionCoordinator) Step(wg *sync.WaitGroup)
- func (c *PartitionCoordinator) UpdateHistory(wg *sync.WaitGroup)
- type PartitionState
- func (*PartitionState) Descriptor() ([]byte, []int)
- func (x *PartitionState) GetCumulativeTimesteps() float64
- func (x *PartitionState) GetPartitionName() string
- func (x *PartitionState) GetState() []float64
- func (*PartitionState) ProtoMessage()
- func (x *PartitionState) ProtoReflect() protoreflect.Message
- func (x *PartitionState) Reset()
- func (x *PartitionState) String() string
- type Settings
- type SimulationConfig
- type SimulationConfigStrings
- type StateHistory
- type StateIterator
- func NewStateIterator(iteration Iteration, params Params, partitionName string, partitionIndex int, valueChannels StateValueChannels, outputCondition OutputCondition, outputFunction OutputFunction, initState []float64, timestepsHistory *CumulativeTimestepsHistory) *StateIterator
- func (s *StateIterator) Iterate(stateHistories []*StateHistory, timestepsHistory *CumulativeTimestepsHistory) []float64
- func (s *StateIterator) ReceiveAndIteratePending(inputChannel <-chan *IteratorInputMessage)
- func (s *StateIterator) UpdateHistory(inputChannel <-chan *IteratorInputMessage)
- type StateTimeStorage
- func NewStateTimeStorage() *StateTimeStorage
- func (s *StateTimeStorage) Append(name string, time float64, values []float64)
- func (s *StateTimeStorage) AppendByIndex(index int, time float64, values []float64)
- func (s *StateTimeStorage) GetIndex(name string) int
- func (s *StateTimeStorage) GetNames() []string
- func (s *StateTimeStorage) GetTimes() []float64
- func (s *StateTimeStorage) GetValues(name string) [][]float64
- func (s *StateTimeStorage) IndexOf(name string) (int, bool)
- func (s *StateTimeStorage) PreRegisterPartitions(names []string)
- func (s *StateTimeStorage) SetTimes(times []float64)
- func (s *StateTimeStorage) SetValues(name string, values [][]float64)
- type StateTimeStorageOutputFunction
- type StateValueChannels
- type StdoutOutputFunction
- type TerminationCondition
- type TimeElapsedTerminationCondition
- type TimestepFunction
- type UpstreamConfig
- type UpstreamStateValues
- type WebsocketOutputFunction
Variables
var File_cmd_messages_partition_state_proto protoreflect.FileDescriptorfunc RunWithHarnesses
func RunWithHarnesses(settings *Settings, implementations *Implementations) errorRunWithHarnesses runs all iterations, each wrapped in a test harness and returns any errors if found. The simulation is also run twice to check for statefulness residues.
type ConfigGenerator
ConfigGenerator builds Settings and Implementations programmatically and can generate runnable configs on demand.
type ConfigGenerator struct {
// contains filtered or unexported fields
}func NewConfigGenerator
func NewConfigGenerator() *ConfigGeneratorNewConfigGenerator creates a new ConfigGenerator with empty ordering.
func (*ConfigGenerator) GenerateConfigs
func (c *ConfigGenerator) GenerateConfigs() (*Settings, *Implementations)GenerateConfigs constructs Settings and Implementations ready to run. It computes state widths, converts named references, and configures iterations with their partition indices.
func (*ConfigGenerator) GetGlobalSeed
func (c *ConfigGenerator) GetGlobalSeed() uint64GetGlobalSeed returns the current global seed.
func (*ConfigGenerator) GetPartition
func (c *ConfigGenerator) GetPartition(name string) *PartitionConfigGetPartition retrieves a partition config by name.
func (*ConfigGenerator) GetSimulation
func (c *ConfigGenerator) GetSimulation() *SimulationConfigGetSimulation returns the current simulation config.
func (*ConfigGenerator) ResetPartition
func (c *ConfigGenerator) ResetPartition(name string, config *PartitionConfig)ResetPartition replaces the config for a partition by name.
func (*ConfigGenerator) SetGlobalSeed
func (c *ConfigGenerator) SetGlobalSeed(seed uint64)SetGlobalSeed assigns a random seed to each partition derived from the provided global seed.
func (*ConfigGenerator) SetPartition
func (c *ConfigGenerator) SetPartition(config *PartitionConfig)SetPartition adds a new partition config. Names must be unique.
func (*ConfigGenerator) SetSimulation
func (c *ConfigGenerator) SetSimulation(config *SimulationConfig)SetSimulation sets the current simulation config.
type ConstantTimestepFunction
ConstantTimestepFunction uses a fixed stepsize.
type ConstantTimestepFunction struct {
Stepsize float64
}func (*ConstantTimestepFunction) NextIncrement
func (t *ConstantTimestepFunction) NextIncrement(timestepsHistory *CumulativeTimestepsHistory) float64type CumulativeTimestepsHistory
CumulativeTimestepsHistory is a rolling window of cumulative timesteps with NextIncrement and CurrentStepNumber.
type CumulativeTimestepsHistory struct {
NextIncrement float64
Values *mat.VecDense
CurrentStepNumber int
StateHistoryDepth int
}type DownstreamStateValues
DownstreamStateValues contains information to broadcast state values to downstream iterators via channel.
type DownstreamStateValues struct {
Channel chan []float64
Copies int
}type EveryNStepsOutputCondition
EveryNStepsOutputCondition emits output once every N steps.
type EveryNStepsOutputCondition struct {
N int
}func (*EveryNStepsOutputCondition) IsOutputStep
func (c *EveryNStepsOutputCondition) IsOutputStep(partitionName string, state []float64, timestepsHistory *CumulativeTimestepsHistory) booltype EveryStepOutputCondition
EveryStepOutputCondition calls the OutputFunction at every step.
type EveryStepOutputCondition struct{}func (*EveryStepOutputCondition) IsOutputStep
func (c *EveryStepOutputCondition) IsOutputStep(partitionName string, state []float64, timestepsHistory *CumulativeTimestepsHistory) booltype ExponentialDistributionTimestepFunction
ExponentialDistributionTimestepFunction draws dt from an exponential distribution parameterised by Mean and Seed.
type ExponentialDistributionTimestepFunction struct {
Mean float64
Seed uint64
// contains filtered or unexported fields
}func NewExponentialDistributionTimestepFunction
func NewExponentialDistributionTimestepFunction(mean float64, seed uint64) *ExponentialDistributionTimestepFunctionNewExponentialDistributionTimestepFunction constructs an exponential-dt timestep function given mean and seed.
func (*ExponentialDistributionTimestepFunction) NextIncrement
func (t *ExponentialDistributionTimestepFunction) NextIncrement(timestepsHistory *CumulativeTimestepsHistory) float64type Implementations
Implementations provides concrete implementations for a simulation run.
type Implementations struct {
Iterations []Iteration
OutputCondition OutputCondition
OutputFunction OutputFunction
TerminationCondition TerminationCondition
TimestepFunction TimestepFunction
}type Iteration
Iteration defines the interface for per-partition state update functions in stochadex simulations.
The Iteration interface is the fundamental building block for defining how simulation state evolves over time. Each partition in a simulation uses an Iteration to compute its next state values based on the current state, parameters, and time information.
Design Philosophy: The Iteration interface emphasizes modularity and composability. By providing a simple, well-defined interface, it enables the creation of complex simulations through the combination of simple, focused iterations. This design supports both built-in iteration types and custom user-defined iterations.
Interface Methods:
- Configure: Initialize the iteration with simulation settings (called once)
- Iterate: Compute the next state values (called each simulation step)
Configuration Phase: Configure is called once per partition during simulation setup. It receives:
- partitionIndex: The index of this partition in the simulation
- settings: Global simulation settings and configuration
This phase is used for:
- Initializing random number generators
- Setting up internal data structures
- Configuring iteration-specific parameters
- Validating configuration parameters
Iteration Phase: Iterate is called each simulation step to compute the next state values. It receives:
- params: Current simulation parameters for this partition
- partitionIndex: The index of this partition
- stateHistories: State histories for all partitions (for cross-partition access)
- timestepsHistory: Time and timestep information
It must return:
- []float64: The next state values for this partition
Implementation Requirements:
- Configure must be called before Iterate
- Iterate must return a slice of the correct length (matching state width)
- Iterate should not modify the input parameters or state histories
- Iterate should be deterministic given the same inputs and initial seed (for reproducible simulations)
Example Usage:
type MyIteration struct {
// Internal state
}
func (m *MyIteration) Configure(partitionIndex int, settings *Settings) {
// Initialize iteration
}
func (m *MyIteration) Iterate(params *Params, partitionIndex int,
stateHistories []*StateHistory,
timestepsHistory *CumulativeTimestepsHistory) []float64 {
// Compute next state values
return []float64{newValue1, newValue2, ...}
}Common Iteration Types:
- Stochastic processes: WienerProcessIteration, PoissonProcessIteration
- Deterministic functions: ValuesFunctionIteration, ConstantValuesIteration
- Aggregation functions: VectorMeanIteration, GroupedAggregationIteration
- User-defined iterations: Custom implementations for specific needs
Performance Considerations:
- Iterate is called frequently during simulation execution
- Implementations should be optimized for performance
- Avoid expensive computations or memory allocations in Iterate
- Consider caching expensive computations in Configure
Thread Safety:
- Iterate may be called concurrently from multiple goroutines
- Implementations should be thread-safe or stateless
type Iteration interface {
Configure(partitionIndex int, settings *Settings)
Iterate(
params *Params,
partitionIndex int,
stateHistories []*StateHistory,
timestepsHistory *CumulativeTimestepsHistory,
) []float64
}type IterationSettings
IterationSettings is the YAML-loadable per-partition configuration.
Usage hints:
- Name is used to address partitions in other configs and params maps.
- ParamsFromUpstream forwards outputs from upstream partitions into Params.
- StateWidth and StateHistoryDepth control the size and depth of state.
type IterationSettings struct {
Name string `yaml:"name"`
Params Params `yaml:"params"`
ParamsFromUpstream map[string]UpstreamConfig `yaml:"params_from_upstream,omitempty"`
InitStateValues []float64 `yaml:"init_state_values"`
Seed uint64 `yaml:"seed"`
StateWidth int `yaml:"state_width"`
StateHistoryDepth int `yaml:"state_history_depth"`
}type IterationTestHarness
IterationTestHarness wraps an iteration and performs checks on its behaviour while running.
type IterationTestHarness struct {
Iteration Iteration
Err error
// contains filtered or unexported fields
}func (*IterationTestHarness) Configure
func (h *IterationTestHarness) Configure(partitionIndex int, settings *Settings)func (*IterationTestHarness) Iterate
func (h *IterationTestHarness) Iterate(params *Params, partitionIndex int, stateHistories []*StateHistory, timestepsHistory *CumulativeTimestepsHistory) []float64type IteratorInputMessage
IteratorInputMessage carries shared histories into iterator jobs.
type IteratorInputMessage struct {
StateHistories []*StateHistory
TimestepsHistory *CumulativeTimestepsHistory
}type JsonLogChannelOutputFunction
JsonLogChannelOutputFunction writes JSON log entries via a background goroutine using a channel for improved throughput.
type JsonLogChannelOutputFunction struct {
// contains filtered or unexported fields
}func NewJsonLogChannelOutputFunction
func NewJsonLogChannelOutputFunction(filePath string) *JsonLogChannelOutputFunctionNewJsonLogChannelOutputFunction creates a JsonLogChannelOutputFunction. Call Close (defer it) to ensure flushing at the end of a run.
func (*JsonLogChannelOutputFunction) Close
func (j *JsonLogChannelOutputFunction) Close()Close flushes and stops the background writer. Defer it after construction.
func (*JsonLogChannelOutputFunction) Configure
func (j *JsonLogChannelOutputFunction) Configure(*Settings)func (*JsonLogChannelOutputFunction) Output
func (j *JsonLogChannelOutputFunction) Output(partitionName string, state []float64, cumulativeTimesteps float64)type JsonLogEntry
JsonLogEntry is the serialised record format used by JSON log outputs.
type JsonLogEntry struct {
PartitionName string `json:"partition_name"`
State []float64 `json:"state"`
CumulativeTimesteps float64 `json:"time"`
}type JsonLogOutputFunction
JsonLogOutputFunction writes newline-delimited JSON log entries.
type JsonLogOutputFunction struct {
// contains filtered or unexported fields
}func NewJsonLogOutputFunction
func NewJsonLogOutputFunction(filePath string) *JsonLogOutputFunctionNewJsonLogOutputFunction creates a new JsonLogOutputFunction.
func (*JsonLogOutputFunction) Configure
func (j *JsonLogOutputFunction) Configure(*Settings)func (*JsonLogOutputFunction) Output
func (j *JsonLogOutputFunction) Output(partitionName string, state []float64, cumulativeTimesteps float64)type NamedPartitionIndex
NamedPartitionIndex pairs the name of a partition with the partition index assigned to it by the PartitionCoordinator.
type NamedPartitionIndex struct {
Name string
Index int
}type NamedUpstreamConfig
NamedUpstreamConfig is like UpstreamConfig but refers to upstream by name.
type NamedUpstreamConfig struct {
Upstream string `yaml:"upstream"`
Indices []int `yaml:"indices,omitempty"`
}type NilOutputCondition
NilOutputCondition never outputs.
type NilOutputCondition struct{}func (*NilOutputCondition) IsOutputStep
func (c *NilOutputCondition) IsOutputStep(partitionName string, state []float64, timestepsHistory *CumulativeTimestepsHistory) booltype NilOutputFunction
NilOutputFunction outputs nothing from the simulation.
type NilOutputFunction struct{}func (*NilOutputFunction) Configure
func (f *NilOutputFunction) Configure(*Settings)func (*NilOutputFunction) Output
func (f *NilOutputFunction) Output(partitionName string, state []float64, cumulativeTimesteps float64)type NumberOfStepsTerminationCondition
NumberOfStepsTerminationCondition terminates after MaxNumberOfSteps.
type NumberOfStepsTerminationCondition struct {
MaxNumberOfSteps int
}func (*NumberOfStepsTerminationCondition) Terminate
func (t *NumberOfStepsTerminationCondition) Terminate(stateHistories []*StateHistory, timestepsHistory *CumulativeTimestepsHistory) booltype OnlyGivenPartitionsOutputCondition
OnlyGivenPartitionsOutputCondition emits output only for listed partitions.
type OnlyGivenPartitionsOutputCondition struct {
Partitions map[string]bool
}func (*OnlyGivenPartitionsOutputCondition) IsOutputStep
func (o *OnlyGivenPartitionsOutputCondition) IsOutputStep(partitionName string, state []float64, timestepsHistory *CumulativeTimestepsHistory) booltype OutputCondition
OutputCondition decides whether an output should be emitted this step.
type OutputCondition interface {
IsOutputStep(partitionName string, state []float64, timestepsHistory *CumulativeTimestepsHistory) bool
}type OutputFunction
OutputFunction writes state/time to an output sink when the OutputCondition is met.
Configure is called once before parallel output begins (from NewPartitionCoordinator). Use it to pre-register partition names, cache indices, or open resources. Implementations that need no setup can leave it empty.
type OutputFunction interface {
Configure(settings *Settings)
Output(partitionName string, state []float64, cumulativeTimesteps float64)
}type Params
Params stores per-partition parameter values.
Usage hints:
- Use Get/GetIndex helpers to retrieve, Set/SetIndex to update.
- SetPartitionName improves error messages for missing params.
type Params struct {
Map map[string][]float64 `yaml:",inline"`
// contains filtered or unexported fields
}func NewParams
func NewParams(params map[string][]float64) ParamsNewParams constructs a Params instance.
func (*Params) Get
func (p *Params) Get(name string) []float64Get returns parameter values or panics with a helpful message.
func (*Params) GetCopy
func (p *Params) GetCopy(name string) []float64GetCopy returns a copy of parameter values or panics with a helpful message.
func (*Params) GetCopyOk
func (p *Params) GetCopyOk(name string) ([]float64, bool)GetCopyOk returns a copy of parameter values if present along with a flag.
func (*Params) GetIndex
func (p *Params) GetIndex(name string, index int) float64GetIndex returns a single parameter value or panics.
func (*Params) GetOk
func (p *Params) GetOk(name string) ([]float64, bool)GetOk returns parameter values if present along with a boolean flag.
func (*Params) Set
func (p *Params) Set(name string, values []float64)Set creates or updates parameter values by name.
func (*Params) SetIndex
func (p *Params) SetIndex(name string, index int, value float64)SetIndex updates a single parameter value or panics on invalid index.
func (*Params) SetPartitionName
func (p *Params) SetPartitionName(name string)SetPartitionName attaches the owning partition name for better errors.
type PartitionConfig
PartitionConfig defines a partition to add to a simulation.
Usage hints:
- Iteration is not YAML-serialised; set it programmatically.
- ParamsAsPartitions allows passing partition indices via their names.
- ParamsFromUpstream forwards outputs from named upstream partitions.
type PartitionConfig struct {
Name string `yaml:"name"`
Iteration Iteration `yaml:"-"`
Params Params `yaml:"params"`
ParamsAsPartitions map[string][]string `yaml:"params_as_partitions,omitempty"`
ParamsFromUpstream map[string]NamedUpstreamConfig `yaml:"params_from_upstream,omitempty"`
InitStateValues []float64 `yaml:"init_state_values"`
StateHistoryDepth int `yaml:"state_history_depth"`
Seed uint64 `yaml:"seed"`
}func LoadPartitionConfigFromYaml
func LoadPartitionConfigFromYaml(path string) *PartitionConfigLoadPartitionConfigFromYaml loads PartitionConfig from a YAML file path.
Usage hints:
- Calls Init to populate missing defaults after unmarshalling.
func (*PartitionConfig) Init
func (p *PartitionConfig) Init()Init ensures params maps are initialised; call after unmarshalling YAML.
type PartitionConfigOrdering
PartitionConfigOrdering maintains the ordering and lookup for partitions. Can be updated dynamically via Append.
type PartitionConfigOrdering struct {
Names []string
IndexByName map[string]int
ConfigByName map[string]*PartitionConfig
}func (*PartitionConfigOrdering) Append
func (p *PartitionConfigOrdering) Append(config *PartitionConfig)Append inserts another partition into the ordering and updates lookups.
type PartitionCoordinator
PartitionCoordinator orchestrates iteration work across partitions and applies state/time history updates in a coordinated manner.
The PartitionCoordinator is the central component that manages the execution of all partitions in a simulation. It coordinates the timing, communication, and state updates across all partitions, ensuring proper synchronization and maintaining simulation consistency.
Architecture: The coordinator uses a two-phase execution model:
- Iteration Phase: All partitions compute their next state values
- Update Phase: State and time histories are updated with new values
This design ensures that all partitions see consistent state information during each iteration, preventing race conditions and maintaining simulation determinism.
Concurrency Model:
- Each partition runs in its own goroutine for parallel execution
- Channels are used for inter-partition communication
- WaitGroups ensure proper synchronization between phases
- Shared state is protected by the coordinator’s control flow
Execution Flow:
- Compute next timestep increment using TimestepFunction
- Request iterations from all partitions (parallel execution)
- Wait for all iterations to complete
- Update state and time histories (parallel execution)
- Check termination condition
- Repeat until termination
Fields:
- Iterators: List of StateIterators, one per partition
- Shared: Shared state and time information accessible to all partitions
- TimestepFunction: Function that determines the next timestep increment
- TerminationCondition: Condition that determines when to stop the simulation
- newWorkChannels: Communication channels for coordinating partition work
Example Usage:
coordinator := NewPartitionCoordinator(settings, implementations)
// Run simulation until termination
coordinator.Run()
// Or step-by-step control
for !coordinator.ReadyToTerminate() {
var wg sync.WaitGroup
coordinator.Step(&wg)
}Performance:
- O(p) time complexity where p is the number of partitions
- Parallel execution of partition iterations
- Efficient channel-based communication
- Memory usage scales with partition count and state size
Thread Safety:
- Safe for concurrent access to coordinator methods
- Internal synchronization ensures consistent state updates
- Partition communication is thread-safe through channels
type PartitionCoordinator struct {
Iterators []*StateIterator
Shared *IteratorInputMessage
TimestepFunction TimestepFunction
TerminationCondition TerminationCondition
// contains filtered or unexported fields
}func NewPartitionCoordinator
func NewPartitionCoordinator(settings *Settings, implementations *Implementations) *PartitionCoordinatorNewPartitionCoordinator wires Settings and Implementations into a runnable coordinator with initial state/time histories and channels.
func (*PartitionCoordinator) ReadyToTerminate
func (c *PartitionCoordinator) ReadyToTerminate() boolReadyToTerminate returns whether the TerminationCondition is met.
func (*PartitionCoordinator) RequestMoreIterations
func (c *PartitionCoordinator) RequestMoreIterations(wg *sync.WaitGroup)RequestMoreIterations spawns a goroutine per partition to run ReceiveAndIteratePending.
func (*PartitionCoordinator) Run
func (c *PartitionCoordinator) Run()Run advances by repeatedly calling Step until termination.
func (*PartitionCoordinator) Step
func (c *PartitionCoordinator) Step(wg *sync.WaitGroup)Step performs one simulation tick: compute dt, request iterations, then apply state/time updates.
func (*PartitionCoordinator) UpdateHistory
func (c *PartitionCoordinator) UpdateHistory(wg *sync.WaitGroup)UpdateHistory spawns a goroutine per partition to run UpdateHistory and shifts time history forward, adding NextIncrement to t[0].
type PartitionState
type PartitionState struct {
CumulativeTimesteps float64 `protobuf:"fixed64,1,opt,name=cumulative_timesteps,json=cumulativeTimesteps,proto3" json:"cumulative_timesteps,omitempty"`
PartitionName string `protobuf:"bytes,2,opt,name=partition_name,json=partitionName,proto3" json:"partition_name,omitempty"`
State []float64 `protobuf:"fixed64,3,rep,packed,name=state,proto3" json:"state,omitempty"`
// contains filtered or unexported fields
}func (*PartitionState) Descriptor
func (*PartitionState) Descriptor() ([]byte, []int)Deprecated: Use PartitionState.ProtoReflect.Descriptor instead.
func (*PartitionState) GetCumulativeTimesteps
func (x *PartitionState) GetCumulativeTimesteps() float64func (*PartitionState) GetPartitionName
func (x *PartitionState) GetPartitionName() stringfunc (*PartitionState) GetState
func (x *PartitionState) GetState() []float64func (*PartitionState) ProtoMessage
func (*PartitionState) ProtoMessage()func (*PartitionState) ProtoReflect
func (x *PartitionState) ProtoReflect() protoreflect.Messagefunc (*PartitionState) Reset
func (x *PartitionState) Reset()func (*PartitionState) String
func (x *PartitionState) String() stringtype Settings
Settings is the YAML-loadable top-level simulation configuration.
type Settings struct {
Iterations []IterationSettings `yaml:"iterations"`
InitTimeValue float64 `yaml:"init_time_value"`
TimestepsHistoryDepth int `yaml:"timesteps_history_depth"`
}func LoadSettingsFromYaml
func LoadSettingsFromYaml(path string) *SettingsLoadSettingsFromYaml loads Settings from a YAML file path.
Usage hints:
- Calls Init to populate missing defaults after unmarshalling.
func (*Settings) Init
func (s *Settings) Init()Init fills in defaults and ensures maps are initialised. Call immediately after unmarshalling from YAML.
type SimulationConfig
SimulationConfig defines additional run-level configuration.
type SimulationConfig struct {
OutputCondition OutputCondition
OutputFunction OutputFunction
TerminationCondition TerminationCondition
TimestepFunction TimestepFunction
InitTimeValue float64
}type SimulationConfigStrings
SimulationConfigStrings is the YAML-loadable version of SimulationConfig, referring to implementations by type names for templating.
type SimulationConfigStrings struct {
OutputCondition string `yaml:"output_condition"`
OutputFunction string `yaml:"output_function"`
TerminationCondition string `yaml:"termination_condition"`
TimestepFunction string `yaml:"timestep_function"`
InitTimeValue float64 `yaml:"init_time_value"`
}func LoadSimulationConfigStringsFromYaml
func LoadSimulationConfigStringsFromYaml(path string) *SimulationConfigStringsLoadSimulationConfigStringsFromYaml loads SimulationConfigStrings from YAML.
type StateHistory
StateHistory is a rolling window of state vectors.
Usage hints:
- Values holds rows of state (row 0 is most recent by convention).
- Use GetNextStateRowToUpdate when updating in multi-row histories.
type StateHistory struct {
// each row is a different state in the history, by convention,
// starting with the most recent at index = 0
Values *mat.Dense
// should be of length = StateWidth
NextValues []float64
StateWidth int
StateHistoryDepth int
}func (*StateHistory) CopyStateRow
func (s *StateHistory) CopyStateRow(index int) []float64CopyStateRow copies a row from the state history given the index.
func (*StateHistory) GetNextStateRowToUpdate
func (s *StateHistory) GetNextStateRowToUpdate() []float64GetNextStateRowToUpdate determines whether or not it is necessary to copy the previous row or simply expose it based on whether a history longer than 1 is needed.
type StateIterator
StateIterator runs an Iteration for a partition on a goroutine and manages reads/writes to history and output.
type StateIterator struct {
Iteration Iteration
Params Params
Partition NamedPartitionIndex
ValueChannels StateValueChannels
OutputCondition OutputCondition
OutputFunction OutputFunction
}func NewStateIterator
func NewStateIterator(iteration Iteration, params Params, partitionName string, partitionIndex int, valueChannels StateValueChannels, outputCondition OutputCondition, outputFunction OutputFunction, initState []float64, timestepsHistory *CumulativeTimestepsHistory) *StateIteratorNewStateIterator creates a StateIterator and may emit initial output if the condition is met by the initial state/time.
func (*StateIterator) Iterate
func (s *StateIterator) Iterate(stateHistories []*StateHistory, timestepsHistory *CumulativeTimestepsHistory) []float64Iterate runs the Iteration and optionally triggers output if the condition is met for the new state/time.
func (*StateIterator) ReceiveAndIteratePending
func (s *StateIterator) ReceiveAndIteratePending(inputChannel <-chan *IteratorInputMessage)ReceiveAndIteratePending listens for an IteratorInputMessage, updates upstream-driven params, runs Iterate, and stores a pending state update.
func (*StateIterator) UpdateHistory
func (s *StateIterator) UpdateHistory(inputChannel <-chan *IteratorInputMessage)UpdateHistory applies the pending state update to the partition history.
type StateTimeStorage
StateTimeStorage stores simulation time series data organised by partition name.
Two append paths serve different use cases:
- AppendByIndex — the simulation hot path. Lock-free. Requires all names pre-registered via PreRegisterPartitions (done automatically by NewPartitionCoordinator), one goroutine per partition index, and no concurrent reads during output.
- Append — for single-goroutine data loading (CSV, JSON log, database). Not safe for concurrent use.
GetValues, GetTimes, GetNames, SetValues, SetTimes and the registration methods (PreRegisterPartitions, GetIndex, IndexOf) are all intended for single-goroutine setup or post-simulation use.
The only internal synchronisation that remains is a mutex guarding the shared times slice, since N partition goroutines may all call appendTimeIfNew with the same timestamp; an atomic fast-path skips the mutex in the common case where the timestamp is already recorded.
type StateTimeStorage struct {
// contains filtered or unexported fields
}func NewStateTimeStorage
func NewStateTimeStorage() *StateTimeStorageNewStateTimeStorage constructs a new StateTimeStorage.
func (*StateTimeStorage) Append
func (s *StateTimeStorage) Append(name string, time float64, values []float64)Append appends values for name and records time. Not safe for concurrent use; intended for single-goroutine data loading.
func (*StateTimeStorage) AppendByIndex
func (s *StateTimeStorage) AppendByIndex(index int, time float64, values []float64)AppendByIndex appends values for a pre-registered partition index and records time at most once per unique timestamp.
Lock-free for the store. Preconditions (all hold under normal coordinator use):
- All names pre-registered via PreRegisterPartitions
- One goroutine per partition index
- No concurrent GetValues calls
func (*StateTimeStorage) GetIndex
func (s *StateTimeStorage) GetIndex(name string) intGetIndex returns or creates the index for name.
func (*StateTimeStorage) GetNames
func (s *StateTimeStorage) GetNames() []stringGetNames returns all registered partition names.
func (*StateTimeStorage) GetTimes
func (s *StateTimeStorage) GetTimes() []float64GetTimes returns a snapshot of the time axis.
func (*StateTimeStorage) GetValues
func (s *StateTimeStorage) GetValues(name string) [][]float64GetValues returns a snapshot of all time series rows for name, panicking if absent.
func (*StateTimeStorage) IndexOf
func (s *StateTimeStorage) IndexOf(name string) (int, bool)IndexOf returns the index and true if name is registered, or 0 and false.
func (*StateTimeStorage) PreRegisterPartitions
func (s *StateTimeStorage) PreRegisterPartitions(names []string)PreRegisterPartitions ensures each name has a stable index and an empty row buffer before AppendByIndex is called concurrently. Idempotent.
func (*StateTimeStorage) SetTimes
func (s *StateTimeStorage) SetTimes(times []float64)SetTimes replaces the time axis.
func (*StateTimeStorage) SetValues
func (s *StateTimeStorage) SetValues(name string, values [][]float64)SetValues replaces the entire series for name.
type StateTimeStorageOutputFunction
StateTimeStorageOutputFunction stores output into StateTimeStorage when the condition is met.
type StateTimeStorageOutputFunction struct {
Store *StateTimeStorage
// contains filtered or unexported fields
}func (*StateTimeStorageOutputFunction) Configure
func (f *StateTimeStorageOutputFunction) Configure(settings *Settings)Configure pre-registers all partition names on Store and caches their indices for lock-free lookup in Output. Safe to call multiple times.
func (*StateTimeStorageOutputFunction) Output
func (f *StateTimeStorageOutputFunction) Output(partitionName string, state []float64, cumulativeTimesteps float64)type StateValueChannels
StateValueChannels provides upstream/downstream channels for inter-iterator communication.
type StateValueChannels struct {
Upstreams map[string]*UpstreamStateValues
Downstream *DownstreamStateValues
}func (*StateValueChannels) BroadcastDownstream
func (s *StateValueChannels) BroadcastDownstream(stateValues []float64)BroadcastDownstream sends state values to all configured downstream copies. Each listener receives an independent copy so params wiring cannot mutate a slice shared with other partitions or with the producer’s state buffer.
func (*StateValueChannels) UpdateUpstreamParams
func (s *StateValueChannels) UpdateUpstreamParams(params *Params)UpdateUpstreamParams updates Params with values received from upstream channels.
type StdoutOutputFunction
StdoutOutputFunction outputs the state to the terminal.
type StdoutOutputFunction struct{}func (*StdoutOutputFunction) Configure
func (s *StdoutOutputFunction) Configure(*Settings)func (*StdoutOutputFunction) Output
func (s *StdoutOutputFunction) Output(partitionName string, state []float64, cumulativeTimesteps float64)type TerminationCondition
TerminationCondition decides when the simulation should end.
type TerminationCondition interface {
Terminate(
stateHistories []*StateHistory,
timestepsHistory *CumulativeTimestepsHistory,
) bool
}type TimeElapsedTerminationCondition
TimeElapsedTerminationCondition terminates after MaxTimeElapsed.
type TimeElapsedTerminationCondition struct {
MaxTimeElapsed float64
}func (*TimeElapsedTerminationCondition) Terminate
func (t *TimeElapsedTerminationCondition) Terminate(stateHistories []*StateHistory, timestepsHistory *CumulativeTimestepsHistory) booltype TimestepFunction
TimestepFunction computes the next time increment.
type TimestepFunction interface {
NextIncrement(
timestepsHistory *CumulativeTimestepsHistory,
) float64
}type UpstreamConfig
UpstreamConfig is the YAML-loadable representation of a slice of data from the output of a partition which is computationally upstream.
type UpstreamConfig struct {
Upstream int `yaml:"upstream"`
Indices []int `yaml:"indices,omitempty"`
}type UpstreamStateValues
UpstreamStateValues contains information to receive state values from an upstream iterator via channel.
type UpstreamStateValues struct {
Channel chan []float64
Indices []int
}type WebsocketOutputFunction
WebsocketOutputFunction serialises and sends outputs via a websocket connection when the condition is met.
type WebsocketOutputFunction struct {
// contains filtered or unexported fields
}func NewWebsocketOutputFunction
func NewWebsocketOutputFunction(connection *websocket.Conn, mutex *sync.Mutex) *WebsocketOutputFunctionNewWebsocketOutputFunction constructs a WebsocketOutputFunction with a connection and a mutex for safe concurrent writes.
func (*WebsocketOutputFunction) Configure
func (w *WebsocketOutputFunction) Configure(*Settings)func (*WebsocketOutputFunction) Output
func (w *WebsocketOutputFunction) Output(partitionName string, state []float64, cumulativeTimesteps float64)Generated by gomarkdoc