From 558fb3a423727d9dcfe3dc6267ba495eacd27f22 Mon Sep 17 00:00:00 2001 From: Nikolaos Karaolidis Date: Wed, 28 Jun 2023 19:52:27 +0300 Subject: [PATCH] Fix batching Signed-off-by: Nikolaos Karaolidis --- README.md | 37 +----- cmd/main.go | 26 +--- plugins/outputs/clickhouse/clickhouse.go | 155 +++++++++++++---------- plugins/outputs/clickhouse/sample.conf | 3 - 4 files changed, 90 insertions(+), 131 deletions(-) diff --git a/README.md b/README.md index a08fcb3..6718b75 100644 --- a/README.md +++ b/README.md @@ -18,41 +18,6 @@ When the plugin first connects it runs SQL from the init_sql setting, allowing y Before inserting a row, the plugin checks whether the table exists. If it doesn't exist, the plugin creates the table. -The name of the timestamp column is "timestamp" but it can be changed with the timestamp_column setting. The timestamp column can be completely disabled by setting it to "". - -## Configuration - -```toml @sample.conf -# Save metrics to an SQL Database -[[outputs.sql]] - ## Data source name - # data_source_name = "" - - ## Timestamp column name - # timestamp_column = "timestamp" - - ## Initialization SQL - # init_sql = "" - - ## Maximum amount of time a connection may be idle. "0s" means connections are - ## never closed due to idle time. - # connection_max_idle_time = "0s" - - ## Maximum amount of time a connection may be reused. "0s" means connections - ## are never closed due to age. - # connection_max_lifetime = "0s" - - ## Maximum number of connections in the idle connection pool. 0 means unlimited. - # connection_max_idle = 2 - - ## Maximum number of open connections to the database. 0 means unlimited. - # connection_max_open = 0 -``` - -## Driver-specific information - -### clickhouse - -#### DSN +## DSN Currently, Telegraf's sql output plugin depends on [clickhouse-go v1.5.4](https://github.com/ClickHouse/clickhouse-go/tree/v1.5.4) which uses a [different DSN format](https://github.com/ClickHouse/clickhouse-go/tree/v1.5.4#dsn) than its newer `v2.*` version. diff --git a/cmd/main.go b/cmd/main.go index 527c0cc..bb0f7fa 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -12,50 +12,26 @@ import ( ) var pollInterval = flag.Duration("poll_interval", 1*time.Second, "how often to send metrics") - var pollIntervalDisabled = flag.Bool( "poll_interval_disabled", false, "set to true to disable polling. You want to use this when you are sending metrics on your own schedule", ) -var configFile = flag.String("config", "sample.conf", "path to the config file for this plugin") +var configFile = flag.String("config", "", "path to the config file for this plugin") var err error -// This is designed to be simple; Just change the import above, and you're good. -// -// However, if you want to do all your config in code, you can like so: -// -// // initialize your plugin with any settings you want -// -// myInput := &mypluginname.MyPlugin{ -// DefaultSettingHere: 3, -// } -// -// shim := shim.New() -// -// shim.AddInput(myInput) -// -// // now the shim.Run() call as below. Note the shim is only intended to run a single plugin. func main() { - // parse command line options flag.Parse() if *pollIntervalDisabled { *pollInterval = shim.PollIntervalDisabled } - // create the shim. This is what will run your plugins. shimLayer := shim.New() - - // If no config is specified, all imported plugins are loaded. - // otherwise, follow what the config asks for. - // Check for settings from a config toml file, - // (or just use whatever plugins were imported above) if err = shimLayer.LoadConfig(configFile); err != nil { fmt.Fprintf(os.Stderr, "Err loading input: %s\n", err) os.Exit(1) } - // run a single plugin until stdin closes, or we receive a termination signal if err = shimLayer.Run(*pollInterval); err != nil { fmt.Fprintf(os.Stderr, "Err: %s\n", err) os.Exit(1) diff --git a/plugins/outputs/clickhouse/clickhouse.go b/plugins/outputs/clickhouse/clickhouse.go index 609d23f..d5ece18 100644 --- a/plugins/outputs/clickhouse/clickhouse.go +++ b/plugins/outputs/clickhouse/clickhouse.go @@ -18,24 +18,23 @@ import ( //go:embed sample.conf var sampleConfig string -type SQL struct { - DataSourceName string - TimestampColumn string - InitSQL string `toml:"init_sql"` - ConnectionMaxIdleTime config.Duration - ConnectionMaxLifetime config.Duration - ConnectionMaxIdle int - ConnectionMaxOpen int +type ClickHouse struct { + DataSourceName string `toml:"data_source_name"` + InitSQL string `toml:"init_sql"` + ConnectionMaxIdleTime config.Duration `toml:"connection_max_idle_time"` + ConnectionMaxLifetime config.Duration `toml:"connection_max_lifetime"` + ConnectionMaxIdle int `toml:"connection_max_idle"` + ConnectionMaxOpen int `toml:"connection_max_open"` db *gosql.DB Log telegraf.Logger `toml:"-"` } -func (*SQL) SampleConfig() string { +func (*ClickHouse) SampleConfig() string { return sampleConfig } -func (p *SQL) Connect() error { +func (p *ClickHouse) Connect() error { db, err := gosql.Open("clickhouse", p.DataSourceName) if err != nil { return err @@ -60,10 +59,12 @@ func (p *SQL) Connect() error { p.db = db + fmt.Println("Connected to ClickHouse!") + return nil } -func (p *SQL) Close() error { +func (p *ClickHouse) Close() error { return p.db.Close() } @@ -86,7 +87,7 @@ func sanitizeQuoted(in string) string { }, in) } -func (p *SQL) deriveDatatype(value interface{}) string { +func (p *ClickHouse) deriveDatatype(value interface{}) string { var datatype string switch value.(type) { @@ -100,32 +101,34 @@ func (p *SQL) deriveDatatype(value interface{}) string { datatype = "String" case bool: datatype = "UInt8" + case time.Time: + datatype = "DateTime" default: datatype = "String" + p.Log.Errorf("unknown datatype for value %v", value) } + return datatype } -func (p *SQL) generateCreateTable(tablename string, columns map[string]struct{}) string { +func (p *ClickHouse) generateCreateTable(tablename string, columns []string, datatypes map[string]string) string { columnDefs := make([]string, 0, len(columns)) - for column := range columns { - dataType := p.deriveDatatype(column) - columnDefs = append(columnDefs, fmt.Sprintf("%s %s", quoteIdent(column), dataType)) + for _, column := range columns { + columnDefs = append(columnDefs, fmt.Sprintf("%s %s", quoteIdent(column), datatypes[column])) } - return fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s (%s)", + return fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s (%s) ENGINE = MergeTree() ORDER BY (host,timestamp) PARTITION BY toYYYYMM(timestamp) TTL timestamp + INTERVAL 3 MONTH", quoteIdent(tablename), strings.Join(columnDefs, ",")) } -func (p *SQL) generateAlterTable(tablename string, columns map[string]struct{}) string { +func (p *ClickHouse) generateAlterTable(tablename string, columns []string, datatypes map[string]string) string { alterDefs := make([]string, 0, len(columns)) - for column := range columns { - dataType := p.deriveDatatype(column) + for _, column := range columns { alterDefs = append(alterDefs, fmt.Sprintf("ADD COLUMN IF NOT EXISTS %s %s", - quoteIdent(column), dataType)) + quoteIdent(column), datatypes[column])) } return fmt.Sprintf("ALTER TABLE %s %s", @@ -133,9 +136,9 @@ func (p *SQL) generateAlterTable(tablename string, columns map[string]struct{}) strings.Join(alterDefs, ",")) } -func (p *SQL) generateInsert(tablename string, columns map[string]struct{}, batchSize int) string { +func (p *ClickHouse) generateInsert(tablename string, columns []string, batchSize int) string { quotedColumns := make([]string, 0, len(columns)) - for column := range columns { + for _, column := range columns { quotedColumns = append(quotedColumns, quoteIdent(column)) } @@ -144,55 +147,74 @@ func (p *SQL) generateInsert(tablename string, columns map[string]struct{}, batc return fmt.Sprintf("INSERT INTO %s(%s) VALUES %s", quoteIdent(tablename), - strings.Join(quotedColumns, ","), + strings.Join(quotedColumns, ", "), placeholders) } -func (p *SQL) isUnknownTableErr(err error) bool { +func (p *ClickHouse) isUnknownTableErr(err error) bool { if err == nil { return false } - return strings.Contains(err.Error(), "UNKNOWN_TABLE") + return strings.Contains(err.Error(), "code: 60") } -func (p *SQL) isNoSuchColumnInTableErr(err error) bool { +func (p *ClickHouse) isNoSuchColumnInTableErr(err error) bool { if err == nil { return false } - return strings.Contains(err.Error(), "NO_SUCH_COLUMN_IN_TABLE") + return strings.Contains(err.Error(), "code: 16") } -func (p *SQL) Write(metrics []telegraf.Metric) error { - metricsByTable := make(map[string][]telegraf.Metric) +func (p *ClickHouse) Write(metrics []telegraf.Metric) error { + metricsByTable := make(map[string][]map[string]interface{}) + columDatatypes := make(map[string]map[string]string) + tableColumns := make(map[string][]string) tableLengths := make(map[string]int) - tableColumns := make(map[string]map[string]struct{}) for _, metric := range metrics { tablename := metric.Name() - metricsByTable[tablename] = append(metricsByTable[tablename], metric) - tableLengths[tablename]++ + + if _, ok := metricsByTable[tablename]; !ok { + metricsByTable[tablename] = make([]map[string]interface{}, 0, len(metrics)) + } + + if _, ok := columDatatypes[tablename]; !ok { + columDatatypes[tablename] = make(map[string]string) + tableColumns[tablename] = make([]string, 0, len(metric.FieldList())+len(metric.TagList())+1) + } + + if _, ok := tableLengths[tablename]; !ok { + tableLengths[tablename] = 0 + } + + metricEntry := make(map[string]interface{}) + + metricEntry["timestamp"] = metric.Time() + columDatatypes[tablename]["timestamp"] = p.deriveDatatype(metric.Time()) + tableColumns[tablename] = append(tableColumns[tablename], "timestamp") for _, tag := range metric.TagList() { - if _, ok := tableColumns[tablename]; !ok { - tableColumns[tablename] = make(map[string]struct{}) - } - tableColumns[tablename][tag.Key] = struct{}{} + metricEntry[tag.Key] = tag.Value + columDatatypes[tablename][tag.Key] = p.deriveDatatype(tag.Value) + tableColumns[tablename] = append(tableColumns[tablename], tag.Key) } for _, field := range metric.FieldList() { - if _, ok := tableColumns[tablename]; !ok { - tableColumns[tablename] = make(map[string]struct{}) - } - tableColumns[tablename][field.Key] = struct{}{} + metricEntry[field.Key] = field.Value + columDatatypes[tablename][field.Key] = p.deriveDatatype(field.Value) + tableColumns[tablename] = append(tableColumns[tablename], field.Key) } + + metricsByTable[tablename] = append(metricsByTable[tablename], metricEntry) + tableLengths[tablename]++ } - for tablename, tableMetrics := range metricsByTable { + for tablename, metrics := range metricsByTable { for { sql := p.generateInsert(tablename, tableColumns[tablename], tableLengths[tablename]) - values := make([]interface{}, 0, len(tableMetrics)*len(tableColumns[tablename])) + values := make([]interface{}, 0, tableLengths[tablename]*len(columDatatypes[tablename])) tx, err := p.db.Begin() if err != nil { @@ -200,32 +222,39 @@ func (p *SQL) Write(metrics []telegraf.Metric) error { } stmt, err := tx.Prepare(sql) - if err != nil { - return fmt.Errorf("prepare failed: %w", err) - } - defer stmt.Close() - - _, err = stmt.Exec(values...) if err != nil { if p.isUnknownTableErr(err) { - createTableStmt := p.generateCreateTable(tablename, tableColumns[tablename]) + createTableStmt := p.generateCreateTable(tablename, tableColumns[tablename], columDatatypes[tablename]) + _, err = p.db.Exec(createTableStmt) if err != nil { - return fmt.Errorf("create table failed: %w", err) + return fmt.Errorf("CREATE TABLE failed: %w", err) } continue } if p.isNoSuchColumnInTableErr(err) { - alterTableStmt := p.generateAlterTable(tablename, tableColumns[tablename]) + alterTableStmt := p.generateAlterTable(tablename, tableColumns[tablename], columDatatypes[tablename]) _, err = p.db.Exec(alterTableStmt) if err != nil { - return fmt.Errorf("alter table failed: %w", err) + return fmt.Errorf("ALTER TABLE failed: %w", err) } continue } - return fmt.Errorf("execution failed: %w", err) + return fmt.Errorf("prepare failed: %w", err) + } + defer stmt.Close() + + for _, metric := range metrics { + for _, column := range tableColumns[tablename] { + values = append(values, metric[column]) + } + } + + _, err = stmt.Exec(values...) + if err != nil { + return fmt.Errorf("exec failed: %w", err) } err = tx.Commit() @@ -240,17 +269,9 @@ func (p *SQL) Write(metrics []telegraf.Metric) error { } func init() { - outputs.Add("sql", func() telegraf.Output { return newSQL() }) -} - -func newSQL() *SQL { - return &SQL{ - TimestampColumn: "timestamp", - // Defaults for the connection settings (ConnectionMaxIdleTime, - // ConnectionMaxLifetime, ConnectionMaxIdle, and ConnectionMaxOpen) - // mirror the golang defaults. As of go 1.18 all of them default to 0 - // except max idle connections which is 2. See - // https://pkg.go.dev/database/sql#DB.SetMaxIdleConns - ConnectionMaxIdle: 2, - } + outputs.Add("clickhouse", func() telegraf.Output { + return &ClickHouse{ + ConnectionMaxIdle: 2, + } + }) } diff --git a/plugins/outputs/clickhouse/sample.conf b/plugins/outputs/clickhouse/sample.conf index 4f91438..f3b8771 100644 --- a/plugins/outputs/clickhouse/sample.conf +++ b/plugins/outputs/clickhouse/sample.conf @@ -3,9 +3,6 @@ ## Data source name # data_source_name = "" - ## Timestamp column name - # timestamp_column = "timestamp" - ## Initialization SQL # init_sql = ""