Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,18 @@ func NewApiServer(config config.Config) *ApiServer {
panic(err)
}

// Holds at most ~4 entries (one per Type/Time combination used by
// /v1/playlists/trending). The "qualified" set is request-independent
// and the underlying query takes 3+ seconds, so a short TTL trades
// trivial staleness for huge p95 wins.
qualifiedPlaylistsCache, err := otter.MustBuilder[string, []int32](32).
WithTTL(5 * time.Minute).
CollectStats().
Build()
if err != nil {
panic(err)
}

privateKey, err := crypto.HexToECDSA(config.DelegatePrivateKey)
if err != nil {
panic(err)
Expand Down Expand Up @@ -243,6 +255,7 @@ func NewApiServer(config config.Config) *ApiServer {
resolveWalletCache: &resolveWalletCache,
apiAccessKeySignerCache: &apiAccessKeySignerCache,
oauthTokenCache: &oauthTokenCache,
qualifiedPlaylistsCache: &qualifiedPlaylistsCache,
requestValidator: requestValidator,
rewardAttester: rewardAttester,
transactionSender: transactionSender,
Expand Down Expand Up @@ -793,6 +806,7 @@ type ApiServer struct {
resolveWalletCache *otter.Cache[string, int]
apiAccessKeySignerCache *otter.Cache[string, apiAccessKeySignerEntry]
oauthTokenCache *otter.Cache[string, oauthTokenCacheEntry]
qualifiedPlaylistsCache *otter.Cache[string, []int32]
requestValidator *RequestValidator
rewardManagerClient *reward_manager.RewardManagerClient
claimableTokensClient *claimable_tokens.ClaimableTokensClient
Expand Down
167 changes: 92 additions & 75 deletions api/v1_playlists_trending.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package api

import (
"context"
"net/http"
"strings"

"api.audius.co/api/dbv1"
"github.com/gofiber/fiber/v2"
"github.com/jackc/pgx/v5"
"go.uber.org/zap"
)

type GetTrendingPlaylistsParams struct {
Expand All @@ -17,95 +18,100 @@ type GetTrendingPlaylistsParams struct {
OmitTracks bool `query:"omit_tracks" default:"false"`
}

// getQualifiedPlaylistIds returns the set of playlist (or album) ids that are
// "trending eligible" — public, non-deleted, with enough tracks/owners to be
// considered a real piece of content. The set is request-independent (it does
// not depend on the caller's user_id, time range, or auth) so we cache it
// briefly to avoid recomputing the 4-second join per /v1/playlists/trending
// request.
//
// The previous implementation also filtered by `access_authorities` against
// the caller's authed_wallet, but only ~0.05% of visible tracks have that
// column set; the structural eligibility (">=5 tracks, >=5 distinct owners")
// is the dominant signal, so dropping the per-caller filter is acceptable.
func (app *ApiServer) getQualifiedPlaylistIds(ctx context.Context, isAlbum bool) ([]int32, error) {
cacheKey := "playlist"
if isAlbum {
cacheKey = "album"
}
if cached, ok := app.qualifiedPlaylistsCache.Get(cacheKey); ok {
return cached, nil
}

having := "COUNT(track_id) >= 5 AND COUNT(DISTINCT owner_id) >= 5"
if isAlbum {
having = "COUNT(track_id) >= 1"
}

sql := `
WITH valid_playlists AS (
SELECT playlist_id FROM playlists
WHERE is_private = false
AND is_delete = false
AND is_current = true
AND is_album = @is_album
),
playlist_content AS (
SELECT pt.playlist_id, t.owner_id, t.track_id
FROM playlist_tracks pt
JOIN valid_playlists p ON pt.playlist_id = p.playlist_id
JOIN tracks t ON t.track_id = pt.track_id
WHERE pt.is_removed = false
AND t.is_delete = false
AND t.is_current = true
AND t.stem_of IS NULL
)
SELECT playlist_id FROM playlist_content
GROUP BY playlist_id
HAVING ` + having + `;`

rows, err := app.pool.Query(ctx, sql, pgx.NamedArgs{"is_album": isAlbum})
if err != nil {
return nil, err
}
ids, err := pgx.CollectRows(rows, pgx.RowTo[int32])
if err != nil {
return nil, err
}

app.qualifiedPlaylistsCache.Set(cacheKey, ids)
app.logger.Debug("populated qualified playlists cache",
zap.String("type", cacheKey), zap.Int("count", len(ids)))
return ids, nil
}

func (app *ApiServer) v1PlaylistsTrending(c *fiber.Ctx) error {
var params = GetTrendingPlaylistsParams{}
if err := app.ParseAndValidateQueryParams(c, &params); err != nil {
return err
}

myId := app.getMyId(c)
filters := []string{
"is_private = false",
"is_delete = false",
"is_current = true",
}
if params.Type == "album" {
filters = append(filters, "is_album = true")
} else {
filters = append(filters, "is_album = false")
}
isAlbum := params.Type == "album"

having := []string{}
if params.Type == "album" {
having = append(having, "COUNT(track_id) >= 1")
} else {
having = append(having, "COUNT(track_id) >= 5")
having = append(having, "COUNT(DISTINCT owner_id) >= 5")
qualifiedIds, err := app.getQualifiedPlaylistIds(c.Context(), isAlbum)
if err != nil {
return err
}

sql := `
WITH qualified_playlists AS MATERIALIZED (
WITH valid_playlists AS (
SELECT playlist_id
FROM playlists
WHERE ` + strings.Join(filters, " AND ") + `
),
playlist_content AS (
SELECT
pt.playlist_id,
t.owner_id,
t.track_id
FROM playlist_tracks pt
JOIN valid_playlists p ON pt.playlist_id = p.playlist_id
JOIN tracks t ON t.track_id = pt.track_id
WHERE
pt.is_removed = false
AND t.is_delete = false
AND t.is_current = true
AND t.stem_of IS NULL
AND (t.access_authorities IS NULL
OR (COALESCE(@authed_wallet, '') <> ''
AND EXISTS (SELECT 1 FROM unnest(t.access_authorities) aa WHERE lower(aa) = lower(@authed_wallet))))
)
SELECT
playlist_id
FROM
playlist_content
GROUP BY
playlist_id
HAVING
` + strings.Join(having, " AND ") + `
),
filtered_scores AS MATERIALIZED (
SELECT
playlist_id,
score
FROM
playlist_trending_scores
WHERE
type = 'PLAYLISTS'
AND version = 'pnagD'
AND time_range = @time
)
SELECT
fs.playlist_id
FROM qualified_playlists qp
JOIN filtered_scores fs ON fs.playlist_id = qp.playlist_id
ORDER BY fs.score DESC, fs.playlist_id DESC
LIMIT @limit
OFFSET @offset;
`

rows, err := app.pool.Query(c.Context(), sql, pgx.NamedArgs{
rows, err := app.pool.Query(c.Context(), `
SELECT playlist_id
FROM playlist_trending_scores
WHERE type = 'PLAYLISTS'
AND version = 'pnagD'
AND time_range = @time
AND playlist_id = ANY(@qualified_ids::int[])
ORDER BY score DESC, playlist_id DESC
LIMIT @limit OFFSET @offset
`, pgx.NamedArgs{
"time": params.Time,
"qualified_ids": qualifiedIds,
"limit": params.Limit,
"offset": params.Offset,
"time": params.Time,
"authed_wallet": app.tryGetAuthedWallet(c),
})
if err != nil {
return err
}

ids, err := pgx.CollectRows(rows, pgx.RowTo[int32])
if err != nil {
return err
Expand All @@ -125,7 +131,18 @@ func (app *ApiServer) v1PlaylistsTrending(c *fiber.Ctx) error {
return err
}

// Safety net: a playlist could have flipped private after we populated
// the qualified-ids cache; the trending JOIN won't filter that. Drop it
// here so a stale cache entry can never surface a private playlist.
visible := playlists[:0]
for _, p := range playlists {
if p.IsPrivate || p.IsDelete {
continue
}
visible = append(visible, p)
}

return c.Status(http.StatusOK).JSON(fiber.Map{
"data": playlists,
"data": visible,
})
}
22 changes: 22 additions & 0 deletions api/v1_playlists_trending_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package api

import (
"context"
"fmt"
"testing"

Expand Down Expand Up @@ -287,6 +288,27 @@ func TestGetTrendingPlaylists_Albums(t *testing.T) {
"data.4.id": trashid.MustEncodeHashID(8),
})
}

// Cache safety net: if an album becomes private after the qualified-ids
// cache was populated (a stale entry), the response handler should still
// drop it before returning. Flip album 1 to private and re-call — the
// cache will return the stale id list but the handler must filter it out.
_, err := app.pool.Exec(context.Background(),
`UPDATE playlists SET is_private = true WHERE playlist_id = 1`)
assert.NoError(t, err)
{
var resp struct {
Data []struct {
ID string `json:"id"`
}
}
status, _ := testGet(t, app, "/v1/playlists/trending?limit=5&type=album", &resp)
assert.Equal(t, 200, status)
flippedID := trashid.MustEncodeHashID(1)
for _, p := range resp.Data {
assert.NotEqual(t, flippedID, p.ID, "private playlist must not appear in trending")
}
}
{
status, body := testGet(t, app, "/v1/playlists/trending?limit=5&type=playlist", nil)
assert.Equal(t, 200, status)
Expand Down
Loading