snowflake
snowflake
==================== SNOWFLAKE CHEAT SHEET ====================
— SNOWFLAKE CONNECTORS —
1. SNOWPIPE (Continuous Data Loading)
Automated, serverless data ingestion from cloud storage
— Create a Snowpipe
CREATE OR REPLACE PIPE my_pipe
AUTO_INGEST = TRUE
AS
COPY INTO my_database.my_schema.my_table
FROM @my_stage
FILE_FORMAT = (TYPE = ‘CSV’);
— Show pipes
SHOW PIPES;
— Pipe status and history
SELECT SYSTEM$PIPE_STATUS(‘my_pipe’);
SELECT * FROM TABLE(INFORMATION_SCHEMA.PIPE_USAGE_HISTORY(
DATE_RANGE_START=>DATEADD(‘days’, -7, CURRENT_DATE())
));
— Refresh pipe (manual trigger)
ALTER PIPE my_pipe REFRESH;
— Pause/Resume pipe
ALTER PIPE my_pipe SET PIPE_EXECUTION_PAUSED = TRUE;
ALTER PIPE my_pipe SET PIPE_EXECUTION_PAUSED = FALSE;
2. SNOWSQL (Command-Line Interface)
Interactive SQL client for Snowflake
Installation
pip install snowsql
Connect
snowsql -a -u
Connect with specific database/warehouse
snowsql -a -u -d -s -w
Execute SQL file
snowsql -a -u -f script.sql
Common SnowSQL commands
!help # Show help
!quit or !exit # Exit SnowSQL
!source script.sql # Execute SQL from file
!set variable_name=value # Set variable
!spool output.txt # Save results to file
!queries # Show running queries
3. PYTHON CONNECTOR
Python library for Snowflake integration
Installation
pip install snowflake-connector-python
Basic connection
import snowflake.connector
conn = snowflake.connector.connect(
user=’USERNAME’,
password=’PASSWORD’,
account=’ACCOUNT_NAME’,
warehouse=’WAREHOUSE_NAME’,
database=’DATABASE_NAME’,
schema=’SCHEMA_NAME’
)
Execute query
cur = conn.cursor()
cur.execute(“SELECT * FROM my_table LIMIT 10”)
results = cur.fetchall()
Execute with parameters (prevents SQL injection)
cur.execute(
“SELECT * FROM my_table WHERE id = %s”,
(123,)
)
Fetch methods
cur.fetchone() # Fetch one row
cur.fetchall() # Fetch all rows
cur.fetchmany(100) # Fetch specified number of rows
Using pandas
import pandas as pd
df = pd.read_sql(“SELECT * FROM my_table”, conn)
Write DataFrame to Snowflake
from snowflake.connector.pandas_tools import write_pandas
write_pandas(conn, df, ‘TABLE_NAME’)
Close connection
cur.close()
conn.close()
Context manager (auto-closes)
with snowflake.connector.connect(**conn_params) as conn:
with conn.cursor() as cur:
cur.execute(“SELECT * FROM my_table”)
results = cur.fetchall()
4. JDBC CONNECTOR
Java Database Connectivity
Connection string
jdbc:snowflake://.snowflakecomputing.com/?
warehouse=&
db=&
schema=
5. ODBC CONNECTOR
Open Database Connectivity for various applications
Connection string example
Driver=SnowflakeDSIIDriver;
Server=.snowflakecomputing.com;
Database=;
Warehouse=;
Schema=;
6. SPARK CONNECTOR
Integration with Apache Spark
Scala/Python example
val options = Map(
“sfURL” -> “.snowflakecomputing.com”,
“sfUser” -> “”,
“sfPassword” -> “”,
“sfDatabase” -> “”,
“sfSchema” -> “”,
“sfWarehouse” -> “”
)
df.write
.format(“snowflake”)
.options(options)
.option(“dbtable”, “table_name”)
.save()
— CORE SQL COMMANDS —
DATABASE OPERATIONS
CREATE DATABASE my_database;
USE DATABASE my_database;
SHOW DATABASES;
DROP DATABASE my_database;
SCHEMA OPERATIONS
CREATE SCHEMA my_schema;
USE SCHEMA my_schema;
SHOW SCHEMAS;
DROP SCHEMA my_schema;
TABLE OPERATIONS
— Create table
CREATE TABLE employees (
id INTEGER,
name VARCHAR(100),
salary DECIMAL(10,2),
hire_date DATE,
department VARCHAR(50)
);
— Create table from query
CREATE TABLE high_earners AS
SELECT * FROM employees WHERE salary > 100000;
— Create temporary table
CREATE TEMPORARY TABLE temp_data (
id INT,
value VARCHAR
);
— Clone table (zero-copy clone)
CREATE TABLE employees_backup CLONE employees;
— Show tables
SHOW TABLES;
— Describe table
DESC TABLE employees;
SHOW COLUMNS IN employees;
— Drop table
DROP TABLE employees;
— Truncate table
TRUNCATE TABLE employees;
DATA LOADING (COPY INTO)
— Load from stage
COPY INTO my_table
FROM @my_stage/path/to/files
FILE_FORMAT = (TYPE = ‘CSV’ FIELD_DELIMITER = ‘,’ SKIP_HEADER = 1);
— Load with pattern matching
COPY INTO my_table
FROM @my_stage
PATTERN = ‘.sales..csv’
FILE_FORMAT = my_csv_format;
— Load JSON data
COPY INTO my_table
FROM @my_stage/data.json
FILE_FORMAT = (TYPE = ‘JSON’);
— Load with transformations
COPY INTO my_table (id, name, upper_name)
FROM (
SELECT $1, $2, UPPER($2)
FROM @my_stage/file.csv
)
FILE_FORMAT = (TYPE = ‘CSV’);
— Check load history
SELECT * FROM TABLE(INFORMATION_SCHEMA.COPY_HISTORY(
TABLE_NAME => ‘MY_TABLE’,
START_TIME => DATEADD(hours, -1, CURRENT_TIMESTAMP())
));
STAGES (Data Storage Locations)
— Create internal stage
CREATE STAGE my_internal_stage;
— Create external stage (S3)
CREATE STAGE my_s3_stage
URL = ‘s3://my-bucket/path/’
CREDENTIALS = (AWS_KEY_ID = ‘xxx’ AWS_SECRET_KEY = ‘xxx’);
— Create external stage (Azure)
CREATE STAGE my_azure_stage
URL = ‘azure://myaccount.blob.core.windows.net/mycontainer/path/’
CREDENTIALS = (AZURE_SAS_TOKEN = ‘xxx’);
— Create external stage (GCS)
CREATE STAGE my_gcs_stage
URL = ‘gcs://my-bucket/path/’
STORAGE_INTEGRATION = my_gcs_integration;
— List files in stage
LIST @my_stage;
— Upload to stage (SnowSQL)
PUT file://local/path/file.csv @my_stage;
— Download from stage
GET @my_stage/file.csv file://local/path/;
— Remove files from stage
REMOVE @my_stage/file.csv;
FILE FORMATS
— Create CSV file format
CREATE FILE FORMAT my_csv_format
TYPE = ‘CSV’
FIELD_DELIMITER = ‘,’
SKIP_HEADER = 1
NULL_IF = (‘NULL’, ‘null’, ”)
EMPTY_FIELD_AS_NULL = TRUE
COMPRESSION = ‘GZIP’;
— Create JSON file format
CREATE FILE FORMAT my_json_format
TYPE = ‘JSON’
COMPRESSION = ‘AUTO’;
— Create Parquet file format
CREATE FILE FORMAT my_parquet_format
TYPE = ‘PARQUET’
COMPRESSION = ‘SNAPPY’;
— Show file formats
SHOW FILE FORMATS;
WAREHOUSES (Compute Resources)
— Create warehouse
CREATE WAREHOUSE my_warehouse
WITH WAREHOUSE_SIZE = ‘LARGE’
AUTO_SUSPEND = 300
AUTO_RESUME = TRUE
MIN_CLUSTER_COUNT = 1
MAX_CLUSTER_COUNT = 3
SCALING_POLICY = ‘STANDARD’;
— Warehouse sizes: X-SMALL, SMALL, MEDIUM, LARGE, X-LARGE, 2X-LARGE, 3X-LARGE, 4X-LARGE
— Alter warehouse
ALTER WAREHOUSE my_warehouse SET WAREHOUSE_SIZE = ‘MEDIUM’;
ALTER WAREHOUSE my_warehouse SUSPEND;
ALTER WAREHOUSE my_warehouse RESUME;
— Use warehouse
USE WAREHOUSE my_warehouse;
— Show warehouses
SHOW WAREHOUSES;
— Drop warehouse
DROP WAREHOUSE my_warehouse;
QUERY OPERATIONS
— Basic SELECT
SELECT * FROM employees WHERE salary > 50000;
— Join tables
SELECT e.name, d.department_name
FROM employees e
JOIN departments d ON e.dept_id = d.id;
— Aggregate functions
SELECT
department,
COUNT() as emp_count, AVG(salary) as avg_salary, MAX(salary) as max_salary FROM employees GROUP BY department HAVING COUNT() > 10;
— Window functions
SELECT
name,
salary,
RANK() OVER (PARTITION BY department ORDER BY salary DESC) as rank
FROM employees;
— Common Table Expressions (CTE)
WITH high_earners AS (
SELECT * FROM employees WHERE salary > 100000
)
SELECT department, COUNT(*)
FROM high_earners
GROUP BY department;
TIME TRAVEL & FAIL-SAFE
— Query historical data (up to 90 days for Enterprise)
SELECT * FROM employees AT(OFFSET => -3600); — 1 hour ago
SELECT * FROM employees BEFORE(STATEMENT => ‘query_id’);
SELECT * FROM employees AT(TIMESTAMP => ‘2024-01-01 00:00:00’::timestamp);
— Undrop objects
UNDROP TABLE employees;
UNDROP SCHEMA my_schema;
UNDROP DATABASE my_database;
— Clone at specific point in time
CREATE TABLE employees_yesterday CLONE employees
AT(OFFSET => -86400); — 24 hours ago
STREAMS (Change Data Capture)
— Create stream
CREATE STREAM employee_stream ON TABLE employees;
— Query stream
SELECT * FROM employee_stream;
— Show streams
SHOW STREAMS;
— Stream metadata columns
— METADATA$ACTION: INSERT, DELETE
— METADATA$ISUPDATE: TRUE/FALSE
— METADATA$ROW_ID: Unique row identifier
TASKS (Job Scheduling)
— Create task
CREATE TASK daily_load_task
WAREHOUSE = my_warehouse
SCHEDULE = ‘USING CRON 0 9 * * * America/New_York’
AS
COPY INTO my_table FROM @my_stage;
— Create task with dependency
CREATE TASK process_task
WAREHOUSE = my_warehouse
AFTER daily_load_task
AS
INSERT INTO summary_table SELECT * FROM staging_table;
— Task operations
ALTER TASK daily_load_task RESUME;
ALTER TASK daily_load_task SUSPEND;
EXECUTE TASK daily_load_task;
— Show tasks
SHOW TASKS;
— Task history
SELECT * FROM TABLE(INFORMATION_SCHEMA.TASK_HISTORY(
SCHEDULED_TIME_RANGE_START => DATEADD(‘day’, -1, CURRENT_TIMESTAMP())
));
VIEWS
— Create view
CREATE VIEW high_salary_employees AS
SELECT * FROM employees WHERE salary > 100000;
— Create materialized view
CREATE MATERIALIZED VIEW dept_summary AS
SELECT
department,
COUNT(*) as emp_count,
AVG(salary) as avg_salary
FROM employees
GROUP BY department;
— Secure view (hides definition)
CREATE SECURE VIEW secure_employee_view AS
SELECT id, name FROM employees;
USERS & ROLES
— Create user
CREATE USER john_doe PASSWORD = ‘SecurePass123’
DEFAULT_ROLE = analyst
DEFAULT_WAREHOUSE = compute_wh;
— Create role
CREATE ROLE data_analyst;
— Grant privileges
GRANT USAGE ON WAREHOUSE compute_wh TO ROLE data_analyst;
GRANT USAGE ON DATABASE my_db TO ROLE data_analyst;
GRANT SELECT ON ALL TABLES IN SCHEMA my_schema TO ROLE data_analyst;
— Grant role to user
GRANT ROLE data_analyst TO USER john_doe;
— Switch role
USE ROLE data_analyst;
— Show grants
SHOW GRANTS TO ROLE data_analyst;
SHOW GRANTS ON TABLE employees;
— BEST PRACTICES —
1. QUERY OPTIMIZATION
— Use clustering keys for large tables
ALTER TABLE large_table CLUSTER BY (date_column, category);
— Use LIMIT for exploratory queries
SELECT * FROM large_table LIMIT 100;
— Use SELECT specific columns (not SELECT *)
SELECT id, name, salary FROM employees;
— Filter early in queries
SELECT * FROM large_table
WHERE date > ‘2024-01-01’ — Filter first
ORDER BY id;
— Use appropriate data types
— Use NUMBER instead of FLOAT for precision
— Use VARCHAR with appropriate length
2. WAREHOUSE MANAGEMENT
— Auto-suspend to save costs
ALTER WAREHOUSE my_wh SET AUTO_SUSPEND = 60; — seconds
— Use multi-cluster for concurrency
ALTER WAREHOUSE my_wh SET
MIN_CLUSTER_COUNT = 1
MAX_CLUSTER_COUNT = 5
SCALING_POLICY = ‘STANDARD’;
— Right-size warehouses
— Start small and scale up if needed
— Monitor query performance
3. CACHING
— Result cache (automatic, 24 hours)
— Use identical queries to leverage cache
— Warehouse cache (local disk cache)
— Remains active while warehouse is running
— Clear cache for testing
ALTER SESSION SET USE_CACHED_RESULT = FALSE;
4. DATA LOADING
— Use COPY INTO for bulk loads (not INSERT)
— Use Snowpipe for continuous loading
— Compress files before loading (GZIP, BZIP2)
— Split large files into smaller chunks (100-250MB recommended)
— Use appropriate file formats (Parquet for columnar data)
5. MONITORING & MAINTENANCE
— Query history
SELECT * FROM TABLE(INFORMATION_SCHEMA.QUERY_HISTORY(
DATE_RANGE_START => DATEADD(‘day’, -1, CURRENT_TIMESTAMP())
))
WHERE EXECUTION_STATUS = ‘FAIL’
ORDER BY START_TIME DESC;
— Warehouse usage
SELECT * FROM TABLE(INFORMATION_SCHEMA.WAREHOUSE_METERING_HISTORY(
DATE_RANGE_START => DATEADD(‘month’, -1, CURRENT_TIMESTAMP())
));
— Storage usage
SELECT * FROM TABLE(INFORMATION_SCHEMA.STORAGE_USAGE(
DATE_RANGE_START => DATEADD(‘month’, -1, CURRENT_TIMESTAMP())
));
— Long-running queries
SHOW QUERIES WHERE EXECUTION_STATUS = ‘RUNNING’;
6. SECURITY
— Enable Multi-Factor Authentication (MFA)
— Use Network Policies to restrict access
— Implement column-level security
— Use Dynamic Data Masking
CREATE MASKING POLICY ssn_mask AS (val STRING) RETURNS STRING ->
CASE WHEN CURRENT_ROLE() IN (‘ADMIN’) THEN val
ELSE ‘-*-
END;
7. COST OPTIMIZATION
— Monitor credit usage regularly
— Use resource monitors
CREATE RESOURCE MONITOR monthly_limit
WITH CREDIT_QUOTA = 1000
TRIGGERS ON 75 PERCENT DO NOTIFY
ON 100 PERCENT DO SUSPEND;
— Schedule tasks during off-peak hours
— Archive old data using Time Travel
— Use transient tables for temporary data (no Fail-safe)
— USEFUL FUNCTIONS —
Date/Time
CURRENT_TIMESTAMP()
CURRENT_DATE()
DATEADD(‘day’, 7, CURRENT_DATE())
DATEDIFF(‘day’, start_date, end_date)
DATE_TRUNC(‘month’, date_column)
TO_DATE(‘2024-01-01’, ‘YYYY-MM-DD’)
String
CONCAT(str1, str2)
UPPER(string)
LOWER(string)
TRIM(string)
SUBSTRING(string, start, length)
SPLIT(string, delimiter)
REGEXP_REPLACE(string, pattern, replacement)
Conversion
TO_NUMBER(string)
TO_VARCHAR(value)
TO_DECIMAL(value, precision, scale)
TRY_CAST(value AS type) — Returns NULL on error
Conditional
COALESCE(val1, val2, ‘default’)
NULLIF(val1, val2)
IFF(condition, true_value, false_value)
CASE WHEN condition THEN result END
Aggregate
SUM(), AVG(), COUNT(), MIN(), MAX()
MEDIAN(), MODE()
STDDEV(), VARIANCE()
LISTAGG(column, ‘,’) — Concatenate values
— SEMI-STRUCTURED DATA (JSON, XML, Avro, Parquet) —
— Create table with VARIANT column
CREATE TABLE json_data (
id INT,
data VARIANT
);
— Load JSON
COPY INTO json_data
FROM @my_stage/data.json
FILE_FORMAT = (TYPE = ‘JSON’);
— Query JSON
SELECT
data:name::STRING as name,
data:age::INT as age,
data:address.city::STRING as city
FROM json_data;
— Flatten nested arrays
SELECT
value:product::STRING as product,
value:price::NUMBER as price
FROM json_data,
LATERAL FLATTEN(input => data:items);

Leave a Reply

Your email address will not be published. Required fields are marked *