Source code for bioframe.io.fileops

import array
import io
import json
import os
import shutil
import subprocess
import tempfile
from collections import OrderedDict
from contextlib import closing

import numpy as np
import pandas as pd

try:
    import bbi
except ImportError:
    bbi = None

try:
    import pyBigWig
except ImportError:
    pyBigWig = None

from ..core.arrops import argnatsort
from ..core.stringops import parse_region
from .schemas import BAM_FIELDS, SCHEMAS

__all__ = [
    "read_table",
    "read_chromsizes",
    "read_tabix",
    "read_pairix",
    "read_alignments",
    "load_fasta",
    "read_bigwig",
    "to_bigwig",
    "read_bigbed",
    "to_bigbed",
]


[docs] def read_table(filepath_or, schema=None, schema_is_strict=False, **kwargs): """ Read a tab-delimited file into a data frame. Equivalent to :func:`pandas.read_table` but supports an additional `schema` argument to populate column names for common genomic formats. Parameters ---------- filepath_or : str, path object or file-like object Any valid string path is acceptable. The string could be a URL schema : str Schema to use for table column names. schema_is_strict : bool Whether to check if columns are filled with NAs. Returns ------- df : pandas.DataFrame of intervals """ kwargs.setdefault("sep", "\t") kwargs.setdefault("header", None) kwargs.setdefault("index_col", False) if isinstance(filepath_or, str) and filepath_or.endswith(".gz"): kwargs.setdefault("compression", "gzip") if schema is not None: try: kwargs.setdefault("names", SCHEMAS[schema]) except (KeyError, TypeError): if isinstance(schema, str): raise ValueError(f"TSV schema not found: '{schema}'") from None kwargs.setdefault("names", schema) df = pd.read_csv(filepath_or, **kwargs) if schema_is_strict: if (df.notna().sum(axis=0) == 0).any(): raise ValueError( "one or more columns are all NA," + " check agreement between number of fields in schema" + " and number of columns in input file" ) return df
[docs] def read_chromsizes( filepath_or, filter_chroms=True, chrom_patterns=(r"^chr[0-9]+$", r"^chr[XY]$", r"^chrM$"), natsort=True, as_bed=False, **kwargs, ): """ Read a ``<db>.chrom.sizes`` or ``<db>.chromInfo.txt`` file from the UCSC database, where ``db`` is a genome assembly name, as a `pandas.Series`. Parameters ---------- filepath_or : str or file-like Path or url to text file, or buffer. filter_chroms : bool, optional Filter for chromosome names given in ``chrom_patterns``. chrom_patterns : sequence, optional Sequence of regular expressions to capture desired sequence names. natsort : bool, optional Sort each captured group of names in natural order. Default is True. as_bed : bool, optional If True, return chromsizes as an interval dataframe (chrom, start, end). **kwargs : Passed to :func:`pandas.read_csv` Returns ------- Series of integer bp lengths indexed by sequence name or an interval dataframe. Notes ----- Mention name patterns See also -------- * UCSC assembly terminology: <http://genome.ucsc.edu/FAQ/FAQdownloads.html#download9> * NCBI assembly terminology: <https://www.ncbi.nlm.nih.gov/grc/help/definitions> """ if isinstance(filepath_or, str) and filepath_or.endswith(".gz"): kwargs.setdefault("compression", "gzip") chromtable = pd.read_csv( filepath_or, sep="\t", usecols=[0, 1], names=["name", "length"], dtype={"name": str}, **kwargs, ) if filter_chroms: parts = [] for pattern in chrom_patterns: if not len(pattern): continue part = chromtable[chromtable["name"].str.contains(pattern)] if natsort: part = part.iloc[argnatsort(part["name"])] parts.append(part) chromtable = pd.concat(parts, axis=0) if as_bed: chromtable["start"] = 0 chromtable = ( chromtable[["name", "start", "length"]] .rename({"name": "chrom", "length": "end"}, axis="columns") .reset_index(drop=True) ) else: chromtable.index = chromtable["name"].values chromtable = chromtable["length"] return chromtable
[docs] def read_tabix(fp, chrom=None, start=None, end=None): """ Read a tabix-indexed file into dataFrame. """ try: import pysam except ImportError: raise ImportError("pysam is required to use `read_tabix`") from None with closing(pysam.TabixFile(fp)) as f: names = list(f.header) or None df = pd.read_csv( io.StringIO("\n".join(f.fetch(chrom, start, end))), sep="\t", header=None, names=names, ) return df
[docs] def read_pairix( fp, region1, region2=None, chromsizes=None, columns=None, usecols=None, dtypes=None, **kwargs, ): """ Read a pairix-indexed file into DataFrame. """ import cytoolz as toolz import pypairix if dtypes is None: dtypes = {} f = pypairix.open(fp, "r") header = f.get_header() if len(header): header_groups = toolz.groupby(lambda x: x.split(":")[0], header) if "#chromsize" in header_groups and chromsizes is None: items = [line.split()[1:] for line in header_groups["#chromsize"]] if len(items) and chromsizes is None: names, lengths = zip(*((item[0], int(item[1])) for item in items)) chromsizes = pd.Series(index=names, data=lengths) if "#columns" in header_groups and columns is None: columns = header_groups["#columns"][0].split()[1:] chrom1, start1, end1 = parse_region(region1, chromsizes) if region2 is not None: chrom2, start2, end2 = parse_region(region2, chromsizes) else: chrom2, start2, end2 = chrom1, start1, end1 it = f.query2D(chrom1, start1, end1, chrom2, start2, end2) if usecols is not None: argusecols = [columns.index(col) for col in usecols] records = [(record[i] for i in argusecols) for record in it] columns = usecols else: records = it df = pd.DataFrame.from_records(records, columns=columns) if columns is not None: for col in columns: if col in dtypes: df[col] = df[col].astype(dtypes[col]) else: df[col] = pd.to_numeric(df[col], "ignore") return df
[docs] def read_alignments(fp, chrom=None, start=None, end=None): """ Read alignment records into a DataFrame. """ try: import pysam except ImportError: raise ImportError("pysam is required to use `read_alignments`") from None ext = os.path.splitext(fp)[1] if ext == ".sam": mode = "r" elif ext == ".bam": mode = "rb" elif ext == ".cram": mode = "rc" else: raise ValueError(f"{ext} is not a supported filetype") with closing(pysam.AlignmentFile(fp, mode)) as f: records = [] for s in f.fetch(chrom, start, end): # Needed because array.array is not json serializable tags = [ (k, v.tolist() if isinstance(v, array.array) else v) for k, v in s.tags ] records.append( ( s.qname, s.flag, s.reference_name, s.pos, s.mapq, s.cigarstring if s.mapq != 0 else np.nan, s.rnext, s.pnext, s.tlen, s.seq, s.qual, json.dumps(dict(tags)), ) ) df = pd.DataFrame(records, columns=BAM_FIELDS) return df
def read_bam(fp, chrom=None, start=None, end=None): """ Deprecated: use `read_alignment` instead. Read bam file into dataframe, """ return read_alignments(fp, chrom, start, end) class PysamFastaRecord: def __init__(self, ff, ref): self.ff = ff if ref not in ff.references: raise KeyError(f"Reference name '{ref}' not found in '{ff}'") self.ref = ref def __getitem__(self, key): if isinstance(key, slice): start, stop = key.start, key.stop else: start = key stop = key + 1 return self.ff.fetch(self.ref, start, stop)
[docs] def load_fasta(filepath_or, engine="pysam", **kwargs): """ Load lazy fasta sequences from an indexed fasta file (optionally compressed) or from a collection of uncompressed fasta files. Parameters ---------- filepath_or : str or iterable If a string, a filepath to a single `.fa` or `.fa.gz` file. Assumed to be accompanied by a `.fai` index file. Depending on the engine, the index may be created on the fly, and some compression formats may not be supported. If not a string, an iterable of fasta file paths each assumed to contain a single sequence. engine : {'pysam', 'pyfaidx'}, optional Module to use for loading sequences. kwargs : optional Options to pass to ``pysam.FastaFile`` or ``pyfaidx.Fasta``. Returns ------- OrderedDict of (lazy) fasta records. Notes ----- * pysam/samtools can read .fai and .gzi indexed files, I think. * pyfaidx can handle uncompressed and bgzf compressed files. """ is_multifile = not isinstance(filepath_or, str) records = OrderedDict() engine = engine.lower() if engine == "pysam": try: import pysam except ImportError: raise ImportError("pysam is required to use engine='pysam'") from None if is_multifile: for onefile in filepath_or: ff = pysam.FastaFile(onefile, **kwargs) name = ff.references[0] records[name] = PysamFastaRecord(ff, name) else: ff = pysam.FastaFile(filepath_or, **kwargs) for name in ff.references: records[name] = PysamFastaRecord(ff, name) elif engine == "pyfaidx": try: import pyfaidx except ImportError: raise ImportError("pyfaidx is required to use engine='pyfaidx'") from None if is_multifile: for onefile in filepath_or: ff = pyfaidx.Fasta(onefile, **kwargs) name = next(iter(ff.keys())) records[name] = ff[name] else: ff = pyfaidx.Fasta(filepath_or, **kwargs) for name in ff.keys(): records[name] = ff[name] else: raise ValueError("engine must be 'pysam' or 'pyfaidx'") return records
[docs] def read_bigwig(path, chrom, start=None, end=None, engine="auto"): """ Read intervals from a bigWig file. Parameters ---------- path : str Path or URL to a bigWig file chrom : str start, end : int, optional Start and end coordinates. Defaults to 0 and chromosome length. engine : {"auto", "pybbi", "pybigwig"} Library to use for querying the bigWig file. Returns ------- DataFrame """ engine = engine.lower() if engine == "auto": if bbi is None and pyBigWig is None: raise ImportError( "read_bigwig requires either the pybbi or pyBigWig package" ) elif bbi is not None: engine = "pybbi" else: engine = "pybigwig" if engine in ("pybbi", "bbi"): if start is None: start = 0 if end is None: end = -1 with bbi.open(path) as f: df = f.fetch_intervals(chrom, start=start, end=end) elif engine == "pybigwig": f = pyBigWig.open(path) if start is None: start = 0 if end is None: end = f.chroms()[chrom] ivals = f.intervals(chrom, start, end) df = pd.DataFrame(ivals, columns=["start", "end", "value"]) df.insert(0, "chrom", chrom) else: raise ValueError(f"engine must be 'auto', 'pybbi' or 'pybigwig'; got {engine}") return df
[docs] def read_bigbed(path, chrom, start=None, end=None, engine="auto"): """ Read intervals from a bigBed file. Parameters ---------- path : str Path or URL to a bigBed file chrom : str start, end : int, optional Start and end coordinates. Defaults to 0 and chromosome length. engine : {"auto", "pybbi", "pybigwig"} Library to use for querying the bigBed file. Returns ------- DataFrame """ engine = engine.lower() if engine == "auto": if bbi is None and pyBigWig is None: raise ImportError( "read_bigbed requires either the pybbi or pyBigWig package" ) elif bbi is not None: engine = "pybbi" else: engine = "pybigwig" if engine in ("pybbi", "bbi"): if start is None: start = 0 if end is None: end = -1 with bbi.open(path) as f: df = f.fetch_intervals(chrom, start=start, end=end) elif engine == "pybigwig": f = pyBigWig.open(path) if start is None: start = 0 if end is None: end = f.chroms()[chrom] ivals = f.entries(chrom, start, end) df = pd.DataFrame(ivals, columns=["start", "end", "rest"]) df.insert(0, "chrom", chrom) else: raise ValueError(f"engine must be 'auto', 'pybbi' or 'pybigwig'; got {engine}") return df
def _find_ucsc_binary(path, cmd): if path is None: try: assert shutil.which(cmd) is not None except Exception: raise ValueError( f"{cmd} is not present in the current environment. " f"Pass it as 'path_to_binary' parameter to bioframe.to_bigwig or " f"install it with, for example, conda install -y -c bioconda " f"ucsc-{cmd.lower()} " ) from None elif path.endswith(cmd): if not os.path.isfile(path) and os.access(path, os.X_OK): raise ValueError( f"{cmd} is absent in the provided path or cannot be " f"executed: {path}. " ) cmd = path else: cmd = os.path.join(path, cmd) if not os.path.isfile(cmd) and os.access(cmd, os.X_OK): raise ValueError( f"{cmd} is absent in the provided path or cannot be " f"executed: {path}. " ) return cmd
[docs] def to_bigwig( df, chromsizes, outpath, value_field=None, engine="ucsc", path_to_binary=None ): """Save a bedGraph-like dataframe as a binary BigWig file. Parameters ---------- df : pandas.DataFrame Data frame with columns 'chrom', 'start', 'end' and one or more value columns chromsizes : pandas.Series Series indexed by chromosome name mapping to their lengths in bp outpath : str The output BigWig file path value_field : str, optional Select the column label of the data frame to generate the track. Default is to use the fourth column. path_to_binary : str, optional Provide system path to the bedGraphToBigWig binary. engine : {'ucsc', 'bigtools'}, optional Engine to use for creating the BigWig file. """ is_bedgraph = True for col in ["chrom", "start", "end"]: if col not in df.columns: is_bedgraph = False if len(df.columns) < 4: is_bedgraph = False if not is_bedgraph: raise ValueError(f"A bedGraph-like DataFrame is required, got {df.columns}") if value_field is None: value_field = df.columns[3] columns = ["chrom", "start", "end", value_field] bg = df[columns].copy() bg["chrom"] = bg["chrom"].astype(str) bg = bg.sort_values(["chrom", "start", "end"]) if engine.lower() == "ucsc": cmd = _find_ucsc_binary(path_to_binary, "bedGraphToBigWig") with tempfile.NamedTemporaryFile(suffix=".bg") as f, \ tempfile.NamedTemporaryFile("wt", suffix=".chrom.sizes") as cs: # fmt: skip # noqa: E501 pd.Series(chromsizes).to_csv(cs, sep="\t", header=False) cs.flush() bg.to_csv( f.name, sep="\t", columns=columns, index=False, header=False, na_rep="nan", ) p = subprocess.run( [cmd, f.name, cs.name, outpath], capture_output=True, ) return p elif engine.lower() == "bigtools": try: import pybigtools except ImportError: raise ImportError( "pybigtools is required to use engine='bigtools'" ) from None f = pybigtools.open(outpath, "w") if issubclass(type(chromsizes), pd.Series): chromsizes = chromsizes.astype(int).to_dict() bg = bg.astype({"chrom": str, "start": int, "end": int, value_field: float}) f.write(chroms=chromsizes, vals=bg.itertuples(index=False)) f.close()
[docs] def to_bigbed( df, chromsizes, outpath, schema="infer", engine="ucsc", path_to_binary=None ): """Save a BED-like dataframe as a binary BigBed file. Parameters ---------- df : pandas.DataFrame Data frame with columns 'chrom', 'start', 'end' and one or more value columns chromsizes : pandas.Series Series indexed by chromosome name mapping to their lengths in bp outpath : str The output BigWig file path value_field : str, optional Select the column label of the data frame to generate the track. Default is to use the fourth column. path_to_binary : str, optional Provide system path to the bedToBigBed binary. """ from bioframe.io.bed import infer_bed_schema, parse_bed_schema, to_bed_dataframe if schema == "infer": n, _ = infer_bed_schema(df) else: n, _ = parse_bed_schema(schema) bed = to_bed_dataframe(df, schema=schema) m = len(bed.columns) - n schema = f"bed{n}+{m}" if m > 0 else f"bed{n}" if engine.lower() == "ucsc": if path_to_binary is None: cmd = _find_ucsc_binary(path_to_binary, "bedToBigBed") with tempfile.NamedTemporaryFile(suffix=".bed") as f, \ tempfile.NamedTemporaryFile("wt", suffix=".chrom.sizes") as cs: # fmt: skip # noqa: E501 pd.Series(chromsizes).to_csv(cs, sep="\t", header=False) cs.flush() bed.to_csv( f.name, sep="\t", columns=bed.columns, index=False, header=False, na_rep="nan", ) p = subprocess.run( [cmd, f"-type={schema}", f.name, cs.name, outpath], capture_output=True, ) return p elif engine.lower() == "bigtools": try: import pybigtools except ImportError: raise ImportError( "pybigtools is required to use engine='bigtools'" ) from None f = pybigtools.open(outpath, "w") if issubclass(type(chromsizes), pd.Series): chromsizes = chromsizes.astype(int).to_dict() bed = bed.astype({"chrom": str, "start": int, "end": int}) record_iter = ( (row[0], row[1], row[2], "\t".join(str(x) for x in row[3:])) for row in bed.itertuples(index=False) ) f.write(chroms=chromsizes, vals=record_iter) f.close()