Refactor WriteMultiTable()

Signed-off-by: Nikolaos Karaolidis <nick@karaolidis.com>
This commit is contained in:
2023-06-29 12:52:54 +03:00
parent c24045dc7d
commit 726df648a5
2 changed files with 39 additions and 31 deletions

BIN
clickhouse Executable file

Binary file not shown.

View File

@@ -255,6 +255,44 @@ func (p *ClickHouse) generateInsert(tablename string, columns *orderedmap.Ordere
placeholders)
}
func (p *ClickHouse) writeMetrics(tablename string, columns *orderedmap.OrderedMap[string, string], metrics []map[string]interface{}) error {
err := p.ensureTable(tablename, columns)
if err != nil {
return err
}
sql := p.generateInsert(tablename, columns, len(metrics))
values := make([]interface{}, 0, len(metrics)*columns.Len())
tx, err := p.db.Begin()
if err != nil {
return fmt.Errorf("begin failed: %w", err)
}
stmt, err := tx.Prepare(sql)
if err != nil {
return fmt.Errorf("prepare failed: %w", err)
}
defer stmt.Close()
for _, metric := range metrics {
for pair := columns.Oldest(); pair != nil; pair = pair.Next() {
values = append(values, metric[pair.Key])
}
}
_, err = stmt.Exec(values...)
if err != nil {
return fmt.Errorf("exec failed: %w", err)
}
err = tx.Commit()
if err != nil {
return fmt.Errorf("commit failed: %w", err)
}
return nil
}
func (p *ClickHouse) WriteMultiTable(metrics []telegraf.Metric) error {
metricsByTable := make(map[string][]map[string]interface{})
columns := make(map[string]*orderedmap.OrderedMap[string, string])
@@ -293,40 +331,10 @@ func (p *ClickHouse) WriteMultiTable(metrics []telegraf.Metric) error {
}
for tablename, metrics := range metricsByTable {
err := p.ensureTable(tablename, columns[tablename])
err := p.writeMetrics(tablename, columns[tablename], metrics)
if err != nil {
return err
}
sql := p.generateInsert(tablename, columns[tablename], len(metrics))
values := make([]interface{}, 0, len(metrics)*columns[tablename].Len())
tx, err := p.db.Begin()
if err != nil {
return fmt.Errorf("begin failed: %w", err)
}
stmt, err := tx.Prepare(sql)
if err != nil {
return fmt.Errorf("prepare failed: %w", err)
}
defer stmt.Close()
for _, metric := range metrics {
for pair := columns[tablename].Oldest(); pair != nil; pair = pair.Next() {
values = append(values, metric[pair.Key])
}
}
_, err = stmt.Exec(values...)
if err != nil {
return fmt.Errorf("exec failed: %w", err)
}
err = tx.Commit()
if err != nil {
return fmt.Errorf("commit failed: %w", err)
}
}
return nil