Source code for src.data_interface.downloader

""" This module contains wrappers to download various movies datasets.
So far we are only using Movielens but we can add more if we want.

Every dataset should have its wrapper class that inherits from ``Downloader``.
"""

import csv
import json
import os
import requests
import re
import logging
from abc import ABC, abstractmethod
from datetime import datetime
from collections import defaultdict

from src.data_interface import model
from config import DATASETS_PATH


[docs]class Downloader(ABC): def __init__(self): # OMDB API related conf self.api_key = self.read_api_key("omdb.key") self.url = "http://private.omdbapi.com/"
[docs] @abstractmethod def download_to_file(self): pass
[docs] @abstractmethod def insert_in_db(self): pass
[docs] def read_api_key(self, key_filepath): with open(key_filepath, "r") as f: api_key = f.read().strip() return api_key
[docs] def item_from_api(self, id): params = {"i": "{0}".format(id), "apikey": self.api_key} movie_json = requests.get(url=self.url, params=params).json() return movie_json
[docs]class MovielensDownloader(Downloader): def __init__(self): super(MovielensDownloader, self).__init__() self.input_filepath = os.path.join( DATASETS_PATH, "movielens/ml-20m/links.csv" ) self.output_filepath = os.path.join( DATASETS_PATH, "movielens/omdb.csv" )
[docs] def download_to_file(self): with open(self.output_filepath, "a") as output: with open(self.input_filepath, "r", encoding="latin1") as input: reader = csv.reader(input, delimiter=",", quotechar="\"") next(reader, None) # skip header for i, (id, imdb_id, _) in enumerate(reader): try: movie_json = self.item_from_api("tt{0}".format(imdb_id)) except json.decoder.JSONDecodeError: logging.error("can't get item from API") continue if eval(movie_json["Response"]): movie_json["id"] = id output.write(json.dumps(movie_json) + "\n") logging.info("got movie {0}".format(imdb_id)) else: logging.error("failed for movie {0}".format(imdb_id))
[docs] def insert_in_db(self): with open(self.output_filepath, "r") as f: movies_to_insert = [] genre_dict = defaultdict(int) for line in f.readlines(): movie = json.loads(line) if movie["id"] == "movieId": continue # duration if re.match(pattern=r"[1-9]\sh", string=movie["Runtime"]): duration = int(movie["Runtime"].split(" h")[0]) * 60 elif re.match(pattern=r"[0-9]+\smin", string=movie["Runtime"]): duration = int(movie["Runtime"].split(" min")[0]) else: duration = None # genres genres_array = movie["Genre"].replace(" ", "").split(",") for genre in genres_array: if genre not in genre_dict: genre_dict[genre] = len(genre_dict) + 1 # fix fields year = datetime.strptime(movie["Released"], "%d %b %Y") \ if movie["Released"] != "N/A" else None rating = movie["imdbRating"] \ if movie["imdbRating"] != "N/A" else None d = { "id": movie["id"], "image": movie["Poster"], "name": movie["Title"], "genres": [genre_dict[name] for name in genres_array], "description": movie["Plot"], "year": year, "rating": rating, "director": movie["Director"], "actors": movie["Actors"], "awards": movie["Awards"], "language": movie["Language"], "country": movie["Country"], "duration": duration } movies_to_insert.append(model.Movie(**d)) genres = [model.Genre(**{"id": id, "name": name}) for name, id in genre_dict.items()] model.insert(genres) model.insert(movies_to_insert) logging.info("inserted {0} movies".format(len(movies_to_insert))) logging.info("inserted {0} genres".format(len(genres)))