201 lines
5.7 KiB
Go
201 lines
5.7 KiB
Go
package handlers
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"net/http"
|
|
"otaupdate/services"
|
|
"reflect"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/fiskerinc/cloud-services/pkg/clickhouse"
|
|
"github.com/fiskerinc/cloud-services/pkg/common"
|
|
"github.com/fiskerinc/cloud-services/pkg/logger"
|
|
"github.com/fiskerinc/cloud-services/pkg/validator"
|
|
|
|
ch "github.com/ClickHouse/clickhouse-go/v2"
|
|
"github.com/gorilla/schema"
|
|
"github.com/pkg/errors"
|
|
"github.com/fiskerinc/cloud-services/pkg/loggerdataresp"
|
|
)
|
|
|
|
const (
|
|
limit = 100000
|
|
)
|
|
|
|
// HandleCanSignalVINGet godoc
|
|
// @Summary Export CAN signals for a specific VIN
|
|
// @Description Exports CAN signals for a specific VIN based on specified time range and CAN signals. Requires API token permission.
|
|
// @Accept json
|
|
// @Produce octet-stream
|
|
// @Param Authorization header string false "Bearer <ID token>"
|
|
// @Param Api-Key header string false "<API token>"
|
|
// @Param select_all query boolean false "Select All CAN Signals"
|
|
// @Param can_signals query []string false "CAN Signals"
|
|
// @Param timestamp_start query float64 true "Start time must be included"
|
|
// @Param timestamp_end query float64 true "End time must be included"
|
|
// @Param vin query string true "VIN must be included in the query"
|
|
// @Success 200 {file} CSV file with the specified CAN signals data
|
|
// @Failure 400 {object} common.JSONError "Bad request"
|
|
// @Failure 401 {object} common.JSONError "Unauthorized"
|
|
// @Failure 503 {object} common.JSONError "Service unavailable"
|
|
// @Router /can_signals_export [get]
|
|
func HandleCanSignalVINGet(w http.ResponseWriter, r *http.Request) {
|
|
filter, err := parseCANSignalFilter(r)
|
|
if loggerdataresp.BadDataErrorResp(w, err, http.StatusBadRequest) {
|
|
return
|
|
}
|
|
filter.Limit = limit
|
|
|
|
conn, err := services.GetClickhouseConn()
|
|
if loggerdataresp.BadDataErrorResp(w, err, http.StatusServiceUnavailable) {
|
|
logger.Error().Err(err).Msg("cannot get clickhouse client")
|
|
return
|
|
}
|
|
|
|
if filter.SelectAll {
|
|
allCanSignals, err := getListOfAllCanSignals(conn)
|
|
if loggerdataresp.BadDataErrorResp(w, err, http.StatusServiceUnavailable) {
|
|
return
|
|
}
|
|
filter.CanSignals = []string{allCanSignals}
|
|
}
|
|
|
|
w.Header().Set("Content-Type", "text/csv")
|
|
res := 0
|
|
|
|
for res == filter.Limit || res == 0 {
|
|
res, err = getCanSignalVin(conn, filter, w)
|
|
if loggerdataresp.BadDataErrorResp(w, err, http.StatusServiceUnavailable) {
|
|
return
|
|
}
|
|
if res == 0 {
|
|
return
|
|
}
|
|
filter.Offset += res
|
|
}
|
|
|
|
if loggerdataresp.BadDataErrorResp(w, err, http.StatusServiceUnavailable) {
|
|
return
|
|
}
|
|
}
|
|
|
|
func parseCANSignalFilter(r *http.Request) (common.CANSignalQuery, error) {
|
|
sch := schema.NewDecoder()
|
|
filter := common.CANSignalQuery{}
|
|
sch.SetAliasTag("json")
|
|
|
|
//parse, err := r.URL.Parse(r.URL.String())
|
|
err := sch.Decode(&filter, r.URL.Query())
|
|
if err != nil {
|
|
return common.CANSignalQuery{}, errors.WithStack(err)
|
|
}
|
|
|
|
err = validator.GetValidator().Struct(filter)
|
|
if err != nil {
|
|
return common.CANSignalQuery{}, errors.WithStack(err)
|
|
}
|
|
|
|
if len(filter.CanSignals) == 0 && !filter.SelectAll {
|
|
return common.CANSignalQuery{}, errors.New("either Select All of a list of CAN Signals required")
|
|
}
|
|
|
|
return filter, nil
|
|
}
|
|
|
|
func getCanSignalVin(conn clickhouse.ConnInterface, filter common.CANSignalQuery, w http.ResponseWriter) (int, error) {
|
|
chCtx := ch.Context(context.Background())
|
|
|
|
query := fmt.Sprintf(`SELECT * from (SELECT VIN, Timestamp, %s FROM feature_table WHERE VIN = '%s' AND Timestamp BETWEEN %f AND %f LIMIT %d, %d) ORDER BY Timestamp DESC`,
|
|
strings.Join(filter.CanSignals, ", "),
|
|
filter.VIN,
|
|
filter.TimestampStart,
|
|
filter.TimestampEnd,
|
|
filter.Offset,
|
|
filter.Limit)
|
|
rows, err := conn.Query(chCtx, query)
|
|
if err != nil {
|
|
return 0, errors.WithStack(err)
|
|
}
|
|
|
|
if filter.Offset == 0 {
|
|
headerLine := "VIN,Timestamp"
|
|
for _, signal := range filter.CanSignals {
|
|
headerLine += "," + signal
|
|
}
|
|
headerLine += "\n"
|
|
_, err = w.Write([]byte(headerLine))
|
|
if err != nil {
|
|
return 0, errors.WithStack(err)
|
|
}
|
|
}
|
|
|
|
columnTypes := rows.ColumnTypes()
|
|
row := make([]interface{}, len(columnTypes))
|
|
for i, cType := range columnTypes {
|
|
kind := cType.ScanType().Kind()
|
|
scanName := cType.ScanType().Name()
|
|
|
|
if kind == reflect.String || strings.Contains(scanName, "string") {
|
|
row[i] = new(string)
|
|
} else if kind == reflect.Float64 || kind == reflect.Float32 || strings.Contains(scanName, "float") || strings.Contains(scanName, "decimal") {
|
|
row[i] = new(float64)
|
|
} else if kind == reflect.Int64 || kind == reflect.Int || strings.Contains(scanName, "int") {
|
|
row[i] = new(int64)
|
|
} else if strings.Contains(scanName, "Time") {
|
|
row[i] = new(time.Time)
|
|
} else {
|
|
row[i] = new(string)
|
|
}
|
|
}
|
|
|
|
var canSignals []string
|
|
if len(filter.CanSignals) == 1 {
|
|
canSignals = strings.Split(filter.CanSignals[0], ",")
|
|
}
|
|
rowCounter := 0
|
|
defer rows.Close()
|
|
for rows.Next() {
|
|
rowCounter++
|
|
err := rows.Scan(row...)
|
|
if err != nil {
|
|
return 0, errors.WithStack(err)
|
|
}
|
|
|
|
vin := *row[0].(*string)
|
|
timestamp := *row[1].(*time.Time)
|
|
dataline := vin + "," + timestamp.Format("2006-01-02 15:04:05.000000")
|
|
|
|
for i := 2; i < len(canSignals)+2; i++ {
|
|
val := "0"
|
|
if v, ok := row[i].(*float64); ok {
|
|
val = strconv.FormatFloat(*v, 'f', -1, 64)
|
|
}
|
|
dataline += "," + val
|
|
}
|
|
|
|
w.Write([]byte(dataline + "\n"))
|
|
}
|
|
w.(http.Flusher).Flush()
|
|
return rowCounter, nil
|
|
}
|
|
|
|
func getListOfAllCanSignals(conn clickhouse.ConnInterface) (string, error) {
|
|
var allCanSignals []string
|
|
|
|
var canlist []common.CANSignalNameList
|
|
chCtx := ch.Context(context.Background())
|
|
err := conn.Select(chCtx, &canlist, "select Signal_Name from ml_var_list_table where Is_Feature=True")
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
for _, signalName := range canlist {
|
|
allCanSignals = append(allCanSignals, signalName.Signal_Name)
|
|
}
|
|
|
|
return strings.Join(allCanSignals, ","), nil
|
|
}
|