mirror of
https://github.com/DocNR/POWR.git
synced 2025-04-19 10:51:19 +00:00
Fix Following feed transaction conflicts and improve error handling
- Implemented global transaction lock mechanism in SocialFeedCache - Updated ContactCacheService to use the transaction lock - Enhanced Following feed refresh logic with retry mechanism - Extended transaction lock to WorkoutService - Updated documentation with transaction lock details - Cleaned up code and improved error handling
This commit is contained in:
parent
2316a93dc2
commit
4e5ca9fcaf
25
CHANGELOG.md
25
CHANGELOG.md
@ -39,13 +39,18 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
- Added debounced subscriptions to prevent rapid resubscriptions
|
||||
- Enhanced error handling to prevent cascading failures
|
||||
- Added proper initialization of SocialFeedCache in RelayInitializer
|
||||
- Fixed following feed refresh issue that caused feed to reset unexpectedly
|
||||
- Implemented contact caching to prevent feed refresh loops
|
||||
- Added schema support for contact list caching (version 12)
|
||||
- Simplified feed refresh logic to prevent unnecessary subscription resets
|
||||
- Enhanced Following feed stability with improved contact management
|
||||
- Fixed database transaction conflicts between SocialFeedCache and ContactCacheService
|
||||
- Implemented global transaction lock mechanism to prevent nested transactions
|
||||
- Added transaction queue for coordinating database operations across services
|
||||
- Enhanced Following feed refresh logic with retry mechanism and better state tracking
|
||||
- Added safeguards to prevent multiple simultaneous refresh attempts
|
||||
- Improved error recovery in contact-based feed refreshes
|
||||
- Enhanced Social Feed Filtering
|
||||
- Updated feed filtering rules to focus on fitness-related content
|
||||
- Implemented consistent tag-based filtering across all feeds
|
||||
- Added comprehensive fitness tag list (#workout, #fitness, #powr, etc.)
|
||||
- Removed article drafts (kind 30024) from all feeds
|
||||
- Created detailed documentation for feed filtering rules
|
||||
- Enhanced POWR feed to only show published content
|
||||
- Updated Community feed (formerly Global) with better content focus
|
||||
- Improved Following feed with consistent filtering rules
|
||||
- Social Feed Caching Implementation
|
||||
@ -558,7 +563,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
- TypeScript parameter typing in database services
|
||||
- Null value handling in database operations
|
||||
- Development seeding duplicate prevention
|
||||
- Template category spacing issues
|
||||
- Template category sunctionality
|
||||
- Keyboard overlap isspes in exercise creation form
|
||||
- SQLite traasacingn nesting issues
|
||||
- TypeScript parameter typing i database services
|
||||
- Null visue handlsng in dauabase operations
|
||||
- Development seeding duplicate prevention
|
||||
- Template categore spacing issuess
|
||||
- Exercise list rendering on iOS
|
||||
- Database reset and reseeding behavior
|
||||
- Template details UI overflow issues
|
||||
|
@ -26,12 +26,15 @@ function FollowingScreen() {
|
||||
}
|
||||
}, [contacts.length]);
|
||||
|
||||
// Track if feed has loaded successfully with content
|
||||
// Feed loading state tracking
|
||||
const [hasLoadedWithContent, setHasLoadedWithContent] = useState(false);
|
||||
// Track if we've loaded content with the full contact list
|
||||
const [hasLoadedWithContacts, setHasLoadedWithContacts] = useState(false);
|
||||
// Track the number of contacts we've loaded content with
|
||||
const [loadedContactsCount, setLoadedContactsCount] = useState(0);
|
||||
const [isRefreshingWithContacts, setIsRefreshingWithContacts] = useState(false);
|
||||
|
||||
// Contact refresh retry tracking
|
||||
const [contactRefreshAttempts, setContactRefreshAttempts] = useState(0);
|
||||
const maxContactRefreshAttempts = 3; // Limit to prevent infinite refresh attempts
|
||||
|
||||
// Use the enhanced useSocialFeed hook with the contact list
|
||||
// Always pass an array, even if empty, to ensure consistent behavior
|
||||
@ -81,86 +84,51 @@ function FollowingScreen() {
|
||||
}
|
||||
}, [contacts.length, loadedContactsCount]);
|
||||
|
||||
// Auto-refresh when contacts are loaded with improved retry logic
|
||||
// Auto-refresh when contacts list changes
|
||||
React.useEffect(() => {
|
||||
// Trigger refresh when contacts change from empty to non-empty
|
||||
// OR when we have content but haven't loaded with the full contact list yet
|
||||
if (contacts.length > 0 && !isLoadingContacts &&
|
||||
(!hasLoadedWithContent || !hasLoadedWithContacts)) {
|
||||
console.log('[FollowingScreen] Contacts loaded, triggering auto-refresh');
|
||||
|
||||
// Track retry attempts
|
||||
let retryCount = 0;
|
||||
const maxRetries = 3;
|
||||
|
||||
// Function to attempt refresh with exponential backoff
|
||||
const attemptRefresh = () => {
|
||||
// Increase delay with each retry (1s, 2s, 4s)
|
||||
const delay = 1000 * Math.pow(2, retryCount);
|
||||
|
||||
console.log(`[FollowingScreen] Scheduling refresh attempt ${retryCount + 1}/${maxRetries + 1} in ${delay}ms`);
|
||||
|
||||
return setTimeout(async () => {
|
||||
// Skip if we've loaded content with the full contact list in the meantime
|
||||
if (hasLoadedWithContent && hasLoadedWithContacts) {
|
||||
console.log('[FollowingScreen] Content already loaded with full contact list, skipping refresh');
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
console.log(`[FollowingScreen] Executing refresh attempt ${retryCount + 1}/${maxRetries + 1}`);
|
||||
// Use force refresh to bypass cooldown
|
||||
await refresh(true);
|
||||
|
||||
// Check if we got any items after a short delay
|
||||
setTimeout(() => {
|
||||
if (feedItems.length === 0 && retryCount < maxRetries &&
|
||||
(!hasLoadedWithContent || !hasLoadedWithContacts)) {
|
||||
console.log(`[FollowingScreen] No items after refresh attempt ${retryCount + 1}, retrying...`);
|
||||
retryCount++;
|
||||
const nextTimer = attemptRefresh();
|
||||
|
||||
// Store the timer ID in the ref so we can clear it if needed
|
||||
retryTimerRef.current = nextTimer;
|
||||
} else if (feedItems.length > 0) {
|
||||
console.log(`[FollowingScreen] Refresh successful, got ${feedItems.length} items`);
|
||||
setHasLoadedWithContent(true);
|
||||
// Mark as loaded with contacts if we have the full contact list
|
||||
if (contacts.length > 0) {
|
||||
console.log(`[FollowingScreen] Marking as loaded with ${contacts.length} contacts`);
|
||||
setLoadedContactsCount(contacts.length);
|
||||
setHasLoadedWithContacts(true);
|
||||
}
|
||||
} else {
|
||||
console.log(`[FollowingScreen] All refresh attempts completed, got ${feedItems.length} items`);
|
||||
}
|
||||
}, 500);
|
||||
} catch (error) {
|
||||
console.error(`[FollowingScreen] Error during refresh attempt ${retryCount + 1}:`, error);
|
||||
|
||||
// Retry on error if we haven't exceeded max retries
|
||||
if (retryCount < maxRetries && (!hasLoadedWithContent || !hasLoadedWithContacts)) {
|
||||
retryCount++;
|
||||
const nextTimer = attemptRefresh();
|
||||
retryTimerRef.current = nextTimer;
|
||||
}
|
||||
}
|
||||
}, delay);
|
||||
};
|
||||
|
||||
// Start the first attempt with initial delay
|
||||
const timerId = attemptRefresh();
|
||||
retryTimerRef.current = timerId;
|
||||
|
||||
// Clean up any pending timers on unmount
|
||||
return () => {
|
||||
if (retryTimerRef.current) {
|
||||
clearTimeout(retryTimerRef.current);
|
||||
retryTimerRef.current = null;
|
||||
}
|
||||
};
|
||||
// Prevent multiple simultaneous refresh attempts
|
||||
if (isRefreshingWithContacts) {
|
||||
return;
|
||||
}
|
||||
}, [contacts.length, isLoadingContacts, refresh, feedItems.length, hasLoadedWithContent, hasLoadedWithContacts]);
|
||||
|
||||
// Only refresh if we have contacts, aren't currently loading contacts,
|
||||
// and either haven't loaded with contacts yet or have no feed items
|
||||
const shouldRefresh = contacts.length > 0 &&
|
||||
!isLoadingContacts &&
|
||||
(!hasLoadedWithContacts || feedItems.length === 0) &&
|
||||
contactRefreshAttempts < maxContactRefreshAttempts;
|
||||
|
||||
if (shouldRefresh) {
|
||||
console.log(`[FollowingScreen] Refreshing feed with ${contacts.length} contacts (attempt ${contactRefreshAttempts + 1}/${maxContactRefreshAttempts})`);
|
||||
|
||||
setIsRefreshingWithContacts(true);
|
||||
setContactRefreshAttempts(prev => prev + 1);
|
||||
|
||||
refresh(true)
|
||||
.then(() => {
|
||||
setHasLoadedWithContent(true);
|
||||
setHasLoadedWithContacts(true);
|
||||
setIsRefreshingWithContacts(false);
|
||||
})
|
||||
.catch(error => {
|
||||
console.error('[FollowingScreen] Error refreshing feed:', error);
|
||||
setIsRefreshingWithContacts(false);
|
||||
|
||||
// Prevent infinite retries by marking as loaded after max attempts
|
||||
if (contactRefreshAttempts >= maxContactRefreshAttempts - 1) {
|
||||
setHasLoadedWithContacts(true);
|
||||
}
|
||||
});
|
||||
}
|
||||
}, [
|
||||
contacts.length,
|
||||
isLoadingContacts,
|
||||
hasLoadedWithContacts,
|
||||
feedItems.length,
|
||||
refresh,
|
||||
contactRefreshAttempts,
|
||||
isRefreshingWithContacts
|
||||
]);
|
||||
|
||||
const [isRefreshing, setIsRefreshing] = useState(false);
|
||||
const [showNewButton, setShowNewButton] = useState(false);
|
||||
@ -191,44 +159,31 @@ function FollowingScreen() {
|
||||
listRef.current?.scrollToOffset({ offset: 0, animated: true });
|
||||
}, []);
|
||||
|
||||
// Handle refresh - updated to use forceRefresh parameter
|
||||
// Manual refresh handler with improved error handling
|
||||
const handleRefresh = useCallback(async () => {
|
||||
setIsRefreshing(true);
|
||||
|
||||
try {
|
||||
console.log('[FollowingScreen] Starting manual refresh (force=true)');
|
||||
// Reset retry counter on manual refresh
|
||||
setContactRefreshAttempts(0);
|
||||
|
||||
// Check if we have contacts before refreshing
|
||||
if (contacts.length === 0) {
|
||||
console.log('[FollowingScreen] No contacts available for refresh, using fallback');
|
||||
// Still try to refresh with force=true to bypass cooldown
|
||||
}
|
||||
|
||||
// Use force=true to bypass cooldown
|
||||
// Force refresh to bypass cooldown
|
||||
await refresh(true);
|
||||
// Add a slight delay to ensure the UI updates
|
||||
await new Promise(resolve => setTimeout(resolve, 300));
|
||||
console.log('[FollowingScreen] Manual refresh completed successfully');
|
||||
|
||||
// If we get content, mark as loaded with content
|
||||
// Small delay to ensure UI updates
|
||||
await new Promise(resolve => setTimeout(resolve, 300));
|
||||
|
||||
// Update loading states if content is available
|
||||
if (feedItems.length > 0) {
|
||||
setHasLoadedWithContent(true);
|
||||
|
||||
// Mark as loaded with contacts if we have the full contact list
|
||||
if (contacts.length > 0) {
|
||||
console.log(`[FollowingScreen] Marking as loaded with ${contacts.length} contacts after manual refresh`);
|
||||
setLoadedContactsCount(contacts.length);
|
||||
setHasLoadedWithContacts(true);
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
console.error('[FollowingScreen] Error refreshing feed:', error);
|
||||
// Log more detailed error information
|
||||
if (error instanceof Error) {
|
||||
console.error(`[FollowingScreen] Error details: ${error.message}`);
|
||||
if (error.stack) {
|
||||
console.error(`[FollowingScreen] Stack trace: ${error.stack}`);
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
setIsRefreshing(false);
|
||||
}
|
||||
|
@ -147,13 +147,261 @@ useEffect(() => {
|
||||
}, [ndk, db]);
|
||||
```
|
||||
|
||||
### Global Transaction Lock Mechanism
|
||||
|
||||
To prevent transaction conflicts between different services (such as SocialFeedCache and ContactCacheService), we've implemented a global transaction lock mechanism in the SocialFeedCache class:
|
||||
|
||||
```typescript
|
||||
// Global transaction lock to prevent transaction conflicts across services
|
||||
private static transactionLock: boolean = false;
|
||||
private static transactionQueue: (() => Promise<void>)[] = [];
|
||||
private static processingQueue: boolean = false;
|
||||
|
||||
/**
|
||||
* Acquire the global transaction lock
|
||||
* @returns True if lock was acquired, false otherwise
|
||||
*/
|
||||
private static acquireTransactionLock(): boolean {
|
||||
if (SocialFeedCache.transactionLock) {
|
||||
return false;
|
||||
}
|
||||
SocialFeedCache.transactionLock = true;
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Release the global transaction lock
|
||||
*/
|
||||
private static releaseTransactionLock(): void {
|
||||
SocialFeedCache.transactionLock = false;
|
||||
// Process the next transaction in queue if any
|
||||
if (SocialFeedCache.transactionQueue.length > 0 && !SocialFeedCache.processingQueue) {
|
||||
SocialFeedCache.processTransactionQueue();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a transaction to the queue
|
||||
* @param transaction Function that performs the transaction
|
||||
*/
|
||||
private static enqueueTransaction(transaction: () => Promise<void>): void {
|
||||
SocialFeedCache.transactionQueue.push(transaction);
|
||||
// Start processing the queue if not already processing
|
||||
if (!SocialFeedCache.processingQueue) {
|
||||
SocialFeedCache.processTransactionQueue();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Process the transaction queue
|
||||
*/
|
||||
private static async processTransactionQueue(): Promise<void> {
|
||||
if (SocialFeedCache.processingQueue || SocialFeedCache.transactionQueue.length === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
SocialFeedCache.processingQueue = true;
|
||||
|
||||
try {
|
||||
while (SocialFeedCache.transactionQueue.length > 0) {
|
||||
// Wait until we can acquire the lock
|
||||
if (!SocialFeedCache.acquireTransactionLock()) {
|
||||
// If we can't acquire the lock, wait and try again
|
||||
await new Promise(resolve => setTimeout(resolve, 100));
|
||||
continue;
|
||||
}
|
||||
|
||||
// Get the next transaction
|
||||
const transaction = SocialFeedCache.transactionQueue.shift();
|
||||
if (!transaction) {
|
||||
SocialFeedCache.releaseTransactionLock();
|
||||
continue;
|
||||
}
|
||||
|
||||
try {
|
||||
// Execute the transaction
|
||||
await transaction();
|
||||
} catch (error) {
|
||||
console.error('[SocialFeedCache] Error executing queued transaction:', error);
|
||||
} finally {
|
||||
// Release the lock
|
||||
SocialFeedCache.releaseTransactionLock();
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
SocialFeedCache.processingQueue = false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute a transaction with the global lock
|
||||
* @param transaction Function that performs the transaction
|
||||
*/
|
||||
public static async executeWithLock(transaction: () => Promise<void>): Promise<void> {
|
||||
// Add the transaction to the queue
|
||||
SocialFeedCache.enqueueTransaction(transaction);
|
||||
}
|
||||
```
|
||||
|
||||
This mechanism ensures that only one transaction is active at any given time, preventing the "cannot start a transaction within a transaction" error that can occur when two services try to start transactions simultaneously.
|
||||
|
||||
The `executeWithLock` method can be used by other services to coordinate their database transactions with SocialFeedCache:
|
||||
|
||||
```typescript
|
||||
// Example usage in ContactCacheService
|
||||
async cacheContacts(ownerPubkey: string, contacts: string[]): Promise<void> {
|
||||
if (!ownerPubkey || !contacts.length) return;
|
||||
|
||||
try {
|
||||
// Use the global transaction lock to prevent conflicts with other services
|
||||
await SocialFeedCache.executeWithLock(async () => {
|
||||
try {
|
||||
// Use a transaction for better performance
|
||||
await this.db.withTransactionAsync(async () => {
|
||||
// Database operations...
|
||||
});
|
||||
} catch (error) {
|
||||
console.error('[ContactCacheService] Error in transaction:', error);
|
||||
throw error; // Rethrow to ensure the transaction is marked as failed
|
||||
}
|
||||
});
|
||||
} catch (error) {
|
||||
console.error('[ContactCacheService] Error caching contacts:', error);
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### Enhanced Write Buffer System
|
||||
|
||||
The write buffer system has been enhanced with exponential backoff and improved error handling:
|
||||
|
||||
```typescript
|
||||
private async flushWriteBuffer() {
|
||||
if (this.writeBuffer.length === 0 || this.processingTransaction) return;
|
||||
|
||||
// Check if database is available
|
||||
if (!this.isDbAvailable()) {
|
||||
console.log('[SocialFeedCache] Database not available, delaying flush');
|
||||
this.scheduleNextFlush(true); // Schedule with backoff
|
||||
return;
|
||||
}
|
||||
|
||||
// Take only a batch of operations to process at once
|
||||
const bufferCopy = [...this.writeBuffer].slice(0, this.maxBatchSize);
|
||||
this.writeBuffer = this.writeBuffer.slice(bufferCopy.length);
|
||||
|
||||
this.processingTransaction = true;
|
||||
|
||||
// Use the transaction lock to prevent conflicts
|
||||
try {
|
||||
// Check if we've exceeded the maximum retry count
|
||||
if (this.retryCount > this.maxRetryCount) {
|
||||
console.warn(`[SocialFeedCache] Exceeded maximum retry count (${this.maxRetryCount}), dropping ${bufferCopy.length} operations`);
|
||||
// Reset retry count but don't retry these operations
|
||||
this.retryCount = 0;
|
||||
this.processingTransaction = false;
|
||||
this.scheduleNextFlush();
|
||||
return;
|
||||
}
|
||||
|
||||
// Increment retry count before attempting transaction
|
||||
this.retryCount++;
|
||||
|
||||
// Execute the transaction with the global lock
|
||||
await SocialFeedCache.executeWithLock(async () => {
|
||||
try {
|
||||
// Execute the transaction
|
||||
await this.db.withTransactionAsync(async () => {
|
||||
for (const { query, params } of bufferCopy) {
|
||||
try {
|
||||
await this.db.runAsync(query, params);
|
||||
} catch (innerError) {
|
||||
// Log individual query errors but continue with other queries
|
||||
console.error(`[SocialFeedCache] Error executing query: ${query}`, innerError);
|
||||
// Don't rethrow to allow other queries to proceed
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// Success - reset retry count
|
||||
this.retryCount = 0;
|
||||
this.dbAvailable = true; // Mark database as available
|
||||
} catch (error) {
|
||||
console.error('[SocialFeedCache] Error in transaction:', error);
|
||||
|
||||
// Check for database connection errors
|
||||
if (error instanceof Error &&
|
||||
(error.message.includes('closed resource') ||
|
||||
error.message.includes('Database not available'))) {
|
||||
// Mark database as unavailable
|
||||
this.dbAvailable = false;
|
||||
console.warn('[SocialFeedCache] Database connection issue detected, marking as unavailable');
|
||||
|
||||
// Add all operations back to the buffer
|
||||
this.writeBuffer = [...bufferCopy, ...this.writeBuffer];
|
||||
} else {
|
||||
// For other errors, add operations back to the buffer
|
||||
// but only if they're not already there (avoid duplicates)
|
||||
for (const op of bufferCopy) {
|
||||
if (!this.writeBuffer.some(item =>
|
||||
item.query === op.query &&
|
||||
JSON.stringify(item.params) === JSON.stringify(op.params)
|
||||
)) {
|
||||
// Add back to the beginning of the buffer to retry sooner
|
||||
this.writeBuffer.unshift(op);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Rethrow to ensure the transaction is marked as failed
|
||||
throw error;
|
||||
}
|
||||
});
|
||||
} catch (error) {
|
||||
console.error('[SocialFeedCache] Error flushing write buffer:', error);
|
||||
} finally {
|
||||
this.processingTransaction = false;
|
||||
this.scheduleNextFlush();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Schedule the next buffer flush with optional backoff
|
||||
*/
|
||||
private scheduleNextFlush(withBackoff: boolean = false) {
|
||||
if (this.bufferFlushTimer) {
|
||||
clearTimeout(this.bufferFlushTimer);
|
||||
this.bufferFlushTimer = null;
|
||||
}
|
||||
|
||||
if (this.writeBuffer.length > 0) {
|
||||
let delay = this.bufferFlushTimeout;
|
||||
|
||||
if (withBackoff) {
|
||||
// Use exponential backoff based on retry count
|
||||
delay = Math.min(
|
||||
this.bufferFlushTimeout * Math.pow(2, this.retryCount),
|
||||
this.maxBackoffTime
|
||||
);
|
||||
}
|
||||
|
||||
console.log(`[SocialFeedCache] Scheduling next flush in ${delay}ms (retry: ${this.retryCount})`);
|
||||
this.bufferFlushTimer = setTimeout(() => this.flushWriteBuffer(), delay);
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
## Benefits
|
||||
|
||||
1. **Reduced Transaction Conflicts**: The write buffer system prevents transaction conflicts by batching operations.
|
||||
2. **Improved Performance**: The LRU cache reduces redundant database operations.
|
||||
3. **Better Error Handling**: The system includes robust error handling to prevent cascading failures.
|
||||
4. **Offline Support**: The cache system provides offline access to social feed data.
|
||||
5. **Reduced Network Usage**: The system reduces network usage by caching events locally.
|
||||
1. **Eliminated Transaction Conflicts**: The global transaction lock mechanism prevents transaction conflicts between different services.
|
||||
2. **Improved Reliability**: The transaction queue ensures that all transactions are processed even if they can't be executed immediately.
|
||||
3. **Enhanced Error Recovery**: The exponential backoff and retry mechanism improves recovery from temporary database errors.
|
||||
4. **Better Offline Stability**: The system handles database unavailability gracefully, enabling seamless offline operation.
|
||||
5. **Reduced Database Contention**: Coordinated transactions reduce contention on the database.
|
||||
6. **Improved Performance**: The LRU cache reduces redundant database operations.
|
||||
7. **Better Error Handling**: The system includes robust error handling to prevent cascading failures.
|
||||
8. **Offline Support**: The cache system provides offline access to social feed data.
|
||||
9. **Reduced Network Usage**: The system reduces network usage by caching events locally.
|
||||
|
||||
## Debugging
|
||||
|
||||
|
158
lib/db/services/ContactCacheService.ts
Normal file
158
lib/db/services/ContactCacheService.ts
Normal file
@ -0,0 +1,158 @@
|
||||
// lib/db/services/ContactCacheService.ts
|
||||
|
||||
import { SQLiteDatabase } from 'expo-sqlite';
|
||||
import { DbService } from '../db-service';
|
||||
import { SocialFeedCache } from './SocialFeedCache';
|
||||
|
||||
/**
|
||||
* Service for caching user contact lists
|
||||
* This service provides offline access to contact lists
|
||||
*/
|
||||
export class ContactCacheService {
|
||||
private db: DbService;
|
||||
|
||||
constructor(database: SQLiteDatabase) {
|
||||
this.db = new DbService(database);
|
||||
this.initializeTable();
|
||||
}
|
||||
|
||||
/**
|
||||
* Initialize the contact cache table
|
||||
*/
|
||||
private async initializeTable(): Promise<void> {
|
||||
try {
|
||||
// Create contact_cache table if it doesn't exist
|
||||
await this.db.runAsync(`
|
||||
CREATE TABLE IF NOT EXISTS contact_cache (
|
||||
owner_pubkey TEXT NOT NULL,
|
||||
contact_pubkey TEXT NOT NULL,
|
||||
cached_at INTEGER NOT NULL,
|
||||
PRIMARY KEY (owner_pubkey, contact_pubkey)
|
||||
)
|
||||
`);
|
||||
|
||||
// Create index for faster queries
|
||||
await this.db.runAsync(`
|
||||
CREATE INDEX IF NOT EXISTS idx_contact_cache_owner
|
||||
ON contact_cache (owner_pubkey)
|
||||
`);
|
||||
|
||||
console.log('[ContactCacheService] Contact cache table initialized');
|
||||
} catch (error) {
|
||||
console.error('[ContactCacheService] Error initializing table:', error);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Cache contacts for a user
|
||||
* @param ownerPubkey The user's pubkey
|
||||
* @param contacts Array of contact pubkeys
|
||||
*/
|
||||
async cacheContacts(ownerPubkey: string, contacts: string[]): Promise<void> {
|
||||
if (!ownerPubkey || !contacts.length) return;
|
||||
|
||||
try {
|
||||
// Use the global transaction lock to prevent conflicts with other services
|
||||
await SocialFeedCache.executeWithLock(async () => {
|
||||
try {
|
||||
// Use a transaction for better performance
|
||||
await this.db.withTransactionAsync(async () => {
|
||||
// First delete all existing contacts for this owner
|
||||
await this.db.runAsync(
|
||||
'DELETE FROM contact_cache WHERE owner_pubkey = ?',
|
||||
[ownerPubkey]
|
||||
);
|
||||
|
||||
// Then insert all contacts
|
||||
const timestamp = Date.now();
|
||||
const insertPromises = contacts.map(contactPubkey =>
|
||||
this.db.runAsync(
|
||||
'INSERT INTO contact_cache (owner_pubkey, contact_pubkey, cached_at) VALUES (?, ?, ?)',
|
||||
[ownerPubkey, contactPubkey, timestamp]
|
||||
)
|
||||
);
|
||||
|
||||
await Promise.all(insertPromises);
|
||||
});
|
||||
|
||||
console.log(`[ContactCacheService] Cached ${contacts.length} contacts for ${ownerPubkey}`);
|
||||
} catch (error) {
|
||||
console.error('[ContactCacheService] Error in transaction:', error);
|
||||
throw error; // Rethrow to ensure the transaction is marked as failed
|
||||
}
|
||||
});
|
||||
} catch (error) {
|
||||
console.error('[ContactCacheService] Error caching contacts:', error);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get cached contacts for a user
|
||||
* @param ownerPubkey The user's pubkey
|
||||
* @returns Array of contact pubkeys
|
||||
*/
|
||||
async getCachedContacts(ownerPubkey: string): Promise<string[]> {
|
||||
if (!ownerPubkey) return [];
|
||||
|
||||
try {
|
||||
const rows = await this.db.getAllAsync<{ contact_pubkey: string }>(
|
||||
'SELECT contact_pubkey FROM contact_cache WHERE owner_pubkey = ?',
|
||||
[ownerPubkey]
|
||||
);
|
||||
|
||||
return rows.map(row => row.contact_pubkey);
|
||||
} catch (error) {
|
||||
console.error('[ContactCacheService] Error getting cached contacts:', error);
|
||||
return [];
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Clear cached contacts for a user
|
||||
* @param ownerPubkey The user's pubkey
|
||||
*/
|
||||
async clearCachedContacts(ownerPubkey: string): Promise<void> {
|
||||
if (!ownerPubkey) return;
|
||||
|
||||
try {
|
||||
await this.db.runAsync(
|
||||
'DELETE FROM contact_cache WHERE owner_pubkey = ?',
|
||||
[ownerPubkey]
|
||||
);
|
||||
|
||||
console.log(`[ContactCacheService] Cleared cached contacts for ${ownerPubkey}`);
|
||||
} catch (error) {
|
||||
console.error('[ContactCacheService] Error clearing cached contacts:', error);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Clear old cached contacts
|
||||
* @param maxAgeDays Maximum age in days (default: 7)
|
||||
*/
|
||||
async clearOldCache(maxAgeDays: number = 7): Promise<void> {
|
||||
try {
|
||||
const maxAgeMs = maxAgeDays * 24 * 60 * 60 * 1000;
|
||||
const cutoffTime = Date.now() - maxAgeMs;
|
||||
|
||||
const result = await this.db.runAsync(
|
||||
'DELETE FROM contact_cache WHERE cached_at < ?',
|
||||
[cutoffTime]
|
||||
);
|
||||
|
||||
console.log(`[ContactCacheService] Cleared old contact cache entries`, result);
|
||||
} catch (error) {
|
||||
console.error('[ContactCacheService] Error clearing old cache:', error);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Create a singleton factory function
|
||||
let contactCacheService: ContactCacheService | null = null;
|
||||
|
||||
export function getContactCacheService(database: SQLiteDatabase): ContactCacheService {
|
||||
if (!contactCacheService) {
|
||||
contactCacheService = new ContactCacheService(database);
|
||||
}
|
||||
return contactCacheService;
|
||||
}
|
@ -27,9 +27,14 @@ export class SocialFeedCache {
|
||||
private maxBatchSize: number = 20; // Maximum operations per batch
|
||||
private dbAvailable: boolean = true; // Track database availability
|
||||
|
||||
// Global transaction lock to prevent transaction conflicts across services
|
||||
private static transactionLock: boolean = false;
|
||||
private static transactionQueue: (() => Promise<void>)[] = [];
|
||||
private static processingQueue: boolean = false;
|
||||
|
||||
// LRU cache for tracking known events
|
||||
private knownEventIds: LRUCache<string, number>; // Event ID -> timestamp
|
||||
|
||||
|
||||
constructor(database: SQLiteDatabase) {
|
||||
this.db = new DbService(database);
|
||||
this.eventCache = new EventCache(database);
|
||||
@ -76,6 +81,91 @@ export class SocialFeedCache {
|
||||
return this.dbAvailable && !!this.db;
|
||||
}
|
||||
|
||||
/**
|
||||
* Acquire the global transaction lock
|
||||
* @returns True if lock was acquired, false otherwise
|
||||
*/
|
||||
private static acquireTransactionLock(): boolean {
|
||||
if (SocialFeedCache.transactionLock) {
|
||||
return false;
|
||||
}
|
||||
SocialFeedCache.transactionLock = true;
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Release the global transaction lock
|
||||
*/
|
||||
private static releaseTransactionLock(): void {
|
||||
SocialFeedCache.transactionLock = false;
|
||||
// Process the next transaction in queue if any
|
||||
if (SocialFeedCache.transactionQueue.length > 0 && !SocialFeedCache.processingQueue) {
|
||||
SocialFeedCache.processTransactionQueue();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a transaction to the queue
|
||||
* @param transaction Function that performs the transaction
|
||||
*/
|
||||
private static enqueueTransaction(transaction: () => Promise<void>): void {
|
||||
SocialFeedCache.transactionQueue.push(transaction);
|
||||
// Start processing the queue if not already processing
|
||||
if (!SocialFeedCache.processingQueue) {
|
||||
SocialFeedCache.processTransactionQueue();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Process the transaction queue
|
||||
*/
|
||||
private static async processTransactionQueue(): Promise<void> {
|
||||
if (SocialFeedCache.processingQueue || SocialFeedCache.transactionQueue.length === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
SocialFeedCache.processingQueue = true;
|
||||
|
||||
try {
|
||||
while (SocialFeedCache.transactionQueue.length > 0) {
|
||||
// Wait until we can acquire the lock
|
||||
if (!SocialFeedCache.acquireTransactionLock()) {
|
||||
// If we can't acquire the lock, wait and try again
|
||||
await new Promise(resolve => setTimeout(resolve, 100));
|
||||
continue;
|
||||
}
|
||||
|
||||
// Get the next transaction
|
||||
const transaction = SocialFeedCache.transactionQueue.shift();
|
||||
if (!transaction) {
|
||||
SocialFeedCache.releaseTransactionLock();
|
||||
continue;
|
||||
}
|
||||
|
||||
try {
|
||||
// Execute the transaction
|
||||
await transaction();
|
||||
} catch (error) {
|
||||
console.error('[SocialFeedCache] Error executing queued transaction:', error);
|
||||
} finally {
|
||||
// Release the lock
|
||||
SocialFeedCache.releaseTransactionLock();
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
SocialFeedCache.processingQueue = false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute a transaction with the global lock
|
||||
* @param transaction Function that performs the transaction
|
||||
*/
|
||||
public static async executeWithLock(transaction: () => Promise<void>): Promise<void> {
|
||||
// Add the transaction to the queue
|
||||
SocialFeedCache.enqueueTransaction(transaction);
|
||||
}
|
||||
|
||||
/**
|
||||
* Flush the write buffer, executing queued operations in a transaction
|
||||
*/
|
||||
@ -95,6 +185,7 @@ export class SocialFeedCache {
|
||||
|
||||
this.processingTransaction = true;
|
||||
|
||||
// Use the transaction lock to prevent conflicts
|
||||
try {
|
||||
// Check if we've exceeded the maximum retry count
|
||||
if (this.retryCount > this.maxRetryCount) {
|
||||
@ -109,48 +200,58 @@ export class SocialFeedCache {
|
||||
// Increment retry count before attempting transaction
|
||||
this.retryCount++;
|
||||
|
||||
// Execute the transaction
|
||||
await this.db.withTransactionAsync(async () => {
|
||||
for (const { query, params } of bufferCopy) {
|
||||
try {
|
||||
await this.db.runAsync(query, params);
|
||||
} catch (innerError) {
|
||||
// Log individual query errors but continue with other queries
|
||||
console.error(`[SocialFeedCache] Error executing query: ${query}`, innerError);
|
||||
// Don't rethrow to allow other queries to proceed
|
||||
// Execute the transaction with the global lock
|
||||
await SocialFeedCache.executeWithLock(async () => {
|
||||
try {
|
||||
// Execute the transaction
|
||||
await this.db.withTransactionAsync(async () => {
|
||||
for (const { query, params } of bufferCopy) {
|
||||
try {
|
||||
await this.db.runAsync(query, params);
|
||||
} catch (innerError) {
|
||||
// Log individual query errors but continue with other queries
|
||||
console.error(`[SocialFeedCache] Error executing query: ${query}`, innerError);
|
||||
// Don't rethrow to allow other queries to proceed
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// Success - reset retry count
|
||||
this.retryCount = 0;
|
||||
this.dbAvailable = true; // Mark database as available
|
||||
} catch (error) {
|
||||
console.error('[SocialFeedCache] Error in transaction:', error);
|
||||
|
||||
// Check for database connection errors
|
||||
if (error instanceof Error &&
|
||||
(error.message.includes('closed resource') ||
|
||||
error.message.includes('Database not available'))) {
|
||||
// Mark database as unavailable
|
||||
this.dbAvailable = false;
|
||||
console.warn('[SocialFeedCache] Database connection issue detected, marking as unavailable');
|
||||
|
||||
// Add all operations back to the buffer
|
||||
this.writeBuffer = [...bufferCopy, ...this.writeBuffer];
|
||||
} else {
|
||||
// For other errors, add operations back to the buffer
|
||||
// but only if they're not already there (avoid duplicates)
|
||||
for (const op of bufferCopy) {
|
||||
if (!this.writeBuffer.some(item =>
|
||||
item.query === op.query &&
|
||||
JSON.stringify(item.params) === JSON.stringify(op.params)
|
||||
)) {
|
||||
// Add back to the beginning of the buffer to retry sooner
|
||||
this.writeBuffer.unshift(op);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Rethrow to ensure the transaction is marked as failed
|
||||
throw error;
|
||||
}
|
||||
});
|
||||
|
||||
// Success - reset retry count
|
||||
this.retryCount = 0;
|
||||
this.dbAvailable = true; // Mark database as available
|
||||
} catch (error) {
|
||||
console.error('[SocialFeedCache] Error flushing write buffer:', error);
|
||||
|
||||
// Check for database connection errors
|
||||
if (error instanceof Error &&
|
||||
(error.message.includes('closed resource') ||
|
||||
error.message.includes('Database not available'))) {
|
||||
// Mark database as unavailable
|
||||
this.dbAvailable = false;
|
||||
console.warn('[SocialFeedCache] Database connection issue detected, marking as unavailable');
|
||||
|
||||
// Add all operations back to the buffer
|
||||
this.writeBuffer = [...bufferCopy, ...this.writeBuffer];
|
||||
} else {
|
||||
// For other errors, add operations back to the buffer
|
||||
// but only if they're not already there (avoid duplicates)
|
||||
for (const op of bufferCopy) {
|
||||
if (!this.writeBuffer.some(item =>
|
||||
item.query === op.query &&
|
||||
JSON.stringify(item.params) === JSON.stringify(op.params)
|
||||
)) {
|
||||
// Add back to the beginning of the buffer to retry sooner
|
||||
this.writeBuffer.unshift(op);
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
this.processingTransaction = false;
|
||||
this.scheduleNextFlush();
|
||||
|
@ -3,6 +3,7 @@ import { SQLiteDatabase } from 'expo-sqlite';
|
||||
import { Workout, WorkoutExercise, WorkoutSet, WorkoutSummary } from '@/types/workout';
|
||||
import { generateId } from '@/utils/ids';
|
||||
import { DbService } from '../db-service';
|
||||
import { SocialFeedCache } from './SocialFeedCache';
|
||||
|
||||
export class WorkoutService {
|
||||
private db: DbService;
|
||||
@ -16,66 +17,74 @@ export class WorkoutService {
|
||||
*/
|
||||
async saveWorkout(workout: Workout): Promise<void> {
|
||||
try {
|
||||
await this.db.withTransactionAsync(async () => {
|
||||
// Check if workout exists (for update vs insert)
|
||||
const existingWorkout = await this.db.getFirstAsync<{ id: string }>(
|
||||
'SELECT id FROM workouts WHERE id = ?',
|
||||
[workout.id]
|
||||
);
|
||||
|
||||
const timestamp = Date.now();
|
||||
|
||||
if (existingWorkout) {
|
||||
// Update existing workout
|
||||
await this.db.runAsync(
|
||||
`UPDATE workouts SET
|
||||
title = ?, type = ?, start_time = ?, end_time = ?,
|
||||
is_completed = ?, updated_at = ?, template_id = ?,
|
||||
share_status = ?, notes = ?
|
||||
WHERE id = ?`,
|
||||
[
|
||||
workout.title,
|
||||
workout.type,
|
||||
workout.startTime,
|
||||
workout.endTime || null,
|
||||
workout.isCompleted ? 1 : 0,
|
||||
timestamp,
|
||||
workout.templateId || null,
|
||||
workout.shareStatus || 'local',
|
||||
workout.notes || null,
|
||||
workout.id
|
||||
]
|
||||
);
|
||||
|
||||
// Delete existing exercises and sets to recreate them
|
||||
await this.deleteWorkoutExercises(workout.id);
|
||||
} else {
|
||||
// Insert new workout
|
||||
await this.db.runAsync(
|
||||
`INSERT INTO workouts (
|
||||
id, title, type, start_time, end_time, is_completed,
|
||||
created_at, updated_at, template_id, source, share_status, notes
|
||||
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
|
||||
[
|
||||
workout.id,
|
||||
workout.title,
|
||||
workout.type,
|
||||
workout.startTime,
|
||||
workout.endTime || null,
|
||||
workout.isCompleted ? 1 : 0,
|
||||
timestamp,
|
||||
timestamp,
|
||||
workout.templateId || null,
|
||||
workout.availability?.source[0] || 'local',
|
||||
workout.shareStatus || 'local',
|
||||
workout.notes || null
|
||||
]
|
||||
);
|
||||
}
|
||||
|
||||
// Save exercises and sets
|
||||
if (workout.exercises?.length) {
|
||||
await this.saveWorkoutExercises(workout.id, workout.exercises);
|
||||
// Use the global transaction lock to prevent conflicts with other services
|
||||
await SocialFeedCache.executeWithLock(async () => {
|
||||
try {
|
||||
await this.db.withTransactionAsync(async () => {
|
||||
// Check if workout exists (for update vs insert)
|
||||
const existingWorkout = await this.db.getFirstAsync<{ id: string }>(
|
||||
'SELECT id FROM workouts WHERE id = ?',
|
||||
[workout.id]
|
||||
);
|
||||
|
||||
const timestamp = Date.now();
|
||||
|
||||
if (existingWorkout) {
|
||||
// Update existing workout
|
||||
await this.db.runAsync(
|
||||
`UPDATE workouts SET
|
||||
title = ?, type = ?, start_time = ?, end_time = ?,
|
||||
is_completed = ?, updated_at = ?, template_id = ?,
|
||||
share_status = ?, notes = ?
|
||||
WHERE id = ?`,
|
||||
[
|
||||
workout.title,
|
||||
workout.type,
|
||||
workout.startTime,
|
||||
workout.endTime || null,
|
||||
workout.isCompleted ? 1 : 0,
|
||||
timestamp,
|
||||
workout.templateId || null,
|
||||
workout.shareStatus || 'local',
|
||||
workout.notes || null,
|
||||
workout.id
|
||||
]
|
||||
);
|
||||
|
||||
// Delete existing exercises and sets to recreate them
|
||||
await this.deleteWorkoutExercises(workout.id);
|
||||
} else {
|
||||
// Insert new workout
|
||||
await this.db.runAsync(
|
||||
`INSERT INTO workouts (
|
||||
id, title, type, start_time, end_time, is_completed,
|
||||
created_at, updated_at, template_id, source, share_status, notes
|
||||
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
|
||||
[
|
||||
workout.id,
|
||||
workout.title,
|
||||
workout.type,
|
||||
workout.startTime,
|
||||
workout.endTime || null,
|
||||
workout.isCompleted ? 1 : 0,
|
||||
timestamp,
|
||||
timestamp,
|
||||
workout.templateId || null,
|
||||
workout.availability?.source[0] || 'local',
|
||||
workout.shareStatus || 'local',
|
||||
workout.notes || null
|
||||
]
|
||||
);
|
||||
}
|
||||
|
||||
// Save exercises and sets
|
||||
if (workout.exercises?.length) {
|
||||
await this.saveWorkoutExercises(workout.id, workout.exercises);
|
||||
}
|
||||
});
|
||||
} catch (error) {
|
||||
console.error('Error in workout transaction:', error);
|
||||
throw error; // Rethrow to ensure the transaction is marked as failed
|
||||
}
|
||||
});
|
||||
|
||||
@ -252,15 +261,23 @@ export class WorkoutService {
|
||||
*/
|
||||
async deleteWorkout(id: string): Promise<void> {
|
||||
try {
|
||||
await this.db.withTransactionAsync(async () => {
|
||||
// Delete exercises and sets first due to foreign key constraints
|
||||
await this.deleteWorkoutExercises(id);
|
||||
|
||||
// Delete the workout
|
||||
await this.db.runAsync(
|
||||
'DELETE FROM workouts WHERE id = ?',
|
||||
[id]
|
||||
);
|
||||
// Use the global transaction lock to prevent conflicts with other services
|
||||
await SocialFeedCache.executeWithLock(async () => {
|
||||
try {
|
||||
await this.db.withTransactionAsync(async () => {
|
||||
// Delete exercises and sets first due to foreign key constraints
|
||||
await this.deleteWorkoutExercises(id);
|
||||
|
||||
// Delete the workout
|
||||
await this.db.runAsync(
|
||||
'DELETE FROM workouts WHERE id = ?',
|
||||
[id]
|
||||
);
|
||||
});
|
||||
} catch (error) {
|
||||
console.error('Error in delete workout transaction:', error);
|
||||
throw error; // Rethrow to ensure the transaction is marked as failed
|
||||
}
|
||||
});
|
||||
} catch (error) {
|
||||
console.error('Error deleting workout:', error);
|
||||
@ -513,4 +530,4 @@ export class WorkoutService {
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user