#!/usr/bin/python3 from sys import argv import logging from time import localtime, mktime, time import requests from base64 import b64decode try: from sqlalchemy import Table, MetaData, Integer, BigInteger, String, Column, Table, ForeignKey, create_engine, select from sqlalchemy.orm import declarative_base, relationship, Session except ModuleNotFoundError: raise ModuleNotFoundError("emerge dev-python/sqlalchemy or pip install SQLAlchemy") try: from bs4 import BeautifulSoup, FeatureNotFound except ModuleNotFoundError: raise ModuleNotFoundError("emerge dev-python/beautifulsoup4 or pip install beautifulsoup4") if len(argv) != 1+2: raise ValueError("1st argument specifies the db URL in sqlalchemy format. example for sqlite: sqlite:///db, 2nd argument is operator contact that's sent via http in user-agent, for example mailto:email@address") operator_contact = argv[2] engine = create_engine(argv[1], echo=True, future=True) Base = declarative_base() class Book(Base): __tablename__ = "books" isbn = Column(BigInteger, primary_key=True, nullable=False, doc="book isbn. found in URL http://www/isbn/987 and in acsm: resource, dc:identifier (sometimes not), thumbnailURL") title = Column(String, nullable=True, doc="title of the book, dcc:title in acsm") creator = Column(String, nullable=True, doc="author of the book, dc:creator in acsm") publisher = Column(String, nullable=True, doc="publisher of the book, dc:publisher in acsm") identifier = Column(String, nullable=True, doc="if dc:identifier can't be derived from isbn, it's stored here. if dc:identifier element is missing, a literal string noidentifier is stored.") thumbnail_extension = Column(String, nullable=True, doc="thumbnails come in predictable URLs, derived from ISBN, apart from the extension. I've observed both jpg and png, may be None if there's no thumbnailURL element") format = Column(String, nullable=True, doc="format of the file. I've seen application/pdf and application/epub+zip") language = Column(String, nullable=True, doc="language of the book. I've seen sl.") borrows = relationship("Borrow", back_populates="book"); def __repr__(self): return f"Book(isbn={self.isbn!r}, title={self.title!r}, creator={self.creator!r}, publisher={self.publisher!r})" class Borrow(Base): __tablename__ = "borrows" id = Column(Integer, primary_key=True, nullable=False, doc="id in transaction element of acsm or in filename of acsm on http") isbn = Column(ForeignKey("books.isbn"), nullable=False, doc="foreign key that leads to a book") purchase = Column(String, nullable=True, doc="acsm purchase element: iso8601 of purchase of book, including timezone") expiration = Column(String, nullable=True, doc="acsm expiration element: iso8601 of expiration of acsm, including timezone") obtained = Column(BigInteger, nullable=False, doc="UNIX timestamp when this borrow was obtained as acsm from http") book = relationship("Book", back_populates="borrows") def __repr__(self): return f"Borrow(id={self.id!r}, isbn={self.isbn!r}, purchase={self.purchase!r}, expiration={self.expiration!r}, obtained=mktime({localtime(self.obtained)!r}), book={self.book!r})" logging.basicConfig(level=logging.NOTSET) logger = logging.getLogger(argv[0]) logger.debug("welcome to %s", argv[0]) Base.metadata.create_all(engine) starting_acsm_id = 177238 guaranteed_large_acsm_id = 1170487 logger.debug(f"created metadata.") force_acsm_id = 0 valid_acsms = 0 only_isbn_acsms = 0 failed_acsms = 0 failed_acsms_not200 = 0 failed_acsms_not200_in_a_row = 0 hmfan2iarts = 100 # how many failed acsms not 200 in a row to stop try: with Session(engine) as session: while True: if force_acsm_id != 0: acsm_id = force_acsm_id force_acsm_id = 0 else: borrow = session.scalars(select(Borrow).order_by(Borrow.id.desc()).limit(1)).first() acsm_id = starting_acsm_id if borrow is None: logger.info(f"oooh, it looks like this is a fresh start, db contains no borrows. I'll start with hardcoded acsm id {starting_acsm_id}") else: logger.info(f"continuing from latest {borrow}") acsm_id = borrow.id+1 r = requests.get(f"https://www.biblos.si/izposoja/prenesi/{acsm_id}.acsm", headers={"User-Agent": f"python-requests/{requests.__version__} (biblos-stat acsm scraper, contact operator: {operator_contact})"}) r.encoding = "UTF-8" if (r.status_code == 200): failed_acsms_not200_in_a_row = 0 if r.status_code != 200: logger.warning(f"received http response with error code not 200 (it is {r.status_code}). if this continues for {hmfan2iarts-failed_acsms_not200_in_a_row} more requests, I'll assume there are no more borrows on the server.") failed_acsms_not200 += 1 failed_acsms_not200_in_a_row += 1 force_acsm_id = acsm_id+1 if failed_acsms_not200_in_a_row == hmfan2iarts: logger.info(f"we are done for now, as server responded with {r.status_code} for queried acsm id {acsm_id}, which means {hmfan2iarts} concurrent responses that are not 200.") if acsm_id < guaranteed_large_acsm_id: logger.error(f"this shouldn't happen. I have a hardcoded value that tells me that at time of program writing, acsm id {guaranteed_large_acsm_id} did exist on the server. dying anyways.") break elif r.text.startswith("Napaka pri prenosu"): logger.warning(f"'napaka pri prenosu' received from http for acsm id {acsm_id}, skipping") force_acsm_id = acsm_id+1 elif r.text.startswith(''): logger.warning(f"received urllink parameter syntax error with no usable data for acsm {acsm_id}, so I did not store anything") force_acsm_id = acsm_id+1 if acsm_id >= 199999 and acsm_id <= 999999: logger.warning(f"on 2022-11-07, library removed access for acsms 200000-999999. skipping to 1000000") force_acsm_id = 1000000 failed_acsms += 1 else: try: acsm = BeautifulSoup(r.text, "xml", from_encoding="UTF-8") except FeatureNotFound: raise FeatureNotFound("pip3 install lxml") ft = acsm.fulfillmentToken expected = f"ACS-BIBL-L-{acsm_id}" if ft.transaction.string != expected: raise ValueError(f"expected {expected} in transaction.string, but instead received {ft.transaction.string} in acsm {acsm_id}") isbn = int(ft.resourceItemInfo.resource.string.split("-").pop())+int(9e12) identifier_is_isbn = True identifier_to_isbn = 0 identifier = "noidentifier" try: identifier = ft.resourceItemInfo.metadata.identifier.string identifier_to_isbn = int(identifier.split(":").pop().replace("-", "")) except (ValueError, AttributeError): identifier_is_isbn = False if identifier_to_isbn == 0: identifier_is_isbn = False expected = ft.resourceItemInfo.resource.string if ft.licenseToken.resource.string != expected: raise ValueError(f"expected {expected} in ft.resourceItemInfo.licenseToken.resource.string but instead received {ft.resourceItemInfo.licenseToken.resource.string} in acsm {acsm_id}") uuid = expected.split(":").pop() expected = f"https://cs.alliance.inkbook.eu/books/{uuid}." try: if ft.resourceItemInfo.metadata.thumbnailURL.string.startswith(expected) != True: raise ValueError(f"expected {expected} in ft.resourceItemInfo.metadata.thumbnailURL.string but instead received {ft.resourceItemInfo.metadata.thumbnailURL.string} in acsm {acsm_id}") thumbnail_extension = ft.resourceItemInfo.metadata.thumbnailURL.string.split(".").pop() except AttributeError: thumbnail_extension = None if ft.resourceItemInfo.metadata.thumbnailURL != None: raise ValueError(f"thumbnailURL actually exists, but it failed to be parsed in acsm {acsm_id}") duration = int(ft.resourceItemInfo.licenseToken.permissions.display.duration.string) if duration != int(ft.resourceItemInfo.licenseToken.permissions.play.duration.string): raise ValueError(f"expected {duration} in fr.int(resourceItemInfo.licenseToken.permissions.play.duration.string) but instead received {int(resourceItemInfo.licenseToken.permissions.play.duration.string)} in acsm {acsm_id}") hmac = b64decode(ft.hmac.string, validate=True) title = ft.resourceItemInfo.metadata.find(name="dc:title").string creator = ft.resourceItemInfo.metadata.creator.string publisher = ft.resourceItemInfo.metadata.publisher.string language = ft.resourceItemInfo.metadata.language.string format = ft.resourceItemInfo.metadata.format.string purchase = ft.purchase.string expiration = ft.expiration.string if identifier_is_isbn: identifier = None book = session.get(Book, isbn) if book == None: book = Book(identifier=identifier, isbn=isbn, title=title, creator=creator, publisher=publisher, thumbnail_extension=thumbnail_extension, language=language, format=format) else: book.identifier = identifier book.isbn = isbn book.title = title book.creator = creator book.publisher = publisher book.thumbnail_extension = thumbnail_extension book.language = language book.format = format borrow = Borrow(id=acsm_id, isbn=isbn, purchase=purchase, expiration=expiration, obtained=int(time()), book=book) logger.info(f"found a new {borrow!r}") session.add(borrow) session.commit() valid_acsms += 1 except KeyboardInterrupt: logger.warning(f"Keyboard interrupt. Exiting. I hope this terminated cleanly. Last requested acsm was discarded.") logger.info(f"In this session, {valid_acsms} valid acsms were stored, {only_isbn_acsms} acsms had only isbn and no other data available and {failed_acsms} acsms failed to be received with response code 200 and {failed_acsms_not200} acsms failed to be received but did not return 200. Last valid requested acsm was {acsm_id}. Thank you for cooperation.") """ metadata = MetaData() books = Table( "books", metadata, Column("title", String, nullable=False, doc="title of the book, dcc:title in acsm"), Column("creator", String, nullable=False, doc="author of the book, dc:creator in acsm"), Column("publisher", String, nullable=False, doc="publisher of the book, dc:publisher in acsm") ) borrows = Table( "borrows", metadata, Column("id", Integer, primary_key=True, nullable=False, doc="id in transaction element of acsm or in filename of acsm on http"), Column("isbn", ForeignKey(books.c.isbn), nullable=False, doc="foreign key that leads to a book"), Column("purchase", String, nullable=False, doc="acsm purchase element: iso8601 of purchase of book, including timezone"), Column("expiration", String, nullable=False, doc="acsm expiration element: iso8601 of expiration of acsm, including timezone"), Column("duration", String, nullable=False, doc="acsm duration element, specifying borrow time of the book") ) metadata.create_all(engine) """ """ with sqlalchemy.orm.Session(engine) as session: result = session.execute(sqlalchemy.text("CREATE TABLE IF NOT EXISTS borrows (id INT PRIMARY KEY, isbn INT NOT NULL, purchase TEXT NOT NULL, expiration TEXT NOT NULL, title TEXT NOT NULL, creator TEXT NOT NULL, publisher TEXT NOT NULL, duration INT NOT NULL, hmac BLOB NOT NULL) STRICT")) session.commit() # cur.execute("CREATE TABLE IF NOT EXISTS borrows(id INT PRIMARY KEY, isbn INT NOT NULL, purchase TEXT NOT NULL, expiration TEXT NOT NULL, title TEXT NOT NULL, creator TEXT NOT NULL, publisher TEXT NOT NULL, duration INT NOT NULL, hmac BLOB NOT NULL) STRICT") """