diff --git a/go.mod b/go.mod index 208fcfb..a80a89c 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,12 @@ require ( github.com/influxdata/telegraf v1.27.1 ) +require ( + github.com/bahlo/generic-list-go v0.2.0 // indirect + github.com/buger/jsonparser v1.1.1 // indirect + github.com/mailru/easyjson v0.7.7 // indirect +) + require ( github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 // indirect github.com/BurntSushi/toml v1.2.1 // indirect @@ -56,6 +62,7 @@ require ( github.com/stoewer/go-strcase v1.2.0 // indirect github.com/stretchr/testify v1.8.4 // indirect github.com/testcontainers/testcontainers-go v0.18.0 // indirect + github.com/wk8/go-ordered-map/v2 v2.1.8 github.com/youmark/pkcs8 v0.0.0-20201027041543-1326539a0a0a // indirect golang.org/x/crypto v0.9.0 // indirect golang.org/x/exp v0.0.0-20230522175609-2e198f4a06a1 // indirect diff --git a/go.sum b/go.sum index ed9b1bd..2f49ddc 100644 --- a/go.sum +++ b/go.sum @@ -23,9 +23,13 @@ github.com/awnumar/memcall v0.1.2 h1:7gOfDTL+BJ6nnbtAp9+HQzUFjtP1hEseRQq8eP055QY github.com/awnumar/memcall v0.1.2/go.mod h1:S911igBPR9CThzd/hYQQmTc9SWNu3ZHIlCGaWsWsoJo= github.com/awnumar/memguard v0.22.3 h1:b4sgUXtbUjhrGELPbuC62wU+BsPQy+8lkWed9Z+pj0Y= github.com/awnumar/memguard v0.22.3/go.mod h1:mmGunnffnLHlxE5rRgQc3j+uwPZ27eYb61ccr8Clz2Y= +github.com/bahlo/generic-list-go v0.2.0 h1:5sz/EEAK+ls5wF+NeqDpk5+iNdMDXrh3z3nPnH1Wvgk= +github.com/bahlo/generic-list-go v0.2.0/go.mod h1:2KvAjgMlE5NNynlg/5iLrrCCZ2+5xWbdbCW3pNTGyYg= github.com/benbjohnson/clock v1.3.3 h1:g+rSsSaAzhHJYcIQE78hJ3AhyjjtQvleKDjlhdBnIhc= github.com/benbjohnson/clock v1.3.3/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/bkaradzic/go-lz4 v1.0.0/go.mod h1:0YdlkowM3VswSROI7qDxhRvJ3sLhlFrRRwjwegp5jy4= +github.com/buger/jsonparser v1.1.1 h1:2PnMjfWD7wBILjqQbt530v576A/cAbQvEW9gGIpYMUs= +github.com/buger/jsonparser v1.1.1/go.mod h1:6RYKKt7H4d4+iWqouImQ9R2FZql3VbhNgx27UK13J/0= github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4= github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM= github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= @@ -84,6 +88,7 @@ github.com/influxdata/telegraf v1.27.1/go.mod h1:keCAT+VIMvDB2mTTptf4ijjvgdeMw0U github.com/influxdata/toml v0.0.0-20190415235208-270119a8ce65 h1:vvyMtD5LTJc1W9sQKjDkAWdcg0478CszSdzlHtiAXCY= github.com/influxdata/toml v0.0.0-20190415235208-270119a8ce65/go.mod h1:zApaNFpP/bTpQItGZNNUMISDMDAnTXu9UqJ4yT3ocz8= github.com/jmoiron/sqlx v1.2.0/go.mod h1:1FEQNm3xlJgrMD+FBdI9+xvCksHtbpVBBw5dYhBSsks= +github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/compress v1.16.5 h1:IFV2oUNUzZaz+XyusxpLzpzS8Pt5rh0Z16For/djlyI= @@ -96,6 +101,8 @@ github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY= github.com/magiconair/properties v1.8.7/go.mod h1:Dhd985XPs7jluiymwWYZ0G4Z61jb3vdS329zhj2hYo0= +github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0= +github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= @@ -155,6 +162,8 @@ github.com/testcontainers/testcontainers-go v0.18.0/go.mod h1:rLC7hR2SWRjJZZNrUY github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= github.com/vishvananda/netlink v1.1.0/go.mod h1:cTgwzPIzzgDAYoQrMm0EdrjRUBkTqKYppBueQtXaqoE= github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df/go.mod h1:JP3t17pCcGlemwknint6hfoeCVQrEMVwxRLRjXpq+BU= +github.com/wk8/go-ordered-map/v2 v2.1.8 h1:5h/BUHu93oj4gIdvHHHGsScSTMijfx5PeYkE/fJgbpc= +github.com/wk8/go-ordered-map/v2 v2.1.8/go.mod h1:5nJHM5DyteebpVlHnWMV0rPz6Zp7+xBAnxjb1X5vnTw= github.com/youmark/pkcs8 v0.0.0-20201027041543-1326539a0a0a h1:fZHgsYlfvtyqToslyjUt3VOPF4J7aK/3MPcK7xp3PDk= github.com/youmark/pkcs8 v0.0.0-20201027041543-1326539a0a0a/go.mod h1:ul22v+Nro/R083muKhosV54bj5niojjWZvU8xrevuH4= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= diff --git a/plugins/outputs/clickhouse/clickhouse.go b/plugins/outputs/clickhouse/clickhouse.go index a81e49c..c35fe9e 100644 --- a/plugins/outputs/clickhouse/clickhouse.go +++ b/plugins/outputs/clickhouse/clickhouse.go @@ -13,6 +13,7 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/plugins/outputs" + "github.com/wk8/go-ordered-map/v2" ) //go:embed sample.conf @@ -104,15 +105,9 @@ func (p *ClickHouse) Close() error { return p.db.Close() } -func quoteIdent(name string) string { - return `"` + strings.ReplaceAll(sanitizeQuoted(name), `"`, `""`) + `"` -} - func sanitizeQuoted(in string) string { // https://dev.mysql.com/doc/refman/8.0/en/identifiers.html // https://www.postgresql.org/docs/13/sql-syntax-lexical.html#SQL-SYNTAX-IDENTIFIERS - - // Whitelist allowed characters return strings.Map(func(r rune) rune { switch { case r >= '\u0001' && r <= '\uFFFF': @@ -123,6 +118,10 @@ func sanitizeQuoted(in string) string { }, in) } +func quoteIdent(name string) string { + return `"` + strings.ReplaceAll(sanitizeQuoted(name), `"`, `""`) + `"` +} + func (p *ClickHouse) deriveDatatype(value interface{}) string { var datatype string @@ -147,18 +146,19 @@ func (p *ClickHouse) deriveDatatype(value interface{}) string { return datatype } -func (p *ClickHouse) generateCreateTable(tablename string, columns []string, datatypes map[string]string) string { - columnDefs := make([]string, 0, len(columns)) - for _, column := range columns { - columnDefs = append(columnDefs, fmt.Sprintf("%s %s", quoteIdent(column), datatypes[column])) +func (p *ClickHouse) generateCreateTable(tablename string, columns *orderedmap.OrderedMap[string, string]) string { + columnDefs := make([]string, 0, columns.Len()) + + for pair := columns.Oldest(); pair != nil; pair = pair.Next() { + columnDefs = append(columnDefs, fmt.Sprintf("%s %s", quoteIdent(pair.Key), pair.Value)) } - orderBy := make([]string, 0, len(columns)) - if _, ok := datatypes["host"]; ok { + orderBy := make([]string, 0, 3) + if _, ok := columns.Get("host"); ok { orderBy = append(orderBy, "host") } orderBy = append(orderBy, quoteIdent(p.TimestampColumn)) - if _, ok := datatypes["measurement"]; ok { + if _, ok := columns.Get("measurement"); ok { orderBy = append(orderBy, "measurement") } @@ -175,12 +175,13 @@ func (p *ClickHouse) generateCreateTable(tablename string, columns []string, dat return createTable } -func (p *ClickHouse) generateAlterTable(tablename string, columns []string, datatypes map[string]string) string { - alterDefs := make([]string, 0, len(columns)) +func (p *ClickHouse) generateAlterTable(tablename string, columns *orderedmap.OrderedMap[string, string]) string { + alterDefs := make([]string, 0, columns.Len()) - for _, column := range columns { + for pair := columns.Oldest(); pair != nil; pair = pair.Next() { alterDefs = append(alterDefs, fmt.Sprintf("ADD COLUMN IF NOT EXISTS %s %s", - quoteIdent(column), datatypes[column])) + quoteIdent(pair.Key), + pair.Value)) } return fmt.Sprintf("ALTER TABLE %s %s", @@ -188,13 +189,64 @@ func (p *ClickHouse) generateAlterTable(tablename string, columns []string, data strings.Join(alterDefs, ",")) } -func (p *ClickHouse) generateInsert(tablename string, columns []string, batchSize int) string { - quotedColumns := make([]string, 0, len(columns)) - for _, column := range columns { - quotedColumns = append(quotedColumns, quoteIdent(column)) +func (p *ClickHouse) ensureTable(tablename string, columns *orderedmap.OrderedMap[string, string]) error { + var res *gosql.Rows + var err error + + for { + res, err = p.db.Query(fmt.Sprintf("DESCRIBE TABLE %s", quoteIdent(tablename))) + + if err != nil { + // Unknown Table Error + if strings.Contains(err.Error(), "code: 60") { + _, err = p.db.Exec(p.generateCreateTable(tablename, columns)) + if err != nil { + return err + } + fmt.Println("Created table", tablename) + continue + } + return err + } + + defer res.Close() + break } - placeholder := "(" + strings.Repeat("?,", len(columns)-1) + "?)" + tableColumns := make(map[string]struct{}) + for res.Next() { + var name string + var _i string + + err = res.Scan(&name, &_i, &_i, &_i, &_i, &_i, &_i) + if err != nil { + return err + } + + tableColumns[name] = struct{}{} + } + + for pair := columns.Oldest(); pair != nil; pair = pair.Next() { + if _, ok := tableColumns[pair.Key]; !ok { + _, err = p.db.Exec(p.generateAlterTable(tablename, columns)) + if err != nil { + return err + } + fmt.Println("Altered table", tablename) + break + } + } + + return nil +} + +func (p *ClickHouse) generateInsert(tablename string, columns *orderedmap.OrderedMap[string, string], batchSize int) string { + quotedColumns := make([]string, 0, columns.Len()) + for pair := columns.Oldest(); pair != nil; pair = pair.Next() { + quotedColumns = append(quotedColumns, quoteIdent(pair.Key)) + } + + placeholder := "(" + strings.Repeat("?,", columns.Len()-1) + "?)" placeholders := strings.Repeat(placeholder+",", batchSize-1) + placeholder return fmt.Sprintf("INSERT INTO %s(%s) VALUES %s", @@ -203,123 +255,96 @@ func (p *ClickHouse) generateInsert(tablename string, columns []string, batchSiz placeholders) } -func (p *ClickHouse) isUnknownTableErr(err error) bool { - if err == nil { - return false - } - - return strings.Contains(err.Error(), "code: 60") -} - -func (p *ClickHouse) isNoSuchColumnInTableErr(err error) bool { - if err == nil { - return false - } - - return strings.Contains(err.Error(), "code: 16") -} - -func (p *ClickHouse) Write(metrics []telegraf.Metric) error { +func (p *ClickHouse) WriteMultiTable(metrics []telegraf.Metric) error { metricsByTable := make(map[string][]map[string]interface{}) - columDatatypes := make(map[string]map[string]string) - tableColumns := make(map[string][]string) - tableLengths := make(map[string]int) + columns := make(map[string]*orderedmap.OrderedMap[string, string]) for _, metric := range metrics { tablename := metric.Name() + if p.MultiTableOptions.TablePrefix != "" { + tablename = p.MultiTableOptions.TablePrefix + "_" + tablename + } + if _, ok := metricsByTable[tablename]; !ok { metricsByTable[tablename] = make([]map[string]interface{}, 0, len(metrics)) } - if _, ok := columDatatypes[tablename]; !ok { - columDatatypes[tablename] = make(map[string]string) - tableColumns[tablename] = make([]string, 0, len(metric.FieldList())+len(metric.TagList())+1) - } - - if _, ok := tableLengths[tablename]; !ok { - tableLengths[tablename] = 0 + if _, ok := columns[tablename]; !ok { + columns[tablename] = orderedmap.New[string, string](len(metrics)) } metricEntry := make(map[string]interface{}) - metricEntry["timestamp"] = metric.Time() - columDatatypes[tablename]["timestamp"] = p.deriveDatatype(metric.Time()) - tableColumns[tablename] = append(tableColumns[tablename], "timestamp") + metricEntry[p.TimestampColumn] = metric.Time() + columns[tablename].Set(p.TimestampColumn, p.deriveDatatype(metric.Time())) for _, tag := range metric.TagList() { metricEntry[tag.Key] = tag.Value - columDatatypes[tablename][tag.Key] = p.deriveDatatype(tag.Value) - tableColumns[tablename] = append(tableColumns[tablename], tag.Key) + columns[tablename].Set(tag.Key, p.deriveDatatype(tag.Value)) } for _, field := range metric.FieldList() { metricEntry[field.Key] = field.Value - columDatatypes[tablename][field.Key] = p.deriveDatatype(field.Value) - tableColumns[tablename] = append(tableColumns[tablename], field.Key) + columns[tablename].Set(field.Key, p.deriveDatatype(field.Value)) } metricsByTable[tablename] = append(metricsByTable[tablename], metricEntry) - tableLengths[tablename]++ } for tablename, metrics := range metricsByTable { - for { - sql := p.generateInsert(tablename, tableColumns[tablename], tableLengths[tablename]) - values := make([]interface{}, 0, tableLengths[tablename]*len(columDatatypes[tablename])) + err := p.ensureTable(tablename, columns[tablename]) + if err != nil { + return err + } - tx, err := p.db.Begin() - if err != nil { - return fmt.Errorf("begin failed: %w", err) + sql := p.generateInsert(tablename, columns[tablename], len(metrics)) + values := make([]interface{}, 0, len(metrics)*columns[tablename].Len()) + + tx, err := p.db.Begin() + if err != nil { + return fmt.Errorf("begin failed: %w", err) + } + + stmt, err := tx.Prepare(sql) + if err != nil { + return fmt.Errorf("prepare failed: %w", err) + } + defer stmt.Close() + + for _, metric := range metrics { + for pair := columns[tablename].Oldest(); pair != nil; pair = pair.Next() { + values = append(values, metric[pair.Key]) } + } - stmt, err := tx.Prepare(sql) - if err != nil { - if p.isUnknownTableErr(err) { - createTableStmt := p.generateCreateTable(tablename, tableColumns[tablename], columDatatypes[tablename]) + _, err = stmt.Exec(values...) + if err != nil { + return fmt.Errorf("exec failed: %w", err) + } - _, err = p.db.Exec(createTableStmt) - if err != nil { - return fmt.Errorf("CREATE TABLE failed: %w", err) - } - continue - } - - if p.isNoSuchColumnInTableErr(err) { - alterTableStmt := p.generateAlterTable(tablename, tableColumns[tablename], columDatatypes[tablename]) - _, err = p.db.Exec(alterTableStmt) - if err != nil { - return fmt.Errorf("ALTER TABLE failed: %w", err) - } - continue - } - - return fmt.Errorf("prepare failed: %w", err) - } - defer stmt.Close() - - for _, metric := range metrics { - for _, column := range tableColumns[tablename] { - values = append(values, metric[column]) - } - } - - _, err = stmt.Exec(values...) - if err != nil { - return fmt.Errorf("exec failed: %w", err) - } - - err = tx.Commit() - if err != nil { - return fmt.Errorf("commit failed: %w", err) - } - - break + err = tx.Commit() + if err != nil { + return fmt.Errorf("commit failed: %w", err) } } + return nil } +func (p *ClickHouse) WriteSingleTable(metrics []telegraf.Metric) error { + // TODO + return nil +} + +func (p *ClickHouse) Write(metrics []telegraf.Metric) error { + if p.TableMode == "single" { + return p.WriteSingleTable(metrics) + } + + return p.WriteMultiTable(metrics) +} + func init() { outputs.Add("clickhouse", func() telegraf.Output { return &ClickHouse{