diff --git a/plugins/outputs/clickhouse/clickhouse.go b/plugins/outputs/clickhouse/clickhouse.go index 564b53d..76bd235 100644 --- a/plugins/outputs/clickhouse/clickhouse.go +++ b/plugins/outputs/clickhouse/clickhouse.go @@ -48,7 +48,7 @@ type ClickHouse struct { db *sql.DB Log telegraf.Logger metricQueue []telegraf.Metric - metricLock sync.Mutex + metricLock sync.RWMutex metricTrigger chan struct{} } @@ -441,12 +441,17 @@ func (ch *ClickHouse) WriteSingleTable(metrics []telegraf.Metric) error { } func (ch *ClickHouse) Write(metrics []telegraf.Metric) error { - ch.metricLock.Lock() - if len(ch.metricQueue) >= ch.QueueMaxSize { - ch.Log.Errorf("Metrics queue is full (%d/%d), dropping metrics", len(ch.metricQueue), ch.QueueMaxSize) - } else { - ch.metricQueue = append(ch.metricQueue, metrics...) + ch.metricLock.RLock() + metricsLength := len(ch.metricQueue) + ch.metricLock.RUnlock() + + if metricsLength >= ch.QueueMaxSize { + ch.Log.Errorf("Metrics queue is full (%d/%d), dropping metrics", metricsLength, ch.QueueMaxSize) + return nil } + + ch.metricLock.Lock() + ch.metricQueue = append(ch.metricQueue, metrics...) ch.metricLock.Unlock() select { @@ -464,12 +469,15 @@ func (ch *ClickHouse) backgroundWriter(delay time.Duration) { for { select { case <-timer.C: - ch.metricLock.Lock() + ch.metricLock.RLock() metrics := ch.metricQueue - ch.metricQueue = nil - ch.metricLock.Unlock() + ch.metricLock.RUnlock() if len(metrics) > 0 { + ch.metricLock.Lock() + ch.metricQueue = make([]telegraf.Metric, 0, ch.QueueInitialSize) + ch.metricLock.Unlock() + if ch.TableMode == "single" { err := ch.WriteSingleTable(metrics) if err != nil { @@ -486,9 +494,9 @@ func (ch *ClickHouse) backgroundWriter(delay time.Duration) { timer.Reset(delay) case <-ch.metricTrigger: - ch.metricLock.Lock() + ch.metricLock.RLock() metricsLength := len(ch.metricQueue) - ch.metricLock.Unlock() + ch.metricLock.RUnlock() if metricsLength < ch.QueueFlushSize { if !timer.Stop() { diff --git a/plugins/outputs/clickhouse/sample.conf b/plugins/outputs/clickhouse/sample.conf index 3cdf7e0..4bdf91e 100644 --- a/plugins/outputs/clickhouse/sample.conf +++ b/plugins/outputs/clickhouse/sample.conf @@ -31,11 +31,11 @@ ## Queue resizes automatically if the queue becomes too large. # queue_initial_size = 100000 - ## Maximum queue size, 0 means unlimited + ## Maximum queue size, "0" means unlimited ## If the queue reaches this size, new writes will be dropped until it drains. # queue_max_size = 0 - ## Queue flush size, 0 means unlimited + ## Queue flush size, "0" means unlimited ## The flush interval will not be reset if the queue is larger than this. # queue_flush_size = 0