Add queue_flush_size option

Signed-off-by: Nikolaos Karaolidis <nick@karaolidis.com>
This commit is contained in:
2023-06-30 09:32:28 +03:00
parent 0c23f61e04
commit 00e92cae05
2 changed files with 22 additions and 8 deletions

View File

@@ -38,6 +38,7 @@ type ClickHouse struct {
MultiTableOptions MultiTableOptions `toml:"multi_table"` MultiTableOptions MultiTableOptions `toml:"multi_table"`
QueueInitialSize int `toml:"queue_initial_size"` QueueInitialSize int `toml:"queue_initial_size"`
QueueMaxSize int `toml:"queue_max_size"` QueueMaxSize int `toml:"queue_max_size"`
QueueFlushSize int `toml:"queue_flush_size"`
QueueFlushInterval time.Duration `toml:"queue_flush_interval"` QueueFlushInterval time.Duration `toml:"queue_flush_interval"`
ConnectionMaxIdleTime time.Duration `toml:"connection_max_idle_time"` ConnectionMaxIdleTime time.Duration `toml:"connection_max_idle_time"`
ConnectionMaxLifetime time.Duration `toml:"connection_max_lifetime"` ConnectionMaxLifetime time.Duration `toml:"connection_max_lifetime"`
@@ -87,6 +88,11 @@ func (ch *ClickHouse) Init() error {
ch.Log.Info("queue_max_size is not set, using default value: ", ch.QueueMaxSize) ch.Log.Info("queue_max_size is not set, using default value: ", ch.QueueMaxSize)
} }
if ch.QueueFlushSize <= 0 {
ch.QueueFlushSize = int(math.MaxUint64 >> 1)
ch.Log.Info("queue_flush_size is not set, using default value: ", ch.QueueFlushSize)
}
if ch.QueueFlushInterval <= 0 { if ch.QueueFlushInterval <= 0 {
ch.QueueFlushInterval = 1 * time.Second ch.QueueFlushInterval = 1 * time.Second
ch.Log.Info("queue_flush_interval is not set, using default value: ", ch.QueueFlushInterval) ch.Log.Info("queue_flush_interval is not set, using default value: ", ch.QueueFlushInterval)
@@ -333,9 +339,6 @@ func (ch *ClickHouse) writeMetrics(tablename string, columns *orderedmap.Ordered
return fmt.Errorf("exec failed: %w", err) return fmt.Errorf("exec failed: %w", err)
} }
} }
if err != nil {
return fmt.Errorf("exec failed: %w", err)
}
err = tx.Commit() err = tx.Commit()
if err != nil { if err != nil {
@@ -483,12 +486,18 @@ func (ch *ClickHouse) backgroundWriter(delay time.Duration) {
timer.Reset(delay) timer.Reset(delay)
case <-ch.metricTrigger: case <-ch.metricTrigger:
ch.metricLock.Lock()
metricsLength := len(ch.metricQueue)
ch.metricLock.Unlock()
if metricsLength < ch.QueueFlushSize {
if !timer.Stop() { if !timer.Stop() {
<-timer.C <-timer.C
} }
timer.Reset(delay) timer.Reset(delay)
} }
} }
}
} }
func init() { func init() {

View File

@@ -27,13 +27,18 @@
## Table name prefix ## Table name prefix
# table_prefix = "telegraf" # table_prefix = "telegraf"
## Initial metric queue size, resizes automatically if the queue becomes too large ## Initial metric queue size
## Queue resizes automatically if the queue becomes too large.
# queue_initial_size = 100000 # queue_initial_size = 100000
## Maximum queue size, 0 means unlimited. ## Maximum queue size, 0 means unlimited
## If the queue reaches this size, new writes will be dropped until it drains. ## If the queue reaches this size, new writes will be dropped until it drains.
# queue_max_size = 0 # queue_max_size = 0
## Queue flush size, 0 means unlimited
## The flush interval will not be reset if the queue is larger than this.
# queue_flush_size = 0
## Flush interval for the metric queue ## Flush interval for the metric queue
## The agent waits until N seconds have passed without any writes before flushing metrics to ClickHouse. ## The agent waits until N seconds have passed without any writes before flushing metrics to ClickHouse.
# queue_flush_interval = "1" # queue_flush_interval = "1"