Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,17 @@ func NewApiServer(config config.Config) *ApiServer {
panic(err)
}

// Caches the candidate user-id list returned by the /v1/users/:userId/
// related collaborative-filter / genre-fallback query. The list is a
// recommendation, not authoritative state, so a 10-minute TTL is fine.
relatedUsersCache, err := otter.MustBuilder[string, []int32](20_000).
WithTTL(10 * time.Minute).
CollectStats().
Build()
if err != nil {
panic(err)
}

privateKey, err := crypto.HexToECDSA(config.DelegatePrivateKey)
if err != nil {
panic(err)
Expand Down Expand Up @@ -256,6 +267,7 @@ func NewApiServer(config config.Config) *ApiServer {
apiAccessKeySignerCache: &apiAccessKeySignerCache,
oauthTokenCache: &oauthTokenCache,
qualifiedPlaylistsCache: &qualifiedPlaylistsCache,
relatedUsersCache: &relatedUsersCache,
requestValidator: requestValidator,
rewardAttester: rewardAttester,
transactionSender: transactionSender,
Expand Down Expand Up @@ -807,6 +819,7 @@ type ApiServer struct {
apiAccessKeySignerCache *otter.Cache[string, apiAccessKeySignerEntry]
oauthTokenCache *otter.Cache[string, oauthTokenCacheEntry]
qualifiedPlaylistsCache *otter.Cache[string, []int32]
relatedUsersCache *otter.Cache[string, []int32]
requestValidator *RequestValidator
rewardManagerClient *reward_manager.RewardManagerClient
claimableTokensClient *claimable_tokens.ClaimableTokensClient
Expand Down
123 changes: 88 additions & 35 deletions api/v1_users_related.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package api

import (
"context"
"fmt"

"api.audius.co/api/dbv1"
"github.com/gofiber/fiber/v2"
"github.com/jackc/pgx/v5"
)
Expand All @@ -15,29 +19,89 @@ type GetUsersRelatedParams struct {
Hybrid approach:
- For artists with < 100 followers: genre-based recommendations (not enough follower data)
- For artists with >= 100 followers: collaborative filtering with small genre boost

The candidate user-id list is cached briefly per (userId, limit, offset) — and
also keyed on myId only when filter_followed is true. The list is a
recommendation surface, not authoritative state, so a 10-minute TTL is fine.
The follow-up GetUsers fetch (which carries the my-perspective fields like
`is_followed`, etc.) still runs fresh on every request.
*/
func (app *ApiServer) v1UsersRelated(c *fiber.Ctx) error {
params := GetUsersRelatedParams{}
if err := app.ParseAndValidateQueryParams(c, &params); err != nil {
return err
}

myId := app.getMyId(c)
userId := app.getUserId(c)

var followerCount int64
err := app.pool.QueryRow(
c.Context(),
`SELECT follower_count FROM aggregate_user WHERE user_id = $1`,
app.getUserId(c),
userId,
).Scan(&followerCount)
if err != nil {
return err
}
lowFollowerCount := followerCount < 100

var sql string
limit := params.Limit
if lowFollowerCount {
// Clamp results to 0-10 because results are not as
// good for low follower counts.
limit = min(params.Limit, max(0, 10-params.Offset))
}

candidateIds, err := app.getRelatedUserIds(
c.Context(),
userId,
myId,
params.FilterFollowed,
lowFollowerCount,
limit,
params.Offset,
)
if err != nil {
return err
}

users, err := app.queries.Users(c.Context(), dbv1.GetUsersParams{
MyID: myId,
Ids: candidateIds,
})
if err != nil {
return err
}

// Use different algorithms based on follower count
return v1UsersResponse(c, users)
}

func (app *ApiServer) getRelatedUserIds(
ctx context.Context,
userId int32,
myId int32,
filterFollowed bool,
lowFollowerCount bool,
limit int,
offset int,
) ([]int32, error) {
// myId only affects the result when filter_followed is true (it's used
// to exclude users the caller already follows). Folding myId into the
// key only in that branch avoids splitting the cache by viewer when the
// recommendations are identical across viewers.
cacheMyId := int32(0)
if filterFollowed {
cacheMyId = myId
}
cacheKey := fmt.Sprintf("%d:%t:%d:%d:%d:%t",
userId, filterFollowed, cacheMyId, limit, offset, lowFollowerCount)
if hit, ok := app.relatedUsersCache.Get(cacheKey); ok {
return hit, nil
}

var sql string
if lowFollowerCount {
// Genre-based algorithm for smaller artists
sql = `
WITH inp AS (
SELECT genre,
Expand All @@ -49,9 +113,6 @@ func (app *ApiServer) v1UsersRelated(c *fiber.Ctx) error {
AND t.is_unlisted IS false
AND t.is_available IS true
AND t.stem_of IS NULL
AND (t.access_authorities IS NULL
OR (COALESCE(@authed_wallet, '') <> ''
AND EXISTS (SELECT 1 FROM unnest(t.access_authorities) aa WHERE lower(aa) = lower(@authed_wallet))))
AND owner_id = @userId
GROUP BY genre
ORDER BY count(*) DESC
Expand Down Expand Up @@ -82,15 +143,10 @@ func (app *ApiServer) v1UsersRelated(c *fiber.Ctx) error {
OFFSET @offset;
`
} else {
// simple collaborative filtering
// - get a sample of followers. as ids are random, this is a reasonable sample
// - for each follower, get the top 200 artists they follow
// - score candidates based on how many of our sample follow them with some genre boost
// - return the top n
sql = `
WITH followers_sample AS MATERIALIZED (
SELECT follower_user_id
FROM follows
FROM follows
WHERE followee_user_id = @userId
ORDER BY follower_user_id DESC
LIMIT 500
Expand All @@ -104,16 +160,13 @@ func (app *ApiServer) v1UsersRelated(c *fiber.Ctx) error {
AND is_unlisted = false
AND is_available = true
AND stem_of IS NULL
AND (access_authorities IS NULL
OR (COALESCE(@authed_wallet, '') <> ''
AND EXISTS (SELECT 1 FROM unnest(access_authorities) aa WHERE lower(aa) = lower(@authed_wallet))))
AND genre IS NOT NULL
GROUP BY genre
ORDER BY COUNT(*) DESC
LIMIT 3
),
candidate_users AS (
SELECT
SELECT
f.followee_user_id AS user_id,
COUNT(*) AS shared_followers
FROM followers_sample rf
Expand All @@ -128,11 +181,11 @@ func (app *ApiServer) v1UsersRelated(c *fiber.Ctx) error {
GROUP BY f.followee_user_id
),
scored_candidates AS (
SELECT
SELECT
cu.user_id,
cu.shared_followers,
au.follower_count,
CASE
CASE
WHEN au.dominant_genre IN (SELECT genre FROM top_genres) THEN 1
ELSE 0
END AS genre_match
Expand All @@ -159,29 +212,29 @@ func (app *ApiServer) v1UsersRelated(c *fiber.Ctx) error {
SELECT user_id
FROM scored_candidates
WHERE shared_followers >= 3
ORDER BY
ORDER BY
-- approx jaccard similarity with small genre boost
(shared_followers::float / (500 + follower_count - shared_followers)) + (genre_match * 0.05) DESC
LIMIT @limit
OFFSET @offset;
`
}

var limit int
if lowFollowerCount {
// Clamp results to 0-10 because results are not as
// good for low follower counts
limit = min(params.Limit, max(0, 10-params.Offset))
} else {
limit = params.Limit
}

return app.queryUsers(c, sql, pgx.NamedArgs{
"myId": app.getMyId(c),
"userId": app.getUserId(c),
"filterFollowed": params.FilterFollowed,
rows, err := app.pool.Query(ctx, sql, pgx.NamedArgs{
"myId": myId,
"userId": userId,
"filterFollowed": filterFollowed,
"limit": limit,
"offset": params.Offset,
"authed_wallet": app.tryGetAuthedWallet(c),
"offset": offset,
})
if err != nil {
return nil, err
}
ids, err := pgx.CollectRows(rows, pgx.RowTo[int32])
if err != nil {
return nil, err
}

app.relatedUsersCache.Set(cacheKey, ids)
return ids, nil
}
10 changes: 10 additions & 0 deletions api/v1_users_related_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,4 +109,14 @@ func TestV1UsersRelated(t *testing.T) {
assert.Len(t, userResponse.Data, 1)
assert.Equal(t, "someseller", userResponse.Data[0].Handle.String)
}

// Cache must not leak across cache-key dimensions: filter_followed=false
// (just queried above) and filter_followed=true return different result
// sets, and the second call must not return a stale unfiltered list.
{
var resp struct{ Data []dbv1.User }
status, _ := testGet(t, app, "/v1/users/7eP5n/related", &resp)
assert.Equal(t, 200, status)
assert.Len(t, resp.Data, 2, "filter_followed=false branch must not be served from filter_followed=true cache entry")
}
}
Loading