Source code for af_analysis.analysis

import numpy as np
import pandas as pd
from tqdm.auto import tqdm
import json
import os
import logging
import itertools
import pickle

import pdb_cpp
from pdb_cpp.geom import distance_matrix

# Logging
logger = logging.getLogger(__name__)

AA_RESNAMES = [
    "ALA",
    "ARG",
    "ASN",
    "ASP",
    "CYS",
    "GLU",
    "GLN",
    "GLY",
    "HIS",
    "ILE",
    "LEU",
    "LYS",
    "MET",
    "PHE",
    "PRO",
    "SER",
    "THR",
    "TRP",
    "TYR",
    "VAL",
]
NON_CANONICAL_RESNAMES = ['PTR']
DNA_RESNAMES = ["DA", "DC", "DG", "DT", "A", "T", "G", "C", "U"]

PROTEIN_SEL = "resname " + " ".join(AA_RESNAMES + NON_CANONICAL_RESNAMES)
NA_SEL = "resname " + " ".join(DNA_RESNAMES)
TOKEN_SEL = f"({PROTEIN_SEL} and name CA) or ({NA_SEL} and name P) or ions or (not {PROTEIN_SEL} and not {NA_SEL} and noh)"
TOKEN_SEL_CB = f"name CB C3' or (resname GLY and name CA) or (not {PROTEIN_SEL} and not {NA_SEL} and noh)"
TOKEN_SEL_PDOCKQ = f"({PROTEIN_SEL} and name CB) or (resname GLY and name CA) or ({NA_SEL} and name P)"
TOKEN_SEL_IPLDDT = f"({PROTEIN_SEL} and name CB) or (resname GLY and name CA) or ({NA_SEL} and name P)  or ions or (not {PROTEIN_SEL} and not {NA_SEL} and noh)"

# Autorship information
__author__ = "Alaa Reguei"
__copyright__ = "Copyright 2023, RPBS"
__credits__ = ["Samuel Murail", "Alaa Reguei"]
__license__ = "GNU General Public License version 2"
__version__ = "0.2.1"
__maintainer__ = "Samuel Murail"
__email__ = "samuel.murail@u-paris.fr"
__status__ = "Beta"


[docs] def get_pae(data_file): """Get the PAE matrix from a json/npz file. Parameters ---------- data_file : str Path to the json/npz file. Returns ------- np.array PAE matrix. """ if data_file is None or data_file == "" or pd.isna(data_file): return None if data_file.endswith(".json"): return extract_pae_json(data_file) elif data_file.endswith(".npz"): return extract_pae_npz(data_file) elif data_file.endswith(".npy"): return extract_pae_npy(data_file) elif data_file.endswith(".pkl"): return extract_pae_pkl(data_file) else: raise ValueError("Unknown file format.")
[docs] def extract_pae_json(json_file): """Get the PAE matrix from a json file. Parameters ---------- json_file : str Path to the json file. Returns ------- np.array PAE matrix. """ with open(json_file) as f: local_json = json.load(f) if "pae" in local_json: pae_array = np.array(local_json["pae"]) elif "predicted_aligned_error" in local_json[0]: pae_array = np.array(local_json[0]["predicted_aligned_error"]) else: raise ValueError("No PAE found in the json file.") return pae_array
[docs] def extract_pae_npz(npz_file): """Get the PAE matrix from a npz file. Parameters ---------- npz_file : str Path to the npz file. Returns ------- np.array PAE matrix. """ data_npz = np.load(npz_file) pae_array = data_npz["pae"] return pae_array
[docs] def extract_pae_npy(npy_file): """Get the PAE matrix from a npy file. Parameters ---------- npy_file : str Path to the npy file. Returns ------- np.array PAE matrix. """ pae_array = np.load(npy_file) return pae_array
[docs] def extract_pae_pkl(pkl_file): """Get the PAE matrix from a pkl file. Parameters ---------- pkl_file : str Path to the pkl file. Returns ------- np.array PAE matrix. """ try: data_pkl = np.load(pkl_file, allow_pickle=True) except pickle.UnpicklingError as e: logger.error(f"Error loading PAE from {pkl_file}: {e}") return None pae_array = data_pkl["predicted_aligned_error"] return pae_array
[docs] def extract_fields_file(data_file, fields): """Get the PAE matrix from a json/pickle file. Parameters ---------- file : str Path to the json file. fields : list List of fields to extract. Returns ------- value """ if data_file is None: return None if data_file.endswith(".json"): with open(data_file) as f: local_data = json.load(f) elif data_file.endswith(".npz"): local_data = np.load(data_file) values = [] for field in fields: if field in local_data: values.append(local_data[field]) else: raise ValueError(f"No field {field} found in the json/npz file.") return values
def _flatten_1d(array_like): array = np.asarray(array_like) if array.ndim > 1: array = array.reshape(-1) return array def _unique_preserve_order(values): return list(dict.fromkeys(values)) def _scale_plddt_if_needed(values): return np.asarray(values, dtype=float) def _infer_chain_lengths(coor): model = coor.models[0] chain_arr = np.asarray(model.chain_str) chains = _unique_preserve_order(chain_arr.tolist()) uniq_resid = _flatten_1d(model.uniq_resid) chain_lengths = {} for chain in chains: chain_mask = chain_arr == chain chain_lengths[chain] = len(np.unique(uniq_resid[chain_mask])) return chain_lengths
[docs] def compute_pdockQ( coor, rec_chains=None, lig_chains=None, cutoff=8.0, L=0.724, x0=152.611, k=0.052, b=0.018, ): coor_cb = coor.select_atoms(TOKEN_SEL_PDOCKQ) chain_lengths = _infer_chain_lengths(coor_cb) if lig_chains is None: lig_chains = [min(chain_lengths.items(), key=lambda x: x[1])[0]] if isinstance(lig_chains, str): lig_chains = [lig_chains] assert len(lig_chains) >= 1, "At least one ligand chain is allowed" logger.info("Model ligand chain : %s", " ".join(lig_chains)) if rec_chains is None: rec_chains = [chain for chain in chain_lengths if chain not in lig_chains] if isinstance(rec_chains, str): rec_chains = [rec_chains] assert len(rec_chains) >= 1, "At least one receptor chain is allowed" logger.info("Model receptor chain : %s", " ".join(rec_chains)) pdockq_list = [] for frame_index in range(coor_cb.model_num): model = coor_cb.models[frame_index] chain_arr = np.asarray(model.chain_str) rec_mask = np.isin(chain_arr, rec_chains) lig_mask = np.isin(chain_arr, lig_chains) if not np.any(rec_mask) or not np.any(lig_mask): pdockq_list.append(0.0) continue rec_xyz = model.xyz[rec_mask] lig_xyz = model.xyz[lig_mask] dist_mat = distance_matrix(rec_xyz, lig_xyz) contact_idx = np.where(dist_mat < cutoff) contact_num = len(contact_idx[0]) if contact_num == 0: pdockq_list.append(0.0) continue rec_contact_idx = np.unique(contact_idx[0]) lig_contact_idx = np.unique(contact_idx[1]) rec_beta = np.asarray(model.beta)[rec_mask][rec_contact_idx] lig_beta = np.asarray(model.beta)[lig_mask][lig_contact_idx] rec_beta = _scale_plddt_if_needed(rec_beta) lig_beta = _scale_plddt_if_needed(lig_beta) avg_plddt = rec_beta.size * np.average(rec_beta) avg_plddt += lig_beta.size * np.average(lig_beta) avg_plddt /= rec_beta.size + lig_beta.size x = avg_plddt * np.log10(contact_num) pdockq = L / (1 + np.exp(-k * (x - x0))) + b pdockq_list.append(pdockq) return pdockq_list
[docs] def compute_pdockQ2( coor, pae_array, cutoff=8.0, L=1.31034849e00, x0=8.47326239e01, k=7.47157696e-02, b=5.01886443e-03, d0=10.0, sel=TOKEN_SEL, ): models_CA = coor.select_atoms(sel) models_chains = np.unique(np.asarray(models_CA.chain_str)) if pae_array.shape != (models_CA.len, models_CA.len): logger.warning( f"PAE array shape {pae_array.shape} mismatch with CA atoms number {models_CA.len}" ) # print(f"PAE array shape {pae_array.shape} CA atoms number {models_CA.len}") pdockq2_list = [[] for _ in models_chains] for frame_index in range(models_CA.model_num): model = models_CA.models[frame_index] # print("model chain_str:", model.chain_str) chain_arr = np.asarray(model.chain_str) beta_arr = _scale_plddt_if_needed(np.asarray(model.beta)) xyz = model.xyz for i, chain in enumerate(models_chains): chain_idx = np.where(chain_arr == chain)[0] # print("chain_idx:", chain_idx) other_idx = np.where(chain_arr != chain)[0] # print("other_idx:", other_idx) if len(chain_idx) == 0 or len(other_idx) == 0: pdockq2_list[i].append(0.0) logger.info( "No interface residues found for pdockq2 calculation, 0 value return." ) continue dist_mat = distance_matrix(xyz[chain_idx], xyz[other_idx]) indexes = np.where(dist_mat < cutoff) if len(indexes[0]) == 0: pdockq2_list[i].append(0.0) continue x_indexes = chain_idx[indexes[0]] y_indexes = other_idx[indexes[1]] # print("x_indexes:", x_indexes) # print("y_indexes:", y_indexes) pae_sel = pae_array[x_indexes, y_indexes] norm_if_interpae = np.mean(1 / (1 + (pae_sel / d0) ** 2)) plddt_avg = np.mean(beta_arr[x_indexes]) x_val = norm_if_interpae * plddt_avg y_val = L / (1 + np.exp(-k * (x_val - x0))) + b pdockq2_list[i].append(y_val) return pdockq2_list
[docs] def pdockq(data): r"""Compute the pDockq [1]_ from the pdb file. .. math:: pDockQ = \frac{L}{1 + e^{-k (x-x_{0})}} + b where: .. math:: x = \overline{plDDT_{interface}} \cdot log(number \: of \: interface \: contacts) :math:`L = 0.724` is the maximum value of the sigmoid, :math:`k = 0.052` is the slope of the sigmoid, :math:`x_{0} = 152.611` is the midpoint of the sigmoid, and :math:`b = 0.018` is the y-intercept of the sigmoid. Implementation was inspired from https://gitlab.com/ElofssonLab/FoldDock/-/blob/main/src/pdockq.py Parameters ---------- data : AFData object containing the data Returns ------- None The `log_pd` dataframe is modified in place. References ---------- .. [1] Bryant P, Pozzati G and Elofsson A. Improved prediction of protein-protein interactions using AlphaFold2. *Nature Communications*. vol. 13, 1265 (2022) https://www.nature.com/articles/s41467-022-28865-w """ pdockq_list = [] disable = not getattr(data, "verbose", True) for pdb in tqdm( data.df["pdb"], total=len(data.df["pdb"]), disable=disable, desc="pdockq" ): if pdb is None or pdb is np.nan: pdockq_list.append(None) continue model = pdb_cpp.Coor(pdb) pdockq_list += compute_pdockQ(model) data.df["pdockq"] = pdockq_list
[docs] def mpdockq(data): r"""Compute the mpDockq [2]_ from the pdb file. .. math:: pDockQ = \frac{L}{1 + e^{-k (x-x_{0})}} + b where: .. math:: x = \overline{plDDT_{interface}} \cdot log(number \: of \: interface \: contacts) :math:`L = 0.728`, :math:`x0 = 309.375`, :math:`k = 0.098` and :math:`b = 0.262`. Implementation was inspired from https://gitlab.com/ElofssonLab/FoldDock/-/blob/main/src/pdockq.py Parameters ---------- data : AFData object containing the data Returns ------- None The `log_pd` dataframe is modified in place. References ---------- .. [2] Bryant P, Pozzati G, Zhu W, Shenoy A, Kundrotas P & Elofsson A. Predicting the structure of large protein complexes using AlphaFold and Monte Carlo tree search. *Nature Communications*. vol. 13, 6028 (2022) https://www.nature.com/articles/s41467-022-33729-4 """ pdockq_list = [] disable = not getattr(data, "verbose", True) for pdb in tqdm( data.df["pdb"], total=len(data.df["pdb"]), disable=disable, desc="mpdockq" ): if pdb is None or pdb is np.nan: pdockq_list.append(None) continue model = pdb_cpp.Coor(pdb) pdockq_list += compute_pdockQ( model, cutoff=8.0, L=0.728, x0=309.375, k=0.098, b=0.262 ) data.df.loc[:, "mpdockq"] = pdockq_list
[docs] def pdockq2(data): r""" Compute pdockq2 from the pdb file [3]_. .. math:: pDockQ_2 = \frac{L}{1 + exp [-k*(X_i-X_0)]} + b with .. math:: X_i = \langle \frac{1}{1+(\frac{PAE_{int}}{d_0})^2} \rangle * \langle pLDDT \rangle_{int} References: .. [3] : https://academic.oup.com/bioinformatics/article/39/7/btad424/7219714 """ pdockq_list = [] max_chain_num = 0 max_chain_val = [] for query in data.chains: chain_num = len(data.chains[query]) if chain_num > max_chain_num: max_chain_num = chain_num max_chain_val = data.chains[query] for i in range(max_chain_num): pdockq_list.append([]) disable = not getattr(data, "verbose", True) if "data_file" not in data.df.columns: raise ValueError( 'No "data_file" column found in the dataframe. pae scores are required to compute pdockq2.' ) for pdb, data_path in tqdm( zip(data.df["pdb"], data.df["data_file"]), total=len(data.df["pdb"]), disable=disable, desc="pdockq2", ): if ( pdb is not None and pdb is not np.nan and data_path is not None and data_path is not np.nan ): model = pdb_cpp.Coor(pdb) # with open(json_path) as f: # local_json = json.load(f) # pae_array = np.array(local_json["pae"]) pae_array = get_pae(data_path) pdockq2 = compute_pdockQ2(model, pae_array) for i in range(max_chain_num): if i < len(pdockq2): pdockq_list[i].append(pdockq2[i][0]) else: pdockq_list[i].append(None) else: for i in range(max_chain_num): pdockq_list[i].append(None) # print(pdockq_list) for i in range(max_chain_num): data.df.loc[:, f"pdockq2_{max_chain_val[i]}"] = pdockq_list[i]
[docs] def inter_chain_pae(data, fun=np.mean): """Read the PAE matrix and extract the average inter chain PAE. Parameters ---------- data : AFData object containing the data fun : function function to apply to the PAE scores Returns ------- None """ pae_list = [] disable = not getattr(data, "verbose", True) if "data_file" not in data.df.columns: raise ValueError( "No 'data_file' column found in the dataframe. pae scores are required to compute pdockq2." ) for query, data_path in tqdm( zip(data.df["query"], data.df["data_file"]), total=len(data.df["data_file"]), disable=disable, desc="inter_chain_pae", ): if data_path is not None and data_path is not np.nan: pae_array = get_pae(data_path) chain_lens = data.chain_length[query] chain_len_sums = np.cumsum([0] + chain_lens) chain_ids = data.chains[query] pae_dict = {} for i in range(len(chain_lens)): for j in range(len(chain_lens)): pae_val = fun( pae_array[ chain_len_sums[i] : chain_len_sums[i + 1], chain_len_sums[j] : chain_len_sums[j + 1], ] ) pae_dict[f"PAE_{chain_ids[i]}_{chain_ids[j]}"] = pae_val pae_list.append(pae_dict) else: pae_list.append({}) pae_df = pd.DataFrame(pae_list) for col in pae_df.columns: data.df.loc[:, col] = pae_df.loc[:, col].to_numpy()
[docs] def compute_LIS_matrix( pae_array, chain_length, pae_cutoff=12.0, ): r"""Compute the LIS score as define in [1]_. Implementation was inspired from implementation in https://github.com/flyark/AFM-LIS Parameters ---------- pae_array : np.array array of predicted PAE chain_length : list list of chain lengths pae_cutoff : float cutoff for native contacts, default is 8.0 A Returns ------- list LIS scores References ---------- .. [1] Kim AR, Hu Y, Comjean A, Rodiger J, Mohr SE, Perrimon N. "Enhanced Protein-Protein Interaction Discovery via AlphaFold-Multimer" bioRxiv (2024). https://www.biorxiv.org/content/10.1101/2024.02.19.580970v1 """ if pae_array is None: return None chain_len_sums = np.cumsum([0] + chain_length) # Use list instead of array, because # df[column].iloc[:] = LIS_list does not work with numpy array LIS_list = [] trans_matrix = np.zeros_like(pae_array) mask = pae_array < pae_cutoff trans_matrix[mask] = 1 - pae_array[mask] / pae_cutoff for i in range(len(chain_length)): i_start = chain_len_sums[i] i_end = chain_len_sums[i + 1] local_LIS_list = [] for j in range(len(chain_length)): j_start = chain_len_sums[j] j_end = chain_len_sums[j + 1] submatrix = trans_matrix[i_start:i_end, j_start:j_end] if np.any(submatrix > 0): local_LIS_list.append(submatrix[submatrix > 0].mean()) else: local_LIS_list.append(0) LIS_list.append(local_LIS_list) return LIS_list
[docs] def LIS_matrix(data, pae_cutoff=12.0): """ Compute the LIS score as define in [2]_. Implementation was inspired from implementation in: .. [2] https://github.com/flyark/AFM-LIS Parameters ---------- data : AFData object containing the data pae_cutoff : float cutoff for PAE matrix values, default is 12.0 A Returns ------- None The dataframe is modified in place. """ LIS_matrix_list = [] disable = not getattr(data, "verbose", True) for query, data_path in tqdm( zip(data.df["query"], data.df["data_file"]), total=len(data.df["query"]), disable=disable, desc="LIS_matrix", ): if data.chain_length[query] is None: LIS_matrix_list.append(None) continue pae_array = get_pae(data_path) LIS_matrix = compute_LIS_matrix(pae_array, data.chain_length[query], pae_cutoff) LIS_matrix_list.append(LIS_matrix) assert len(LIS_matrix_list) == len(data.df["query"]) data.df.loc[:, "LIS"] = LIS_matrix_list
[docs] def cLIS_matrix(data, pae_cutoff=12.0, dist_cutoff=8.0): """Compute the cLIS score from the PAE matrix and pdb file. Implementation is based on the cLIS from the IPSAE package https://github.com/flyark/AFM-LIS Cite: .. [1] Dunbrack RL Jr. "Rēs ipSAE loquunt: What’s wrong with AlphaFold’s ipTM score and how to fix it" bioRxiv (2025). Parameters ---------- data : AFData object containing the dipSAE(ata ref_dict : dict dictionary containing the reference PAE matrix for each query Returns ------- None The dataframe is modified in place. """ LIA_matrix_list = [] disable = not getattr(data, "verbose", True) for query, data_file, pdb in tqdm( zip(data.df["query"], data.df["data_file"], data.df["pdb"]), total=len(data.df["query"]), disable=disable, desc="LIA_matrix", ): PAE_matrix = get_pae(data_file) if PAE_matrix is None: logger.warning(f"No PAE matrix found for query {query}.") LIA_matrix_list.append(None) continue LIA_matrix = compute_cLIS_matrix( pdb=pdb, pae_array=PAE_matrix, pae_cutoff=pae_cutoff, dist_cutoff=dist_cutoff, chain_ids=data.chains[query], chain_length=data.chain_length[query], ) LIA_matrix_list.append(LIA_matrix) assert len(LIA_matrix_list) == len(data.df["query"]) data.df.loc[:, "cLIS"] = LIA_matrix_list
[docs] def compute_cLIS_matrix( pdb: str, pae_array: np.ndarray, chain_ids: list, chain_length: dict, pae_cutoff: float = 12.0, dist_cutoff: float = 8.0, sel: str = TOKEN_SEL_CB, ) -> np.ndarray: """Compute the cLIS score from the PAE matrix and pdb file. Parameters ---------- pdb : str path to the pdb file pae_array : np.array array of predicted PAE pae_cutoff : float cutoff for PAE matrix values, default is 10.0 A dist_cutoff : float cutoff for distance between atoms, default is 10.0 A chain_ids : list list of chain IDs chain_length : list list of chain lengths sel : str selection string for the atoms to consider in the distance calculation, default is TOKEN_SEL_CB Returns ------- list LIA score matrix """ chain_len_sums = np.cumsum([0] + chain_length) model = pdb_cpp.Coor(pdb) model_cb = model.select_atoms(sel) distance = distance_matrix(model_cb.xyz, model_cb.xyz) if model_cb.len == pae_array.shape[0]: contact_map = (distance < dist_cutoff).astype(int) else: contact_map = np.zeros_like(pae_array, dtype=int) chain_offsets = np.cumsum([0] + chain_length[:-1]) chain_arr = np.asarray(model_cb.chain_str) uniq_resid = _flatten_1d(model_cb.uniq_resid) mapped_idx = np.full(model_cb.len, -1, dtype=int) for chain, offset, length in zip(chain_ids, chain_offsets, chain_length): chain_atom_idx = np.where(chain_arr == chain)[0] if len(chain_atom_idx) == 0: continue order = np.argsort(uniq_resid[chain_atom_idx]) chain_atom_idx = chain_atom_idx[order] for local_idx, atom_idx in enumerate(chain_atom_idx): if local_idx >= length: break mapped_idx[atom_idx] = offset + local_idx valid = mapped_idx >= 0 mapped = mapped_idx[valid] contact_map[np.ix_(mapped, mapped)] = ( distance[valid][:, valid] < dist_cutoff ).astype(int) trans_matrix = np.zeros_like(pae_array) mask = pae_array < pae_cutoff trans_matrix[mask] = 1 - pae_array[mask] / pae_cutoff trans_matrix[contact_map == 0] = 0 LIA_list = [] for i in range(len(chain_length)): i_start = chain_len_sums[i] i_end = chain_len_sums[i + 1] local_LIA_list = [] for j in range(len(chain_length)): j_start = chain_len_sums[j] j_end = chain_len_sums[j + 1] submatrix = trans_matrix[i_start:i_end, j_start:j_end] if np.any(submatrix > 0): local_LIA_list.append(submatrix[submatrix > 0].mean()) else: local_LIA_list.append(0) LIA_list.append(local_LIA_list) return LIA_list
[docs] def PAE_matrix(data, fun=np.average): """ Compute the average (or something else) PAE matrix. Parameters ---------- data : AFData object containing the data fun : function function to apply to the PAE scores Returns ------- None The dataframe is modified in place. """ PAE_avg_list = [] disable = not getattr(data, "verbose", True) for query, data_path in tqdm( zip(data.df["query"], data.df["data_file"]), total=len(data.df["query"]), disable=disable, desc="PAE_matrix", ): if data.chain_length[query] is None: PAE_avg_list.append(None) continue pae_array = get_pae(data_path) chain_len_cum = np.cumsum([data.chain_length[query]]) chain_len_cum = np.insert(chain_len_cum, 0, 0) avg_matrix = np.zeros((len(chain_len_cum) - 1, len(chain_len_cum) - 1)) for i in range(len(chain_len_cum) - 1): for j in range(len(chain_len_cum) - 1): avg_matrix[i, j] = fun( pae_array[ chain_len_cum[i] : chain_len_cum[i + 1], chain_len_cum[j] : chain_len_cum[j + 1], ] ) PAE_avg_list.append(avg_matrix) assert len(PAE_avg_list) == len(data.df["query"]) data.df.loc[:, "PAE_fun"] = PAE_avg_list
[docs] def read_ftdmp_raw_score(raw_path): """Read raw ftdmp score files Parameters ---------- raw_path : str Path to the raw score file Returns ------- raw_score : pandas.DataFrame Dataframe containing the raw score data """ with open(raw_path, "r") as filin: # extract the header header = filin.readline().strip().split() # extract the data data = filin.readlines() # extract the data raw_data = [line.strip().split() for line in data] data = [] for line in raw_data: data_line = [] for i, val in enumerate(line): if i == 0: data_line.append(val) else: data_line.append(float(val)) data.append(data_line) # convert to pandas dataframe score_df = pd.DataFrame(data, columns=header) return score_df
[docs] def extract_ftdmp( ftdmp_result_path, score_list=["raw_scoring_results_without_ranks.txt"] ): """Read ftdmp output files Parameters ---------- ftdmp_result_path : str Path to the ftdmp output directory Returns ------- my_data : AFData object containing the data """ jobs_dir = os.listdir(os.path.join(ftdmp_result_path, "jobs")) if len(jobs_dir) > 1: logger.warning(f"Two outputs directory founded in {jobs_dir}") df_list = [] for job_dir in jobs_dir: local_job_dir = os.path.join(ftdmp_result_path, "jobs", job_dir) file_list = os.listdir(local_job_dir) for file in file_list: if file in score_list: logger.info(f"Reading ftdmp score file : {file}") local_score = os.path.join(ftdmp_result_path, "jobs", job_dir, file) df_list.append(read_ftdmp_raw_score(local_score)) # my_data.df["ID"] = [ # os.path.basename(file_path) if file_path is not None else None # for file_path in my_data.df["pdb"] # ] # return df_list # for df in df_list: # if len(df) == 0: # continue # my_data.df = my_data.df.merge(df, on="ID", how="inner") return df_list
[docs] def compute_ftdmp( my_data, ftdmp_path=None, out_path="tmp_ftdmp", score_list=["raw_scoring_results_without_ranks.txt"], env=None, keep_tmp=False, ): """Compute ftdmp scores Parameters ---------- ftdmp_path : str Path to the ftdmp output directory Returns ------- my_data : AFData object containing the data """ import shutil import subprocess if ftdmp_path is None: ftdmp_exe_path = shutil.which("ftdmp-qa-all") if ftdmp_exe_path is None: logger.warning("Software ftdmp-qa-all not found in PATH") return else: ftdmp_exe_path = os.path.expanduser(os.path.join(ftdmp_path, "ftdmp-qa-all")) # Test if pytorch is installed try: import torch except ImportError: logger.warning("Pytorch not found, ftdmp will not work") return if not os.path.exists(out_path): os.makedirs(out_path) # Check that all pdb files are in the same directory if not all( os.path.dirname(pdb) == os.path.dirname(my_data.df["pdb"].iloc[0]) for pdb in my_data.df["pdb"] ): pdb_run_list = [] pdb_dir_list = [] for pdb in my_data.df["pdb"].tolist(): pdb_dirname = os.path.dirname(pdb) if pdb_dirname not in pdb_dir_list: pdb_dir_list.append(pdb_dirname) pdb_run_list.append([]) # Add the pdb file to the corresponding directory index = pdb_dir_list.index(pdb_dirname) pdb_run_list[index].append(pdb) logger.info( f"For Ftdmp, all PDB files must be in the same directory. Ftdmp will be launched {len(pdb_run_list)} times." ) else: # If all pdb files are in the same directory, we can run ftdmp on all of them at once pdb_run_list = [my_data.df["pdb"].tolist()] logger.info( "All PDB files are in the same directory. Ftdmp will be launched once." ) # If the pdb files are not in the same directory, we need to copy them to the out_path # pdb_list = [pdb for pdb in my_data.df["pdb"].tolist() if pdb is not None and not pd.isna(pdb)] df_list = [] # print(pdb_run_list) for i, pdb_list in enumerate(pdb_run_list): logger.info( f"Running ftdmp on {len(pdb_list)} PDB files, step {i+1}/{len(pdb_run_list)}" ) # ls MY_AF_DIRECTORY/*.pdb | ~/Documents/Code/ftdmp/ftdmp-qa-all --workdir ftdmp_beta_amyloid_dimer cmd = [ftdmp_exe_path, "--workdir", out_path] proc = subprocess.Popen( cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env, ) # pdb_list = [pdb for pdb in my_data.df["pdb"].tolist() if pdb is not None and not pd.isna(pdb)] com_input = "\n".join(pdb_list) com_input += "\n" (stdout_data, stderr_data) = proc.communicate(com_input.encode()) # print(stdout_data) # print(stderr_data) df_list += extract_ftdmp(ftdmp_result_path=out_path, score_list=score_list) if not keep_tmp: shutil.rmtree(out_path) logger.info("Ftdmp scores computed. Merging data frames.") # print(df_list) ftdmp_df = pd.DataFrame() for df in df_list: if len(df) == 0: continue # Add the ID column to the dataframe df["ID"] = df["ID"].astype(str) ftdmp_df = pd.concat([ftdmp_df, df], ignore_index=True) my_data.df["ID"] = [ os.path.basename(file_path) if file_path is not None else None for file_path in my_data.df["pdb"] ] my_data.df = my_data.df.merge(ftdmp_df, on="ID", how="inner") return
[docs] def compute_dockq(data, ref_dict, fun=np.average, dockq_thresold=0.3): """Compute the DockQ score from the PAE matrix. Parameters ---------- data : AFData object containing the data ref_dict : dict dictionary containing the reference PAE matrix for each query fun : function function to apply to the PAE scores dockq_thresold : float threshold with multiple chain to recompute DockQ score, default is 0.3 Returns ------- None The dataframe is modified in place. """ from pdb_cpp import analysis as pdb_cpp_analysis dockq_list = [] lrmsd_list = [] fnat_list = [] old_query = "" disable = not getattr(data, "verbose", True) for query, pdb in tqdm( zip(data.df["query"], data.df["pdb"]), total=len(data.df["query"]), disable=disable, desc="dockq", ): if ( query not in ref_dict or data.chain_length[query] is None or pdb is None or pdb is np.nan ): if query not in ref_dict: logger.warning(f"No reference pdb structure found for query {query}.") if pdb is None or pdb is np.nan: logger.warning(f"No PDB file found for query {query}.") dockq_list.append(None) lrmsd_list.append(None) fnat_list.append(None) continue if query != old_query: ref_coor = pdb_cpp.Coor(ref_dict[query]) native_seq = ref_coor.get_aa_seq() old_query = query model = pdb_cpp.Coor(pdb) model_seq = model.get_aa_seq() lig_chains = [ min(model_seq.items(), key=lambda x: len(x[1].replace("-", "")))[0] ] rec_chains = [chain for chain in model_seq if chain not in lig_chains] native_lig_chains = [ min(native_seq.items(), key=lambda x: len(x[1].replace("-", "")))[0] ] native_rec_chains = [ chain for chain in native_seq if chain not in native_lig_chains ] dockq_score = pdb_cpp_analysis.dockQ( model, ref_coor, rec_chains=rec_chains, lig_chains=lig_chains, native_rec_chains=native_rec_chains, native_lig_chains=native_lig_chains, ) if len(rec_chains) > 1 and dockq_score["DockQ"][0] < dockq_thresold: # If the model has multiple chains and the DockQ score is below the threshold, # recompute the DockQ score using different alignement modes new_results = [dockq_score] for chain_perm in list(itertools.permutations(rec_chains, len(rec_chains)))[ 1: ]: new_results.append( pdb_cpp_analysis.dockQ( model, ref_coor, rec_chains=list(chain_perm), lig_chains=lig_chains, native_rec_chains=native_rec_chains, native_lig_chains=native_lig_chains, ) ) # Select the best result dockq_score = max(new_results, key=lambda x: x["DockQ"][0]) # print(dockq_score) # print(f"dockq: {dockq_score['DockQ'][0]:.3f} Lrms: {dockq_score['LRMS'][0]:.2f}") dockq_list.append(dockq_score["DockQ"][0]) lrmsd_list.append(dockq_score["LRMS"][0]) fnat_list.append(dockq_score["Fnat"][0]) assert len(dockq_list) == len(data.df["query"]) data.df.loc[:, "dockq"] = dockq_list data.df.loc[:, "lrmsd"] = lrmsd_list data.df.loc[:, "fnat"] = fnat_list
[docs] def ipTM_d0(data): """Compute the ipTM_d0 score from the PAE matrix. Implementation is based on the ipTM_d0 function from the IPSAE package https://github.com/DunbrackLab/IPSAE/blob/main/ipsae.py Cite: .. [1] Dunbrack RL Jr. "Rēs ipSAE loquunt: What’s wrong with AlphaFold’s ipTM score and how to fix it" bioRxiv (2025). Parameters ---------- data : AFData object containing the data ref_dict : dict dictionary containing the reference PAE matrix for each query Returns ------- None The dataframe is modified in place. """ iptm_d0_list = [] iptm_d0_matrix_list = [] disable = not getattr(data, "verbose", True) for query, data_file in tqdm( zip(data.df["query"], data.df["data_file"]), total=len(data.df["query"]), disable=disable, desc="ipTM_d0", ): PAE_matrix = get_pae(data_file) if PAE_matrix is None: logger.warning(f"No PAE matrix found for query {query}.") iptm_d0_list.append( {f"ipTM_d0_{data.chains[query][0]}_{data.chains[query][1]}": None} ) iptm_d0_matrix_list.append(None) continue # Check if the PAE matrix is square iptm_d0_values = compute_iptm_d0_values( pae_array=PAE_matrix, chain_ids=data.chains[query], chain_length=data.chain_length[query], chain_type=data.chain_type[query], ) iptm_d0_list.append(iptm_d0_values) # Build NxN matrix from per-pair values chain_ids = data.chains[query] matrix = [ [ iptm_d0_values.get(f"ipTM_d0_{chain_ids[i]}_{chain_ids[j]}", 0.0) for j in range(len(chain_ids)) ] for i in range(len(chain_ids)) ] iptm_d0_matrix_list.append(matrix) assert len(iptm_d0_list) == len(data.df["query"]) iptm_d0_df = pd.DataFrame(iptm_d0_list) for col in iptm_d0_df.columns: data.df.loc[:, col] = iptm_d0_df.loc[:, col].to_numpy() data.df.loc[:, "ipTM_d0_matrix"] = iptm_d0_matrix_list
[docs] def compute_iptm_d0_values(pae_array, chain_ids, chain_length, chain_type): """Compute the ipTM_d0 score from the PAE matrix. Parameters ---------- pae_array : np.array array of predicted PAE chain_ids : list list of chain IDs chain_length : list list of chain lengths chain_type : list list of chain types (e.g. "protein", "nucleic_acid") Returns ------- list ipTM_d0 score """ # Define the ptm and d0 functions def ptm_func(x, d0): return 1.0 / (1 + (x / d0) ** 2.0) ptm_func_vec = np.vectorize(ptm_func) # vector version # Define the d0 functions for numbers and arrays; minimum value = 1.0; from Yang and Skolnick, PROTEINS: Structure, Function, and Bioinformatics 57:702–710 (2004) def calc_d0(L, pair_type): L = float(L) if L < 27: L = 27 min_value = 1.0 if pair_type == "nucleic_acid": min_value = 2.0 d0 = 1.24 * (L - 15) ** (1.0 / 3.0) - 1.8 return max(min_value, d0) def fun(matrix): """Function to apply to the ipTM_d0 matrix. This function computes the mean of each row and returns the maximum value. """ matrix_res = np.zeros(matrix.shape[0]) for i in range(matrix.shape[0]): matrix_res[i] = np.mean(matrix[i, :]) max_index = np.argmax(matrix_res) return matrix_res[max_index] chain_len_sums = np.cumsum([0] + chain_length) iptm_d0_dict = {} iptm_do_sum = 0.0 iptm_size = 0.0 for i in range(len(chain_length)): for j in range(len(chain_length)): # if i != j: do_chain = chain_length[i] + chain_length[j] type = ( "nucleic_acid" if (chain_type[i] != "protein" or chain_type[j] != "protein") else "protein" ) d0 = calc_d0(do_chain, type) iptm_d0_matrix = ptm_func_vec( pae_array[ chain_len_sums[i] : chain_len_sums[i + 1], chain_len_sums[j] : chain_len_sums[j + 1], ], d0, ) iptm_d0_mean = fun(iptm_d0_matrix) iptm_d0_dict[f"ipTM_d0_{chain_ids[i]}_{chain_ids[j]}"] = iptm_d0_mean iptm_do_sum += iptm_d0_mean * chain_length[i] * chain_length[j] iptm_size += chain_length[i] * chain_length[j] iptm_d0_dict[f"ipTM_d0"] = iptm_do_sum / iptm_size if iptm_size > 0 else None return iptm_d0_dict
[docs] def ipSAE(data, pae_cutoff=10.0): """Compute the ipSAE score from the PAE matrix. Implementation is based on the ipTM_d0 function from the IPSAE package https://github.com/DunbrackLab/IPSAE/blob/main/ipsae.py Cite: .. [1] Dunbrack RL Jr. Rēs ipSAE loquunt: What\'s wrong with AlphaFold\'s ipTM score and how to fix it bioRxiv (2025). Parameters ---------- data : AFData object containing the dipSAE(ata ref_dict : dict dictionary containing the reference PAE matrix for each query Returns ------- None The dataframe is modified in place. """ ipSAE_list = [] ipSAE_matrix_list = [] disable = not getattr(data, "verbose", True) for query, data_file in tqdm( zip(data.df["query"], data.df["data_file"]), total=len(data.df["query"]), disable=disable, desc="ipSAE", ): PAE_matrix = get_pae(data_file) if PAE_matrix is None: logger.warning(f"No PAE matrix found for query {query}.") ipSAE_list.append( {f"ipSAE_{data.chains[query][0]}_{data.chains[query][1]}": None} ) ipSAE_matrix_list.append(None) continue ipSAE_values = compute_ipSAE_matrix( pae_array=PAE_matrix, pae_cutoff=pae_cutoff, chain_ids=data.chains[query], chain_length=data.chain_length[query], chain_type=data.chain_type[query], ) ipSAE_list.append(ipSAE_values) # Build NxN matrix from per-pair values chain_ids = data.chains[query] matrix = [ [ ipSAE_values.get(f"ipSAE_{chain_ids[i]}_{chain_ids[j]}", 0.0) for j in range(len(chain_ids)) ] for i in range(len(chain_ids)) ] ipSAE_matrix_list.append(matrix) assert len(ipSAE_list) == len(data.df["query"]) ipSAE_df = pd.DataFrame(ipSAE_list) for col in ipSAE_df.columns: data.df.loc[:, col] = ipSAE_df.loc[:, col].to_numpy() data.df.loc[:, "ipSAE_matrix"] = ipSAE_matrix_list
[docs] def compute_ipSAE_matrix(pae_array, pae_cutoff, chain_ids, chain_length, chain_type): """Compute the ipSAE score from the PAE matrix. Parameters ---------- pae_array : np.array array of predicted PAE pae_cutoff : float cutoff for PAE matrix values, default is 10.0 A chain_ids : list list of chain IDs chain_length : list list of chain lengths chain_type : list list of chain types (e.g. "protein", "nucleic_acid") Returns ------- list ipSAE score matrix """ # Define the ptm and d0 functions def ptm_func(x, d0): return 1.0 / (1 + (x / d0) ** 2.0) ptm_func_vec = np.vectorize(ptm_func) # vector version def calc_d0_array(L, pair_type): # Convert L to a NumPy array if it isn't already one (enables flexibility in input types) L = np.array(L, dtype=float) L = np.maximum(27, L) min_value = 1.0 if pair_type == "nucleic_acid": min_value = 2.0 # Calculate d0 using the vectorized operation return np.maximum(min_value, 1.24 * (L - 15) ** (1.0 / 3.0) - 1.8) def fun(matrix): """Function to apply to the ipTM_d0 matrix.""" # return np.mean(x) matrix_res = np.zeros(matrix.shape[0]) for i in range(matrix.shape[0]): matrix_res[i] = np.mean(matrix[i, :]) # print("matrix_res[i]", matrix_res[i]) max_index = np.argmax(matrix_res) return matrix_res[max_index] chain_len_sums = np.cumsum([0] + chain_length) ipSAE_dict = {} for i in range(len(chain_length)): for j in range(len(chain_length)): sub_pae_array = pae_array[ chain_len_sums[i] : chain_len_sums[i + 1], chain_len_sums[j] : chain_len_sums[j + 1], ] type = ( "nucleic_acid" if (chain_type[i] != "protein" or chain_type[j] != "protein") else "protein" ) valid_pairs_matrix = sub_pae_array <= pae_cutoff n0res_byres_all = np.sum(valid_pairs_matrix, axis=1) d0_res = calc_d0_array(n0res_byres_all, type) ipsae_d0res_byres = np.zeros(chain_length[i]) # print( # "sub_pae_array:", # sub_pae_array.shape, # "d0_res.shape:", # d0_res.shape, # "chain_length[i]:", # chain_length[i], # ) for k in range(chain_length[i]): ptm_row_d0res = ptm_func_vec(sub_pae_array[k], d0_res[k]) if valid_pairs_matrix[k].any(): ipsae_d0res_byres[k] = ptm_row_d0res[valid_pairs_matrix[k]].mean() else: ipsae_d0res_byres[k] = 0.0 max_index = np.argmax(ipsae_d0res_byres) ipSAE_dict[f"ipSAE_{chain_ids[i]}_{chain_ids[j]}"] = ipsae_d0res_byres[ max_index ] # print("ipsae_d0res_byres[max_index]", ipsae_d0res_byres[max_index]) return ipSAE_dict
[docs] def ipTM_d0_interface(data): """Compute the ipTM_d0 score from the PAE matrix. Implementation is based on the ipTM_d0 function from the IPSAE package https://github.com/DunbrackLab/IPSAE/blob/main/ipsae.py Cite: .. [1] Dunbrack RL Jr. "Rēs ipSAE loquunt: What’s wrong with AlphaFold’s ipTM score and how to fix it" bioRxiv (2025). Parameters ---------- data : AFData object containing the data ref_dict : dict dictionary containing the reference PAE matrix for each query Returns ------- None The dataframe is modified in place. """ iptm_d0_list = [] disable = not getattr(data, "verbose", True) for query, pdb, data_file in tqdm( zip(data.df["query"], data.df["pdb"], data.df["data_file"]), total=len(data.df["query"]), disable=disable, desc="ipTM_d0_interface", ): PAE_matrix = get_pae(data_file) if PAE_matrix is None: logger.warning(f"No PAE matrix found for query {query}.") iptm_d0_list.append( { f"ipTM_interface_{data.chains[query][0]}_{data.chains[query][1]}": None } ) continue iptm_d0_values = compute_iptm_d0_interface_values( pdb=pdb, pae_array=PAE_matrix, chain_ids=data.chains[query], chain_length=data.chain_length[query], chain_type=data.chain_type[query], ) iptm_d0_list.append(iptm_d0_values) assert len(iptm_d0_list) == len(data.df["query"]) iptm_d0_df = pd.DataFrame(iptm_d0_list) for col in iptm_d0_df.columns: data.df.loc[:, col] = iptm_d0_df.loc[:, col].to_numpy()
[docs] def compute_iptm_d0_interface_values( pdb, pae_array, chain_ids, chain_length, chain_type, sel=TOKEN_SEL_CB ): """Compute the ipTM_d0 score from the PAE matrix. Parameters ---------- pdb : str path to the pdb file pae_array : np.array array of predicted PAE chain_ids : list list of chain IDs chain_length : list list of chain lengths chain_type : list list of chain types (e.g. "protein", "nucleic_acid") sel : str selection string for the atoms to consider in the distance calculation, default is TOKEN_SEL_CB Returns ------- list ipTM_d0 score """ # Define the ptm and d0 functions def ptm_func(x, d0): return 1.0 / (1 + (x / d0) ** 2.0) ptm_func_vec = np.vectorize(ptm_func) # vector version # Define the d0 functions for numbers and arrays; minimum value = 1.0; from Yang and Skolnick, PROTEINS: Structure, Function, and Bioinformatics 57:702–710 (2004) def calc_d0(L, pair_type): L = float(L) if L < 27: L = 27 min_value = 1.0 if pair_type == "nucleic_acid": min_value = 2.0 d0 = 1.24 * (L - 15) ** (1.0 / 3.0) - 1.8 return max(min_value, d0) def fun(matrix): """Function to apply to the ipTM_d0 matrix. This function computes the mean of each row and returns the maximum value. """ if np.isnan(matrix).all(): return 0.0 matrix_res = np.zeros(matrix.shape[0]) for i in range(matrix.shape[0]): matrix_res[i] = np.nanmean(matrix[i, :]) max_index = np.nanargmax(matrix_res) return matrix_res[max_index] chain_len_sums = np.cumsum([0] + chain_length) model = pdb_cpp.Coor(pdb) model_cb = model.select_atoms(sel) assert model_cb.len == sum( chain_length ), "Number of CB atoms does not match the sum of chain lengths." assert ( model_cb.len == pae_array.shape[0] ), "Number of CB atoms does not match the number of rows in the PAE matrix." distance = distance_matrix(model_cb.xyz, model_cb.xyz) iptm_d0_dict = {} for i in range(len(chain_length)): for j in range(len(chain_length)): if i != j: do_chain = chain_length[i] + chain_length[j] type = ( "nucleic_acid" if (chain_type[i] != "protein" or chain_type[j] != "protein") else "protein" ) d0 = calc_d0(do_chain, type) iptm_d0_matrix = ptm_func_vec( pae_array[ chain_len_sums[i] : chain_len_sums[i + 1], chain_len_sums[j] : chain_len_sums[j + 1], ], d0, ) sub_distance = distance[ chain_len_sums[i] : chain_len_sums[i + 1], chain_len_sums[j] : chain_len_sums[j + 1], ] # Apply distance cutoff iptm_d0_matrix[sub_distance > 10.0] = np.nan iptm_d0_mean = fun(iptm_d0_matrix) iptm_d0_dict[ f"ipTM_interface_{chain_ids[i]}_{chain_ids[j]}" ] = iptm_d0_mean # print(f"ipTM_interface_{chain_ids[i]}_{chain_ids[j]}", iptm_d0_mean) return iptm_d0_dict
[docs] def iplddt(data, sel=TOKEN_SEL_IPLDDT, cutoff=10.0): r"""Compute the iplddt from the pdb file. Parameters ---------- data : AFData object containing the data sel : str selection string for the atoms to consider in the distance calculation, default is TOKEN_SEL_IPLDDT cutoff : float distance cutoff to define interface residues, default is 10.0 A Implementation was inspired from https://github.com/piercelab/alphafold_v2.2_customize/blob/master/get_interface_plddt.pl If contact number is zero, the iplddt score is set to 0. Returns ------- None The `data.df` dataframe is modified in place. References ---------- """ iplddt_list = [] disable = not getattr(data, "verbose", True) for pdb, query in tqdm( zip(data.df["pdb"], data.df["query"]), total=len(data.df["pdb"]), disable=disable, desc="iplddt", ): if pdb is None or pdb is np.nan: iplddt_list.append(None) continue model = pdb_cpp.Coor(pdb) # if getattr(data, "format", None) in {"AF3_webserver", "AF3_local", "boltz1"}: # coor_CA_CB = model.select_atoms( # "protein or (dna and name P) or (not protein and not dna and noh)" # ) # else: # coor_CA_CB = model.select_atoms( # "(protein and (name CB or (resname GLY and name CA))) or (dna and name P) or (not protein and not dna and noh)" # ) coor_CA_CB = model.select_atoms(sel) model_chains = data.chains.get( query, np.unique(np.asarray(coor_CA_CB.chain_str)) ) iplddt_local = {"pdb": pdb} for i, chain in enumerate(model_chains): chain_sel = coor_CA_CB.select_atoms(f"chain {chain}") for j in range(i + 1, len(model_chains)): other_chain = model_chains[j] other_chain_sel = coor_CA_CB.select_atoms(f"chain {other_chain}") if chain_sel.len == 0 or other_chain_sel.len == 0: iplddt_local[f"iplddt_{chain}_{other_chain}"] = 0.0 continue distance = distance_matrix( chain_sel.xyz, other_chain_sel.xyz, ) contact_num = np.sum(distance < cutoff) if contact_num == 0: iplddt_local[f"iplddt_{chain}_{other_chain}"] = 0.0 continue chain_indices, other_chain_indices = np.where(distance < cutoff) # chain_indices = np.unique(chain_indices) # other_chain_indices = np.unique(other_chain_indices) main_chain_beta = np.asarray(chain_sel.beta)[chain_indices] other_chain_beta = np.asarray(other_chain_sel.beta)[other_chain_indices] if main_chain_beta.size > 0 and np.nanmax(main_chain_beta) <= 1.0: main_chain_beta = main_chain_beta * 100.0 if other_chain_beta.size > 0 and np.nanmax(other_chain_beta) <= 1.0: other_chain_beta = other_chain_beta * 100.0 avg_iplddt = np.mean( np.concatenate([main_chain_beta, other_chain_beta]) ) iplddt_local[f"iplddt_{chain}_{other_chain}"] = avg_iplddt iplddt_list.append(iplddt_local) iplddt_df = pd.DataFrame(iplddt_list) data.df = data.df.merge(iplddt_df, on="pdb", how="inner")
[docs] def chain_plddt(data): r"""Compute for each chain the average plddt from the pdb file. Parameters ---------- data : AFData object containing the data Returns ------- None The `data.df` dataframe is modified in place. References ---------- """ chain_plddt_list = [] disable = not getattr(data, "verbose", True) for pos, query in tqdm( enumerate(data.df["query"]), total=len(data.df["query"]), disable=disable, desc="chain_plddt", ): plddt_dict = {} plddt = data.get_plddt(pos) chain_lens = data.chain_length[query] chain_len_sums = np.cumsum([0] + chain_lens) chain_ids = data.chains[query] for i, chain in enumerate(chain_ids): plddt_dict[f"plddt_{chain}"] = np.mean(plddt[chain_len_sums[i] : chain_len_sums[i + 1]]) chain_plddt_list.append(plddt_dict) chain_plddt_df = pd.DataFrame(chain_plddt_list) for column in chain_plddt_df.columns: data.df.loc[:, column] = chain_plddt_df.loc[:, column].to_numpy()