Fix batching

Signed-off-by: Nikolaos Karaolidis <nick@karaolidis.com>
This commit is contained in:
2023-06-28 19:52:27 +03:00
parent dc1277222d
commit 558fb3a423
4 changed files with 90 additions and 131 deletions

View File

@@ -18,41 +18,6 @@ When the plugin first connects it runs SQL from the init_sql setting, allowing y
Before inserting a row, the plugin checks whether the table exists. If it doesn't exist, the plugin creates the table.
The name of the timestamp column is "timestamp" but it can be changed with the timestamp_column setting. The timestamp column can be completely disabled by setting it to "".
## Configuration
```toml @sample.conf
# Save metrics to an SQL Database
[[outputs.sql]]
## Data source name
# data_source_name = ""
## Timestamp column name
# timestamp_column = "timestamp"
## Initialization SQL
# init_sql = ""
## Maximum amount of time a connection may be idle. "0s" means connections are
## never closed due to idle time.
# connection_max_idle_time = "0s"
## Maximum amount of time a connection may be reused. "0s" means connections
## are never closed due to age.
# connection_max_lifetime = "0s"
## Maximum number of connections in the idle connection pool. 0 means unlimited.
# connection_max_idle = 2
## Maximum number of open connections to the database. 0 means unlimited.
# connection_max_open = 0
```
## Driver-specific information
### clickhouse
#### DSN
## DSN
Currently, Telegraf's sql output plugin depends on [clickhouse-go v1.5.4](https://github.com/ClickHouse/clickhouse-go/tree/v1.5.4) which uses a [different DSN format](https://github.com/ClickHouse/clickhouse-go/tree/v1.5.4#dsn) than its newer `v2.*` version.

View File

@@ -12,50 +12,26 @@ import (
)
var pollInterval = flag.Duration("poll_interval", 1*time.Second, "how often to send metrics")
var pollIntervalDisabled = flag.Bool(
"poll_interval_disabled",
false,
"set to true to disable polling. You want to use this when you are sending metrics on your own schedule",
)
var configFile = flag.String("config", "sample.conf", "path to the config file for this plugin")
var configFile = flag.String("config", "", "path to the config file for this plugin")
var err error
// This is designed to be simple; Just change the import above, and you're good.
//
// However, if you want to do all your config in code, you can like so:
//
// // initialize your plugin with any settings you want
//
// myInput := &mypluginname.MyPlugin{
// DefaultSettingHere: 3,
// }
//
// shim := shim.New()
//
// shim.AddInput(myInput)
//
// // now the shim.Run() call as below. Note the shim is only intended to run a single plugin.
func main() {
// parse command line options
flag.Parse()
if *pollIntervalDisabled {
*pollInterval = shim.PollIntervalDisabled
}
// create the shim. This is what will run your plugins.
shimLayer := shim.New()
// If no config is specified, all imported plugins are loaded.
// otherwise, follow what the config asks for.
// Check for settings from a config toml file,
// (or just use whatever plugins were imported above)
if err = shimLayer.LoadConfig(configFile); err != nil {
fmt.Fprintf(os.Stderr, "Err loading input: %s\n", err)
os.Exit(1)
}
// run a single plugin until stdin closes, or we receive a termination signal
if err = shimLayer.Run(*pollInterval); err != nil {
fmt.Fprintf(os.Stderr, "Err: %s\n", err)
os.Exit(1)

View File

@@ -18,24 +18,23 @@ import (
//go:embed sample.conf
var sampleConfig string
type SQL struct {
DataSourceName string
TimestampColumn string
type ClickHouse struct {
DataSourceName string `toml:"data_source_name"`
InitSQL string `toml:"init_sql"`
ConnectionMaxIdleTime config.Duration
ConnectionMaxLifetime config.Duration
ConnectionMaxIdle int
ConnectionMaxOpen int
ConnectionMaxIdleTime config.Duration `toml:"connection_max_idle_time"`
ConnectionMaxLifetime config.Duration `toml:"connection_max_lifetime"`
ConnectionMaxIdle int `toml:"connection_max_idle"`
ConnectionMaxOpen int `toml:"connection_max_open"`
db *gosql.DB
Log telegraf.Logger `toml:"-"`
}
func (*SQL) SampleConfig() string {
func (*ClickHouse) SampleConfig() string {
return sampleConfig
}
func (p *SQL) Connect() error {
func (p *ClickHouse) Connect() error {
db, err := gosql.Open("clickhouse", p.DataSourceName)
if err != nil {
return err
@@ -60,10 +59,12 @@ func (p *SQL) Connect() error {
p.db = db
fmt.Println("Connected to ClickHouse!")
return nil
}
func (p *SQL) Close() error {
func (p *ClickHouse) Close() error {
return p.db.Close()
}
@@ -86,7 +87,7 @@ func sanitizeQuoted(in string) string {
}, in)
}
func (p *SQL) deriveDatatype(value interface{}) string {
func (p *ClickHouse) deriveDatatype(value interface{}) string {
var datatype string
switch value.(type) {
@@ -100,32 +101,34 @@ func (p *SQL) deriveDatatype(value interface{}) string {
datatype = "String"
case bool:
datatype = "UInt8"
case time.Time:
datatype = "DateTime"
default:
datatype = "String"
p.Log.Errorf("unknown datatype for value %v", value)
}
return datatype
}
func (p *SQL) generateCreateTable(tablename string, columns map[string]struct{}) string {
func (p *ClickHouse) generateCreateTable(tablename string, columns []string, datatypes map[string]string) string {
columnDefs := make([]string, 0, len(columns))
for column := range columns {
dataType := p.deriveDatatype(column)
columnDefs = append(columnDefs, fmt.Sprintf("%s %s", quoteIdent(column), dataType))
for _, column := range columns {
columnDefs = append(columnDefs, fmt.Sprintf("%s %s", quoteIdent(column), datatypes[column]))
}
return fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s (%s)",
return fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s (%s) ENGINE = MergeTree() ORDER BY (host,timestamp) PARTITION BY toYYYYMM(timestamp) TTL timestamp + INTERVAL 3 MONTH",
quoteIdent(tablename),
strings.Join(columnDefs, ","))
}
func (p *SQL) generateAlterTable(tablename string, columns map[string]struct{}) string {
func (p *ClickHouse) generateAlterTable(tablename string, columns []string, datatypes map[string]string) string {
alterDefs := make([]string, 0, len(columns))
for column := range columns {
dataType := p.deriveDatatype(column)
for _, column := range columns {
alterDefs = append(alterDefs, fmt.Sprintf("ADD COLUMN IF NOT EXISTS %s %s",
quoteIdent(column), dataType))
quoteIdent(column), datatypes[column]))
}
return fmt.Sprintf("ALTER TABLE %s %s",
@@ -133,9 +136,9 @@ func (p *SQL) generateAlterTable(tablename string, columns map[string]struct{})
strings.Join(alterDefs, ","))
}
func (p *SQL) generateInsert(tablename string, columns map[string]struct{}, batchSize int) string {
func (p *ClickHouse) generateInsert(tablename string, columns []string, batchSize int) string {
quotedColumns := make([]string, 0, len(columns))
for column := range columns {
for _, column := range columns {
quotedColumns = append(quotedColumns, quoteIdent(column))
}
@@ -144,55 +147,74 @@ func (p *SQL) generateInsert(tablename string, columns map[string]struct{}, batc
return fmt.Sprintf("INSERT INTO %s(%s) VALUES %s",
quoteIdent(tablename),
strings.Join(quotedColumns, ","),
strings.Join(quotedColumns, ", "),
placeholders)
}
func (p *SQL) isUnknownTableErr(err error) bool {
func (p *ClickHouse) isUnknownTableErr(err error) bool {
if err == nil {
return false
}
return strings.Contains(err.Error(), "UNKNOWN_TABLE")
return strings.Contains(err.Error(), "code: 60")
}
func (p *SQL) isNoSuchColumnInTableErr(err error) bool {
func (p *ClickHouse) isNoSuchColumnInTableErr(err error) bool {
if err == nil {
return false
}
return strings.Contains(err.Error(), "NO_SUCH_COLUMN_IN_TABLE")
return strings.Contains(err.Error(), "code: 16")
}
func (p *SQL) Write(metrics []telegraf.Metric) error {
metricsByTable := make(map[string][]telegraf.Metric)
func (p *ClickHouse) Write(metrics []telegraf.Metric) error {
metricsByTable := make(map[string][]map[string]interface{})
columDatatypes := make(map[string]map[string]string)
tableColumns := make(map[string][]string)
tableLengths := make(map[string]int)
tableColumns := make(map[string]map[string]struct{})
for _, metric := range metrics {
tablename := metric.Name()
metricsByTable[tablename] = append(metricsByTable[tablename], metric)
tableLengths[tablename]++
if _, ok := metricsByTable[tablename]; !ok {
metricsByTable[tablename] = make([]map[string]interface{}, 0, len(metrics))
}
if _, ok := columDatatypes[tablename]; !ok {
columDatatypes[tablename] = make(map[string]string)
tableColumns[tablename] = make([]string, 0, len(metric.FieldList())+len(metric.TagList())+1)
}
if _, ok := tableLengths[tablename]; !ok {
tableLengths[tablename] = 0
}
metricEntry := make(map[string]interface{})
metricEntry["timestamp"] = metric.Time()
columDatatypes[tablename]["timestamp"] = p.deriveDatatype(metric.Time())
tableColumns[tablename] = append(tableColumns[tablename], "timestamp")
for _, tag := range metric.TagList() {
if _, ok := tableColumns[tablename]; !ok {
tableColumns[tablename] = make(map[string]struct{})
}
tableColumns[tablename][tag.Key] = struct{}{}
metricEntry[tag.Key] = tag.Value
columDatatypes[tablename][tag.Key] = p.deriveDatatype(tag.Value)
tableColumns[tablename] = append(tableColumns[tablename], tag.Key)
}
for _, field := range metric.FieldList() {
if _, ok := tableColumns[tablename]; !ok {
tableColumns[tablename] = make(map[string]struct{})
}
tableColumns[tablename][field.Key] = struct{}{}
}
metricEntry[field.Key] = field.Value
columDatatypes[tablename][field.Key] = p.deriveDatatype(field.Value)
tableColumns[tablename] = append(tableColumns[tablename], field.Key)
}
for tablename, tableMetrics := range metricsByTable {
metricsByTable[tablename] = append(metricsByTable[tablename], metricEntry)
tableLengths[tablename]++
}
for tablename, metrics := range metricsByTable {
for {
sql := p.generateInsert(tablename, tableColumns[tablename], tableLengths[tablename])
values := make([]interface{}, 0, len(tableMetrics)*len(tableColumns[tablename]))
values := make([]interface{}, 0, tableLengths[tablename]*len(columDatatypes[tablename]))
tx, err := p.db.Begin()
if err != nil {
@@ -200,32 +222,39 @@ func (p *SQL) Write(metrics []telegraf.Metric) error {
}
stmt, err := tx.Prepare(sql)
if err != nil {
return fmt.Errorf("prepare failed: %w", err)
}
defer stmt.Close()
_, err = stmt.Exec(values...)
if err != nil {
if p.isUnknownTableErr(err) {
createTableStmt := p.generateCreateTable(tablename, tableColumns[tablename])
createTableStmt := p.generateCreateTable(tablename, tableColumns[tablename], columDatatypes[tablename])
_, err = p.db.Exec(createTableStmt)
if err != nil {
return fmt.Errorf("create table failed: %w", err)
return fmt.Errorf("CREATE TABLE failed: %w", err)
}
continue
}
if p.isNoSuchColumnInTableErr(err) {
alterTableStmt := p.generateAlterTable(tablename, tableColumns[tablename])
alterTableStmt := p.generateAlterTable(tablename, tableColumns[tablename], columDatatypes[tablename])
_, err = p.db.Exec(alterTableStmt)
if err != nil {
return fmt.Errorf("alter table failed: %w", err)
return fmt.Errorf("ALTER TABLE failed: %w", err)
}
continue
}
return fmt.Errorf("execution failed: %w", err)
return fmt.Errorf("prepare failed: %w", err)
}
defer stmt.Close()
for _, metric := range metrics {
for _, column := range tableColumns[tablename] {
values = append(values, metric[column])
}
}
_, err = stmt.Exec(values...)
if err != nil {
return fmt.Errorf("exec failed: %w", err)
}
err = tx.Commit()
@@ -240,17 +269,9 @@ func (p *SQL) Write(metrics []telegraf.Metric) error {
}
func init() {
outputs.Add("sql", func() telegraf.Output { return newSQL() })
}
func newSQL() *SQL {
return &SQL{
TimestampColumn: "timestamp",
// Defaults for the connection settings (ConnectionMaxIdleTime,
// ConnectionMaxLifetime, ConnectionMaxIdle, and ConnectionMaxOpen)
// mirror the golang defaults. As of go 1.18 all of them default to 0
// except max idle connections which is 2. See
// https://pkg.go.dev/database/sql#DB.SetMaxIdleConns
outputs.Add("clickhouse", func() telegraf.Output {
return &ClickHouse{
ConnectionMaxIdle: 2,
}
})
}

View File

@@ -3,9 +3,6 @@
## Data source name
# data_source_name = ""
## Timestamp column name
# timestamp_column = "timestamp"
## Initialization SQL
# init_sql = ""