from abc import ABC, abstractmethod
import logging
import os
import csv
from config import MAX_RECOMMENDATIONS, BATCH_UPLOAD_SIZE, ML_MODELS_PATH
from src.recommender.wrappers import Recommendations
from src.data_interface import model
from sqlalchemy.sql.expression import case
[docs]class Engine(ABC):
""" Abstract class for all engines. You should not directly use this
class, instead use the classes that inherit from this class.
"""
def __init__(self):
self.type = type(self).__name__ # the engine type is its class name
# ``input_id_kind`` tells what id the engine is expecting.
# It can be "item" if the engine is expecting an item_id,
# or "user" if it expects a user_id
self.input_id_kind = None
logging.debug("Creating instance of {0}".format(self.type))
[docs] def init_recommendations(self, context):
""" Create an empty ``src.recommender.wrappers.Recommendations```
object and fill in the engine type, display name and priority
based on the informations stored in DB.
Args:
context (src.recommender.wrappers.Context): Context wrapper,
containing useful informations for the engine.
Returns:
(src.recommender.wrappers.Recommendations): Recommendations object\
filled with engine type, display name and priority
"""
r = Recommendations()
r.type = self.type
name, priority = model.Engine.query\
.with_entities(model.Engine.display_name, model.Engine.priority)\
.filter(model.Engine.type == self.type)\
.one()
if context.item and True: # TODO: add dynamic name option in DB
r.display_name = name.format(context.item.name)
else:
r.display_name = name
r.priority = priority
return r
[docs] @abstractmethod
def recommend(self, context):
""" Abstract method for all engines for recommending items.
The context wrapper stores all the informations the engine might
need to compute the recommendations, like the current item_id,
the current user_id, the user browsing history, etc ...
Every engine must override this method. They have to call
``self.init_recommendations`` first to create an empty
``src.recommender.wrappers.Recommendations`` object and then
enrich it with the recommended items.
Args:
context (src.recommender.wrappers.Context): the context
Returns:
src.recommender.wrappers.Recommendations: the recommendation object
"""
[docs]class QueryBasedEngine(Engine):
""" Abstract class for an engine based on a SQL query
performed at every call. These are engines require no training,
for instance an engine that will recommend random items for DB.
"""
def __init__(self):
super(QueryBasedEngine, self).__init__()
[docs] @abstractmethod
def compute_query(self, context):
""" Abstract method that computes the SQL query using SQLAlchemy
Args:
context (recommender.wrappers.Context): context wrapper
Returns:
query result
"""
pass
[docs] def recommend(self, context):
""" Method for recommending items, by calling `self.compute_query`.
Args:
context (recommender.wrappers.Context): context wrapper
Returns:
list(dict): recommendations as list of dict
"""
r = self.init_recommend(context)
recommendations = self.compute_query(context)
r.recommended_items = recommendations
logging.debug(r.to_string())
return r.to_dict()
[docs]class OfflineEngine(QueryBasedEngine):
""" These engines are a special kind of QueryBasedEngine because they
require a training.
Most of the offline Machine Learning algorithms will inherit from
this class.
The recommendations are computed offline with the ``train`` method,
then saved on disk with ``save_recommendations_to_csv``
and finally uploaded to the DB using ``upload``.
"""
def __init__(self):
super(OfflineEngine, self).__init__()
self.output_filepath = os.path.join(
ML_MODELS_PATH, "csv", self.type + ".csv"
)
[docs] def compute_query(self, context):
"""Get the recommended items from the DB.
Args:
context (src.recommender.wrappers.Context): Context wrapper
Returns:
list: list of Recommendation
"""
recommendations = model.Movie.query\
.filter(model.Movie.id == model.Recommendation.recommended_item_id) \
.filter(model.Recommendation.source_item_id_kind == self.input_id_kind) \
.filter(model.Recommendation.source_item_id == context.item.id) \
.filter(model.Recommendation.engine_name == self.type) \
.order_by(model.Recommendation.score.desc()) \
.limit(MAX_RECOMMENDATIONS) \
.all()
return recommendations
[docs] @abstractmethod
def train(self):
""" Method for training the engine.
This method should load the dataset, compute the recommendations
and then persist them to disk using ``save_recommendations_to_csv``.
"""
pass
[docs] def save_recommendations_to_csv(self, recommendations):
""" Save recommendations to a CSV file.
Args:
recommendations (list(tuple)): List of recommendation tuple\
corresponding to:\
(movie_id, recommended_movie_id, input_kind, score)
"""
with open(self.output_filepath, "w") as csv_file:
writer = csv.writer(
csv_file,
delimiter=",",
quoting=csv.QUOTE_MINIMAL
)
writer.writerows(recommendations)
[docs] def upload(self):
""" Upload the recommendations from a CSV file to the DB.
"""
input_filepath = os.path.join(
ML_MODELS_PATH,
"csv",
self.type + ".csv"
)
with open(input_filepath, "r") as csv_file:
recommendations = []
reader = csv.reader(
csv_file,
delimiter=",",
quoting=csv.QUOTE_MINIMAL
)
for i, line in enumerate(reader):
r = model.Recommendation(
engine_name=self.type,
source_item_id=line[0],
recommended_item_id=line[1],
source_item_id_kind=line[2],
score=line[3]
)
recommendations.append(r)
# don't burst RAM, use batch size
if i % BATCH_UPLOAD_SIZE == 0:
model.insert(recommendations)
logging.info("inserted {0} recommendations"
.format(len(recommendations)))
del recommendations[:]
else: # last batch
model.insert(recommendations)
logging.info("inserted {0} recommendations"
.format(len(recommendations)))
del recommendations[:]
[docs]class OnlineEngine(Engine):
""" Online Machine Learning Engines that do not get their recommendations
from a SQL query but from a loaded model.
The model is trained with the ``train`` method, and loaded at runtime
with the ``load_model`` method.
"""
def __init__(self):
super(OnlineEngine, self).__init__()
self.model = self.load_model()
[docs] @abstractmethod
def load_model(self):
""" Load the ML model from disk and return it
Returns:
The ML model to be saved as ``self.model``
"""
model = None
return model
[docs] @abstractmethod
def predict(self, context):
""" Predict using the loaded model and the context.
Args:
context (src.recommender.wrappers.Context): Context wrapper
Returns:
ids (list(int)): list of recommended ids\
sorted by descending score
scores (list(float)): list of scores for each recommended item
"""
ids, scores = None, None
return ids, scores
[docs] @abstractmethod
def train(self):
""" Train a ML model and save it to disk
"""
pass
[docs] def recommend(self, context):
""" Recommend movies based on context
Args:
context (src.recommender.wrappers.Context): Context wrapper
Returns:
recommendations (dict): src.recommender.wrappers.Recommendations\
as dict
"""
r = self.init_recommendations(context)
ids, _ = self.predict(context) # online prediction
if ids:
# considering the ids are ranked from the most relevant to
# the least relevant, use this to keep the order of
# the recommendations when querying the DB.
ordering = case(
{id: index for index, id in enumerate(ids)},
value=model.Movie.id
)
recommendations = model.Movie.query\
.filter(model.Movie.id.in_(ids)) \
.order_by(ordering) \
.limit(MAX_RECOMMENDATIONS).all()
else:
recommendations = []
r.recommended_items = recommendations
logging.debug(r.to_string())
return r.to_dict()