-
Notifications
You must be signed in to change notification settings - Fork 14
/
Copy pathiceberg_create_insert.py
88 lines (73 loc) · 2.75 KB
/
iceberg_create_insert.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
import logging
import numpy as np
import pandas as pd
from pyarrow import Table
from pyiceberg.catalog import Catalog, load_catalog
from pyiceberg.partitioning import PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.types import (
IntegerType,
NestedField,
StringType,
TimestampType,
)
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def load_catalog_iceberg(provider: str, warehouse_bucket: str, region: str) -> Catalog:
return load_catalog(
provider,
**{
"type": provider,
"warehouse": warehouse_bucket,
"region": region,
}
)
def create_table_iceberg(catalog: Catalog, schema: dict, table_name: str,
table_location: str, partition_spec: str) -> None:
if catalog.table_exists(table_name):
logging.info(f"Table '{table_name}' already exists!")
return None
catalog.create_table(
identifier=table_name,
location=table_location,
schema=schema,
partition_spec=partition_spec,
)
def insert_data_iceberg(data: dict,
iceberg_table: Table) -> None:
df = pd.DataFrame(data)
arrow_table = Table.from_pandas(df)
with iceberg_table.transaction() as txn:
txn.append(arrow_table)
def read_data_iceberg(table: Table) -> pd.DataFrame:
rows = table.scan().to_arrow().to_pandas()
return rows
if __name__ == "__main__":
provider = "glue"
warehouse_bucket = "s3://<your-bucket-name>/"
region = "eu-north-1"
catalog = load_catalog_iceberg(provider, warehouse_bucket, region)
logging.info("Catalog loaded")
database_name = "db1"
table_name = "example_table"
iceberg_table_identifier = f"{database_name}.{table_name}"
schema = Schema(
NestedField(field_id=1, name="id", field_type=IntegerType(), required=False),
NestedField(field_id=2, name="name", field_type=StringType(), required=False),
NestedField(field_id=3, name="created_at", field_type=TimestampType(),
required=False),
)
partition_spec = PartitionSpec(spec_id=0)
iceberg_table_location = f"{warehouse_bucket}{database_name}/{table_name}"
create_table_iceberg(catalog, schema, iceberg_table_identifier,
iceberg_table_location, partition_spec)
logging.info("Table created")
data = {
"id": pd.Series([1], dtype="Int32"),
"name": ["jacinto"],
"created_at": np.array(['2024-01-01 12:00:00'], dtype='datetime64[us]')
}
iceberg_table = catalog.load_table(iceberg_table_identifier)
insert_data_iceberg(data, iceberg_table)
logging.info("Data inserted")
rows = read_data_iceberg(iceberg_table)
logging.info(rows)