diff --git a/api/server.go b/api/server.go index 4bf58abb..474b11d2 100644 --- a/api/server.go +++ b/api/server.go @@ -147,6 +147,17 @@ func NewApiServer(config config.Config) *ApiServer { panic(err) } + // Caches the candidate user-id list returned by the /v1/users/:userId/ + // related collaborative-filter / genre-fallback query. The list is a + // recommendation, not authoritative state, so a 10-minute TTL is fine. + relatedUsersCache, err := otter.MustBuilder[string, []int32](20_000). + WithTTL(10 * time.Minute). + CollectStats(). + Build() + if err != nil { + panic(err) + } + privateKey, err := crypto.HexToECDSA(config.DelegatePrivateKey) if err != nil { panic(err) @@ -256,6 +267,7 @@ func NewApiServer(config config.Config) *ApiServer { apiAccessKeySignerCache: &apiAccessKeySignerCache, oauthTokenCache: &oauthTokenCache, qualifiedPlaylistsCache: &qualifiedPlaylistsCache, + relatedUsersCache: &relatedUsersCache, requestValidator: requestValidator, rewardAttester: rewardAttester, transactionSender: transactionSender, @@ -807,6 +819,7 @@ type ApiServer struct { apiAccessKeySignerCache *otter.Cache[string, apiAccessKeySignerEntry] oauthTokenCache *otter.Cache[string, oauthTokenCacheEntry] qualifiedPlaylistsCache *otter.Cache[string, []int32] + relatedUsersCache *otter.Cache[string, []int32] requestValidator *RequestValidator rewardManagerClient *reward_manager.RewardManagerClient claimableTokensClient *claimable_tokens.ClaimableTokensClient diff --git a/api/v1_users_related.go b/api/v1_users_related.go index 308f201a..0fce5e5d 100644 --- a/api/v1_users_related.go +++ b/api/v1_users_related.go @@ -1,6 +1,10 @@ package api import ( + "context" + "fmt" + + "api.audius.co/api/dbv1" "github.com/gofiber/fiber/v2" "github.com/jackc/pgx/v5" ) @@ -15,6 +19,12 @@ type GetUsersRelatedParams struct { Hybrid approach: - For artists with < 100 followers: genre-based recommendations (not enough follower data) - For artists with >= 100 followers: collaborative filtering with small genre boost + +The candidate user-id list is cached briefly per (userId, limit, offset) — and +also keyed on myId only when filter_followed is true. The list is a +recommendation surface, not authoritative state, so a 10-minute TTL is fine. +The follow-up GetUsers fetch (which carries the my-perspective fields like +`is_followed`, etc.) still runs fresh on every request. */ func (app *ApiServer) v1UsersRelated(c *fiber.Ctx) error { params := GetUsersRelatedParams{} @@ -22,22 +32,76 @@ func (app *ApiServer) v1UsersRelated(c *fiber.Ctx) error { return err } + myId := app.getMyId(c) + userId := app.getUserId(c) + var followerCount int64 err := app.pool.QueryRow( c.Context(), `SELECT follower_count FROM aggregate_user WHERE user_id = $1`, - app.getUserId(c), + userId, ).Scan(&followerCount) if err != nil { return err } lowFollowerCount := followerCount < 100 - var sql string + limit := params.Limit + if lowFollowerCount { + // Clamp results to 0-10 because results are not as + // good for low follower counts. + limit = min(params.Limit, max(0, 10-params.Offset)) + } + + candidateIds, err := app.getRelatedUserIds( + c.Context(), + userId, + myId, + params.FilterFollowed, + lowFollowerCount, + limit, + params.Offset, + ) + if err != nil { + return err + } + + users, err := app.queries.Users(c.Context(), dbv1.GetUsersParams{ + MyID: myId, + Ids: candidateIds, + }) + if err != nil { + return err + } - // Use different algorithms based on follower count + return v1UsersResponse(c, users) +} + +func (app *ApiServer) getRelatedUserIds( + ctx context.Context, + userId int32, + myId int32, + filterFollowed bool, + lowFollowerCount bool, + limit int, + offset int, +) ([]int32, error) { + // myId only affects the result when filter_followed is true (it's used + // to exclude users the caller already follows). Folding myId into the + // key only in that branch avoids splitting the cache by viewer when the + // recommendations are identical across viewers. + cacheMyId := int32(0) + if filterFollowed { + cacheMyId = myId + } + cacheKey := fmt.Sprintf("%d:%t:%d:%d:%d:%t", + userId, filterFollowed, cacheMyId, limit, offset, lowFollowerCount) + if hit, ok := app.relatedUsersCache.Get(cacheKey); ok { + return hit, nil + } + + var sql string if lowFollowerCount { - // Genre-based algorithm for smaller artists sql = ` WITH inp AS ( SELECT genre, @@ -49,9 +113,6 @@ func (app *ApiServer) v1UsersRelated(c *fiber.Ctx) error { AND t.is_unlisted IS false AND t.is_available IS true AND t.stem_of IS NULL - AND (t.access_authorities IS NULL - OR (COALESCE(@authed_wallet, '') <> '' - AND EXISTS (SELECT 1 FROM unnest(t.access_authorities) aa WHERE lower(aa) = lower(@authed_wallet)))) AND owner_id = @userId GROUP BY genre ORDER BY count(*) DESC @@ -82,15 +143,10 @@ func (app *ApiServer) v1UsersRelated(c *fiber.Ctx) error { OFFSET @offset; ` } else { - // simple collaborative filtering - // - get a sample of followers. as ids are random, this is a reasonable sample - // - for each follower, get the top 200 artists they follow - // - score candidates based on how many of our sample follow them with some genre boost - // - return the top n sql = ` WITH followers_sample AS MATERIALIZED ( SELECT follower_user_id - FROM follows + FROM follows WHERE followee_user_id = @userId ORDER BY follower_user_id DESC LIMIT 500 @@ -104,16 +160,13 @@ func (app *ApiServer) v1UsersRelated(c *fiber.Ctx) error { AND is_unlisted = false AND is_available = true AND stem_of IS NULL - AND (access_authorities IS NULL - OR (COALESCE(@authed_wallet, '') <> '' - AND EXISTS (SELECT 1 FROM unnest(access_authorities) aa WHERE lower(aa) = lower(@authed_wallet)))) AND genre IS NOT NULL GROUP BY genre ORDER BY COUNT(*) DESC LIMIT 3 ), candidate_users AS ( - SELECT + SELECT f.followee_user_id AS user_id, COUNT(*) AS shared_followers FROM followers_sample rf @@ -128,11 +181,11 @@ func (app *ApiServer) v1UsersRelated(c *fiber.Ctx) error { GROUP BY f.followee_user_id ), scored_candidates AS ( - SELECT + SELECT cu.user_id, cu.shared_followers, au.follower_count, - CASE + CASE WHEN au.dominant_genre IN (SELECT genre FROM top_genres) THEN 1 ELSE 0 END AS genre_match @@ -159,7 +212,7 @@ func (app *ApiServer) v1UsersRelated(c *fiber.Ctx) error { SELECT user_id FROM scored_candidates WHERE shared_followers >= 3 - ORDER BY + ORDER BY -- approx jaccard similarity with small genre boost (shared_followers::float / (500 + follower_count - shared_followers)) + (genre_match * 0.05) DESC LIMIT @limit @@ -167,21 +220,21 @@ func (app *ApiServer) v1UsersRelated(c *fiber.Ctx) error { ` } - var limit int - if lowFollowerCount { - // Clamp results to 0-10 because results are not as - // good for low follower counts - limit = min(params.Limit, max(0, 10-params.Offset)) - } else { - limit = params.Limit - } - - return app.queryUsers(c, sql, pgx.NamedArgs{ - "myId": app.getMyId(c), - "userId": app.getUserId(c), - "filterFollowed": params.FilterFollowed, + rows, err := app.pool.Query(ctx, sql, pgx.NamedArgs{ + "myId": myId, + "userId": userId, + "filterFollowed": filterFollowed, "limit": limit, - "offset": params.Offset, - "authed_wallet": app.tryGetAuthedWallet(c), + "offset": offset, }) + if err != nil { + return nil, err + } + ids, err := pgx.CollectRows(rows, pgx.RowTo[int32]) + if err != nil { + return nil, err + } + + app.relatedUsersCache.Set(cacheKey, ids) + return ids, nil } diff --git a/api/v1_users_related_test.go b/api/v1_users_related_test.go index 0f343eed..d6c1f47d 100644 --- a/api/v1_users_related_test.go +++ b/api/v1_users_related_test.go @@ -109,4 +109,14 @@ func TestV1UsersRelated(t *testing.T) { assert.Len(t, userResponse.Data, 1) assert.Equal(t, "someseller", userResponse.Data[0].Handle.String) } + + // Cache must not leak across cache-key dimensions: filter_followed=false + // (just queried above) and filter_followed=true return different result + // sets, and the second call must not return a stale unfiltered list. + { + var resp struct{ Data []dbv1.User } + status, _ := testGet(t, app, "/v1/users/7eP5n/related", &resp) + assert.Equal(t, 200, status) + assert.Len(t, resp.Data, 2, "filter_followed=false branch must not be served from filter_followed=true cache entry") + } }