Skip to content

Crawler Reference

bear.crawler

strip_oa_prefix(id)

Remove the OpenAlex ID prefix.

Source code in bear/crawler.py
def strip_oa_prefix(id: str) -> str:
    """Remove the OpenAlex ID prefix."""
    return id.lstrip("https://openalex.org/")

get_openalex_id(entity_type, name)

Get an OpenAlex ID for a given entity type and search name with retry logic.

Parameters:

Name Type Description Default
entity_type str

The type of entity to search for. Must be one of "authors" or "institutions".

required
name str

The name to search for.

required
Example

get_openalex_id("authors", "Jason Chor Ming Lo") get_openalex_id("institutions", "University of Wisconsin-Madison")

Source code in bear/crawler.py
@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(min=1, max=30),
    retry=retry_if_exception_type((httpx.HTTPError, httpx.TimeoutException)),
    reraise=True,
)
def get_openalex_id(entity_type: str, name: str) -> str:
    """
    Get an OpenAlex ID for a given entity type and search name with retry logic.

    Args:
        entity_type: The type of entity to search for. Must be one of "authors" or "institutions".
        name: The name to search for.

    Example:
        get_openalex_id("authors", "Jason Chor Ming Lo")
        get_openalex_id("institutions", "University of Wisconsin-Madison")
    """
    if entity_type not in ("authors", "institutions"):
        raise ValueError("entity_type must be 'authors' or 'institutions'")

    url = f"https://api.openalex.org/{entity_type}?search={name}"
    if config.OPENALEX_MAILTO_EMAIL:
        url += f"&mailto={config.OPENALEX_MAILTO_EMAIL}"

    try:
        response = httpx.get(url)
        response.raise_for_status()
        results = response.json().get("results")

        if not results:
            raise ValueError(f"No {entity_type.rstrip('s')} found for query: {name}")

        logger.info(f"Found: {results[0]['display_name']} ({results[0]['id']})")
        return strip_oa_prefix(results[0]["id"])
    except (httpx.HTTPError, httpx.TimeoutException) as e:
        logger.warning(f"Error retrieving {entity_type} ID: {str(e)}. Retrying...")
        raise

query_openalex(endpoint, query, limit=0, save_folder=None)

Get all results from the OpenAlex API for a given endpoint and query.

Parameters:

Name Type Description Default
endpoint str

The API endpoint to query (e.g., "works", "authors").

required
query str

The filter query for the API.

required
limit int

The maximum number of pages (round trips) to retrieve. If 0 (default), all pages are retrieved.

0
save_folder Path | None

Optional folder to save results as a Parquet file.

None
Example
# Get works authored by a specific institution
query_openalex("works", "authorships.institutions.lineage:I135310074,type:types/article", limit=5)

# Get authors affiliated with a specific institution
query_openalex("authors", "last_known_institutions.id:https://openalex.org/I135310074", limit=3)
Source code in bear/crawler.py
def query_openalex(endpoint: str, query: str, limit: int = 0, save_folder: Path | None = None) -> list[dict[str, Any]]:
    """Get all results from the OpenAlex API for a given endpoint and query.

    Args:
        endpoint: The API endpoint to query (e.g., "works", "authors").
        query: The filter query for the API.
        limit: The maximum number of pages (round trips) to retrieve.
               If 0 (default), all pages are retrieved.
        save_folder: Optional folder to save results as a Parquet file.

    Example:
        ```python
        # Get works authored by a specific institution
        query_openalex("works", "authorships.institutions.lineage:I135310074,type:types/article", limit=5)

        # Get authors affiliated with a specific institution
        query_openalex("authors", "last_known_institutions.id:https://openalex.org/I135310074", limit=3)
        ```
    """

    if save_folder is not None:
        save_folder.mkdir(parents=True, exist_ok=True)

    cursor = "*"
    all_results = []
    round_trips = 0
    save_counter = 0
    while True:
        if limit > 0 and round_trips >= limit:
            logger.warning(f"Reached API call limit of {limit} for endpoint '{endpoint}' with query: {query}. Results will be incomplete.")
            break

        cursor, results = _get_page_results(endpoint, query, cursor)
        round_trips += 1

        if not results:
            break
        all_results.extend(results)

        # Save results to Parquet file if specified
        if save_folder and len(all_results) >= 1000:  # Save every 1000 records
            chunk_file = save_folder / f"chunk_{save_counter}.parquet"
            logger.info(f"Saving {len(all_results)} results to {chunk_file}")
            _dump(all_results, chunk_file)
            save_counter += 1
            all_results = []  # Reset for next chunk

        logger.info(f"Retrieved {len(all_results)} results so far for query: {query}")

    if save_folder and all_results:
        chunk_file = save_folder / f"chunk_{save_counter}.parquet"
        logger.info(f"Saving final {len(all_results)} results to {chunk_file}")
        _dump(all_results, chunk_file)
    return all_results

crawl(institution, save_path=Path('tmp/openalex_data'), author_api_call_limit=0, authors_limit=0, per_author_work_api_call_limit=0, skip_pulling_authors=False, skip_existing_works=True)

Crawl the OpenAlex API and dump the results to local storage.

Source code in bear/crawler.py
def crawl(
    institution: str,
    save_path: Path = Path("tmp/openalex_data"),
    author_api_call_limit: int = 0,
    authors_limit: int = 0,
    per_author_work_api_call_limit: int = 0,
    skip_pulling_authors: bool = False,
    skip_existing_works: bool = True,
) -> None:
    """Crawl the OpenAlex API and dump the results to local storage."""

    save_path.mkdir(parents=True, exist_ok=True)

    # Get existing authors if skip_existing is True
    if skip_existing_works and (save_path / "authors").exists():
        existing_authors = [p.name for p in Path("tmp/openalex_data/works/").glob("*/")]

    if not skip_pulling_authors:
        # Get all authors affiliated with the institution
        institution_id = get_openalex_id("institutions", institution)

        logger.info(f"Fetching authors for institution ID: {institution_id}")
        query_authors = f"last_known_institutions.id:{institution_id}"
        query_openalex(endpoint="authors", query=query_authors, limit=author_api_call_limit, save_folder=save_path / "authors")
        authors = pd.read_parquet(save_path / "authors").to_dict(orient="records")
    else:
        # If skipping pulling authors, use existing authors from previous runs
        if (save_path / "authors").exists():
            authors = pd.read_parquet(save_path / "authors").to_dict(orient="records")
        else:
            logger.warning("Skipping pulling authors, but no existing authors found.")
            authors = []

    # Get all works authored by the institution's authors
    if authors_limit > 0:
        authors = authors[:authors_limit]

    if skip_existing_works:
        authors = [a for a in authors if strip_oa_prefix(a["id"]) not in existing_authors]

    for author in tqdm(authors):
        try:
            author_id = strip_oa_prefix(author["id"])

            query_works = f"authorships.author.id:{author_id}"
            query_openalex(endpoint="works", query=query_works, limit=per_author_work_api_call_limit, save_folder=save_path / "works" / author_id)
        except Exception as e:
            logger.error(f"Error processing author {author_id}: {str(e)}")
            continue

OpenAlex Data Crawler

The crawler module handles data collection from the OpenAlex API.

Features

  • Institution-specific data crawling
  • Rate limiting and retry logic
  • Parallel processing support
  • Data validation and cleaning

Usage

uv run bear/crawler.py <institution-id>

Data Output

Crawled data is saved in parquet format in the tmp/openalex_data/ directory.