Source code for mardi_importer.cran.RPackage

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from mardi_importer.integrator import MardiIntegrator, MardiItemEntity
from mardi_importer.publications import (ArxivPublication, 
                                         CrossrefPublication,
                                         ZenodoResource, 
                                         Author)
from wikibaseintegrator.wbi_enums import ActionIfExists
from wikibaseintegrator.wbi_helpers import search_entities, remove_claims

from dataclasses import dataclass, field
from typing import Optional, Dict, List, Tuple
from io import StringIO

from bs4 import BeautifulSoup
import pandas as pd
import requests
import re

import logging
log = logging.getLogger('CRANlogger')

[docs] @dataclass class RPackage: """Class to manage R package items in the local Wikibase instance. Attributes: date: Date of publication label: Package name description: Title of the R package long_description: Detailed description of the R package url: URL to the CRAN repository version: Version of the R package versions: Previous published versions author: Author(s) of the package license: Software license dependency: Dependencies to R and other packages imports: Imported R packages maintainer: Software maintainer _QID: Package QID integrator: API to MaRDI integrator """ date: str label: str description: str api: MardiIntegrator long_description: str = "" url: str = "" version: str = "" versions: List[Tuple[str, str]] = field(default_factory=list) authors: List[Author] = field(default_factory=list) license_data: List[Tuple[str, str]] = field(default_factory=list) dependencies: List[Tuple[str, str]] = field(default_factory=list) imports: List[Tuple[str, str]] = field(default_factory=list) maintainer: str = "" author_pool: List[Author] = field(default_factory=list) crossref_publications: List[CrossrefPublication] = field(default_factory=list) arxiv_publications: List[ArxivPublication] = field(default_factory=list) zenodo_resources: List[ZenodoResource] = field(default_factory=list) _QID: str = "" _item: MardiItemEntity = None @property def QID(self) -> str: """Return the QID of the R package in the knowledge graph. Searches for an item with the package label in the Wikibase SQL tables and returns the QID if a matching result is found. Returns: str: The entity QID representing the R package. """ self._QID = self._QID or self.item.is_instance_of('wd:Q73539779') return self._QID @property def item(self) -> MardiItemEntity: """Return the integrator Item representing the R package. Adds also the label and description of the package. Returns: MardiItemEntity: Integrator item """ if not self._item: self._item = self.api.item.new() self._item.labels.set(language="en", value=self.label) description = self.description if self.label == self.description: description += " (R Package)" self._item.descriptions.set( language="en", value=description ) return self._item
[docs] def exists(self) -> str: """Checks if an item corresponding to the R package already exists. Returns: str: Entity ID """ if self.QID: self._item = self.api.item.get(entity_id=self.QID) return self.QID
[docs] def is_updated(self) -> bool: """Checks if the Item corresponding to the R package is up to date. Compares the last update property in the local knowledge graph with the publication date imported from CRAN. Returns: bool: **True** if both dates coincide, **False** otherwise. """ return self.date == self.get_last_update()
[docs] def pull(self): """Imports metadata from CRAN corresponding to the R package. Imports **Version**, **Dependencies**, **Imports**m **Authors**, **Maintainer** and **License** and saves them as instance attributes. """ self.url = f"https://CRAN.R-project.org/package={self.label}" try: page = requests.get(self.url) soup = BeautifulSoup(page.content, 'lxml') except: log.warning(f"Package {self.label} package not found in CRAN.") return None else: if soup.find_all('table'): self.long_description = soup.find_all('p')[0].get_text() or "" self.parse_publications(self.long_description) self.long_description = re.sub("\n", "", self.long_description).strip() self.long_description = re.sub("\t", "", self.long_description).strip() table = soup.find_all('table')[0] package_df = self.clean_package_list(table) if "Version" in package_df.columns: self.version = package_df.loc[1, "Version"] if "Author" in package_df.columns: self.authors = package_df.loc[1, "Author"] if "License" in package_df.columns: self.license_data = package_df.loc[1, "License"] if "Depends" in package_df.columns: self.dependencies = package_df.loc[1, "Depends"] if "Imports" in package_df.columns: self.imports = package_df.loc[1, "Imports"] if "Maintainer" in package_df.columns: self.maintainer = package_df.loc[1, "Maintainer"] self.get_versions() else: log.warning(f"Metadata table not found in CRAN. Package has probably been archived.") return self
[docs] def create(self) -> None: """Create a package in the Wikibase instance. This function pulls the package, inserts its claims, and writes it to the Wikibase instance. Returns: None """ package = self.pull() if package: package = package.insert_claims().write() if package: log.info(f"Package created with QID: {package['QID']}.") #print('package created') else: log.info(f"Package could not be created.")
#print('package not created')
[docs] def write(self) -> Optional[Dict[str, str]]: """Write the package item to the Wikibase instance. If the item has claims, it will be written to the Wikibase instance. If the item is successfully written, a dictionary with the QID of the item will be returned. Returns: Optional[Dict[str, str]]: A dictionary with the QID of the written item if successful, or None otherwise. """ if self.item.claims: item = self.item.write() if item: return {'QID': item.id}
[docs] def insert_claims(self): # Instance of: R package self.item.add_claim("wdt:P31", "wd:Q73539779") # Programmed in: R self.item.add_claim("wdt:P277", "wd:Q206904") # Long description prop_nr = self.api.get_local_id_by_label("description", "property") self.item.add_claim(prop_nr, self.long_description) # Last update date self.item.add_claim("wdt:P5017", f"+{self.date}T00:00:00Z") # Software version identifiers for version, publication_date in self.versions: qualifier = [self.api.get_claim("wdt:P577", publication_date)] self.item.add_claim("wdt:P348", version, qualifiers=qualifier) if self.version: qualifier = [self.api.get_claim("wdt:P577", f"+{self.date}T00:00:00Z")] self.item.add_claim("wdt:P348", self.version, qualifiers=qualifier) # Disambiguate Authors and create corresponding Author items self.author_pool = Author.disambiguate_authors(self.author_pool) # Authors for author in self.authors: author.pull_QID(self.author_pool) self.item.add_claim("wdt:P50", author.QID) # Maintainer self.maintainer.pull_QID(self.author_pool) self.item.add_claim("wdt:P126", self.maintainer.QID) # Licenses if self.license_data: claims = self.process_claims(self.license_data, 'wdt:P275', 'wdt:P9767') self.item.add_claims(claims) # Dependencies if self.dependencies: claims = self.process_claims(self.dependencies, 'wdt:P1547', 'wdt:P348') self.item.add_claims(claims) # Imports if self.imports: prop_nr = self.api.get_local_id_by_label("imports", "property") claims = self.process_claims(self.imports, prop_nr, 'wdt:P348') self.item.add_claims(claims) # Related publications and sources cites_work = "wdt:P2860" for publications in [self.crossref_publications, self.arxiv_publications, self.zenodo_resources]: for publication in publications: for author in publication.authors: author.pull_QID(self.author_pool) publication.create() self.item.add_claim(cites_work, publication.QID) # CRAN Project self.item.add_claim("wdt:P5565", self.label) # Wikidata QID wikidata_QID = self.get_wikidata_QID() if wikidata_QID: self.item.add_claim("Wikidata QID", wikidata_QID) return self
[docs] def update(self): """Updates existing WB item with the imported metadata from CRAN. The metadata corresponding to the package is first pulled from CRAN and saved as instance attributes through :meth:`pull`. The statements that do not coincide with the locally saved information are updated or subsituted with the updated information. Uses :class:`mardi_importer.wikibase.WBItem` to update the item corresponding to the R package. Returns: str: ID of the updated R package. """ if self.pull(): # Obtain current Authors current_authors = self.item.get_value('wdt:P50') for author_qid in current_authors: author_item = self.api.item.get(entity_id=author_qid) author_label = str(author_item.labels.get('en')) current_author = Author(self.api, name=author_label) current_author._QID = author_qid self.author_pool += [current_author] # Disambiguate Authors and create corresponding Author items self.author_pool = Author.disambiguate_authors(self.author_pool) # GUID to remove remove_guid = [] props_to_delete = ['wdt:P50', 'wdt:P275', 'wdt:P1547', 'imports', 'wdt:P2860'] for prop_str in props_to_delete: prop_nr = self.api.get_local_id_by_label(prop_str, 'property') for claim in self.item.claims.get(prop_nr): remove_guid.append(claim.id) for guid in remove_guid: remove_claims(guid, login=self.api.login, is_bot=True) # Restart item state self.exists() if self.item.descriptions.values.get('en') != self.description: description = self.description if self.label == self.description: description += " (R Package)" self.item.descriptions.set( language="en", value=description ) # Long description self.item.add_claim("description", self.long_description, action="replace_all") # Last update date self.item.add_claim("wdt:P5017", f"+{self.date}T00:00:00Z", action="replace_all") # Software version identifiers for version, publication_date in self.versions: qualifier = [self.api.get_claim("wdt:P577", publication_date)] self.item.add_claim("wdt:P348", version, qualifiers=qualifier) if self.version: qualifier = [self.api.get_claim("wdt:P577", f"+{self.date}T00:00:00Z")] self.item.add_claim("wdt:P348", self.version, qualifiers=qualifier) # Authors for author in self.authors: author.pull_QID(self.author_pool) self.item.add_claim("wdt:P50", author.QID) # Maintainer self.maintainer.pull_QID(self.author_pool) self.item.add_claim("wdt:P126", self.maintainer.QID, action="replace_all") # Licenses if self.license_data: claims = self.process_claims(self.license_data, 'wdt:P275', 'wdt:P9767') self.item.add_claims(claims) # Dependencies if self.dependencies: claims = self.process_claims(self.dependencies, 'wdt:P1547', 'wdt:P348') self.item.add_claims(claims) # Imports if self.imports: prop_nr = self.api.get_local_id_by_label("imports", "property") claims = self.process_claims(self.imports, prop_nr, 'wdt:P348') self.item.add_claims(claims) # Related publications and sources cites_work = "wdt:P2860" for publications in [self.crossref_publications, self.arxiv_publications, self.zenodo_resources]: for publication in publications: for author in publication.authors: author.pull_QID(self.author_pool) publication.create() self.item.add_claim(cites_work, publication.QID) # CRAN Project self.item.add_claim("wdt:P5565", self.label, action="replace_all") # Wikidata QID wikidata_QID = self.get_wikidata_QID() if wikidata_QID: self.item.add_claim("Wikidata QID", wikidata_QID, action="replace_all") package = self.write() if package: print(f"Package with QID updated: {package['QID']}.") else: print(f"Package could not be updated.")
[docs] def process_claims(self, data, prop_nr, qualifier_nr=None): claims = [] for value, qualifier_value in data: qualifier_prop_nr = ( 'wdt:P2699' if qualifier_value.startswith('https') else qualifier_nr ) qualifier = ( [self.api.get_claim(qualifier_prop_nr, qualifier_value)] if qualifier_value else [] ) claims.append(self.api.get_claim(prop_nr, value, qualifiers=qualifier)) return claims
[docs] def parse_publications(self, description): """Extracts the DOI identification of related publications. Identifies the DOI of publications that are mentioned using the format *doi:* or *arXiv:* in the long description of the R package. Returns: List: List containing the wikibase IDs of mentioned publications. """ doi_references = re.findall('<doi:(.*?)>', description) arxiv_references = re.findall('<arXiv:(.*?)>', description) zenodo_references = re.findall('<zenodo:(.*?)>', description) doi_references = list(map(lambda x: x[:-1] if x.endswith('.') else x, doi_references)) arxiv_references = list(map(lambda x: x[:-1] if x.endswith('.') else x, arxiv_references)) zenodo_references = list(map(lambda x: x[:-1] if x.endswith('.') else x, zenodo_references)) crossref_references = [] for doi in doi_references: doi = doi.strip().lower() if re.search('10.48550/', doi): arxiv_id = doi.replace(":",".") arxiv_id = arxiv_id.replace('10.48550/arxiv.', '') arxiv_references.append(arxiv_id.strip()) elif re.search('10.5281/', doi): zenodo_id = doi.replace(":",".") zenodo_id = doi.replace('10.5281/zenodo.', '') zenodo_references.append(zenodo_id.strip()) else: crossref_references.append(doi) for doi in crossref_references: publication = CrossrefPublication(self.api, doi) self.author_pool += publication.authors self.crossref_publications.append(publication) for arxiv_id in arxiv_references: arxiv_id = arxiv_id.replace(":",".") publication = ArxivPublication(self.api, arxiv_id) if publication.title != "Error": self.author_pool += publication.authors self.arxiv_publications.append(publication) for zenodo_id in zenodo_references: zenodo_id = zenodo_id.replace(":",".") publication = ZenodoResource(self.api, zenodo_id) self.author_pool += publication.authors self.zenodo_resources.append(publication)
[docs] def get_last_update(self): """Returns the package last update date saved in the Wikibase instance. Returns: str: Last update date in format DD-MM-YYYY. """ last_update = self.item.get_value("wdt:P5017") return last_update[0][1:11] if last_update else None
[docs] def clean_package_list(self, table_html): """Processes raw imported data from CRAN to enable the creation of items. - Package dependencies are splitted at the comma position. - License information is processed using the :meth:`parse_license` method. - Author information is processed using the :meth:`parse_authors` method. - Maintainer information is processed using the :meth:`parse_maintainer` method. Args: table_html: HTML code obtained with BeautifulSoup corresponding to the table containing the metadata of the R package imported from CRAN. Returns: (Pandas dataframe): Dataframe with processed data from a single R package including columns: **Version**, **Author**, **License**, **Depends**, **Imports** and **Maintainer**. """ package_df = pd.read_html(StringIO(str(table_html))) package_df = package_df[0].set_index(0).T package_df.columns = package_df.columns.str[:-1] if "Depends" in package_df.columns: package_df["Depends"] = package_df["Depends"].apply(self.parse_software) if "Imports" in package_df.columns: package_df["Imports"] = package_df["Imports"].apply(self.parse_software) if "License" in package_df.columns: package_df["License"] = package_df["License"].apply(self.parse_license) if "Author" in package_df.columns: package_df["Author"] = str(table_html.find("td", text="Author:").find_next_sibling("td")).replace('\n', '').replace('\r', '') package_df["Author"] = package_df["Author"].apply(self.parse_authors) if "Maintainer" in package_df.columns: package_df["Maintainer"] = package_df["Maintainer"].apply(self.parse_maintainer) return package_df
[docs] def parse_software(self, software_str: str) -> List[Tuple[str, str]]: """Processes the dependency and import information of each R package. This includes: - Extracting the version information of each dependency/import if provided. - Providing the Item QID given the dependency/import label. - Creating a new Item if the dependency/import is not found in the local knowledge graph. Returns: List[Tuple[str, str]]: List of tuples including software QID and version. """ if pd.isna(software_str): return [] software_list = str(software_str).split(", ") software_tuples = [] for software_string in software_list: software_version = re.search("\((.*?)\)", software_string) software_version = software_version.group(1) if software_version else "" software_name = re.sub("\(.*?\)", "", software_string).strip() # Instance of R package if software_name == "R": # Software = R software_QID = self.api.query("local_id", "Q206904") else: item = self.api.item.new() item.labels.set(language="en", value=software_name) software_id = item.is_instance_of("wd:Q73539779") if software_id: # Software = R package software_QID = software_id else: # Software = New instance of R package item.add_claim("wdt:P31", "wd:Q73539779") item.add_claim("wdt:P277", "wd:Q206904") software_QID = item.write().id software_tuples.append((software_QID, software_version)) return software_tuples
[docs] def parse_license(self, x: str) -> List[Tuple[str, str]]: """Splits string of licenses. Takes into account that licenses are often not uniformly listed. Characters \|, + and , are used to separate licenses. Further details on each license are often included in square brackets. The concrete License is identified and linked to the corresponding item that has previously been imported from Wikidata. Further license information, when provided between round or square brackets, is added as a qualifier. If a file license is mentioned, the linked to the file license in CRAN is added as a qualifier. Args: x (str): String imported from CRAN representing license information. Returns: List[Tuple[str, str]]: List of license tuples. Each tuple contains the license QID as the first element and the license qualifier as the second element. """ if pd.isna(x): return [] license_list = [] licenses = str(x).split(" | ") i = 0 while i in range(len(licenses)): if not re.findall(r"\[", licenses[i]) or ( re.findall(r"\[", licenses[i]) and re.findall(r"\]", licenses[i]) ): license_list.append(licenses[i]) i += 1 elif re.findall(r"\[", licenses[i]) and not re.findall( r"\]", licenses[i] ): j = i + 1 license_aux = licenses[i] closed = False while j < len(licenses) and not closed: license_aux += " | " license_aux += licenses[j] if re.findall(r"\]", licenses[j]): closed = True j += 1 license_list.append(license_aux) i = j split_list = [] for item in license_list: items = item.split(" + ") i = 0 while i in range(len(items)): if not re.findall(r"\[", items[i]) or ( re.findall(r"\[", items[i]) and re.findall(r"\]", items[i]) ): split_list.append(items[i]) i += 1 elif re.findall(r"\[", items[i]) and not re.findall(r"\]", items[i]): j = i + 1 items_aux = items[i] closed = False while j < len(items) and not closed: items_aux += " + " items_aux += items[j] if re.findall(r"\]", items[j]): closed = True j += 1 split_list.append(items_aux) i = j license_list = list(dict.fromkeys(split_list)) license_tuples = [] for license_str in license_list: license_qualifier = "" if re.findall(r"\(.*?\)", license_str): qualifier_groups = re.search(r"\((.*?)\)", license_str) license_qualifier = qualifier_groups.group(1) license_aux = re.sub(r"\(.*?\)", "", license_str) if re.findall(r"\[.*?\]", license_aux): qualifier_groups = re.search(r"\[(.*?)\]", license_str) license_qualifier = qualifier_groups.group(1) license_str = re.sub(r"\[.*?\]", "", license_aux) else: license_str = license_aux elif re.findall(r"\[.*?\]", license_str): qualifier_groups = re.search(r"\[(.*?)\]", license_str) license_qualifier = qualifier_groups.group(1) license_str = re.sub(r"\[.*?\]", "", license_str) license_str = license_str.strip() if license_str in ["file LICENSE", "file LICENCE"]: license_qualifier = f"https://cran.r-project.org/web/packages/{self.label}/LICENSE" license_QID = self.get_license_QID(license_str) license_tuples.append((license_QID, license_qualifier)) return license_tuples
[docs] def parse_authors(self, x): """Splits the string corresponding to the authors into a dictionary. Author information in CRAN is not registered uniformly. This function parses the imported string and returns just the names of the individuals that can be unequivocally identified as authors (i.e. they are followed by the *[aut]* abbreviation). Generally, authors in CRAN are indicated with the abbreviation *[aut]*. When no abbreviations are included, only the first individual is imported to Wikibase (otherwise it can often not be established whether information after the first author refers to another individual, an institution, a funder, etc.) Args: x (String): String imported from CRAN representing author information. Returns: (Dict): Dictionary of authors and corresponding ORCID ID, if provided. """ td_match = re.match(r'<td>(.*?)</td>', x) if td_match: x = td_match.groups()[0] x = re.sub("<img alt.*?a>", "", x) # Delete img tags x = re.sub(r"\(.*?\)", "", x) # Delete text in brackets x = re.sub(r'"', "", x) # Delete quotation marks x = re.sub("\t", "", x) # Delete tabs x = re.sub("ORCID iD", "", x) # Delete orcid id refs author_list = re.findall(r".*?\]", x) authors = [] if author_list: for author in author_list: labels = re.findall(r"\[.*?\]", author) if labels: is_author = re.findall("aut", labels[0]) if is_author: orcid = re.findall(r"\d{4}-\d{4}-\d{4}-.{4}", author) if orcid: orcid = orcid[0] author = re.sub(r"<a href=.*?>", "", author) author = re.sub(r"\[.*?\]", "", author) author = re.sub(r"^\s?,", "", author) author = re.sub(r"^\s?and\s?", "", author) author = re.sub( r"[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+", "", author ) author = author.strip() multiple_words = author.split(" ") if len(multiple_words) > 1: if author: authors.append(Author(self.api, author, orcid)) else: authors_comma = x.split(", ") authors_and = x.split(" and ") if len(authors_and) > len(authors_comma): author = re.sub( r"[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+", "", authors_and[0] ) else: author = re.sub( r"[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+", "", authors_comma[0], ) if len(author.split(" ")) > 5 or re.findall(r"[@\(\)\[\]&]", author): author = "" if author: authors.append(Author(self.api, author)) self.author_pool += authors return authors
[docs] def parse_maintainer(self, name: str) -> str: """Remove unnecessary information from maintainer string. Args: x (str): String imported from CRAN which may contain e-mail address and comments within brackets Returns: (str): Name of the maintainer """ if pd.isna(name): return name quotes = re.match(r'"(.*?)"', name) if quotes: name = quotes.groups()[0] name = re.sub(r"<.*?>", "", name) name = re.sub(r"\(.*?\)", "", name) name = name.strip() name = name.split(',') maintainer = Author(self.api, name=name[0]) self.author_pool += [maintainer] return maintainer
[docs] def get_license_QID(self, license_str: str) -> str: """Returns the Wikidata item ID corresponding to a software license. The same license is often denominated in CRAN using differents names. This function returns the wikidata item ID corresponding to a single unique license that is referenced in CRAN under different names (e.g. *Artistic-2.0* and *Artistic License 2.0* both refer to the same license, corresponding to item *Q14624826*). Args: license_str (str): String corresponding to a license imported from CRAN. Returns: (str): Wikidata item ID. """ def get_license(label: str) -> str: license_item = self.api.item.new() license_item.labels.set(language="en", value=label) return license_item.is_instance_of("wd:Q207621") license_mapping = { "ACM": get_license("ACM Software License Agreement"), "AGPL":"wd:Q28130012", "AGPL-3": "wd:Q27017232", "Apache License": "wd:Q616526", "Apache License 2.0": "wd:Q13785927", "Apache License version 1.1": "wd:Q17817999", "Apache License version 2.0": "wd:Q13785927", "Artistic-2.0": "wd:Q14624826", "Artistic License 2.0": "wd:Q14624826", "BSD 2-clause License": "wd:Q18517294", "BSD 3-clause License": "wd:Q18491847", "BSD_2_clause": "wd:Q18517294", "BSD_3_clause": "wd:Q18491847", "BSL": "wd:Q2353141", "BSL-1.0": "wd:Q2353141", "CC0": "wd:Q6938433", "CC BY 4.0": "wd:Q20007257", "CC BY-SA 4.0": "wd:Q18199165", "CC BY-NC 4.0": "wd:Q34179348", "CC BY-NC-SA 4.0": "wd:Q42553662", "CeCILL": "wd:Q1052189", "CeCILL-2": "wd:Q19216649", "Common Public License Version 1.0": "wd:Q2477807", "CPL-1.0": "wd:Q2477807", "Creative Commons Attribution 4.0 International License": "wd:Q20007257", "EPL": "wd:Q1281977", "EUPL": "wd:Q1376919", "EUPL-1.1": "wd:Q1376919", "file LICENCE": get_license("File License"), "file LICENSE": get_license("File License"), "FreeBSD": "wd:Q34236", "GNU Affero General Public License": "wd:Q1131681", "GNU General Public License": "wd:Q7603", "GNU General Public License version 2": "wd:Q10513450", "GNU General Public License version 3": "wd:Q10513445", "GPL": "wd:Q7603", "GPL-2": "wd:Q10513450", "GPL-3": "wd:Q10513445", "LGPL": "wd:Q192897", "LGPL-2": "wd:Q23035974", "LGPL-2.1": "wd:Q18534390", "LGPL-3": "wd:Q18534393", "Lucent Public License": "wd:Q6696468", "MIT": "wd:Q334661", "MIT License": "wd:Q334661", "Mozilla Public License 1.1": "wd:Q26737735", "Mozilla Public License 2.0": "wd:Q25428413", "Mozilla Public License Version 2.0": "wd:Q25428413", "MPL": "wd:Q308915", "MPL version 1.0": "wd:Q26737738", "MPL version 1.1": "wd:Q26737735", "MPL version 2.0": "wd:Q25428413", "MPL-1.1": "wd:Q26737735", "MPL-2.0": "wd:Q25428413", "Unlimited": get_license("Unlimited License"), } license_info = license_mapping.get(license_str) if callable(license_info): return license_info() else: return license_info
[docs] def get_wikidata_QID(self) -> Optional[str]: """Get the Wikidata QID for the R package. Searches for the R package in Wikidata using its label. Retrieves the QID of matching entities and checks if there is an instance of an R package. If so, returns the QID. Returns: Optional[str]: The Wikidata QID of the R package if found, or None otherwise. """ results = search_entities( search_string=self.label, mediawiki_api_url='https://www.wikidata.org/w/api.php' ) for result in results: item = self.api.item.get( entity_id=result, mediawiki_api_url='https://www.wikidata.org/w/api.php' ) if 'P31' in item.claims.get_json().keys(): instance_claims = item.claims.get('P31') if instance_claims: for claim in instance_claims: claim = claim.get_json() if claim['mainsnak']['datatype'] == "wikibase-item": # If instance of R package if 'datavalue' in claim['mainsnak'].keys(): if claim['mainsnak']['datavalue']['value']['id'] == "Q73539779": return result
[docs] def get_versions(self): url = f"https://cran.r-project.org/src/contrib/Archive/{self.label}" try: page = requests.get(url) soup = BeautifulSoup(page.content, 'lxml') except: log.warning(f"Version page for package {self.label} not found.") else: if soup.find_all('table'): table = soup.find_all('table')[0] versions_df = pd.read_html(StringIO(str(table))) versions_df = versions_df[0] versions_df = versions_df.drop(columns=['Unnamed: 0', 'Size', 'Description']) versions_df = versions_df.drop(index= [0, 1]) for _, row in versions_df.iterrows(): name = row['Name'] publication_date = row['Last modified'] if isinstance(name, str): version = re.sub(f'{self.label}_', '', name) version = re.sub('.tar.gz', '', version) publication_date = publication_date.split()[0] publication_date = f"+{publication_date}T00:00:00Z" self.versions.append((version, publication_date))