diff --git a/clickhouse b/clickhouse new file mode 100755 index 0000000..27e6088 Binary files /dev/null and b/clickhouse differ diff --git a/plugins/outputs/clickhouse/clickhouse.go b/plugins/outputs/clickhouse/clickhouse.go index c35fe9e..ae98a80 100644 --- a/plugins/outputs/clickhouse/clickhouse.go +++ b/plugins/outputs/clickhouse/clickhouse.go @@ -255,6 +255,44 @@ func (p *ClickHouse) generateInsert(tablename string, columns *orderedmap.Ordere placeholders) } +func (p *ClickHouse) writeMetrics(tablename string, columns *orderedmap.OrderedMap[string, string], metrics []map[string]interface{}) error { + err := p.ensureTable(tablename, columns) + if err != nil { + return err + } + + sql := p.generateInsert(tablename, columns, len(metrics)) + values := make([]interface{}, 0, len(metrics)*columns.Len()) + + tx, err := p.db.Begin() + if err != nil { + return fmt.Errorf("begin failed: %w", err) + } + + stmt, err := tx.Prepare(sql) + if err != nil { + return fmt.Errorf("prepare failed: %w", err) + } + defer stmt.Close() + + for _, metric := range metrics { + for pair := columns.Oldest(); pair != nil; pair = pair.Next() { + values = append(values, metric[pair.Key]) + } + } + + _, err = stmt.Exec(values...) + if err != nil { + return fmt.Errorf("exec failed: %w", err) + } + + err = tx.Commit() + if err != nil { + return fmt.Errorf("commit failed: %w", err) + } + return nil +} + func (p *ClickHouse) WriteMultiTable(metrics []telegraf.Metric) error { metricsByTable := make(map[string][]map[string]interface{}) columns := make(map[string]*orderedmap.OrderedMap[string, string]) @@ -293,40 +331,10 @@ func (p *ClickHouse) WriteMultiTable(metrics []telegraf.Metric) error { } for tablename, metrics := range metricsByTable { - err := p.ensureTable(tablename, columns[tablename]) + err := p.writeMetrics(tablename, columns[tablename], metrics) if err != nil { return err } - - sql := p.generateInsert(tablename, columns[tablename], len(metrics)) - values := make([]interface{}, 0, len(metrics)*columns[tablename].Len()) - - tx, err := p.db.Begin() - if err != nil { - return fmt.Errorf("begin failed: %w", err) - } - - stmt, err := tx.Prepare(sql) - if err != nil { - return fmt.Errorf("prepare failed: %w", err) - } - defer stmt.Close() - - for _, metric := range metrics { - for pair := columns[tablename].Oldest(); pair != nil; pair = pair.Next() { - values = append(values, metric[pair.Key]) - } - } - - _, err = stmt.Exec(values...) - if err != nil { - return fmt.Errorf("exec failed: %w", err) - } - - err = tx.Commit() - if err != nil { - return fmt.Errorf("commit failed: %w", err) - } } return nil