ETL: Извлечение, преобразование и загрузка в Go
Вы собираетесь посетить Бостон и хотите попробовать вкусную еду. Вы спрашиваете своего друга, который живет там, где можно поесть. Он отвечает: «Везде хорошо, не прогадаешь». Что заставляет задуматься, может, мне стоит проверить, где не стоит есть.
В вас возникает компьютерный компьютерщик, и вы обнаруживаете, что в городе Бостон есть набор данных о пищевых нарушениях. Вы скачиваете его и решаете посмотреть.
Данные в формате CSV (который вы ненавидите). Поскольку вы собираетесь поиграть с данными, вы решаете загрузить эти данные в базу данных SQL. Как только данные попадают в базу данных, вы можете динамически запрашивать их с помощью SQL или даже использовать необычные инструменты, такие как Grafana или Redash, для визуализации данных.
Этот пост будет посвящен процессу загрузки данных из различных источников в базу данных SQL, известную как ETL.
Извлечь, преобразовать, загрузить (ETL)
ETL означает «Извлечь, преобразовать, загрузить». Многие данные в необработанном формате можно найти в файлах (логи, CSV, JSON…), и распространенной практикой является загрузка этих данных в общее место (иногда называемое хранилищем данных или озером данных), где специалисты по данным могут лучше анализировать их.
Три этапа ETL:
- Extract (извлечение): данные в логах, CSV и других форматах необходимо проанализировать (или извлечь). Иногда нам нужны только части данных.
- Transform (преобразование): здесь мы переименовываем поле, преобразуем типы данных, обогащаем (например, добавляем геолокацию) и т.д.
- Load (загрузка): наконец, мы загружаем данные в место назначения.
Примечание: иногда порядок меняется. Сначала мы извлекаем и загружаем, а затем в базе данных выполняются преобразования.
Первый взгляд на данные
Данные CSV имеют много-много ошибок, но их легко просмотреть, поскольку они текстовые. Посмотрим на строку заголовка.
Листинг 1. Право первого выбора
$ wc -l boston-food.csv
655317 boston-food.csv
$ head -1 boston-food.csv
businessname,dbaname,legalowner,namelast,namefirst,licenseno,issdttm,expdttm,licstatus,licensecat,descript,result,resultdttm,violation,viollevel,violdesc,violdttm,violstatus,statusdate,comments,address,city,state,zip,property_id,location
В листинге 1 мы используем команду wc
, чтобы увидеть, сколько строк у нас есть, а затем используем команду head
, чтобы увидеть первую строку, содержащую имена столбцов.
Некоторые имена, например businessname
, имеют смысл. Некоторые, например expdttm
, более загадочные. Быстрый поиск в Интернете находит описание данных.
Прочитав описание данных, вы решаете использовать только некоторые поля и переименовать некоторые из них, чтобы получить более понятные имена.
businessname
станетbusiness_name
licstatus
станетlicense_status
violdesc
станетdescription
violstatus
станетstatus
viollevel
который либо*
,**
либо***
станетlevel
(1, 2 или 3 - целое число)result
,comments
,address
,city
иzip
останутся такие как есть
Все остальные поля мы проигнорируем
Листинг 2: Схема базы данных
CREATE TABLE IF NOT EXISTS violations (
business_name TEXT,
license_status TEXT,
result TEXT,
description TEXT,
time TIMESTAMP,
status TEXT,
level INTEGER,
comments TEXT,
address TEXT,
city TEXT,
zip TEXT
);
Зависимости
Листинг 3: go.mod
module github.com/353words/food
go 1.17
require (
github.com/jmoiron/sqlx v1.3.4
github.com/jszwec/csvutil v1.5.1
github.com/mattn/go-sqlite3 v1.14.8
)
В листинге 3 показано содержимое go.mod
. Чтобы проанализировать CSV-файл, мы будем использовать csvutil. Для базы данных мы будем использовать go-sqlite3 (я люблю SQLite) и sqlx.
Программа
Листинг 4: импорт
import (
_ "embed"
"encoding/csv"
"fmt"
"io"
"log"
"os"
"time"
"github.com/jmoiron/sqlx"
"github.com/jszwec/csvutil"
_ "github.com/mattn/go-sqlite3"
)
В листинге 4 показан наш импорт. Во 2 строке мы импортируем пакет embed
. Мы собираемся написать SQL в файлах .sql, а затем встроить их в исполняемый файл с помощью директив //go:embed
.
Листинг 5: операторы SQL
//go:embed schema.sql
var schemaSQL string
//go:embed insert.sql
var insertSQL string
Мы используем директиву //go:embed
для встраивания SQL, написанного в файлах .sql, в наш код. Это позволяет нам писать SQL вне кода Go и при этом поставлять единственный исполняемый файл.
Листинг 6: Row
type Row struct {
Business string `csv:"businessname" db:"business_name"`
Licstatus string `csv:"licstatus" db:"license_status"`
Result string `csv:"result" db:"result"`
Violdesc string `csv:"violdesc" db:"description"`
Violdttm time.Time `csv:"violdttm" db:"time"`
Violstatus string `csv:"violstatus" db:"status"`
Viollevel string `csv:"viollevel" db:"-"`
Level int `db:"level"`
Comments string `csv:"comments" db:"comments"`
Address string `csv:"address" db:"address"`
City string `csv:"city" db:"city"`
Zip string `csv:"zip" db:"zip"`
}
Тут мы определяем структуру Row
. Она используется как в csvutil
для анализа строк в файле CSV, так и в sqlx
для вставки значений в базу данных. Мы используем теги полей, чтобы указать соответствующие столбцы в CSV и базе данных.
Когда вы посмотрите на поле viollevel
в CSV-файле (вы можете использовать shuf boston-food.csv| head
, чтобы увидеть несколько случайных строк), вы увидите *
, **
либо ***
. Ниже мы воспользуемся parseLevel
, чтобы преобразовать *
в целое число и заполнить Level
.
Листинг 7: parseLevel
func parseLevel(value string) int {
switch value {
case "*":
return 1
case "**":
return 2
case "***":
return 3
}
return -1
}
В листинге 7 показана функция parseLevel
преобразования *
в число. В конце функции мы возвращаем -1 для неизвестных значений. Решение вернуть -1, а не ошибку - это решение дизайна данных, в этом случае мы решили, что иметь недопустимые (-1) уровни в базе данных - это нормально.
Листинг 8: unmarshalTime
func unmarshalTime(data []byte, t *time.Time) error {
var err error
*t, err = time.Parse("2006-01-02 15:04:05", string(data))
return err
}
В листинге 8 функция unmarshalTime
, которая используется csvutil
для анализа значений времени в файле CSV.
Листинг 9: ETL
func ETL(csvFile io.Reader, tx *sqlx.Tx) (int, int, error) {
r := csv.NewReader(csvFile)
dec, err := csvutil.NewDecoder(r)
if err != nil {
return 0, 0, err
}
dec.Register(unmarshalTime)
numRecords := 0
numErrors := 0
for {
numRecords++
var row Row
err = dec.Decode(&row)
if err == io.EOF {
break
}
if err != nil {
log.Printf("error: %d: %s", numRecords, err)
numErrors++
continue
}
row.Level = parseLevel(row.Viollevel)
if _, err := tx.NamedExec(insertSQL, &row); err != nil {
return 0, 0, err
}
}
return numRecords, numErrors, nil
}
В листинге 8 показана функция ETL
. Мы видим, что функция ETL
получает io.Reader
как CSV-файл и транзакцию, которая используется для вставки значений в базу данных, ETL
возвращает количество записей, количество плохих записей и значение ошибки.
Внутри функции мы используем unmarshalTime
для обработки значений времени. Далее мы инициализируем количество записей и количество ошибок, которые возвращает ETL
.
Внутри цикла for мы декодируем строку из CSV, далее мы проверяем, указывает ли возвращенная ошибка io.EOF
на конец файла. Затем мы проверяем наличие других ошибок, и если они есть, мы регистрируем их и увеличиваем numErrors
.
Листинг 10: main
func main() {
file, err := os.Open("boston-food.csv")
if err != nil {
log.Fatal(err)
}
defer file.Close()
db, err := sqlx.Open("sqlite3", "./food.db")
if err != nil {
log.Fatal(err)
}
defer db.Close()
if _, err := db.Exec(schemaSQL); err != nil {
log.Fatal(err)
}
tx, err := db.Beginx()
if err != nil {
log.Fatal(err)
}
start := time.Now()
numRecords, numErrors, err := ETL(file, tx)
duration := time.Since(start)
if err != nil {
tx.Rollback()
log.Fatal(err)
}
frac := float64(numErrors) / float64(numRecords)
if frac > 0.1 {
tx.Rollback()
log.Fatalf("too many errors: %d/%d = %f", numErrors, numRecords, frac)
}
tx.Commit()
fmt.Printf("%d records (%.2f errors) in %v\n", numRecords, frac, duration)
}
Использование транзакций
Вставка данных через транзакцию означает, что либо вставлены все данные, либо не вставлены никакие. Если бы мы не использовали транзакции, а входила половина данных - у нас была серьезная проблема. Нам нужно либо перезапустить ETL с середины, либо удалить данные, которым удалось их получить. Оба варианта сложно реализовать, и они усложнят ваш код. Транзакции - одна из основных причин (помимо SQL) того, что я люблю использовать транзакционные базы данных, такие как SQLite, PostgreSQL и другие.
Запуск ETL
Листинг 11: Запуск ETL
$ go run etl.go
... (many log lines reducted)
2021/09/11 12:21:44 error: 655301: parsing time " " as "2006-01-02 15:04:05": cannot parse " " as "2006"
655317 records (0.06 errors) in 13.879984129s
Около 6% строк содержали ошибки, в основном, нехватку времени. Это нормально, поскольку мы определили порог ошибки равным 10%.
Анализ данных
Когда данные находятся в базе данных, мы можем использовать SQL для их запроса.
Листинг 12: Запрос
SELECT
business_name, COUNT(business_name) as num_violations
FROM
violations
WHERE
license_status = 'Active' AND
time >= '2016-01-01'
GROUP BY business_name
ORDER BY num_violations DESC
LIMIT 20
В листинге 12 показан SQL-запрос для выбора 20 крупнейших предприятий, у которых было наибольшее количество нарушений за последние 5 лет. Во второй строке мы выбираем столбец business_name
и его количество. В строках с пятой по седьмую мы ограничиваем записи до тех, которые являются активными, и время после 2016 года. В строке 8 мы группируем строку по business_name
, а в строке 9 мы упорядочиваем результаты по количеству нарушений и, наконец в строке 10 мы ограничиваем 20 результатами.
Листинг 13: Выполнение запроса
$ sqlite3 food.db < query.sql
Dunkin Donuts|1031
Subway|996
The Real Deal|756
Mcdonalds|723
Caffe Nero|640
ORIENTAL HOUSE|599
Burger King|537
Dumpling Palace|463
Sweetgreen|454
The Upper Crust|453
Dunkin' Donuts|436
Yamato II|435
Anh Hong Restaurant|413
Chilacates|408
Yely's Coffee Shop|401
India Quality|386
Domino's Pizza|374
Fan Fan Restaurant|362
Pavement Coffeehouse|357
FLAMES RESTAURANT|357
В листинге 13 показано, как выполнить запрос с помощью утилиты командной строки sqlite3
.
Последние мысли
В науке о данных и анализе данных преобладают, ну, ну, данные :) Конвейеры данных и ETL - это то, что доставляет данные в место, где вы можете запрашивать и анализировать их. Go отлично подходит для запуска ETL, он быстрый, эффективный и имеет отличные библиотеки.
Использование транзакций и SQL сэкономит вам много усилий в долгосрочной перспективе. Вам не нужно быть экспертом по SQL (я не такой), чтобы использовать их.