Source code for mardi_importer.integrator.MardiEntities

import re
import sqlalchemy as db
from sqlalchemy import and_, case

from mardiclient import MardiItem, MardiProperty
from wikibaseintegrator.wbi_exceptions import ModificationFailed
from wikibaseintegrator.datatypes import ExternalID
from wikibaseintegrator.wbi_enums import ActionIfExists
from mardi_importer.importer import ImporterException

[docs] class MardiItemEntity(MardiItem):
[docs] def new(self, **kwargs): return MardiItemEntity(api=self.api, **kwargs)
[docs] def get(self, entity_id, **kwargs): json_data = super(MardiItemEntity, self)._get(entity_id=entity_id, **kwargs) return MardiItemEntity(api=self.api).from_json(json_data=json_data['entities'][entity_id])
[docs] def get_QID(self, alias=False): """Creates a list of QID of all items in the local wikibase with the same label Returns: QIDs (list): List of QID """ label = "" if 'en' in self.labels.values: label = self.labels.values['en'].value label = bytes(label, "utf-8") is_truncated = False if len(label) > 250: label = label[:250] is_truncated = True def query_wikidata_table(field_type): # field_type = 1 : Label # field_type = 2 : Alias # see: https://doc.wikimedia.org/Wikibase/REL1_41/php/docs_sql_wbt_type.html entity_id = [] with self.api.engine.connect() as connection: metadata = db.MetaData() try: wbt_item_terms = db.Table( "wbt_item_terms", metadata, autoload_with=connection ) wbt_term_in_lang = db.Table( "wbt_term_in_lang", metadata, autoload_with=connection ) wbt_text_in_lang = db.Table( "wbt_text_in_lang", metadata, autoload_with=connection ) wbt_text = db.Table( "wbt_text", metadata, autoload_with=connection ) query = (db.select(wbt_item_terms.columns.wbit_item_id) .join(wbt_term_in_lang, wbt_item_terms.columns.wbit_term_in_lang_id == wbt_term_in_lang.columns.wbtl_id) .join(wbt_text_in_lang, wbt_term_in_lang.columns.wbtl_text_in_lang_id == wbt_text_in_lang.columns.wbxl_id) .join(wbt_text, wbt_text.columns.wbx_id == wbt_text_in_lang.columns.wbxl_text_id) .where(and_( case( (is_truncated, wbt_text.columns.wbx_text.like(label + b"%")), else_=wbt_text.columns.wbx_text == label), wbt_term_in_lang.columns.wbtl_type_id == field_type, wbt_text_in_lang.columns.wbxl_language == bytes("en", "utf-8")))) results = connection.execute(query).fetchall() if results: for result in results: entity_id.append(f"Q{str(result[0])}") except Exception as e: raise ImporterException( "Error attempting to read mappings from database\n{}".format(e) ) return entity_id entity_id = query_wikidata_table(field_type=1) if alias: entity_id += query_wikidata_table(field_type=2) return entity_id
[docs] class MardiPropertyEntity(MardiProperty):
[docs] def new(self, **kwargs): return MardiPropertyEntity(api=self.api, **kwargs)
[docs] def get(self, entity_id, **kwargs): json_data = super(MardiPropertyEntity, self)._get(entity_id=entity_id, **kwargs) return MardiPropertyEntity(api=self.api).from_json(json_data=json_data['entities'][entity_id])
[docs] def get_PID(self): """Returns the PID of the property with the same label """ label = "" if 'en' in self.labels.values: label = self.labels.values['en'].value with self.api.engine.connect() as connection: metadata = db.MetaData() try: wbt_property_terms = db.Table( "wbt_property_terms", metadata, autoload_with=connection ) wbt_term_in_lang = db.Table( "wbt_term_in_lang", metadata, autoload_with=connection ) wbt_text_in_lang = db.Table( "wbt_text_in_lang", metadata, autoload_with=connection ) wbt_text = db.Table( "wbt_text", metadata, autoload_with=connection ) query = (db.select(wbt_property_terms.columns.wbpt_property_id) .join(wbt_term_in_lang, wbt_term_in_lang.columns.wbtl_id == wbt_property_terms.columns.wbpt_term_in_lang_id) .join(wbt_text_in_lang, wbt_term_in_lang.columns.wbtl_text_in_lang_id == wbt_text_in_lang.columns.wbxl_id) .join(wbt_text, wbt_text.columns.wbx_id == wbt_text_in_lang.columns.wbxl_text_id) .where(and_(wbt_text.columns.wbx_text == bytes(label, "utf-8"), wbt_term_in_lang.columns.wbtl_type_id == 1, wbt_text_in_lang.columns.wbxl_language == bytes("en", "utf-8")))) prefix = "P" results = connection.execute(query).fetchall() if results: for result in results: return f"P{str(result[0])}" except Exception as e: raise ImporterException( "Error attempting to read mappings from database\n{}".format(e) )