DevGang
Авторизоваться

Перенос данных из PostgreSQL в Google BigQuery с помощью Python

В этой статье мы покажем, как создать скрипт Python для переноса данных из локальной базы данных PostgreSQL в Google BigQuery. Процесс включает в себя извлечение данных из PostgreSQL, их преобразование в соответствии со схемой BigQuery, загрузку в Google Cloud Storage (GCS) и, наконец, импорт в BigQuery. 

Мы подробно рассмотрим каждый из этапов процесса, предоставив примеры кода и пояснения.

Для начала вам необходимо установить необходимые библиотеки Python, такие как psycopg2 для взаимодействия с PostgreSQL, google-cloud-storage для работы с GCS и google-cloud-bigquery для взаимодействия с BigQuery. 

pip install pandas sqlalchemy psycopg2 google-cloud-storage google-cloud-bigquery

После установки необходимых библиотек нам потребуется настроить переменные среды, которые содержат учетные данные для доступа к PostgreSQL и Google Cloud.

import os
import pandas as pd
from sqlalchemy import create_engine
from google.cloud import storage, bigquery

# PostgreSQL connection details
POSTGRES_USER = os.getenv('POSTGRES_USER')
POSTGRES_PASSWORD = os.getenv('POSTGRES_PASSWORD')
POSTGRES_HOST = os.getenv('POSTGRES_HOST')
POSTGRES_PORT = os.getenv('POSTGRES_PORT', 5432)
POSTGRES_DB = os.getenv('POSTGRES_DB')

# GCS and BigQuery details
GCS_BUCKET_NAME = os.getenv('GCS_BUCKET_NAME')
GCS_FILE_NAME = 'data.csv'
BQ_PROJECT_ID = os.getenv('BQ_PROJECT_ID')
BQ_DATASET_ID = os.getenv('BQ_DATASET_ID')
BQ_TABLE_ID = os.getenv('BQ_TABLE_ID')

# Initialize clients
storage_client = storage.Client()
bigquery_client = bigquery.Client()

# PostgreSQL connection URL
postgres_url = f'postgresql://{POSTGRES_USER}:{POSTGRES_PASSWORD}@{POSTGRES_HOST}:{POSTGRES_PORT}/{POSTGRES_DB}'

# Extract data from PostgreSQL
def extract_data(query):
    engine = create_engine(postgres_url)
    with engine.connect() as connection:
        df = pd.read_sql(query, connection)
    return df

# Transform data to match BigQuery schema
def transform_data(df):
    # Add transformation logic if necessary
    return df

# Upload data to GCS
def upload_to_gcs(df):
    bucket = storage_client.bucket(GCS_BUCKET_NAME)
    blob = bucket.blob(GCS_FILE_NAME)
    df.to_csv(GCS_FILE_NAME, index=False)
    blob.upload_from_filename(GCS_FILE_NAME)
    os.remove(GCS_FILE_NAME)
    print(f'File {GCS_FILE_NAME} uploaded to GCS bucket {GCS_BUCKET_NAME}.')

# Load data from GCS to BigQuery
def load_to_bigquery():
    table_id = f'{BQ_PROJECT_ID}.{BQ_DATASET_ID}.{BQ_TABLE_ID}'
    job_config = bigquery.LoadJobConfig(
        source_format=bigquery.SourceFormat.CSV,
        skip_leading_rows=1,
        autodetect=True,
    )

    uri = f'gs://{GCS_BUCKET_NAME}/{GCS_FILE_NAME}'
    load_job = bigquery_client.load_table_from_uri(uri, table_id, job_config=job_config)
    load_job.result()  # Wait for the job to complete
    print(f'Loaded data into BigQuery table {table_id}.')

# Main function
def main():
    query = "SELECT * FROM your_table_name"
    df = extract_data(query)
    df = transform_data(df)
    upload_to_gcs(df)
    load_to_bigquery()

if __name__ == '__main__':
    main()

Пример использования динамической схемы для адаптивного преобразования данных

def extract_data(query):
    engine = create_engine(postgres_url)
    with engine.connect() as connection:
        df = pd.read_sql(query, connection)
    return df

def transform_data(df):
    # Dynamic schema detection
    expected_columns = ['client_id', 'hospital_id', 'plan_name', 'active_status']
    current_columns = df.columns.tolist()

    # Reorder columns if necessary
    if set(expected_columns) == set(current_columns):
        df = df[expected_columns]
    else:
        raise ValueError("Schema mismatch detected.")
    
    return df

Вариант 2

from google.cloud import bigquery
from google.cloud import storage
import psycopg2

def extract_data(query, gcs_bucket_name=None):
  """Extracts data from PostgreSQL and uploads to Cloud Storage (optional)"""
  try:
    pg_conn = psycopg2.connect(**pg_conn_params)  # Replace with your connection details
    cur = pg_conn.cursor()
    cur.execute(query)
    data = cur.fetchall()
    cur.close()
    pg_conn.close()

    if gcs_bucket_name:
      # Convert data to CSV (basic transformation)
      csv_data = [",".join(str(x) for x in row) for row in data]
      csv_content = "\n".join(csv_data)

      # Upload data to Cloud Storage
      gcs_client = storage.Client()
      bucket = gcs_client.bucket(gcs_bucket_name)
      blob = bucket.blob("data.csv")
      blob.upload_from_string(csv_content)
      
      print(f"Data extracted and uploaded to gs://{gcs_bucket_name}/data.csv")
      return f"gs://{gcs_bucket_name}/data.csv"
    else:
      return data  # Return data directly if no Cloud Storage used

  except Exception as e:
    print(f"Error extracting data: {e}")
    return None

def load_to_bigquery(data_uri, table_name, schema):
  """Loads data from Cloud Storage (or directly) to BigQuery table"""
  bq_client = bigquery.Client()
  dataset_ref = bq_client.dataset(bq_client.project)
  table_ref = dataset_ref.table(table_name)
  
  if data_uri.startswith("gs://"):
    load_job_config = bigquery.LoadJobConfig(
        schema=schema,
        skip_leading_rows=1,  # Assuming first row is header in CSV
        field_delimiter=","
    )
    load_job = bq_client.load_job_from_uri(
        data_uri,
        table_ref,
        job_config=load_job_config
    )
  else:
    load_job = bq_client.load_table_from_dataframe(data, table_ref, schema=schema)
  
  load_job.result()  # Wait for load to complete
  print(f"Data loaded to BigQuery table: {table_name}")

# Replace with your PostgreSQL connection details
pg_conn_params = {
  "host": "your_postgres_host",
  "database": "your_database",
  "user": "your_user",
  "password": "your_password"
}

# Replace with your BigQuery schema (adjust data types as needed)
schema = [
    bigquery.SchemaField("column1", bigquery.STRING),
    bigquery.SchemaField("column2", bigquery.INTEGER),
]

# Replace with your PostgreSQL query
query = "SELECT column1, column2 FROM your_table"

# Choose one approach based on your needs:
# Option 1: Extract directly to BigQuery (no downtime)
data = extract_data(query)
load_to_bigquery(data, "your_dataset.your_table", schema)

# Option 2: Extract to Cloud Storage, then load to BigQuery (minimal downtime)
gcs_bucket_name = "your_gcs_bucket"  # Uncomment if using Cloud Storage
# data_uri = extract_data(query, gcs_bucket_name)
# if data_uri:
#   load_to_bigquery(data_uri, "your_dataset.your_table", schema)

Мы надеемся, что наше руководство помогло вам разобраться в том, как перенести данные из локальной базы данных PostgreSQL в Google BigQuery. Мы подробно описали каждый шаг процесса, показав, как выполнить каждую задачу для достижения конечного результата. Мы также предоставили несколько вариантов реализации скрипта, чтобы вы могли выбрать наиболее подходящий для ваших нужд. Спасибо за ваше внимание! 

Источник: https://medium.com/@bgiri-gcloud/python-script-to-migrate-data-from-an-on-premises-postgresql-database-to-google-bigquery-2d1f538848d2

#PostgreSQL #Python #Google Cloud Platform #Pandas
Комментарии
Чтобы оставить комментарий, необходимо авторизоваться

Присоединяйся в тусовку

В этом месте могла бы быть ваша реклама

Разместить рекламу