/* * Interactive Relay Pool Test Program * * Interactive command-line interface for testing nostr_relay_pool functionality. * All output is logged to pool.log while the menu runs in the terminal. * * Usage: ./pool_test */ #define _POSIX_C_SOURCE 200809L #include #include #include #include #include #include #include #include #include #include "../nostr_core/nostr_core.h" #include "../cjson/cJSON.h" // Global variables volatile sig_atomic_t running = 1; nostr_relay_pool_t* pool = NULL; nostr_pool_subscription_t** subscriptions = NULL; int subscription_count = 0; int subscription_capacity = 0; pthread_t poll_thread; int log_fd = -1; // Signal handler for clean shutdown void signal_handler(int signum) { (void)signum; running = 0; } // Event callback - called when an event is received void on_event(cJSON* event, const char* relay_url, void* user_data) { (void)user_data; // Extract basic event information cJSON* id = cJSON_GetObjectItem(event, "id"); cJSON* pubkey = cJSON_GetObjectItem(event, "pubkey"); cJSON* created_at = cJSON_GetObjectItem(event, "created_at"); cJSON* kind = cJSON_GetObjectItem(event, "kind"); cJSON* content = cJSON_GetObjectItem(event, "content"); time_t now = time(NULL); char timestamp[26]; ctime_r(&now, timestamp); timestamp[24] = '\0'; // Remove newline dprintf(log_fd, "[%s] ๐Ÿ“จ EVENT from %s\n", timestamp, relay_url); dprintf(log_fd, "โ”œโ”€โ”€ ID: %.12s...\n", id && cJSON_IsString(id) ? cJSON_GetStringValue(id) : "unknown"); dprintf(log_fd, "โ”œโ”€โ”€ Pubkey: %.12s...\n", pubkey && cJSON_IsString(pubkey) ? cJSON_GetStringValue(pubkey) : "unknown"); dprintf(log_fd, "โ”œโ”€โ”€ Kind: %d\n", kind && cJSON_IsNumber(kind) ? (int)cJSON_GetNumberValue(kind) : -1); dprintf(log_fd, "โ”œโ”€โ”€ Created: %lld\n", created_at && cJSON_IsNumber(created_at) ? (long long)cJSON_GetNumberValue(created_at) : 0); // Truncate content if too long if (content && cJSON_IsString(content)) { const char* content_str = cJSON_GetStringValue(content); size_t content_len = strlen(content_str); if (content_len > 100) { dprintf(log_fd, "โ””โ”€โ”€ Content: %.97s...\n", content_str); } else { dprintf(log_fd, "โ””โ”€โ”€ Content: %s\n", content_str); } } else { dprintf(log_fd, "โ””โ”€โ”€ Content: (empty)\n"); } dprintf(log_fd, "\n"); } // EOSE callback - called when End of Stored Events is received void on_eose(cJSON** events, int event_count, void* user_data) { (void)user_data; time_t now = time(NULL); char timestamp[26]; ctime_r(&now, timestamp); timestamp[24] = '\0'; dprintf(log_fd, "[%s] ๐Ÿ“‹ EOSE received - %d events collected\n", timestamp, event_count); // Log collected events if any for (int i = 0; i < event_count; i++) { cJSON* id = cJSON_GetObjectItem(events[i], "id"); if (id && cJSON_IsString(id)) { dprintf(log_fd, " Event %d: %.12s...\n", i + 1, cJSON_GetStringValue(id)); } } dprintf(log_fd, "\n"); } // Background polling thread void* poll_thread_func(void* arg) { (void)arg; while (running) { if (pool) { nostr_relay_pool_poll(pool, 100); } struct timespec ts = {0, 10000000}; // 10ms nanosleep(&ts, NULL); } return NULL; } // Print menu void print_menu() { printf("\n=== NOSTR Relay Pool Test Menu ===\n"); printf("1. Start Pool (wss://relay.laantungir.net)\n"); printf("2. Stop Pool\n"); printf("3. Add relay to pool\n"); printf("4. Remove relay from pool\n"); printf("5. Add subscription\n"); printf("6. Remove subscription\n"); printf("7. Show pool status\n"); printf("8. Test reconnection (simulate disconnect)\n"); printf("9. Exit\n"); printf("Choice: "); } // Get user input with default char* get_input(const char* prompt, const char* default_value) { static char buffer[1024]; printf("%s", prompt); if (default_value) { printf(" [%s]", default_value); } printf(": "); if (!fgets(buffer, sizeof(buffer), stdin)) { return NULL; } // Remove newline size_t len = strlen(buffer); if (len > 0 && buffer[len-1] == '\n') { buffer[len-1] = '\0'; } // Return default if empty if (strlen(buffer) == 0 && default_value) { return strdup(default_value); } return strdup(buffer); } // Parse comma-separated list into cJSON array cJSON* parse_comma_list(const char* input, int is_number) { if (!input || strlen(input) == 0) { return NULL; } cJSON* array = cJSON_CreateArray(); if (!array) return NULL; char* input_copy = strdup(input); char* token = strtok(input_copy, ","); while (token) { // Trim whitespace while (*token == ' ') token++; char* end = token + strlen(token) - 1; while (end > token && *end == ' ') *end-- = '\0'; if (is_number) { int num = atoi(token); cJSON_AddItemToArray(array, cJSON_CreateNumber(num)); } else { cJSON_AddItemToArray(array, cJSON_CreateString(token)); } token = strtok(NULL, ","); } free(input_copy); return array; } // Add subscription interactively void add_subscription() { if (!pool) { printf("โŒ Pool not started\n"); return; } printf("\n--- Add Subscription ---\n"); printf("Enter filter values (press Enter for no value):\n"); cJSON* filter = cJSON_CreateObject(); // ids char* ids_input = get_input("ids (comma-separated event ids)", NULL); if (ids_input && strlen(ids_input) > 0) { cJSON* ids = parse_comma_list(ids_input, 0); if (ids) cJSON_AddItemToObject(filter, "ids", ids); } free(ids_input); // authors char* authors_input = get_input("authors (comma-separated pubkeys)", NULL); if (authors_input && strlen(authors_input) > 0) { cJSON* authors = parse_comma_list(authors_input, 0); if (authors) cJSON_AddItemToObject(filter, "authors", authors); } free(authors_input); // kinds char* kinds_input = get_input("kinds (comma-separated numbers)", NULL); if (kinds_input && strlen(kinds_input) > 0) { cJSON* kinds = parse_comma_list(kinds_input, 1); if (kinds) cJSON_AddItemToObject(filter, "kinds", kinds); } free(kinds_input); // #e tag char* e_input = get_input("#e (comma-separated event ids)", NULL); if (e_input && strlen(e_input) > 0) { cJSON* e_array = parse_comma_list(e_input, 0); if (e_array) cJSON_AddItemToObject(filter, "#e", e_array); } free(e_input); // #p tag char* p_input = get_input("#p (comma-separated pubkeys)", NULL); if (p_input && strlen(p_input) > 0) { cJSON* p_array = parse_comma_list(p_input, 0); if (p_array) cJSON_AddItemToObject(filter, "#p", p_array); } free(p_input); // since char* since_input = get_input("since (unix timestamp or 'n' for now)", NULL); if (since_input && strlen(since_input) > 0) { if (strcmp(since_input, "n") == 0) { // Use current timestamp time_t now = time(NULL); cJSON_AddItemToObject(filter, "since", cJSON_CreateNumber((int)now)); printf("Using current timestamp: %ld\n", now); } else { int since = atoi(since_input); if (since > 0) cJSON_AddItemToObject(filter, "since", cJSON_CreateNumber(since)); } } free(since_input); // until char* until_input = get_input("until (unix timestamp)", NULL); if (until_input && strlen(until_input) > 0) { int until = atoi(until_input); if (until > 0) cJSON_AddItemToObject(filter, "until", cJSON_CreateNumber(until)); } free(until_input); // limit char* limit_input = get_input("limit (max events)", "10"); if (limit_input && strlen(limit_input) > 0) { int limit = atoi(limit_input); if (limit > 0) cJSON_AddItemToObject(filter, "limit", cJSON_CreateNumber(limit)); } free(limit_input); // Get relay URLs from pool char** relay_urls = NULL; nostr_pool_relay_status_t* statuses = NULL; int relay_count = nostr_relay_pool_list_relays(pool, &relay_urls, &statuses); if (relay_count <= 0) { printf("โŒ No relays in pool\n"); cJSON_Delete(filter); return; } // Ask about close_on_eose behavior char* close_input = get_input("Close subscription on EOSE? (y/n)", "n"); int close_on_eose = (close_input && strcmp(close_input, "y") == 0) ? 1 : 0; free(close_input); // Create subscription with new parameters nostr_pool_subscription_t* sub = nostr_relay_pool_subscribe( pool, (const char**)relay_urls, relay_count, filter, on_event, on_eose, NULL, close_on_eose, 1, // enable_deduplication NOSTR_POOL_EOSE_FULL_SET, // result_mode 30, // relay_timeout_seconds 60 // eose_timeout_seconds ); // Free relay URLs for (int i = 0; i < relay_count; i++) { free(relay_urls[i]); } free(relay_urls); free(statuses); if (!sub) { printf("โŒ Failed to create subscription\n"); cJSON_Delete(filter); return; } // Store subscription if (subscription_count >= subscription_capacity) { subscription_capacity = subscription_capacity == 0 ? 10 : subscription_capacity * 2; subscriptions = realloc(subscriptions, subscription_capacity * sizeof(nostr_pool_subscription_t*)); } subscriptions[subscription_count++] = sub; printf("โœ… Subscription created (ID: %d)\n", subscription_count); // Log the filter char* filter_json = cJSON_Print(filter); time_t now = time(NULL); char timestamp[26]; ctime_r(&now, timestamp); timestamp[24] = '\0'; dprintf(log_fd, "[%s] ๐Ÿ” New subscription created (ID: %d)\n", timestamp, subscription_count); dprintf(log_fd, "Filter: %s\n\n", filter_json); free(filter_json); } // Remove subscription void remove_subscription() { if (subscription_count == 0) { printf("โŒ No subscriptions to remove\n"); return; } printf("\n--- Remove Subscription ---\n"); printf("Available subscriptions:\n"); for (int i = 0; i < subscription_count; i++) { printf("%d. Subscription %d\n", i + 1, i + 1); } char* choice_input = get_input("Enter subscription number to remove", NULL); if (!choice_input || strlen(choice_input) == 0) { free(choice_input); return; } int choice = atoi(choice_input) - 1; free(choice_input); if (choice < 0 || choice >= subscription_count) { printf("โŒ Invalid subscription number\n"); return; } nostr_pool_subscription_close(subscriptions[choice]); // Shift remaining subscriptions for (int i = choice; i < subscription_count - 1; i++) { subscriptions[i] = subscriptions[i + 1]; } subscription_count--; printf("โœ… Subscription removed\n"); time_t now = time(NULL); char timestamp[26]; ctime_r(&now, timestamp); timestamp[24] = '\0'; dprintf(log_fd, "[%s] ๐Ÿ—‘๏ธ Subscription removed (was ID: %d)\n\n", timestamp, choice + 1); } // Show pool status void show_pool_status() { if (!pool) { printf("โŒ Pool not started\n"); return; } // Give polling thread time to establish connections printf("โณ Waiting for connections to establish...\n"); sleep(3); char** relay_urls = NULL; nostr_pool_relay_status_t* statuses = NULL; int relay_count = nostr_relay_pool_list_relays(pool, &relay_urls, &statuses); printf("\n๐Ÿ“Š POOL STATUS\n"); printf("Relays: %d\n", relay_count); printf("Subscriptions: %d\n", subscription_count); if (relay_count > 0) { printf("\nRelay Details:\n"); for (int i = 0; i < relay_count; i++) { const char* status_str; switch (statuses[i]) { case NOSTR_POOL_RELAY_CONNECTED: status_str = "๐ŸŸข CONNECTED"; break; case NOSTR_POOL_RELAY_CONNECTING: status_str = "๐ŸŸก CONNECTING"; break; case NOSTR_POOL_RELAY_DISCONNECTED: status_str = "โšช DISCONNECTED"; break; case NOSTR_POOL_RELAY_ERROR: status_str = "๐Ÿ”ด ERROR"; break; default: status_str = "โ“ UNKNOWN"; break; } printf("โ”œโ”€โ”€ %s: %s\n", relay_urls[i], status_str); const nostr_relay_stats_t* stats = nostr_relay_pool_get_relay_stats(pool, relay_urls[i]); if (stats) { printf("โ”‚ โ”œโ”€โ”€ Events received: %d\n", stats->events_received); printf("โ”‚ โ”œโ”€โ”€ Connection attempts: %d\n", stats->connection_attempts); printf("โ”‚ โ”œโ”€โ”€ Connection failures: %d\n", stats->connection_failures); printf("โ”‚ โ”œโ”€โ”€ Ping latency: %.2f ms\n", stats->ping_latency_current); printf("โ”‚ โ””โ”€โ”€ Query latency: %.2f ms\n", stats->query_latency_avg); } free(relay_urls[i]); } free(relay_urls); free(statuses); } printf("\n"); } int main() { // Setup logging to file log_fd = open("pool.log", O_WRONLY | O_CREAT | O_TRUNC, 0644); if (log_fd == -1) { fprintf(stderr, "โŒ Failed to open pool.log for writing\n"); return 1; } // Initialize NOSTR library if (nostr_init() != NOSTR_SUCCESS) { fprintf(stderr, "โŒ Failed to initialize NOSTR library\n"); close(log_fd); return 1; } // Setup signal handler signal(SIGINT, signal_handler); signal(SIGTERM, signal_handler); // Start polling thread if (pthread_create(&poll_thread, NULL, poll_thread_func, NULL) != 0) { fprintf(stderr, "โŒ Failed to create polling thread\n"); nostr_cleanup(); close(log_fd); return 1; } printf("๐Ÿ”— NOSTR Relay Pool Interactive Test\n"); printf("=====================================\n"); printf("All event output is logged to pool.log\n"); printf("Press Ctrl+C to exit\n\n"); time_t now = time(NULL); char timestamp[26]; ctime_r(&now, timestamp); timestamp[24] = '\0'; dprintf(log_fd, "[%s] ๐Ÿš€ Pool test started\n\n", timestamp); // Main menu loop while (running) { print_menu(); char choice; if (scanf("%c", &choice) != 1) { break; } // Consume newline int c; while ((c = getchar()) != '\n' && c != EOF); switch (choice) { case '1': { // Start Pool if (pool) { printf("โŒ Pool already started\n"); break; } // Create pool with custom reconnection configuration for faster testing nostr_pool_reconnect_config_t config = *nostr_pool_reconnect_config_default(); config.ping_interval_seconds = 5; // Ping every 5 seconds for testing pool = nostr_relay_pool_create(&config); if (!pool) { printf("โŒ Failed to create pool\n"); break; } if (nostr_relay_pool_add_relay(pool, "wss://relay.laantungir.net") != NOSTR_SUCCESS) { printf("โŒ Failed to add default relay\n"); nostr_relay_pool_destroy(pool); pool = NULL; break; } printf("โœ… Pool started with wss://relay.laantungir.net\n"); now = time(NULL); ctime_r(&now, timestamp); timestamp[24] = '\0'; dprintf(log_fd, "[%s] ๐ŸŠ Pool started with default relay\n\n", timestamp); break; } case '2': { // Stop Pool if (!pool) { printf("โŒ Pool not started\n"); break; } // Close all subscriptions for (int i = 0; i < subscription_count; i++) { if (subscriptions[i]) { nostr_pool_subscription_close(subscriptions[i]); } } free(subscriptions); subscriptions = NULL; subscription_count = 0; subscription_capacity = 0; nostr_relay_pool_destroy(pool); pool = NULL; printf("โœ… Pool stopped\n"); now = time(NULL); ctime_r(&now, timestamp); timestamp[24] = '\0'; dprintf(log_fd, "[%s] ๐Ÿ›‘ Pool stopped\n\n", timestamp); break; } case '3': { // Add relay if (!pool) { printf("โŒ Pool not started\n"); break; } char* url = get_input("Enter relay URL", "wss://relay.example.com"); if (url && strlen(url) > 0) { if (nostr_relay_pool_add_relay(pool, url) == NOSTR_SUCCESS) { printf("โœ… Relay added: %s\n", url); now = time(NULL); ctime_r(&now, timestamp); timestamp[24] = '\0'; dprintf(log_fd, "[%s] โž• Relay added: %s\n\n", timestamp, url); } else { printf("โŒ Failed to add relay\n"); } } free(url); break; } case '4': { // Remove relay if (!pool) { printf("โŒ Pool not started\n"); break; } char* url = get_input("Enter relay URL to remove", NULL); if (url && strlen(url) > 0) { if (nostr_relay_pool_remove_relay(pool, url) == NOSTR_SUCCESS) { printf("โœ… Relay removed: %s\n", url); now = time(NULL); ctime_r(&now, timestamp); timestamp[24] = '\0'; dprintf(log_fd, "[%s] โž– Relay removed: %s\n\n", timestamp, url); } else { printf("โŒ Failed to remove relay\n"); } } free(url); break; } case '5': // Add subscription add_subscription(); break; case '6': // Remove subscription remove_subscription(); break; case '7': // Show status show_pool_status(); break; case '8': { // Test reconnection if (!pool) { printf("โŒ Pool not started\n"); break; } char** relay_urls = NULL; nostr_pool_relay_status_t* statuses = NULL; int relay_count = nostr_relay_pool_list_relays(pool, &relay_urls, &statuses); if (relay_count <= 0) { printf("โŒ No relays in pool\n"); break; } printf("\n--- Test Reconnection ---\n"); printf("Available relays:\n"); for (int i = 0; i < relay_count; i++) { printf("%d. %s (%s)\n", i + 1, relay_urls[i], statuses[i] == NOSTR_POOL_RELAY_CONNECTED ? "CONNECTED" : "NOT CONNECTED"); } char* choice_input = get_input("Enter relay number to test reconnection with", NULL); if (!choice_input || strlen(choice_input) == 0) { for (int i = 0; i < relay_count; i++) free(relay_urls[i]); free(relay_urls); free(statuses); free(choice_input); break; } int choice = atoi(choice_input) - 1; free(choice_input); if (choice < 0 || choice >= relay_count) { printf("โŒ Invalid relay number\n"); for (int i = 0; i < relay_count; i++) free(relay_urls[i]); free(relay_urls); free(statuses); break; } printf("๐Ÿ”„ Testing reconnection with %s...\n", relay_urls[choice]); printf(" The pool is configured with automatic reconnection enabled.\n"); printf(" If the connection drops, it will automatically attempt to reconnect\n"); printf(" with exponential backoff (1s โ†’ 2s โ†’ 4s โ†’ 8s โ†’ 16s โ†’ 30s max).\n"); printf(" Connection health is monitored with ping/pong every 30 seconds.\n"); time_t now = time(NULL); char timestamp[26]; ctime_r(&now, timestamp); timestamp[24] = '\0'; dprintf(log_fd, "[%s] ๐Ÿ”„ TEST: Testing reconnection behavior with %s\n", timestamp, relay_urls[choice]); dprintf(log_fd, " Pool configured with: auto-reconnect=ON, max_attempts=10, ping_interval=30s\n\n"); printf("โœ… Reconnection test initiated. Monitor the status and logs for reconnection activity.\n"); for (int i = 0; i < relay_count; i++) free(relay_urls[i]); free(relay_urls); free(statuses); break; } case '9': // Exit running = 0; break; default: printf("โŒ Invalid choice\n"); break; } } printf("\n๐Ÿงน Cleaning up...\n"); // Stop polling thread running = 0; pthread_join(poll_thread, NULL); // Clean up pool and subscriptions if (pool) { for (int i = 0; i < subscription_count; i++) { if (subscriptions[i]) { nostr_pool_subscription_close(subscriptions[i]); } } free(subscriptions); nostr_relay_pool_destroy(pool); printf("โœ… Pool destroyed\n"); } // Cleanup nostr_cleanup(); close(log_fd); printf("๐Ÿ‘‹ Test completed\n"); return 0; }