SELECT (date_trunc('hour', now()) + INTERVAL '1h')::SCHEDULE;
INSTALL http_client FROM community;
LOAD http_client;
CREATE SCHEMA IF NOT EXISTS bsky;
CREATE TABLE IF NOT EXISTS bsky.posts (
topic VARCHAR,
created_at TIMESTAMP,
cid VARCHAR,
author_handle VARCHAR,
url VARCHAR,
text VARCHAR,
like_count INT,
reply_count INT,
quote_count INT,
repost_count INT,
loaded_at TIMESTAMP DEFAULT now(),
);
SET VARIABLE access_jwt = http_post(
'https://bsky.social/xrpc/com.atproto.server.createSession',
headers => MAP {
'Content-Type': 'application/json',
'Accept': 'application/json',
},
body => (SELECT c FROM './bluesky_credentials.json' c)
) ->> 'body' ->> 'accessJwt';
WITH topics AS (
SELECT col0 AS topic, col1 AS query_string FROM (
VALUES
('DuckDB', 'duckdb'),
('Data Engineering', '"data-engineering" "data engineering" "dataengineering"'),
('#databs', '#databs'),
)
),
topics_with_ts AS (
SELECT
topic,
query_string,
coalesce(max(loaded_at), (now() - INTERVAL '30 days')::TIMESTAMP) as last_loaded_at,
FROM topics LEFT JOIN bsky.posts USING(topic)
GROUP BY ALL
),
json_posts AS (
SELECT
topic,
(http_get(
'https://bsky.social/xrpc/app.bsky.feed.searchPosts',
headers => MAP {
'Accept': 'application/json',
'Authorization': concat('Bearer ', getvariable('access_jwt')),
},
params => MAP {
'q': query_string,
'limit': '100',
'since': strftime(last_loaded_at, '%Y-%m-%dT%H:%M:%SZ'),
}
) ->> 'body' -> '$.posts[*]').unnest() AS p
FROM topics_with_ts
)
INSERT INTO bsky.posts BY NAME (
SELECT
topic,
(p ->> '$.record.createdAt')::TIMESTAMP AS created_at,
p ->> 'cid' AS cid,
p ->> '$.author.handle' AS author_handle,
concat('https://bsky.app/profile/', author_handle, '/post/', split_part(p ->> 'uri', '/', -1)) AS url,
p ->> '$.record.text' AS text,
(p -> 'likeCount')::INT AS like_count,
(p -> 'replyCount')::INT AS reply_count,
(p -> 'quoteCount')::INT AS quote_count,
(p -> 'repostCount')::INT AS repost_count,
FROM json_posts
);