DevGang
Авторизоваться

ETL: Извлечение, преобразование и загрузка в Go 

Вы собираетесь посетить Бостон и хотите попробовать вкусную еду. Вы спрашиваете своего друга, который живет там, где можно поесть. Он отвечает: «Везде хорошо, не прогадаешь». Что заставляет задуматься, может, мне стоит проверить, где не стоит есть.

В вас возникает компьютерный компьютерщик, и вы обнаруживаете, что в городе Бостон есть набор данных о пищевых нарушениях. Вы скачиваете его и решаете посмотреть.

Данные в формате CSV (который вы ненавидите). Поскольку вы собираетесь поиграть с данными, вы решаете загрузить эти данные в базу данных SQL. Как только данные попадают в базу данных, вы можете динамически запрашивать их с помощью SQL или даже использовать необычные инструменты, такие как Grafana или Redash, для визуализации данных.

Этот пост будет посвящен процессу загрузки данных из различных источников в базу данных SQL, известную как ETL.

Извлечь, преобразовать, загрузить (ETL)

ETL означает «Извлечь, преобразовать, загрузить». Многие данные в необработанном формате можно найти в файлах (логи, CSV, JSON…), и распространенной практикой является загрузка этих данных в общее место (иногда называемое хранилищем данных или озером данных), где специалисты по данным могут лучше анализировать их.

Три этапа ETL:

  1. Extract (извлечение): данные в логах, CSV и других форматах необходимо проанализировать (или извлечь). Иногда нам нужны только части данных.
  2. Transform (преобразование): здесь мы переименовываем поле, преобразуем типы данных, обогащаем (например, добавляем геолокацию) и т.д.
  3. Load (загрузка): наконец, мы загружаем данные в место назначения.

Примечание: иногда порядок меняется. Сначала мы извлекаем и загружаем, а затем в базе данных выполняются преобразования.

Первый взгляд на данные

Данные CSV имеют много-много ошибок, но их легко просмотреть, поскольку они текстовые. Посмотрим на строку заголовка.

Листинг 1. Право первого выбора

$ wc -l boston-food.csv 
655317 boston-food.csv

$ head -1 boston-food.csv           
businessname,dbaname,legalowner,namelast,namefirst,licenseno,issdttm,expdttm,licstatus,licensecat,descript,result,resultdttm,violation,viollevel,violdesc,violdttm,violstatus,statusdate,comments,address,city,state,zip,property_id,location

В листинге 1 мы используем команду wc, чтобы увидеть, сколько строк у нас есть, а затем используем команду head, чтобы увидеть первую строку, содержащую имена столбцов.

Некоторые имена, например businessname, имеют смысл. Некоторые, например expdttm, более загадочные. Быстрый поиск в Интернете находит описание данных.

Прочитав описание данных, вы решаете использовать только некоторые поля и переименовать некоторые из них, чтобы получить более понятные имена.

  1. businessname станет business_name
  2. licstatus станет license_status
  3. violdesc станет description
  4. violstatus станет status
  5. viollevel который либо *** либо *** станет level (1, 2 или 3 - целое число)
  6. resultcommentsaddresscity и zip останутся такие как есть

Все остальные поля мы проигнорируем

Листинг 2: Схема базы данных

CREATE TABLE IF NOT EXISTS violations (
    business_name TEXT,
    license_status TEXT,
    result TEXT,
    description TEXT,
    time TIMESTAMP,
    status TEXT,
    level INTEGER,
    comments TEXT,
    address TEXT,
    city TEXT,
    zip TEXT
);

Зависимости

Листинг 3: go.mod

module github.com/353words/food

go 1.17

require (
    github.com/jmoiron/sqlx v1.3.4
    github.com/jszwec/csvutil v1.5.1
    github.com/mattn/go-sqlite3 v1.14.8
)

В листинге 3 показано содержимое go.mod. Чтобы проанализировать CSV-файл, мы будем использовать csvutil. Для базы данных мы будем использовать go-sqlite3 (я люблю SQLite) и sqlx.

Программа

Листинг 4: импорт

import (
    _ "embed"
    "encoding/csv"
    "fmt"
    "io"
    "log"
    "os"
    "time"

    "github.com/jmoiron/sqlx"
    "github.com/jszwec/csvutil"
    _ "github.com/mattn/go-sqlite3"
)

В листинге 4 показан наш импорт. Во 2 строке мы импортируем пакет embed. Мы собираемся написать SQL в файлах .sql, а затем встроить их в исполняемый файл с помощью директив //go:embed.

Листинг 5: операторы SQL

//go:embed schema.sql
var schemaSQL string

//go:embed insert.sql
var insertSQL string

Мы используем директиву //go:embed для встраивания SQL, написанного в файлах .sql, в наш код. Это позволяет нам писать SQL вне кода Go и при этом поставлять единственный исполняемый файл.

Листинг 6: Row

type Row struct {
    Business   string    `csv:"businessname" db:"business_name"`
    Licstatus  string    `csv:"licstatus" db:"license_status"`
    Result     string    `csv:"result" db:"result"`
    Violdesc   string    `csv:"violdesc" db:"description"`
    Violdttm   time.Time `csv:"violdttm" db:"time"`
    Violstatus string    `csv:"violstatus" db:"status"`
    Viollevel  string    `csv:"viollevel" db:"-"`
    Level      int       `db:"level"`
    Comments   string    `csv:"comments" db:"comments"`
    Address    string    `csv:"address" db:"address"`
    City       string    `csv:"city" db:"city"`
    Zip        string    `csv:"zip" db:"zip"`
}

Тут мы определяем структуру Row. Она используется как в csvutil для анализа строк в файле CSV, так и в sqlx для вставки значений в базу данных. Мы используем теги полей, чтобы указать соответствующие столбцы в CSV и базе данных.

Когда вы посмотрите на поле viollevel в CSV-файле (вы можете использовать shuf boston-food.csv| head, чтобы увидеть несколько случайных строк), вы увидите *, ** либо ***. Ниже мы воспользуемся parseLevel, чтобы преобразовать * в целое число и заполнить Level.

Листинг 7: parseLevel

func parseLevel(value string) int {
    switch value {
    case "*":
        return 1
    case "**":
        return 2
    case "***":
        return 3
    }

    return -1
}

В листинге 7 показана функция parseLevel преобразования * в число. В конце функции мы возвращаем -1 для неизвестных значений. Решение вернуть -1, а не ошибку - это решение дизайна данных, в этом случае мы решили, что иметь недопустимые (-1) уровни в базе данных - это нормально.

Листинг 8: unmarshalTime

func unmarshalTime(data []byte, t *time.Time) error {
    var err error
    *t, err = time.Parse("2006-01-02 15:04:05", string(data))
    return err
}

В листинге 8 функция unmarshalTime, которая используется csvutil для анализа значений времени в файле CSV.

Листинг 9: ETL

func ETL(csvFile io.Reader, tx *sqlx.Tx) (int, int, error) {
    r := csv.NewReader(csvFile)
    dec, err := csvutil.NewDecoder(r)
    if err != nil {
        return 0, 0, err
    }
    dec.Register(unmarshalTime)
    numRecords := 0
    numErrors := 0

    for {
        numRecords++
        var row Row
        err = dec.Decode(&row)
        if err == io.EOF {
            break
        }
        if err != nil {
            log.Printf("error: %d: %s", numRecords, err)
            numErrors++
            continue
        }
        row.Level = parseLevel(row.Viollevel)
        if _, err := tx.NamedExec(insertSQL, &row); err != nil {
            return 0, 0, err
        }
    }

    return numRecords, numErrors, nil
}

В листинге 8 показана функция ETL. Мы видим, что функция ETL получает io.Reader как CSV-файл и транзакцию, которая используется для вставки значений в базу данных, ETL возвращает количество записей, количество плохих записей и значение ошибки.

Внутри функции мы используем unmarshalTime для обработки значений времени. Далее мы инициализируем количество записей и количество ошибок, которые возвращает ETL.

Внутри цикла for мы декодируем строку из CSV, далее мы проверяем, указывает ли возвращенная ошибка io.EOF на конец файла. Затем мы проверяем наличие других ошибок, и если они есть, мы регистрируем их и увеличиваем numErrors

Листинг 10: main

func main() {
    file, err := os.Open("boston-food.csv")
    if err != nil {
        log.Fatal(err)
    }
    defer file.Close()

    db, err := sqlx.Open("sqlite3", "./food.db")
    if err != nil {
        log.Fatal(err)
    }
    defer db.Close()

    if _, err := db.Exec(schemaSQL); err != nil {
        log.Fatal(err)
    }

    tx, err := db.Beginx()
    if err != nil {
        log.Fatal(err)
    }

    start := time.Now()
    numRecords, numErrors, err := ETL(file, tx)
    duration := time.Since(start)
    if err != nil {
        tx.Rollback()
        log.Fatal(err)
    }

    frac := float64(numErrors) / float64(numRecords)
    if frac > 0.1 {
        tx.Rollback()
        log.Fatalf("too many errors: %d/%d = %f", numErrors, numRecords, frac)
    }
    tx.Commit()
    fmt.Printf("%d records (%.2f errors) in %v\n", numRecords, frac, duration)
}

Использование транзакций

Вставка данных через транзакцию означает, что либо вставлены все данные, либо не вставлены никакие. Если бы мы не использовали транзакции, а входила половина данных - у нас была серьезная проблема. Нам нужно либо перезапустить ETL с середины, либо удалить данные, которым удалось их получить. Оба варианта сложно реализовать, и они усложнят ваш код. Транзакции - одна из основных причин (помимо SQL) того, что я люблю использовать транзакционные базы данных, такие как SQLite, PostgreSQL и другие.

Запуск ETL

Листинг 11: Запуск ETL

$ go run etl.go

... (many log lines reducted)
2021/09/11 12:21:44 error: 655301: parsing time " " as "2006-01-02 15:04:05": cannot parse " " as "2006"
655317 records (0.06 errors) in 13.879984129s

Около 6% строк содержали ошибки, в основном, нехватку времени. Это нормально, поскольку мы определили порог ошибки равным 10%.

Анализ данных

Когда данные находятся в базе данных, мы можем использовать SQL для их запроса.

Листинг 12: Запрос

SELECT 
    business_name, COUNT(business_name) as num_violations
FROM
    violations
WHERE
    license_status = 'Active' AND 
    time >= '2016-01-01'
GROUP BY business_name
ORDER BY num_violations DESC
LIMIT 20

В листинге 12 показан SQL-запрос для выбора 20 крупнейших предприятий, у которых было наибольшее количество нарушений за последние 5 лет. Во второй строке мы выбираем столбец business_name и его количество. В строках с пятой по седьмую мы ограничиваем записи до тех, которые являются активными, и время после 2016 года. В строке 8 мы группируем строку по business_name, а в строке 9 мы упорядочиваем результаты по количеству нарушений и, наконец в строке 10 мы ограничиваем 20 результатами.

Листинг 13: Выполнение запроса

$ sqlite3 food.db < query.sql
 
Dunkin Donuts|1031
Subway|996
The Real Deal|756
Mcdonalds|723
Caffe Nero|640
ORIENTAL HOUSE|599
Burger King|537
Dumpling Palace|463
Sweetgreen|454
The Upper Crust|453
Dunkin' Donuts|436
Yamato II|435
Anh Hong Restaurant|413
Chilacates|408
Yely's Coffee Shop|401
India Quality|386
Domino's Pizza|374
Fan Fan Restaurant|362
Pavement Coffeehouse|357
FLAMES RESTAURANT|357

В листинге 13 показано, как выполнить запрос с помощью утилиты командной строки sqlite3.

Последние мысли

В науке о данных и анализе данных преобладают, ну, ну, данные :) Конвейеры данных и ETL - это то, что доставляет данные в место, где вы можете запрашивать и анализировать их. Go отлично подходит для запуска ETL, он быстрый, эффективный и имеет отличные библиотеки.

Использование транзакций и SQL сэкономит вам много усилий в долгосрочной перспективе. Вам не нужно быть экспертом по SQL (я не такой), чтобы использовать их.

#Golang #SQL
Комментарии
Чтобы оставить комментарий, необходимо авторизоваться

Присоединяйся в тусовку

Поделитесь своим опытом, расскажите о новом инструменте, библиотеке или фреймворке. Для этого не обязательно становится постоянным автором.

Попробовать

В подарок 100$ на счет при регистрации

Получить