From 00e92cae05f83d04b3451cde654db5e7ac50c726 Mon Sep 17 00:00:00 2001 From: Nikolaos Karaolidis Date: Fri, 30 Jun 2023 09:32:28 +0300 Subject: [PATCH] Add queue_flush_size option Signed-off-by: Nikolaos Karaolidis --- plugins/outputs/clickhouse/clickhouse.go | 21 +++++++++++++++------ plugins/outputs/clickhouse/sample.conf | 9 +++++++-- 2 files changed, 22 insertions(+), 8 deletions(-) diff --git a/plugins/outputs/clickhouse/clickhouse.go b/plugins/outputs/clickhouse/clickhouse.go index 8e2686a..564b53d 100644 --- a/plugins/outputs/clickhouse/clickhouse.go +++ b/plugins/outputs/clickhouse/clickhouse.go @@ -38,6 +38,7 @@ type ClickHouse struct { MultiTableOptions MultiTableOptions `toml:"multi_table"` QueueInitialSize int `toml:"queue_initial_size"` QueueMaxSize int `toml:"queue_max_size"` + QueueFlushSize int `toml:"queue_flush_size"` QueueFlushInterval time.Duration `toml:"queue_flush_interval"` ConnectionMaxIdleTime time.Duration `toml:"connection_max_idle_time"` ConnectionMaxLifetime time.Duration `toml:"connection_max_lifetime"` @@ -87,6 +88,11 @@ func (ch *ClickHouse) Init() error { ch.Log.Info("queue_max_size is not set, using default value: ", ch.QueueMaxSize) } + if ch.QueueFlushSize <= 0 { + ch.QueueFlushSize = int(math.MaxUint64 >> 1) + ch.Log.Info("queue_flush_size is not set, using default value: ", ch.QueueFlushSize) + } + if ch.QueueFlushInterval <= 0 { ch.QueueFlushInterval = 1 * time.Second ch.Log.Info("queue_flush_interval is not set, using default value: ", ch.QueueFlushInterval) @@ -333,9 +339,6 @@ func (ch *ClickHouse) writeMetrics(tablename string, columns *orderedmap.Ordered return fmt.Errorf("exec failed: %w", err) } } - if err != nil { - return fmt.Errorf("exec failed: %w", err) - } err = tx.Commit() if err != nil { @@ -483,10 +486,16 @@ func (ch *ClickHouse) backgroundWriter(delay time.Duration) { timer.Reset(delay) case <-ch.metricTrigger: - if !timer.Stop() { - <-timer.C + ch.metricLock.Lock() + metricsLength := len(ch.metricQueue) + ch.metricLock.Unlock() + + if metricsLength < ch.QueueFlushSize { + if !timer.Stop() { + <-timer.C + } + timer.Reset(delay) } - timer.Reset(delay) } } } diff --git a/plugins/outputs/clickhouse/sample.conf b/plugins/outputs/clickhouse/sample.conf index 2b7cc3a..3cdf7e0 100644 --- a/plugins/outputs/clickhouse/sample.conf +++ b/plugins/outputs/clickhouse/sample.conf @@ -27,13 +27,18 @@ ## Table name prefix # table_prefix = "telegraf" - ## Initial metric queue size, resizes automatically if the queue becomes too large + ## Initial metric queue size + ## Queue resizes automatically if the queue becomes too large. # queue_initial_size = 100000 - ## Maximum queue size, 0 means unlimited. + ## Maximum queue size, 0 means unlimited ## If the queue reaches this size, new writes will be dropped until it drains. # queue_max_size = 0 + ## Queue flush size, 0 means unlimited + ## The flush interval will not be reset if the queue is larger than this. + # queue_flush_size = 0 + ## Flush interval for the metric queue ## The agent waits until N seconds have passed without any writes before flushing metrics to ClickHouse. # queue_flush_interval = "1"