Перенос данных из PostgreSQL в Google BigQuery с помощью Python
В этой статье мы покажем, как создать скрипт Python для переноса данных из локальной базы данных PostgreSQL в Google BigQuery. Процесс включает в себя извлечение данных из PostgreSQL, их преобразование в соответствии со схемой BigQuery, загрузку в Google Cloud Storage (GCS) и, наконец, импорт в BigQuery.
Мы подробно рассмотрим каждый из этапов процесса, предоставив примеры кода и пояснения.
Для начала вам необходимо установить необходимые библиотеки Python, такие как psycopg2
для взаимодействия с PostgreSQL, google-cloud-storage
для работы с GCS и google-cloud-bigquery
для взаимодействия с BigQuery.
pip install pandas sqlalchemy psycopg2 google-cloud-storage google-cloud-bigquery
После установки необходимых библиотек нам потребуется настроить переменные среды, которые содержат учетные данные для доступа к PostgreSQL и Google Cloud.
import os
import pandas as pd
from sqlalchemy import create_engine
from google.cloud import storage, bigquery
# PostgreSQL connection details
POSTGRES_USER = os.getenv('POSTGRES_USER')
POSTGRES_PASSWORD = os.getenv('POSTGRES_PASSWORD')
POSTGRES_HOST = os.getenv('POSTGRES_HOST')
POSTGRES_PORT = os.getenv('POSTGRES_PORT', 5432)
POSTGRES_DB = os.getenv('POSTGRES_DB')
# GCS and BigQuery details
GCS_BUCKET_NAME = os.getenv('GCS_BUCKET_NAME')
GCS_FILE_NAME = 'data.csv'
BQ_PROJECT_ID = os.getenv('BQ_PROJECT_ID')
BQ_DATASET_ID = os.getenv('BQ_DATASET_ID')
BQ_TABLE_ID = os.getenv('BQ_TABLE_ID')
# Initialize clients
storage_client = storage.Client()
bigquery_client = bigquery.Client()
# PostgreSQL connection URL
postgres_url = f'postgresql://{POSTGRES_USER}:{POSTGRES_PASSWORD}@{POSTGRES_HOST}:{POSTGRES_PORT}/{POSTGRES_DB}'
# Extract data from PostgreSQL
def extract_data(query):
engine = create_engine(postgres_url)
with engine.connect() as connection:
df = pd.read_sql(query, connection)
return df
# Transform data to match BigQuery schema
def transform_data(df):
# Add transformation logic if necessary
return df
# Upload data to GCS
def upload_to_gcs(df):
bucket = storage_client.bucket(GCS_BUCKET_NAME)
blob = bucket.blob(GCS_FILE_NAME)
df.to_csv(GCS_FILE_NAME, index=False)
blob.upload_from_filename(GCS_FILE_NAME)
os.remove(GCS_FILE_NAME)
print(f'File {GCS_FILE_NAME} uploaded to GCS bucket {GCS_BUCKET_NAME}.')
# Load data from GCS to BigQuery
def load_to_bigquery():
table_id = f'{BQ_PROJECT_ID}.{BQ_DATASET_ID}.{BQ_TABLE_ID}'
job_config = bigquery.LoadJobConfig(
source_format=bigquery.SourceFormat.CSV,
skip_leading_rows=1,
autodetect=True,
)
uri = f'gs://{GCS_BUCKET_NAME}/{GCS_FILE_NAME}'
load_job = bigquery_client.load_table_from_uri(uri, table_id, job_config=job_config)
load_job.result() # Wait for the job to complete
print(f'Loaded data into BigQuery table {table_id}.')
# Main function
def main():
query = "SELECT * FROM your_table_name"
df = extract_data(query)
df = transform_data(df)
upload_to_gcs(df)
load_to_bigquery()
if __name__ == '__main__':
main()
Пример использования динамической схемы для адаптивного преобразования данных
def extract_data(query):
engine = create_engine(postgres_url)
with engine.connect() as connection:
df = pd.read_sql(query, connection)
return df
def transform_data(df):
# Dynamic schema detection
expected_columns = ['client_id', 'hospital_id', 'plan_name', 'active_status']
current_columns = df.columns.tolist()
# Reorder columns if necessary
if set(expected_columns) == set(current_columns):
df = df[expected_columns]
else:
raise ValueError("Schema mismatch detected.")
return df
Вариант 2
from google.cloud import bigquery
from google.cloud import storage
import psycopg2
def extract_data(query, gcs_bucket_name=None):
"""Extracts data from PostgreSQL and uploads to Cloud Storage (optional)"""
try:
pg_conn = psycopg2.connect(**pg_conn_params) # Replace with your connection details
cur = pg_conn.cursor()
cur.execute(query)
data = cur.fetchall()
cur.close()
pg_conn.close()
if gcs_bucket_name:
# Convert data to CSV (basic transformation)
csv_data = [",".join(str(x) for x in row) for row in data]
csv_content = "\n".join(csv_data)
# Upload data to Cloud Storage
gcs_client = storage.Client()
bucket = gcs_client.bucket(gcs_bucket_name)
blob = bucket.blob("data.csv")
blob.upload_from_string(csv_content)
print(f"Data extracted and uploaded to gs://{gcs_bucket_name}/data.csv")
return f"gs://{gcs_bucket_name}/data.csv"
else:
return data # Return data directly if no Cloud Storage used
except Exception as e:
print(f"Error extracting data: {e}")
return None
def load_to_bigquery(data_uri, table_name, schema):
"""Loads data from Cloud Storage (or directly) to BigQuery table"""
bq_client = bigquery.Client()
dataset_ref = bq_client.dataset(bq_client.project)
table_ref = dataset_ref.table(table_name)
if data_uri.startswith("gs://"):
load_job_config = bigquery.LoadJobConfig(
schema=schema,
skip_leading_rows=1, # Assuming first row is header in CSV
field_delimiter=","
)
load_job = bq_client.load_job_from_uri(
data_uri,
table_ref,
job_config=load_job_config
)
else:
load_job = bq_client.load_table_from_dataframe(data, table_ref, schema=schema)
load_job.result() # Wait for load to complete
print(f"Data loaded to BigQuery table: {table_name}")
# Replace with your PostgreSQL connection details
pg_conn_params = {
"host": "your_postgres_host",
"database": "your_database",
"user": "your_user",
"password": "your_password"
}
# Replace with your BigQuery schema (adjust data types as needed)
schema = [
bigquery.SchemaField("column1", bigquery.STRING),
bigquery.SchemaField("column2", bigquery.INTEGER),
]
# Replace with your PostgreSQL query
query = "SELECT column1, column2 FROM your_table"
# Choose one approach based on your needs:
# Option 1: Extract directly to BigQuery (no downtime)
data = extract_data(query)
load_to_bigquery(data, "your_dataset.your_table", schema)
# Option 2: Extract to Cloud Storage, then load to BigQuery (minimal downtime)
gcs_bucket_name = "your_gcs_bucket" # Uncomment if using Cloud Storage
# data_uri = extract_data(query, gcs_bucket_name)
# if data_uri:
# load_to_bigquery(data_uri, "your_dataset.your_table", schema)
Мы надеемся, что наше руководство помогло вам разобраться в том, как перенести данные из локальной базы данных PostgreSQL в Google BigQuery. Мы подробно описали каждый шаг процесса, показав, как выполнить каждую задачу для достижения конечного результата. Мы также предоставили несколько вариантов реализации скрипта, чтобы вы могли выбрать наиболее подходящий для ваших нужд. Спасибо за ваше внимание!
Источник: https://medium.com/@bgiri-gcloud/python-script-to-migrate-data-from-an-on-premises-postgresql-database-to-google-bigquery-2d1f538848d2