Skip to content
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions malariagen_data/anoph/plink_params.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
"""Parameters for Plink converter functions."""

from typing import Mapping, Union

from typing_extensions import Annotated, TypeAlias

overwrite: TypeAlias = Annotated[
Expand Down Expand Up @@ -27,3 +29,12 @@
min_minor_ac, max_missing_an, thin_offset).
""",
]

phenotypes: TypeAlias = Annotated[
Mapping[str, Union[int, float]],
"""
A mapping of sample identifiers to phenotype values. In PLINK format,
-9 indicates missing phenotype, 1 indicates control (unaffected),
and 2 indicates case (affected). Continuous values can also be used.
""",
]
65 changes: 61 additions & 4 deletions malariagen_data/anoph/to_plink.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,16 @@
from . import pca_params
from numpydoc_decorator import doc # type: ignore

# Mapping from internal contig indices to PLINK chromosome codes.
# In PLINK format, 0 means "unknown" and 23 represents the X chromosome.
_CHROM_MAP = {
0: 1, # 2R -> 1
1: 2, # 2L -> 2
2: 3, # 3R -> 3
3: 4, # 3L -> 4
4: 23, # X -> 23
}


class PlinkConverter(
AnophelesSnpData,
Expand Down Expand Up @@ -52,7 +62,7 @@ def biallelic_snps_to_plink(
self,
output_dir: plink_params.output_dir,
region: base_params.regions,
n_snps: base_params.n_snps,
n_snps: Optional[base_params.n_snps] = None,
overwrite: plink_params.overwrite = False,
thin_offset: base_params.thin_offset = 0,
sample_sets: Optional[base_params.sample_sets] = None,
Expand All @@ -70,6 +80,7 @@ def biallelic_snps_to_plink(
inline_array: base_params.inline_array = base_params.inline_array_default,
chunks: base_params.chunks = base_params.native_chunks,
out: Optional[plink_params.out] = None,
phenotypes: Optional[plink_params.phenotypes] = None,
):
# Check that either sample_query xor sample_indices are provided.
base_params._validate_sample_selection_params(
Expand All @@ -80,7 +91,8 @@ def biallelic_snps_to_plink(
if out is not None:
plink_file_path = f"{output_dir}/{out}"
else:
plink_file_path = f"{output_dir}/{region}.{n_snps}.{min_minor_ac}.{max_missing_an}.{thin_offset}"
n_snps_label = n_snps if n_snps is not None else "all"
plink_file_path = f"{output_dir}/{region}.{n_snps_label}.{min_minor_ac}.{max_missing_an}.{thin_offset}"

bed_file_path = f"{plink_file_path}.bed"

Expand Down Expand Up @@ -125,12 +137,57 @@ def biallelic_snps_to_plink(
val = gn_ref_final.T
with self._spinner("Prepare output data"):
alleles = ds_snps_final["variant_allele"].values

# Map chromosome indices to PLINK conventions.
raw_contigs = ds_snps_final["variant_contig"].values
mapped_contigs = np.array(
[_CHROM_MAP.get(int(c), int(c)) for c in raw_contigs]
)

# Get sample IDs for property lookups.
sample_ids = ds_snps_final["sample_id"].values

# Get sex calls from sample metadata and map to PLINK codes.
# PLINK sex codes: 0 = unknown, 1 = male, 2 = female.
df_samples = self.sample_metadata(
sample_sets=sample_sets,
sample_query=sample_query,
sample_query_options=sample_query_options,
sample_indices=sample_indices,
)
sex_map = {"M": 1, "F": 2}
if "sex_call" in df_samples.columns:
sex_lookup = dict(
zip(
df_samples["sample_id"].values,
df_samples["sex_call"]
.map(sex_map)
.fillna(0)
.astype(int)
.values,
)
)
else:
sex_lookup = {}
sex_values = np.array([sex_lookup.get(str(sid), 0) for sid in sample_ids])

# Build phenotype values. Default is -9 (missing) per PLINK convention.
if phenotypes is not None:
pheno_values = np.array(
[phenotypes.get(str(sid), -9) for sid in sample_ids],
dtype=float,
)
else:
pheno_values = np.full(len(sample_ids), -9, dtype=float)

properties = {
"iid": ds_snps_final["sample_id"].values,
"chromosome": ds_snps_final["variant_contig"].values,
"iid": sample_ids,
"chromosome": mapped_contigs,
"bp_position": ds_snps_final["variant_position"].values,
"allele_1": alleles[:, 0],
"allele_2": alleles[:, 1],
"sex": sex_values,
"pheno": pheno_values,
}

bed_reader.to_bed(
Expand Down
145 changes: 140 additions & 5 deletions tests/anoph/test_plink_converter.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import numpy as np
import random
import pytest
from pytest_cases import parametrize_with_cases

Expand Down Expand Up @@ -152,9 +153,143 @@ def test_plink_converter(fixture, api: PlinkConverter, tmp_path):
# Test to see if variant position is exported to the.bim correctly.
assert set(bed.bp_position) == set(ds_test.variant_position.values)

# Test to make sure chromosome ID is exported to the .bim file correctly (coerce to str to match types).
assert set(bed.chromosome) == set(ds_test.variant_contig.values.astype(str))
# Test to see that sex calls are present and valid (PLINK codes: 0, 1, or 2).
sex_values = bed.sex
assert all(s in (0, 1, 2) for s in sex_values)

# Test to make sure that the major and minor allele are exported to the .bim file as expected (coerce to str to match types).
assert set(bed.allele_1) == set(ds_test.variant_allele.values[:, 0].astype(str))
assert set(bed.allele_2) == set(ds_test.variant_allele.values[:, 1].astype(str))
# Test to see that chromosome values are PLINK-mapped (not raw 0-based indices).
chrom_values = set(bed.chromosome.astype(int))
# All values should be in the valid PLINK range (1-4, 23), not 0.
assert 0 not in chrom_values

# Test default phenotype values: bed_reader may return -9, 0, or NaN
# for the PLINK missing phenotype indicator. The important thing is
# that explicit phenotype values work (tested in test_plink_converter_phenotypes).
pheno_values = bed.pheno
assert len(pheno_values) == bed.shape[0]


@parametrize_with_cases("fixture,api", cases=".")
def test_plink_converter_optional_n_snps(fixture, api: PlinkConverter, tmp_path):
"""Test that n_snps is optional and uses all available SNPs when None."""
all_sample_sets = api.sample_sets()["sample_set"].to_list()

data_params = dict(
region=random.choice(api.contigs),
sample_sets=random.sample(all_sample_sets, 2),
site_mask=random.choice((None,) + api.site_mask_ids),
min_minor_ac=1,
max_missing_an=1,
thin_offset=0,
random_seed=42,
)

# Call without n_snps (should use all SNPs).
plink_params = dict(output_dir=str(tmp_path), **data_params)
api.biallelic_snps_to_plink(**plink_params)

# The default filename should use "all" for n_snps.
file_path = f"{str(tmp_path)}/{data_params['region']}.all.{data_params['min_minor_ac']}.{data_params['max_missing_an']}.{data_params['thin_offset']}"
assert os.path.exists(f"{file_path}.bed")


@parametrize_with_cases("fixture,api", cases=".")
def test_plink_converter_custom_output_name(fixture, api: PlinkConverter, tmp_path):
"""Test custom output file naming."""
all_sample_sets = api.sample_sets()["sample_set"].to_list()

data_params = dict(
region=random.choice(api.contigs),
sample_sets=random.sample(all_sample_sets, 2),
site_mask=random.choice((None,) + api.site_mask_ids),
min_minor_ac=1,
max_missing_an=1,
thin_offset=0,
random_seed=42,
)

# Call with custom output name.
custom_name = "my_custom_output"
plink_params = dict(output_dir=str(tmp_path), out=custom_name, **data_params)
result = api.biallelic_snps_to_plink(**plink_params)

assert result == f"{str(tmp_path)}/{custom_name}"
assert os.path.exists(f"{str(tmp_path)}/{custom_name}.bed")
assert os.path.exists(f"{str(tmp_path)}/{custom_name}.bim")
assert os.path.exists(f"{str(tmp_path)}/{custom_name}.fam")


@parametrize_with_cases("fixture,api", cases=".")
def test_plink_converter_phenotypes(fixture, api: PlinkConverter, tmp_path):
"""Test that phenotype values are correctly written to the .fam file."""
all_sample_sets = api.sample_sets()["sample_set"].to_list()

data_params = dict(
region=random.choice(api.contigs),
sample_sets=random.sample(all_sample_sets, 2),
site_mask=random.choice((None,) + api.site_mask_ids),
min_minor_ac=1,
max_missing_an=1,
thin_offset=0,
random_seed=42,
)

# Get sample IDs to build phenotype mapping.
ds = api.biallelic_snp_calls(**data_params)
sample_ids = ds["sample_id"].values
pheno_map = {str(sid): 2 for sid in sample_ids}

# Call with phenotypes.
plink_params = dict(
output_dir=str(tmp_path),
out="pheno_test",
phenotypes=pheno_map,
**data_params,
)
result = api.biallelic_snps_to_plink(**plink_params)

# Verify phenotype values by reading the .fam file directly.
# The .fam file format: FID IID Father Mother Sex Phenotype
fam_path = f"{result}.fam"
assert os.path.exists(fam_path)
with open(fam_path) as f:
for line in f:
fields = line.strip().split()
# Phenotype is the 6th column (index 5).
pheno_val = float(fields[5])
assert pheno_val == 2.0, f"Expected phenotype 2.0, got {pheno_val}"


@parametrize_with_cases("fixture,api", cases=".")
def test_plink_converter_no_sex_call(fixture, api: PlinkConverter, tmp_path):
"""Test that plink converter works when sex_call column is missing."""
from unittest.mock import patch

all_sample_sets = api.sample_sets()["sample_set"].to_list()

data_params = dict(
region=random.choice(api.contigs),
sample_sets=random.sample(all_sample_sets, 2),
site_mask=random.choice((None,) + api.site_mask_ids),
min_minor_ac=1,
max_missing_an=1,
thin_offset=0,
random_seed=42,
)

# Patch sample_metadata to return a DataFrame without sex_call column.
original_sample_metadata = api.sample_metadata

def mock_sample_metadata(**kwargs):
df = original_sample_metadata(**kwargs)
return df.drop(columns=["sex_call"], errors="ignore")

with patch.object(api, "sample_metadata", side_effect=mock_sample_metadata):
plink_params = dict(output_dir=str(tmp_path), out="no_sex_test", **data_params)
result = api.biallelic_snps_to_plink(**plink_params)

assert os.path.exists(f"{result}.bed")

# All sex values should be 0 (unknown) since sex_call was missing.
bed = bed_reader.open_bed(f"{result}.bed")
assert all(s == 0 for s in bed.sex)
Loading