From da3d08912dd41532f299976ad9c341b7db85fafc Mon Sep 17 00:00:00 2001 From: Nikolaos Karaolidis Date: Thu, 29 Jun 2023 16:21:33 +0300 Subject: [PATCH] Rename clickhouse variable Signed-off-by: Nikolaos Karaolidis --- plugins/outputs/clickhouse/clickhouse.go | 215 +++++++++++------------ 1 file changed, 107 insertions(+), 108 deletions(-) diff --git a/plugins/outputs/clickhouse/clickhouse.go b/plugins/outputs/clickhouse/clickhouse.go index 8e55526..b6a7724 100644 --- a/plugins/outputs/clickhouse/clickhouse.go +++ b/plugins/outputs/clickhouse/clickhouse.go @@ -55,53 +55,53 @@ func (*ClickHouse) SampleConfig() string { return sampleConfig } -func (p *ClickHouse) Init() error { - if p.DataSourceName == "" { +func (ch *ClickHouse) Init() error { + if ch.DataSourceName == "" { return fmt.Errorf("data_source_name is a required configuration option") } - if p.TimestampColumn == "" { - p.TimestampColumn = "timestamp" - p.Log.Info("timestamp_column is not set, using default value:", p.TimestampColumn) + if ch.TimestampColumn == "" { + ch.TimestampColumn = "timestamp" + ch.Log.Info("timestamp_column is not set, using default value:", ch.TimestampColumn) } - if p.TableMode == "" { - p.TableMode = "multi" - p.Log.Info("table_mode is not set, using default value:", p.TableMode) - } else if p.TableMode != "single" && p.TableMode != "multi" { + if ch.TableMode == "" { + ch.TableMode = "multi" + ch.Log.Info("table_mode is not set, using default value:", ch.TableMode) + } else if ch.TableMode != "single" && ch.TableMode != "multi" { return fmt.Errorf("table_mode must be one of: single, multi") } - if p.TableMode == "single" && p.SingleTableOptions.TableName == "" { - p.SingleTableOptions.TableName = "telegraf" - p.Log.Info("table_name is not set, using default value:", p.SingleTableOptions.TableName) + if ch.TableMode == "single" && ch.SingleTableOptions.TableName == "" { + ch.SingleTableOptions.TableName = "telegraf" + ch.Log.Info("table_name is not set, using default value:", ch.SingleTableOptions.TableName) } - if p.QueueSize <= 0 { - p.QueueSize = 100000 - p.Log.Info("queue_size is not set, using default value:", p.QueueSize) + if ch.QueueSize <= 0 { + ch.QueueSize = 100000 + ch.Log.Info("queue_size is not set, using default value:", ch.QueueSize) } - if p.QueueLimit <= 0 { - p.QueueLimit = int(math.MaxUint64 >> 1) - p.Log.Info("queue_limit is not set, using default value:", p.QueueLimit) + if ch.QueueLimit <= 0 { + ch.QueueLimit = int(math.MaxUint64 >> 1) + ch.Log.Info("queue_limit is not set, using default value:", ch.QueueLimit) } - if p.FlushInterval <= 0 { - p.FlushInterval = 5 * time.Second - p.Log.Info("flush_interval is not set, using default value:", p.FlushInterval) + if ch.FlushInterval <= 0 { + ch.FlushInterval = 5 * time.Second + ch.Log.Info("flush_interval is not set, using default value:", ch.FlushInterval) } - p.metricQueue = make([]telegraf.Metric, 0, p.QueueSize) - p.metricTrigger = make(chan struct{}, 1) + ch.metricQueue = make([]telegraf.Metric, 0, ch.QueueSize) + ch.metricTrigger = make(chan struct{}, 1) - go p.metricWriter(p.FlushInterval) + go ch.metricWriter(ch.FlushInterval) return nil } -func (p *ClickHouse) Connect() error { - db, err := sql.Open("clickhouse", p.DataSourceName) +func (ch *ClickHouse) Connect() error { + db, err := sql.Open("clickhouse", ch.DataSourceName) if err != nil { return err } @@ -111,26 +111,26 @@ func (p *ClickHouse) Connect() error { return err } - db.SetConnMaxIdleTime(time.Duration(p.ConnectionMaxIdleTime)) - db.SetConnMaxLifetime(time.Duration(p.ConnectionMaxLifetime)) - db.SetMaxIdleConns(p.ConnectionMaxIdle) - db.SetMaxOpenConns(p.ConnectionMaxOpen) + db.SetConnMaxIdleTime(time.Duration(ch.ConnectionMaxIdleTime)) + db.SetConnMaxLifetime(time.Duration(ch.ConnectionMaxLifetime)) + db.SetMaxIdleConns(ch.ConnectionMaxIdle) + db.SetMaxOpenConns(ch.ConnectionMaxOpen) - if p.InitSQL != "" { - _, err = db.Exec(p.InitSQL) + if ch.InitSQL != "" { + _, err = db.Exec(ch.InitSQL) if err != nil { return err } } - p.db = db - p.Log.Info("Connected to ClickHouse!") + ch.db = db + ch.Log.Info("Connected to ClickHouse!") return nil } -func (p *ClickHouse) Close() error { - return p.db.Close() +func (ch *ClickHouse) Close() error { + return ch.db.Close() } func sanitizeQuoted(in string) string { @@ -150,7 +150,7 @@ func quoteIdent(name string) string { return `"` + strings.ReplaceAll(sanitizeQuoted(name), `"`, `""`) + `"` } -func (p *ClickHouse) valueToDatatype(value interface{}) string { +func (ch *ClickHouse) valueToDatatype(value interface{}) string { var datatype string switch value.(type) { @@ -168,43 +168,42 @@ func (p *ClickHouse) valueToDatatype(value interface{}) string { datatype = "DateTime" default: datatype = "String" - p.Log.Errorf("unknown datatype for value %v", value) + ch.Log.Errorf("unknown datatype for value %v", value) } return datatype } -// TODO: fix this fuckyness type NullUint64 struct { Uint64 uint64 Valid bool } -func (p *ClickHouse) datatypeToNullable(datatype string) interface{} { +func (ch *ClickHouse) datatypeToNullable(datatype string) interface{} { var nullable interface{} switch datatype { case "Int64": - nullable = int64(0) // sql.NullInt64{} + nullable = sql.NullInt64{Int64: 0, Valid: false} case "UInt64": - nullable = uint64(0) // NullUint64{} + nullable = NullUint64{Uint64: 0, Valid: false} case "Float64": - nullable = float64(0) // sql.NullFloat64{} + nullable = sql.NullFloat64{Float64: 0, Valid: false} case "String": - nullable = "" // sql.NullString{} + nullable = sql.NullString{String: "", Valid: false} case "UInt8": - nullable = false // sql.NullBool{} + nullable = sql.NullBool{Bool: false, Valid: false} case "DateTime": - nullable = time.Unix(0, 0) // sql.NullTime{} + nullable = sql.NullTime{Time: time.Unix(0, 0), Valid: false} default: - nullable = "" // sql.NullString{} - p.Log.Errorf("unknown datatype %s", datatype) + nullable = sql.NullString{String: "", Valid: false} + ch.Log.Errorf("unknown datatype %s", datatype) } return nullable } -func (p *ClickHouse) generateCreateTable(tablename string, columns *orderedmap.OrderedMap[string, string]) string { +func (ch *ClickHouse) generateCreateTable(tablename string, columns *orderedmap.OrderedMap[string, string]) string { columnDefs := make([]string, 0, columns.Len()) for pair := columns.Oldest(); pair != nil; pair = pair.Next() { @@ -215,7 +214,7 @@ func (p *ClickHouse) generateCreateTable(tablename string, columns *orderedmap.O if _, ok := columns.Get("host"); ok { orderBy = append(orderBy, "host") } - orderBy = append(orderBy, quoteIdent(p.TimestampColumn)) + orderBy = append(orderBy, quoteIdent(ch.TimestampColumn)) if _, ok := columns.Get("measurement"); ok { orderBy = append(orderBy, "measurement") } @@ -224,16 +223,16 @@ func (p *ClickHouse) generateCreateTable(tablename string, columns *orderedmap.O quoteIdent(tablename), strings.Join(columnDefs, ","), strings.Join(orderBy, ","), - quoteIdent(p.TimestampColumn)) + quoteIdent(ch.TimestampColumn)) - if p.TTL != "" { - createTable += fmt.Sprintf(" TTL %s + INTERVAL %s", quoteIdent(p.TimestampColumn), p.TTL) + if ch.TTL != "" { + createTable += fmt.Sprintf(" TTL %s + INTERVAL %s", quoteIdent(ch.TimestampColumn), ch.TTL) } return createTable } -func (p *ClickHouse) generateAlterTable(tablename string, columns *orderedmap.OrderedMap[string, string]) string { +func (ch *ClickHouse) generateAlterTable(tablename string, columns *orderedmap.OrderedMap[string, string]) string { alterDefs := make([]string, 0, columns.Len()) for pair := columns.Oldest(); pair != nil; pair = pair.Next() { @@ -247,21 +246,21 @@ func (p *ClickHouse) generateAlterTable(tablename string, columns *orderedmap.Or strings.Join(alterDefs, ",")) } -func (p *ClickHouse) ensureTable(tablename string, columns *orderedmap.OrderedMap[string, string]) error { +func (ch *ClickHouse) ensureTable(tablename string, columns *orderedmap.OrderedMap[string, string]) error { var res *sql.Rows var err error for { - res, err = p.db.Query(fmt.Sprintf("DESCRIBE TABLE %s", quoteIdent(tablename))) + res, err = ch.db.Query(fmt.Sprintf("DESCRIBE TABLE %s", quoteIdent(tablename))) if err != nil { // Unknown Table Error if strings.Contains(err.Error(), "code: 60") { - _, err = p.db.Exec(p.generateCreateTable(tablename, columns)) + _, err = ch.db.Exec(ch.generateCreateTable(tablename, columns)) if err != nil { return err } - p.Log.Info("Created table", tablename) + ch.Log.Info("Created table", tablename) continue } return err @@ -286,11 +285,11 @@ func (p *ClickHouse) ensureTable(tablename string, columns *orderedmap.OrderedMa for pair := columns.Oldest(); pair != nil; pair = pair.Next() { if _, ok := tableColumns[pair.Key]; !ok { - _, err = p.db.Exec(p.generateAlterTable(tablename, columns)) + _, err = ch.db.Exec(ch.generateAlterTable(tablename, columns)) if err != nil { return err } - p.Log.Info("Altered table", tablename) + ch.Log.Info("Altered table", tablename) break } } @@ -298,7 +297,7 @@ func (p *ClickHouse) ensureTable(tablename string, columns *orderedmap.OrderedMa return nil } -func (p *ClickHouse) generateInsert(tablename string, columns *orderedmap.OrderedMap[string, string], batchSize int) string { +func (ch *ClickHouse) generateInsert(tablename string, columns *orderedmap.OrderedMap[string, string], batchSize int) string { quotedColumns := make([]string, 0, columns.Len()) for pair := columns.Oldest(); pair != nil; pair = pair.Next() { quotedColumns = append(quotedColumns, quoteIdent(pair.Key)) @@ -313,25 +312,25 @@ func (p *ClickHouse) generateInsert(tablename string, columns *orderedmap.Ordere placeholders) } -func (p *ClickHouse) nullifyMissingValues(columns *orderedmap.OrderedMap[string, string], metrics []map[string]interface{}) { +func (ch *ClickHouse) nullifyMissingValues(columns *orderedmap.OrderedMap[string, string], metrics []map[string]interface{}) { for _, metric := range metrics { for pair := columns.Oldest(); pair != nil; pair = pair.Next() { if _, ok := metric[pair.Key]; !ok { - metric[pair.Key] = p.datatypeToNullable(pair.Value) + metric[pair.Key] = ch.datatypeToNullable(pair.Value) } } } } -func (p *ClickHouse) writeMetrics(tablename string, columns *orderedmap.OrderedMap[string, string], metrics []map[string]interface{}) error { - err := p.ensureTable(tablename, columns) +func (ch *ClickHouse) writeMetrics(tablename string, columns *orderedmap.OrderedMap[string, string], metrics []map[string]interface{}) error { + err := ch.ensureTable(tablename, columns) if err != nil { return err } - sql := p.generateInsert(tablename, columns, len(metrics)) + sql := ch.generateInsert(tablename, columns, len(metrics)) - tx, err := p.db.Begin() + tx, err := ch.db.Begin() if err != nil { return fmt.Errorf("begin failed: %w", err) } @@ -369,7 +368,7 @@ func (p *ClickHouse) writeMetrics(tablename string, columns *orderedmap.OrderedM return nil } -func (p *ClickHouse) WriteMultiTable(metrics []telegraf.Metric) error { +func (ch *ClickHouse) WriteMultiTable(metrics []telegraf.Metric) error { metricsData := make(map[string][]map[string]interface{}) columns := make(map[string]*orderedmap.OrderedMap[string, string]) @@ -377,8 +376,8 @@ func (p *ClickHouse) WriteMultiTable(metrics []telegraf.Metric) error { for _, metric := range metrics { tablename := metric.Name() - if p.MultiTableOptions.TablePrefix != "" { - tablename = p.MultiTableOptions.TablePrefix + "_" + tablename + if ch.MultiTableOptions.TablePrefix != "" { + tablename = ch.MultiTableOptions.TablePrefix + "_" + tablename } if _, ok := metricsData[tablename]; !ok { @@ -391,41 +390,41 @@ func (p *ClickHouse) WriteMultiTable(metrics []telegraf.Metric) error { metricEntry := make(map[string]interface{}) - metricEntry[p.TimestampColumn] = metric.Time() - columns[tablename].Set(p.TimestampColumn, p.valueToDatatype(metric.Time())) + metricEntry[ch.TimestampColumn] = metric.Time() + columns[tablename].Set(ch.TimestampColumn, ch.valueToDatatype(metric.Time())) for _, tag := range metric.TagList() { metricEntry[tag.Key] = tag.Value - columns[tablename].Set(tag.Key, p.valueToDatatype(tag.Value)) + columns[tablename].Set(tag.Key, ch.valueToDatatype(tag.Value)) } for _, field := range metric.FieldList() { metricEntry[field.Key] = field.Value - columns[tablename].Set(field.Key, p.valueToDatatype(field.Value)) + columns[tablename].Set(field.Key, ch.valueToDatatype(field.Value)) } metricsData[tablename] = append(metricsData[tablename], metricEntry) } for tablename, metrics := range metricsData { - p.nullifyMissingValues(columns[tablename], metrics) + ch.nullifyMissingValues(columns[tablename], metrics) } - p.Log.Infof("Prepared %d metrics for writing in %s\n", len(metrics), time.Since(start)) + ch.Log.Infof("Prepared %d metrics for writing in %s\n", len(metrics), time.Since(start)) start = time.Now() for tablename, metrics := range metricsData { - err := p.writeMetrics(tablename, columns[tablename], metrics) + err := ch.writeMetrics(tablename, columns[tablename], metrics) if err != nil { return err } } - p.Log.Infof("Wrote %d metrics to %d tables in %s\n", len(metrics), len(metricsData), time.Since(start)) + ch.Log.Infof("Wrote %d metrics to %d tables in %s\n", len(metrics), len(metricsData), time.Since(start)) return nil } -func (p *ClickHouse) WriteSingleTable(metrics []telegraf.Metric) error { - tablename := p.SingleTableOptions.TableName +func (ch *ClickHouse) WriteSingleTable(metrics []telegraf.Metric) error { + tablename := ch.SingleTableOptions.TableName metricsData := make([]map[string]interface{}, 0, len(metrics)) columns := orderedmap.New[string, string](len(metrics)) @@ -434,92 +433,92 @@ func (p *ClickHouse) WriteSingleTable(metrics []telegraf.Metric) error { metricName := metric.Name() metricEntry := make(map[string]interface{}) - metricEntry[p.TimestampColumn] = metric.Time() - columns.Set(p.TimestampColumn, p.valueToDatatype(metric.Time())) + metricEntry[ch.TimestampColumn] = metric.Time() + columns.Set(ch.TimestampColumn, ch.valueToDatatype(metric.Time())) metricEntry["measurement"] = metricName - columns.Set("measurement", p.valueToDatatype(metricName)) + columns.Set("measurement", ch.valueToDatatype(metricName)) for _, tag := range metric.TagList() { colName := fmt.Sprintf("%s_%s", metricName, tag.Key) metricEntry[colName] = tag.Value - columns.Set(colName, p.valueToDatatype(tag.Value)) + columns.Set(colName, ch.valueToDatatype(tag.Value)) } for _, field := range metric.FieldList() { colName := fmt.Sprintf("%s_%s", metricName, field.Key) metricEntry[colName] = field.Value - columns.Set(colName, p.valueToDatatype(field.Value)) + columns.Set(colName, ch.valueToDatatype(field.Value)) } for pair := columns.Oldest(); pair != nil; pair = pair.Next() { if _, ok := metricEntry[pair.Key]; !ok { - metricEntry[pair.Key] = p.datatypeToNullable(pair.Value) + metricEntry[pair.Key] = ch.datatypeToNullable(pair.Value) } } metricsData = append(metricsData, metricEntry) } - p.nullifyMissingValues(columns, metricsData) - p.Log.Infof("Prepared %d metrics for writing in %s\n", len(metrics), time.Since(start)) + ch.nullifyMissingValues(columns, metricsData) + ch.Log.Infof("Prepared %d metrics for writing in %s\n", len(metrics), time.Since(start)) start = time.Now() - err := p.writeMetrics(tablename, columns, metricsData) + err := ch.writeMetrics(tablename, columns, metricsData) if err != nil { return err } - p.Log.Infof("Wrote %d metrics to %s in %s\n", len(metrics), tablename, time.Since(start)) + ch.Log.Infof("Wrote %d metrics to %s in %s\n", len(metrics), tablename, time.Since(start)) return nil } -func (p *ClickHouse) Write(metrics []telegraf.Metric) error { - p.metricLock.Lock() - if len(p.metricQueue) >= p.QueueLimit { - p.Log.Errorf("Metrics queue is full (%d/%d), dropping metrics", len(p.metricQueue), p.QueueLimit) +func (ch *ClickHouse) Write(metrics []telegraf.Metric) error { + ch.metricLock.Lock() + if len(ch.metricQueue) >= ch.QueueLimit { + ch.Log.Errorf("Metrics queue is full (%d/%d), dropping metrics", len(ch.metricQueue), ch.QueueLimit) } else { - p.metricQueue = append(p.metricQueue, metrics...) + ch.metricQueue = append(ch.metricQueue, metrics...) } - p.metricLock.Unlock() + ch.metricLock.Unlock() select { - case p.metricTrigger <- struct{}{}: + case ch.metricTrigger <- struct{}{}: default: } return nil } -func (p *ClickHouse) metricWriter(delay time.Duration) { +func (ch *ClickHouse) metricWriter(delay time.Duration) { timer := time.NewTimer(delay) defer timer.Stop() for { select { case <-timer.C: - p.metricLock.Lock() - metrics := p.metricQueue - p.metricQueue = nil - p.metricLock.Unlock() + ch.metricLock.Lock() + metrics := ch.metricQueue + ch.metricQueue = nil + ch.metricLock.Unlock() if len(metrics) > 0 { - if p.TableMode == "single" { - err := p.WriteSingleTable(metrics) + if ch.TableMode == "single" { + err := ch.WriteSingleTable(metrics) if err != nil { - p.Log.Errorf("Error writing to ClickHouse: %s", err) + ch.Log.Errorf("Error writing to ClickHouse: %s", err) } } else { - err := p.WriteMultiTable(metrics) + err := ch.WriteMultiTable(metrics) if err != nil { - p.Log.Errorf("Error writing to ClickHouse: %s", err) + ch.Log.Errorf("Error writing to ClickHouse: %s", err) } } } timer.Reset(delay) - case <-p.metricTrigger: + case <-ch.metricTrigger: if !timer.Stop() { <-timer.C }