Change Mutex to RWMutex

Signed-off-by: Nikolaos Karaolidis <nick@karaolidis.com>
This commit is contained in:
2023-06-30 11:28:44 +03:00
parent 8ef7e13ce8
commit 86956cd4be
2 changed files with 21 additions and 13 deletions

View File

@@ -48,7 +48,7 @@ type ClickHouse struct {
db *sql.DB db *sql.DB
Log telegraf.Logger Log telegraf.Logger
metricQueue []telegraf.Metric metricQueue []telegraf.Metric
metricLock sync.Mutex metricLock sync.RWMutex
metricTrigger chan struct{} metricTrigger chan struct{}
} }
@@ -441,12 +441,17 @@ func (ch *ClickHouse) WriteSingleTable(metrics []telegraf.Metric) error {
} }
func (ch *ClickHouse) Write(metrics []telegraf.Metric) error { func (ch *ClickHouse) Write(metrics []telegraf.Metric) error {
ch.metricLock.Lock() ch.metricLock.RLock()
if len(ch.metricQueue) >= ch.QueueMaxSize { metricsLength := len(ch.metricQueue)
ch.Log.Errorf("Metrics queue is full (%d/%d), dropping metrics", len(ch.metricQueue), ch.QueueMaxSize) ch.metricLock.RUnlock()
} else {
ch.metricQueue = append(ch.metricQueue, metrics...) if metricsLength >= ch.QueueMaxSize {
ch.Log.Errorf("Metrics queue is full (%d/%d), dropping metrics", metricsLength, ch.QueueMaxSize)
return nil
} }
ch.metricLock.Lock()
ch.metricQueue = append(ch.metricQueue, metrics...)
ch.metricLock.Unlock() ch.metricLock.Unlock()
select { select {
@@ -464,12 +469,15 @@ func (ch *ClickHouse) backgroundWriter(delay time.Duration) {
for { for {
select { select {
case <-timer.C: case <-timer.C:
ch.metricLock.Lock() ch.metricLock.RLock()
metrics := ch.metricQueue metrics := ch.metricQueue
ch.metricQueue = nil ch.metricLock.RUnlock()
ch.metricLock.Unlock()
if len(metrics) > 0 { if len(metrics) > 0 {
ch.metricLock.Lock()
ch.metricQueue = make([]telegraf.Metric, 0, ch.QueueInitialSize)
ch.metricLock.Unlock()
if ch.TableMode == "single" { if ch.TableMode == "single" {
err := ch.WriteSingleTable(metrics) err := ch.WriteSingleTable(metrics)
if err != nil { if err != nil {
@@ -486,9 +494,9 @@ func (ch *ClickHouse) backgroundWriter(delay time.Duration) {
timer.Reset(delay) timer.Reset(delay)
case <-ch.metricTrigger: case <-ch.metricTrigger:
ch.metricLock.Lock() ch.metricLock.RLock()
metricsLength := len(ch.metricQueue) metricsLength := len(ch.metricQueue)
ch.metricLock.Unlock() ch.metricLock.RUnlock()
if metricsLength < ch.QueueFlushSize { if metricsLength < ch.QueueFlushSize {
if !timer.Stop() { if !timer.Stop() {

View File

@@ -31,11 +31,11 @@
## Queue resizes automatically if the queue becomes too large. ## Queue resizes automatically if the queue becomes too large.
# queue_initial_size = 100000 # queue_initial_size = 100000
## Maximum queue size, 0 means unlimited ## Maximum queue size, "0" means unlimited
## If the queue reaches this size, new writes will be dropped until it drains. ## If the queue reaches this size, new writes will be dropped until it drains.
# queue_max_size = 0 # queue_max_size = 0
## Queue flush size, 0 means unlimited ## Queue flush size, "0" means unlimited
## The flush interval will not be reset if the queue is larger than this. ## The flush interval will not be reset if the queue is larger than this.
# queue_flush_size = 0 # queue_flush_size = 0