Basic relay functionality completed

This commit is contained in:
Your Name
2025-09-04 07:10:13 -04:00
parent 227c579147
commit 662feab881
15 changed files with 2207 additions and 210 deletions

View File

@@ -81,6 +81,44 @@ void close_database() {
}
}
// Event type classification
typedef enum {
EVENT_TYPE_REGULAR,
EVENT_TYPE_REPLACEABLE,
EVENT_TYPE_EPHEMERAL,
EVENT_TYPE_ADDRESSABLE,
EVENT_TYPE_UNKNOWN
} event_type_t;
event_type_t classify_event_kind(int kind) {
if ((kind >= 1000 && kind < 10000) ||
(kind >= 4 && kind < 45) ||
kind == 1 || kind == 2) {
return EVENT_TYPE_REGULAR;
}
if ((kind >= 10000 && kind < 20000) ||
kind == 0 || kind == 3) {
return EVENT_TYPE_REPLACEABLE;
}
if (kind >= 20000 && kind < 30000) {
return EVENT_TYPE_EPHEMERAL;
}
if (kind >= 30000 && kind < 40000) {
return EVENT_TYPE_ADDRESSABLE;
}
return EVENT_TYPE_UNKNOWN;
}
const char* event_type_to_string(event_type_t type) {
switch (type) {
case EVENT_TYPE_REGULAR: return "regular";
case EVENT_TYPE_REPLACEABLE: return "replaceable";
case EVENT_TYPE_EPHEMERAL: return "ephemeral";
case EVENT_TYPE_ADDRESSABLE: return "addressable";
default: return "unknown";
}
}
// Store event in database
int store_event(cJSON* event) {
if (!g_db || !event) {
@@ -101,15 +139,32 @@ int store_event(cJSON* event) {
return -1;
}
// Classify event type
event_type_t type = classify_event_kind((int)cJSON_GetNumberValue(kind));
// Serialize tags to JSON (use empty array if no tags)
char* tags_json = NULL;
if (tags && cJSON_IsArray(tags)) {
tags_json = cJSON_Print(tags);
} else {
tags_json = strdup("[]");
}
if (!tags_json) {
log_error("Failed to serialize tags to JSON");
return -1;
}
// Prepare SQL statement for event insertion
const char* sql =
"INSERT INTO event (id, pubkey, created_at, kind, content, sig) "
"VALUES (?, ?, ?, ?, ?, ?)";
"INSERT INTO events (id, pubkey, created_at, kind, event_type, content, sig, tags) "
"VALUES (?, ?, ?, ?, ?, ?, ?, ?)";
sqlite3_stmt* stmt;
int rc = sqlite3_prepare_v2(g_db, sql, -1, &stmt, NULL);
if (rc != SQLITE_OK) {
log_error("Failed to prepare event insert statement");
free(tags_json);
return -1;
}
@@ -118,8 +173,10 @@ int store_event(cJSON* event) {
sqlite3_bind_text(stmt, 2, cJSON_GetStringValue(pubkey), -1, SQLITE_STATIC);
sqlite3_bind_int64(stmt, 3, (sqlite3_int64)cJSON_GetNumberValue(created_at));
sqlite3_bind_int(stmt, 4, (int)cJSON_GetNumberValue(kind));
sqlite3_bind_text(stmt, 5, cJSON_GetStringValue(content), -1, SQLITE_STATIC);
sqlite3_bind_text(stmt, 6, cJSON_GetStringValue(sig), -1, SQLITE_STATIC);
sqlite3_bind_text(stmt, 5, event_type_to_string(type), -1, SQLITE_STATIC);
sqlite3_bind_text(stmt, 6, cJSON_GetStringValue(content), -1, SQLITE_STATIC);
sqlite3_bind_text(stmt, 7, cJSON_GetStringValue(sig), -1, SQLITE_STATIC);
sqlite3_bind_text(stmt, 8, tags_json, -1, SQLITE_TRANSIENT);
// Execute statement
rc = sqlite3_step(stmt);
@@ -128,56 +185,17 @@ int store_event(cJSON* event) {
if (rc != SQLITE_DONE) {
if (rc == SQLITE_CONSTRAINT) {
log_warning("Event already exists in database");
free(tags_json);
return 0; // Not an error, just duplicate
}
char error_msg[256];
snprintf(error_msg, sizeof(error_msg), "Failed to insert event: %s", sqlite3_errmsg(g_db));
log_error(error_msg);
free(tags_json);
return -1;
}
// Insert tags if present
if (tags && cJSON_IsArray(tags)) {
const char* event_id = cJSON_GetStringValue(id);
cJSON* tag;
cJSON_ArrayForEach(tag, tags) {
if (cJSON_IsArray(tag) && cJSON_GetArraySize(tag) >= 2) {
cJSON* tag_name = cJSON_GetArrayItem(tag, 0);
cJSON* tag_value = cJSON_GetArrayItem(tag, 1);
if (cJSON_IsString(tag_name) && cJSON_IsString(tag_value)) {
// Collect additional tag parameters if present
char* parameters = NULL;
if (cJSON_GetArraySize(tag) > 2) {
cJSON* params_array = cJSON_CreateArray();
for (int i = 2; i < cJSON_GetArraySize(tag); i++) {
cJSON_AddItemToArray(params_array, cJSON_Duplicate(cJSON_GetArrayItem(tag, i), 1));
}
parameters = cJSON_Print(params_array);
cJSON_Delete(params_array);
}
const char* tag_sql =
"INSERT INTO tag (id, name, value, parameters) VALUES (?, ?, ?, ?)";
sqlite3_stmt* tag_stmt;
rc = sqlite3_prepare_v2(g_db, tag_sql, -1, &tag_stmt, NULL);
if (rc == SQLITE_OK) {
sqlite3_bind_text(tag_stmt, 1, event_id, -1, SQLITE_STATIC);
sqlite3_bind_text(tag_stmt, 2, cJSON_GetStringValue(tag_name), -1, SQLITE_STATIC);
sqlite3_bind_text(tag_stmt, 3, cJSON_GetStringValue(tag_value), -1, SQLITE_STATIC);
sqlite3_bind_text(tag_stmt, 4, parameters, -1, SQLITE_TRANSIENT);
sqlite3_step(tag_stmt);
sqlite3_finalize(tag_stmt);
}
if (parameters) free(parameters);
}
}
}
}
free(tags_json);
log_success("Event stored in database");
return 0;
}
@@ -189,7 +207,7 @@ cJSON* retrieve_event(const char* event_id) {
}
const char* sql =
"SELECT id, pubkey, created_at, kind, content, sig FROM event WHERE id = ?";
"SELECT id, pubkey, created_at, kind, content, sig, tags FROM events WHERE id = ?";
sqlite3_stmt* stmt;
int rc = sqlite3_prepare_v2(g_db, sql, -1, &stmt, NULL);
@@ -210,69 +228,210 @@ cJSON* retrieve_event(const char* event_id) {
cJSON_AddStringToObject(event, "content", (char*)sqlite3_column_text(stmt, 4));
cJSON_AddStringToObject(event, "sig", (char*)sqlite3_column_text(stmt, 5));
// Add tags array - retrieve from tag table
cJSON* tags_array = cJSON_CreateArray();
const char* tag_sql = "SELECT name, value, parameters FROM tag WHERE id = ?";
sqlite3_stmt* tag_stmt;
if (sqlite3_prepare_v2(g_db, tag_sql, -1, &tag_stmt, NULL) == SQLITE_OK) {
sqlite3_bind_text(tag_stmt, 1, event_id, -1, SQLITE_STATIC);
while (sqlite3_step(tag_stmt) == SQLITE_ROW) {
cJSON* tag = cJSON_CreateArray();
cJSON_AddItemToArray(tag, cJSON_CreateString((char*)sqlite3_column_text(tag_stmt, 0)));
cJSON_AddItemToArray(tag, cJSON_CreateString((char*)sqlite3_column_text(tag_stmt, 1)));
// Add parameters if they exist
const char* parameters = (char*)sqlite3_column_text(tag_stmt, 2);
if (parameters && strlen(parameters) > 0) {
cJSON* params = cJSON_Parse(parameters);
if (params && cJSON_IsArray(params)) {
int param_count = cJSON_GetArraySize(params);
for (int i = 0; i < param_count; i++) {
cJSON* param = cJSON_GetArrayItem(params, i);
cJSON_AddItemToArray(tag, cJSON_Duplicate(param, 1));
}
}
if (params) cJSON_Delete(params);
}
cJSON_AddItemToArray(tags_array, tag);
// Parse tags JSON
const char* tags_json = (char*)sqlite3_column_text(stmt, 6);
if (tags_json) {
cJSON* tags = cJSON_Parse(tags_json);
if (tags) {
cJSON_AddItemToObject(event, "tags", tags);
} else {
cJSON_AddItemToObject(event, "tags", cJSON_CreateArray());
}
sqlite3_finalize(tag_stmt);
} else {
cJSON_AddItemToObject(event, "tags", cJSON_CreateArray());
}
cJSON_AddItemToObject(event, "tags", tags_array);
}
sqlite3_finalize(stmt);
return event;
}
// Handle REQ message (subscription)
int handle_req_message(const char* sub_id, cJSON* filters) {
// Handle REQ message (subscription) - send events matching filters
int handle_req_message(const char* sub_id, cJSON* filters, struct lws *wsi) {
log_info("Handling REQ message");
// For now, just handle simple event ID requests
if (cJSON_IsArray(filters)) {
cJSON* filter = cJSON_GetArrayItem(filters, 0);
if (filter) {
cJSON* ids = cJSON_GetObjectItem(filter, "ids");
if (ids && cJSON_IsArray(ids)) {
cJSON* event_id = cJSON_GetArrayItem(ids, 0);
if (event_id && cJSON_IsString(event_id)) {
cJSON* event = retrieve_event(cJSON_GetStringValue(event_id));
if (event) {
log_success("Found event for subscription");
cJSON_Delete(event);
return 1; // Found event
}
}
}
}
if (!cJSON_IsArray(filters)) {
log_error("REQ filters is not an array");
return 0;
}
return 0; // No events found
int events_sent = 0;
// Process each filter in the array
for (int i = 0; i < cJSON_GetArraySize(filters); i++) {
cJSON* filter = cJSON_GetArrayItem(filters, i);
if (!filter || !cJSON_IsObject(filter)) {
log_warning("Invalid filter object");
continue;
}
// Build SQL query based on filter
char sql[1024] = "SELECT id, pubkey, created_at, kind, content, sig, tags FROM events WHERE 1=1";
char* sql_ptr = sql + strlen(sql);
int remaining = sizeof(sql) - strlen(sql);
// Handle kinds filter
cJSON* kinds = cJSON_GetObjectItem(filter, "kinds");
if (kinds && cJSON_IsArray(kinds)) {
int kind_count = cJSON_GetArraySize(kinds);
if (kind_count > 0) {
snprintf(sql_ptr, remaining, " AND kind IN (");
sql_ptr += strlen(sql_ptr);
remaining = sizeof(sql) - strlen(sql);
for (int k = 0; k < kind_count; k++) {
cJSON* kind = cJSON_GetArrayItem(kinds, k);
if (cJSON_IsNumber(kind)) {
if (k > 0) {
snprintf(sql_ptr, remaining, ",");
sql_ptr++;
remaining--;
}
snprintf(sql_ptr, remaining, "%d", (int)cJSON_GetNumberValue(kind));
sql_ptr += strlen(sql_ptr);
remaining = sizeof(sql) - strlen(sql);
}
}
snprintf(sql_ptr, remaining, ")");
sql_ptr += strlen(sql_ptr);
remaining = sizeof(sql) - strlen(sql);
}
}
// Handle authors filter
cJSON* authors = cJSON_GetObjectItem(filter, "authors");
if (authors && cJSON_IsArray(authors)) {
int author_count = cJSON_GetArraySize(authors);
if (author_count > 0) {
snprintf(sql_ptr, remaining, " AND pubkey IN (");
sql_ptr += strlen(sql_ptr);
remaining = sizeof(sql) - strlen(sql);
for (int a = 0; a < author_count; a++) {
cJSON* author = cJSON_GetArrayItem(authors, a);
if (cJSON_IsString(author)) {
if (a > 0) {
snprintf(sql_ptr, remaining, ",");
sql_ptr++;
remaining--;
}
snprintf(sql_ptr, remaining, "'%s'", cJSON_GetStringValue(author));
sql_ptr += strlen(sql_ptr);
remaining = sizeof(sql) - strlen(sql);
}
}
snprintf(sql_ptr, remaining, ")");
sql_ptr += strlen(sql_ptr);
remaining = sizeof(sql) - strlen(sql);
}
}
// Handle since filter
cJSON* since = cJSON_GetObjectItem(filter, "since");
if (since && cJSON_IsNumber(since)) {
snprintf(sql_ptr, remaining, " AND created_at >= %ld", (long)cJSON_GetNumberValue(since));
sql_ptr += strlen(sql_ptr);
remaining = sizeof(sql) - strlen(sql);
}
// Handle until filter
cJSON* until = cJSON_GetObjectItem(filter, "until");
if (until && cJSON_IsNumber(until)) {
snprintf(sql_ptr, remaining, " AND created_at <= %ld", (long)cJSON_GetNumberValue(until));
sql_ptr += strlen(sql_ptr);
remaining = sizeof(sql) - strlen(sql);
}
// Add ordering and limit
snprintf(sql_ptr, remaining, " ORDER BY created_at DESC");
sql_ptr += strlen(sql_ptr);
remaining = sizeof(sql) - strlen(sql);
// Handle limit filter
cJSON* limit = cJSON_GetObjectItem(filter, "limit");
if (limit && cJSON_IsNumber(limit)) {
int limit_val = (int)cJSON_GetNumberValue(limit);
if (limit_val > 0 && limit_val <= 5000) {
snprintf(sql_ptr, remaining, " LIMIT %d", limit_val);
}
} else {
// Default limit to prevent excessive queries
snprintf(sql_ptr, remaining, " LIMIT 500");
}
// Debug: Log the SQL query being executed
char debug_msg[1280];
snprintf(debug_msg, sizeof(debug_msg), "Executing SQL: %s", sql);
log_info(debug_msg);
// Execute query and send events
sqlite3_stmt* stmt;
int rc = sqlite3_prepare_v2(g_db, sql, -1, &stmt, NULL);
if (rc != SQLITE_OK) {
char error_msg[256];
snprintf(error_msg, sizeof(error_msg), "Failed to prepare subscription query: %s", sqlite3_errmsg(g_db));
log_error(error_msg);
continue;
}
int row_count = 0;
while (sqlite3_step(stmt) == SQLITE_ROW) {
row_count++;
// Build event JSON
cJSON* event = cJSON_CreateObject();
cJSON_AddStringToObject(event, "id", (char*)sqlite3_column_text(stmt, 0));
cJSON_AddStringToObject(event, "pubkey", (char*)sqlite3_column_text(stmt, 1));
cJSON_AddNumberToObject(event, "created_at", sqlite3_column_int64(stmt, 2));
cJSON_AddNumberToObject(event, "kind", sqlite3_column_int(stmt, 3));
cJSON_AddStringToObject(event, "content", (char*)sqlite3_column_text(stmt, 4));
cJSON_AddStringToObject(event, "sig", (char*)sqlite3_column_text(stmt, 5));
// Parse tags JSON
const char* tags_json = (char*)sqlite3_column_text(stmt, 6);
cJSON* tags = NULL;
if (tags_json) {
tags = cJSON_Parse(tags_json);
}
if (!tags) {
tags = cJSON_CreateArray();
}
cJSON_AddItemToObject(event, "tags", tags);
// Send EVENT message
cJSON* event_msg = cJSON_CreateArray();
cJSON_AddItemToArray(event_msg, cJSON_CreateString("EVENT"));
cJSON_AddItemToArray(event_msg, cJSON_CreateString(sub_id));
cJSON_AddItemToArray(event_msg, event);
char* msg_str = cJSON_Print(event_msg);
if (msg_str) {
size_t msg_len = strlen(msg_str);
unsigned char* buf = malloc(LWS_PRE + msg_len);
if (buf) {
memcpy(buf + LWS_PRE, msg_str, msg_len);
lws_write(wsi, buf + LWS_PRE, msg_len, LWS_WRITE_TEXT);
free(buf);
}
free(msg_str);
}
cJSON_Delete(event_msg);
events_sent++;
}
char row_debug[128];
snprintf(row_debug, sizeof(row_debug), "Query returned %d rows", row_count);
log_info(row_debug);
sqlite3_finalize(stmt);
}
char events_debug[128];
snprintf(events_debug, sizeof(events_debug), "Total events sent: %d", events_sent);
log_info(events_debug);
return events_sent;
}
// Handle EVENT message (publish)
@@ -364,13 +523,25 @@ static int nostr_relay_callback(struct lws *wsi, enum lws_callback_reasons reaso
} else if (strcmp(msg_type, "REQ") == 0) {
// Handle REQ message
cJSON* sub_id = cJSON_GetArrayItem(json, 1);
cJSON* filters = cJSON_GetArrayItem(json, 2);
if (sub_id && cJSON_IsString(sub_id)) {
const char* subscription_id = cJSON_GetStringValue(sub_id);
strncpy(pss->subscription_id, subscription_id, sizeof(pss->subscription_id) - 1);
handle_req_message(subscription_id, filters);
// Create array of filter objects from position 2 onwards
cJSON* filters = cJSON_CreateArray();
int json_size = cJSON_GetArraySize(json);
for (int i = 2; i < json_size; i++) {
cJSON* filter = cJSON_GetArrayItem(json, i);
if (filter) {
cJSON_AddItemToArray(filters, cJSON_Duplicate(filter, 1));
}
}
handle_req_message(subscription_id, filters, wsi);
// Clean up the filters array we created
cJSON_Delete(filters);
// Send EOSE (End of Stored Events)
cJSON* eose_response = cJSON_CreateArray();
@@ -462,14 +633,7 @@ int start_websocket_relay() {
log_success("WebSocket relay started on ws://127.0.0.1:8888");
// Main event loop with proper signal handling
fd_set rfds;
struct timeval tv;
while (g_server_running) {
FD_ZERO(&rfds);
tv.tv_sec = 1;
tv.tv_usec = 0;
int result = lws_service(ws_context, 1000);
if (result < 0) {