Skip to content

db

pinky_streamlit.db

Generic Snowflake data-access patterns for Streamlit in Snowflake dashboards.

Convention

query_ → snowpark.DataFrame (lazy, Snowflake-side, no cache) load_ → pd.DataFrame (materialized, caller chooses cache strategy)

Three cache strategies are provided

load_static — process-lifetime (reference data, deployed assets) load_analytic — TTL 1h per session (read-only dashboards and KPIs) load_crud — session_state with manual invalidation (editable tables) load_nocache — no cache, reload on every rerun (dev/debug only)

add_status_icons(df, column_name, mapping_status_icons, default_icon='-')

Add a {COLUMN_NAME}_ICON column by mapping status values to icons.

Parameters:

Name Type Description Default
df DataFrame

Source pandas DataFrame.

required
column_name str

Status column name (coerced to uppercase).

required
mapping_status_icons dict[str, str]

Dict mapping status values to icon strings (e.g. {"Active": ":material/person_check:"}).

required
default_icon str

Fallback icon for unmapped values (default "-").

'-'

Returns:

Type Description
DataFrame

DataFrame with the new {COLUMN_NAME}_ICON column appended.

Source code in src/pinky_streamlit/core/db.py
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
def add_status_icons(
    df: pd.DataFrame,
    column_name: str,
    mapping_status_icons: dict[str, str],
    default_icon: str = "-",
) -> pd.DataFrame:
    """Add a {COLUMN_NAME}_ICON column by mapping status values to icons.

    Args:
        df:                   Source pandas DataFrame.
        column_name:          Status column name (coerced to uppercase).
        mapping_status_icons: Dict mapping status values to icon strings
                              (e.g. {"Active": ":material/person_check:"}).
        default_icon:         Fallback icon for unmapped values (default "-").

    Returns:
        DataFrame with the new {COLUMN_NAME}_ICON column appended.
    """
    col = column_name.upper()
    df[f"{col}_ICON"] = df[col].map(mapping_status_icons).fillna(default_icon)
    return df

insert_data(session, data, key, table, metadata=None, on_success=None)

Insert new rows into a Snowflake table (MERGE / INSERT IF NOT MATCHED).

Deduplicates on the key column: existing rows are not overwritten.

Parameters:

Name Type Description Default
session Session

Active Snowpark session.

required
data list[dict[str, Any]]

List of dicts representing rows to insert.

required
key str

Unique identifier column for deduplication.

required
table str

Target table name.

required
metadata dict[str, Any] | None

Extra columns to inject automatically (e.g. {"CREATED_AT": current_timestamp(), "CREATED_BY": lit(login)}).

None
on_success Callable[[], None] | None

Callback invoked after a successful insert (e.g. cache refresh).

None

Returns:

Type Description
MergeResult

snowpark.MergeResult with the number of inserted rows.

Source code in src/pinky_streamlit/core/db.py
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
def insert_data(
    session: snowpark.Session,
    data: list[dict[str, Any]],
    key: str,
    table: str,
    metadata: dict[str, Any] | None = None,
    on_success: Callable[[], None] | None = None,
) -> snowpark.MergeResult:
    """Insert new rows into a Snowflake table (MERGE / INSERT IF NOT MATCHED).

    Deduplicates on the key column: existing rows are not overwritten.

    Args:
        session:    Active Snowpark session.
        data:       List of dicts representing rows to insert.
        key:        Unique identifier column for deduplication.
        table:      Target table name.
        metadata:   Extra columns to inject automatically
                    (e.g. {"CREATED_AT": current_timestamp(), "CREATED_BY": lit(login)}).
        on_success: Callback invoked after a successful insert (e.g. cache refresh).

    Returns:
        snowpark.MergeResult with the number of inserted rows.
    """
    try:
        target: snowpark.DataFrame = session.table(table)
        new_values: snowpark.DataFrame = session.create_dataframe(data)
        if metadata:
            new_values = new_values.with_columns(
                list(metadata.keys()), list(metadata.values())
            )
        assignments = {k: new_values[k] for k in new_values.columns}
        result = target.merge(
            new_values,
            cast(Column, target[key] == new_values[key]),
            clauses=[when_not_matched().insert(assignments)],
        )
        logger.info("[insert_data] INSERT %s: %d row(s)", table, result.rows_inserted)
        if on_success:
            on_success()
        return result  # type: ignore[no-any-return]
    except Exception as e:
        logger.error("[insert_data] INSERT %s failed: %s", table, e, exc_info=True)
        raise

invalidate_crud(key)

Force a CRUD-cached DataFrame to reload on the next rerun.

Call this after an INSERT / UPDATE / DELETE to refresh stale data.

Parameters:

Name Type Description Default
key str

The key passed to load_crud for the dataset to invalidate.

required
Source code in src/pinky_streamlit/core/db.py
253
254
255
256
257
258
259
260
261
def invalidate_crud(key: str) -> None:
    """Force a CRUD-cached DataFrame to reload on the next rerun.

    Call this after an INSERT / UPDATE / DELETE to refresh stale data.

    Args:
        key: The key passed to load_crud for the dataset to invalidate.
    """
    st.session_state[f"refresh_{key}"] = True

load_analytic(_session, _query_fn, key, _transform=None)

Load and cache a DataFrame with a 1-hour TTL (analytic cache).

Suitable for dashboards, KPIs, and read-only aggregated data. Parameters prefixed with _ are not hashed by Streamlit (non-serializable or unstable). key is the sole cache discriminant — must be unique per logical dataset.

Parameters:

Name Type Description Default
_session Session

Active Snowpark session.

required
_query_fn Callable[[Session], DataFrame]

Query function (session) → snowpark.DataFrame.

required
key str

Cache discriminant.

required
_transform Callable[[DataFrame], DataFrame] | None

Optional transform applied to the pandas DataFrame after materialization.

None

Returns:

Type Description
DataFrame

Cached pandas DataFrame.

Source code in src/pinky_streamlit/core/db.py
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
@st.cache_data(ttl=3600, show_spinner=False)  # type: ignore[untyped-decorator]
def load_analytic(
    _session: snowpark.Session,
    _query_fn: Callable[[snowpark.Session], snowpark.DataFrame],
    key: str,
    _transform: Callable[[pd.DataFrame], pd.DataFrame] | None = None,
) -> pd.DataFrame:
    """Load and cache a DataFrame with a 1-hour TTL (analytic cache).

    Suitable for dashboards, KPIs, and read-only aggregated data.
    Parameters prefixed with _ are not hashed by Streamlit (non-serializable or
    unstable). key is the sole cache discriminant — must be unique per logical
    dataset.

    Args:
        _session:   Active Snowpark session.
        _query_fn:  Query function (session) → snowpark.DataFrame.
        key:        Cache discriminant.
        _transform: Optional transform applied to the pandas DataFrame after
                    materialization.

    Returns:
        Cached pandas DataFrame.
    """
    logger.info("[load_analytic] Loading %s", key)
    df: pd.DataFrame = _query_fn(_session).to_pandas().where(lambda d: d.notna(), None)
    return _transform(df) if _transform else df

load_crud(session, query_fn, key, transform=None)

Load a DataFrame via session_state with a manual refresh flag (CRUD cache).

Suitable for editable reference tables. Reload is triggered by calling invalidate_crud(key) after a write operation.

Parameters:

Name Type Description Default
session Session

Active Snowpark session.

required
query_fn Callable[[Session], DataFrame]

Query function (session) → snowpark.DataFrame.

required
key str

Unique key (prefixed with "df_" in session_state).

required
transform Callable[[DataFrame], DataFrame] | None

Optional transform applied after materialization.

None

Returns:

Type Description
DataFrame

pandas DataFrame stored in st.session_state.

Source code in src/pinky_streamlit/core/db.py
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
def load_crud(
    session: snowpark.Session,
    query_fn: Callable[[snowpark.Session], snowpark.DataFrame],
    key: str,
    transform: Callable[[pd.DataFrame], pd.DataFrame] | None = None,
) -> pd.DataFrame:
    """Load a DataFrame via session_state with a manual refresh flag (CRUD cache).

    Suitable for editable reference tables. Reload is triggered by
    calling invalidate_crud(key) after a write operation.

    Args:
        session:   Active Snowpark session.
        query_fn:  Query function (session) → snowpark.DataFrame.
        key:       Unique key (prefixed with "df_" in session_state).
        transform: Optional transform applied after materialization.

    Returns:
        pandas DataFrame stored in st.session_state.
    """
    state_key = f"df_{key}"
    refresh_key = f"refresh_{key}"
    if state_key not in st.session_state or st.session_state.get(refresh_key, False):
        logger.info(
            "[load_crud] Loading %s (refresh=%s)",
            key,
            st.session_state.get(refresh_key, "init"),
        )
        df = query_fn(session).to_pandas().where(lambda d: d.notna(), None)
        st.session_state[state_key] = transform(df) if transform else df
        st.session_state[refresh_key] = False
    return st.session_state[state_key]

load_nocache(session, query_fn, key, transform=None)

Load a DataFrame with no caching — reloads on every rerun.

For development and debugging only.

Parameters:

Name Type Description Default
session Session

Active Snowpark session.

required
query_fn Callable[[Session], DataFrame]

Query function (session) → snowpark.DataFrame.

required
key str

Key used for logging only.

required
transform Callable[[DataFrame], DataFrame] | None

Optional transform applied after materialization.

None

Returns:

Type Description
DataFrame

Fresh pandas DataFrame.

Source code in src/pinky_streamlit/core/db.py
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
def load_nocache(
    session: snowpark.Session,
    query_fn: Callable[[snowpark.Session], snowpark.DataFrame],
    key: str,
    transform: Callable[[pd.DataFrame], pd.DataFrame] | None = None,
) -> pd.DataFrame:
    """Load a DataFrame with no caching — reloads on every rerun.

    For development and debugging only.

    Args:
        session:   Active Snowpark session.
        query_fn:  Query function (session) → snowpark.DataFrame.
        key:       Key used for logging only.
        transform: Optional transform applied after materialization.

    Returns:
        Fresh pandas DataFrame.
    """
    logger.info("[load_nocache] Loading %s", key)
    df = query_fn(session).to_pandas().where(lambda d: d.notna(), None)
    return transform(df) if transform else df

load_static(_session, _query_fn, key)

Load a resource once for the lifetime of the process (static cache).

Suitable for deployed assets: JSON, YAML, reference files on Snowflake stage. @st.cache_resource shares the result across all sessions — never reloaded or serialized. Parameters prefixed with _ are not hashed; key is the sole cache discriminant.

Example

partial(load_static, _query_fn=lambda s: s.read.json("@STAGE/file.json"), key="cfg")

Parameters:

Name Type Description Default
_session Session

Active Snowpark session.

required
_query_fn Callable[[Session], Any]

Callable (session) → Any returning the resource to load.

required
key str

Cache discriminant (unique per logical dataset).

required

Returns:

Type Description
Any

The loaded resource (type depends on _query_fn).

Source code in src/pinky_streamlit/core/db.py
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
@st.cache_resource  # type: ignore[untyped-decorator]
def load_static(
    _session: snowpark.Session,
    _query_fn: Callable[[snowpark.Session], Any],
    key: str,
) -> Any:
    """Load a resource once for the lifetime of the process (static cache).

    Suitable for deployed assets: JSON, YAML, reference files on Snowflake stage.
    @st.cache_resource shares the result across all sessions — never reloaded or
    serialized. Parameters prefixed with _ are not hashed; key is the sole
    cache discriminant.

    Example:
        partial(load_static, _query_fn=lambda s: s.read.json("@STAGE/file.json"), key="cfg")

    Args:
        _session:  Active Snowpark session.
        _query_fn: Callable (session) → Any returning the resource to load.
        key:       Cache discriminant (unique per logical dataset).

    Returns:
        The loaded resource (type depends on _query_fn).
    """
    logger.info("[load_static] Loading %s", key)
    return _query_fn(_session)

update_data(session, data, key, table, metadata=None, on_success=None)

Update existing rows in a Snowflake table.

Builds a DataFrame from data, optionally adds metadata columns, then executes an UPDATE conditioned on the key column.

Parameters:

Name Type Description Default
session Session

Active Snowpark session.

required
data list[dict[str, Any]]

List of dicts representing rows to update (must contain key).

required
key str

Unique identifier column name (e.g. "ID").

required
table str

Target table name.

required
metadata dict[str, Any] | None

Extra columns to inject automatically (e.g. {"UPDATED_AT": current_timestamp(), "UPDATED_BY": lit(login)}).

None
on_success Callable[[], None] | None

Callback invoked after a successful update (e.g. cache refresh).

None

Returns:

Type Description
UpdateResult

snowpark.UpdateResult with the number of updated rows.

Source code in src/pinky_streamlit/core/db.py
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
def update_data(
    session: snowpark.Session,
    data: list[dict[str, Any]],
    key: str,
    table: str,
    metadata: dict[str, Any] | None = None,
    on_success: Callable[[], None] | None = None,
) -> snowpark.UpdateResult:
    """Update existing rows in a Snowflake table.

    Builds a DataFrame from data, optionally adds metadata columns,
    then executes an UPDATE conditioned on the key column.

    Args:
        session:    Active Snowpark session.
        data:       List of dicts representing rows to update (must contain key).
        key:        Unique identifier column name (e.g. "ID").
        table:      Target table name.
        metadata:   Extra columns to inject automatically
                    (e.g. {"UPDATED_AT": current_timestamp(), "UPDATED_BY": lit(login)}).
        on_success: Callback invoked after a successful update (e.g. cache refresh).

    Returns:
        snowpark.UpdateResult with the number of updated rows.
    """
    try:
        target: snowpark.DataFrame = session.table(table)
        changes: snowpark.DataFrame = session.create_dataframe(data)
        if metadata:
            changes = changes.with_columns(
                list(metadata.keys()), list(metadata.values())
            )
        assignments = {k: changes[k] for k in changes.columns if k != key}
        condition: Column = cast(Column, target[key] == changes[key])
        result = target.update(
            assignments=assignments, condition=condition, source=changes
        )
        logger.info("[update_data] UPDATE %s: %d row(s)", table, result.rows_updated)
        if on_success:
            on_success()
        return result  # type: ignore[no-any-return]
    except Exception as e:
        logger.error("[update_data] UPDATE %s failed: %s", table, e, exc_info=True)
        raise