viewgit/inc/functions.php:22 Function utf8_encode() is deprecated [8192]
<?php /** * SeekQuarry/Yioop -- * Open Source Pure PHP Search Engine, Crawler, and Indexer * * Copyright (C) 2009 - 2023 Chris Pollett chris@pollett.org * * LICENSE: * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program. If not, see <https://www.gnu.org/licenses/>. * * END LICENSE * * @author Sarika Padmashali padmashalisarika@gmail.com * (Reworked so could scale for yioop.com by Chris Pollett) * @license https://www.gnu.org/licenses/ GPL3 * @link https://www.seekquarry.com/ * @copyright 2009 - 2023 * @filesource */ namespace seekquarry\yioop\library\media_jobs; use seekquarry\yioop\configs as C; use seekquarry\yioop\library as L; use seekquarry\yioop\library\LinearAlgebra as LinearAlgebra; use seekquarry\yioop\library\LRUCache as LRUCache; use seekquarry\yioop\library\PhraseParser as PhraseParser; use seekquarry\yioop\models\CronModel; /** * Recommendation Job recommends the trending threads as well * as threads and groups which are relevant based on the * users viewing history */ class RecommendationJob extends MediaJob { /** * Time in current epoch when analytics last updated * @var int */ public $update_time; /** * Used to track what is the active recommendation timestamp * @var int */ public $active_time; /** * Associative array of the number of items a term appears in * @var array */ public $item_idf; /** * Associative array of the number of user views a term appears in * @var array */ public $user_idf; /** * LRUCache for term embeddings */ public $lru_cache; /** * Maximum number of group items used in making recommendations */ const MAX_GROUP_ITEMS = 50000; /** * Maximum number of terms used in making recommendations */ const MAX_TERMS = 20000; /** * File containing paths to description folders of wiki page resources * that should be used to create data corpus for computing recommendations */ const RECOMMENDATION_FILE = C\APP_DIR . "/resources/recommendation.txt"; /** * Length of context window for calculating term embeddings */ const CONTEXT_WINDOW_LENGTH = 5; /** * Update period to consider for fetching the records from * ITEM_IMPRESSION_SUMMARY table */ const UPDATE_PERIOD = C\ONE_MONTH; /** * Stop words to exclude from the descriptions fetched by DescriptionUpdate * media job */ const DESCRIPTION_STOP_WORDS = ["author", "authors", "plot", "genre", "genres", "star", "stars", "credits", "rating", "ratings", "year", "director", "cast", "runtime"]; /** * Hash algorithm to be used for calculating hash in Hash2Vec embedding */ const HASH_ALGORITHM = "md5"; /** * Hash algorithm to be used for calculating sign in Hash2Vec term embedding */ const SIGN_HASH_ALGORITHM = "crc32"; /** * MAX term embeddings fetched from database to initialize LRUCache */ const MAX_TERM_EMBEDDINGS = 500; /** * Maximum number of resources used in making resource recommendations/ * Maximum number of group items to hold in memory in one go */ const MAX_BATCH_SIZE = 200; /** * Sets up the database connection so can access tables related * to recommendations. Initialize timing info related to job. */ public function init() { $this->update_time = 0; $this->active_time = 0; $this->name_server_does_client_tasks = true; $this->name_server_does_client_tasks_only = true; $this->cron_model = new CronModel(); $db_class = C\NS_DATASOURCES . ucfirst(C\DBMS). "Manager"; $this->db = new $db_class(); $this->db->connect(); $this->size = C\EMBEDDING_VECTOR_SIZE; } /** * Only update if its been more than an hour since the last update * * @return bool whether its been an hour since the last update */ public function checkPrerequisites() { $time = time(); $delta = $time - $this->update_time; if ($delta > C\ONE_DAY) { $this->update_time = $time; L\crawlLog("Prerequisites for Recommendation Media Job met"); return true; } L\crawlLog("Time since last update not exceeded, skipping". " Recommendation MediaJob $delta"); return false; } /** * For now analytics update is only done on name server as Yioop * currently only supports one DBMS at a time. */ public function nondistributedTasks() { L\crawlLog("Performing the Recommendation Media Job"); $this->active_time = $this->cron_model->getCronTime( "item_group_recommendations"); L\crawlLog("Current Active Recommendation Timestamp: ". $this->active_time); L\crawlLog("...Start computing similarity-based group and item ". "recommendations..."); $this->computeThreadGroupRecommendations(); L\crawlLog("...Finished computing similarity-based group and item ". "recommendations."); L\crawlLog("...Start computing similarity-based wiki resource " . "recommendations..."); $this->computeWikiResourceRecommendations(); L\crawlLog("...Finished computing similarity-based wiki" . "resource recommendations..."); L\crawlLog("...Start computing new user recommendations..."); $this->initializeNewUserRecommendations(); L\crawlLog("...Finished computing new user recommendations..."); $this->cron_model->updateCronTime( "item_group_recommendations", $this->update_time); } /** * Computes recommendations for users who have yet to receive any * recommendation of the given type based on what is the most * most popular recommendation */ public function initializeNewUserRecommendations() { $db = $this->db; $popular_recommendations = [ C\THREAD_RECOMMENDATION => [], C\GROUP_RECOMMENDATION => []]; $sql = "SELECT ITEM_ID, SUM(SCORE) AS TOTAL_SCORE FROM " . "GROUP_ITEM_RECOMMENDATION WHERE ITEM_TYPE = ? " . "GROUP BY ITEM_ID ORDER BY TOTAL_SCORE DESC ". $db->limitOffset(C\MAX_RECOMMENDATIONS); foreach ($popular_recommendations as $type => $recommendation) { $results = $db->execute($sql, [$type]); while ($row = $db->fetchArray($results)) { $popular_recommendations[$type][] = $row; } } $new_user_sql = "SELECT USER_ID AS USER_ID ". "FROM USERS WHERE USER_ID NOT IN ". "(SELECT USER_ID FROM GROUP_ITEM_RECOMMENDATION)"; $new_user_results = $db->execute($new_user_sql); $base_recommend_sql = "INSERT INTO GROUP_ITEM_RECOMMENDATION VALUES "; $insert_recommend_sql = $base_recommend_sql; $comma = ""; $insert_count = 0; $i = 0; while($row = $db->fetchArray($new_user_results)) { $user_id = $row['USER_ID']; foreach ($popular_recommendations as $type => $recommendations) { foreach ($recommendations as $recommendation) { $insert_recommend_sql .= "$comma ($user_id, {$recommendation['ITEM_ID']}, ". "$type, {$recommendation['TOTAL_SCORE']}," . $this->update_time . ")"; $comma = ","; $insert_count++; } if ($insert_count > C\BATCH_SQL_INSERT_NUM) { $db->execute($insert_recommend_sql); $insert_recommend_sql = $base_recommend_sql; $insert_count = 0; $comma = ""; } } } if ($insert_count > 0) { $db->execute($insert_recommend_sql); } $sql = "SELECT GROUP_ID, PAGE_ID, RESOURCE_PATH, RESOURCE_ID," . " SUM(SCORE) AS TOTAL_SCORE FROM" . " GROUP_RESOURCE_RECOMMENDATION GROUP BY GROUP_ID," . " PAGE_ID, RESOURCE_PATH, RESOURCE_ID ORDER BY TOTAL_SCORE DESC"; $results = $db->execute($sql); $popular_recommendations[C\RESOURCE_RECOMMENDATION] = []; while ($row = $db->fetchArray($results)) { $popular_recommendations[C\RESOURCE_RECOMMENDATION][] = $row; } $base_recommend_sql = "INSERT INTO GROUP_RESOURCE_RECOMMENDATION" . " VALUES "; $insert_recommend_sql = $base_recommend_sql; $comma = ""; $insert_count = 0; $new_user_sql = "SELECT USER_ID FROM USERS WHERE USER_ID NOT IN" . "(SELECT USER_ID FROM GROUP_RESOURCE_RECOMMENDATION)"; $new_user_results = $db->execute($new_user_sql); while ($row = $db->fetchArray($new_user_results)) { $user_id = $row['USER_ID']; $timestamp = time(); foreach ($popular_recommendations[C\RESOURCE_RECOMMENDATION] as $recommendation) { $insert_recommend_sql .= "$comma ($user_id, {$recommendation['GROUP_ID']}, ". "{$recommendation['PAGE_ID']}, " . "'{$recommendation['RESOURCE_PATH']}', ". "{$recommendation['TOTAL_SCORE']}, {$this->update_time}, ". "{$recommendation['RESOURCE_ID']})"; $comma = ","; $insert_count++; if ($insert_count > C\BATCH_SQL_INSERT_NUM) { $db->execute($insert_recommend_sql); $insert_recommend_sql = $base_recommend_sql; $insert_count = 0; $comma = ""; } } } if ($insert_count > 0) { $db->execute($insert_recommend_sql); } } /** * Manages the whole process of computing thread and group recommendations * for users. Makes a series of calls to handle parts of this computation * before synthesizing the result */ public function computeThreadGroupRecommendations() { L\crawlLog("...Start computing Item Term Embeddings..."); $item_terms = $this->computeItemTermEmbeddings(); L\crawlLog("...Finished computing Item Term Embeddings..."); L\crawlLog("...Start computing Item Embeddings..."); $item_embeddings = $this->computeItemEmbeddings($item_terms); L\crawlLog("...Finished computing Item Embeddings..."); L\crawlLog("...Start write back term embeddings from cache to db"); $this->saveTermEmbeddingsCacheToDb(C\THREAD_RECOMMENDATION); L\crawlLog("...Finished write back term embeddings from cache to db"); L\crawlLog("...Start computing Item User Embeddings..."); [$item_user_embeddings, $user_items] = $this-> computeItemUserEmbeddings($item_embeddings); L\crawlLog("...Finshed computing Item User Embeddings..."); L\crawlLog("...Start computing Item User Recommendations..."); $user_groups = $this->computeItemUserRecommendations($item_embeddings, $item_user_embeddings, $user_items); L\crawlLog("...Finished computing Item User Recommendations..."); unset($item_user_embeddings); unset($user_items); L\crawlLog("...Start computing Group Embeddings..."); $group_embeddings = $this->computeGroupEmbeddings($item_embeddings); L\crawlLog("...Finished computing Group Embeddings..."); unset($item_embedding); L\crawlLog("...Start computing Group User Embeddings..."); [$group_user_embeddings, $user_group_impression] = $this->computeGroupUserEmbeddings($group_embeddings); L\crawlLog("...Finished computing Group User Embeddings..."); L\crawlLog("...Start computing Group User Recommendations..."); $this->computeGroupUserRecommendations($group_embeddings, $group_user_embeddings, $user_groups, $user_group_impression); L\crawlLog("...Finished computing Group User Recommendations..."); unset($group_embeddings); unset($group_user_embeddings); unset($user_group_impression); unset($user_groups); } /** * Computes the term embeddings for individual items (main thread only and * not comments) in groups feeds for the terms in their title and * description text. Processes only MAX_GROUP_ITEMS which are either newly * created or recently edited * * @return array $item_terms terms in each item */ public function computeItemTermEmbeddings() { $db = $this->db; $this->lru_cache = new LRUCache(self::MAX_TERM_EMBEDDINGS); $select_sql = "SELECT * FROM RECOMMENDATION_TERM_EMBEDDING WHERE" . " ITEM_TYPE = ? " . $db->limitOffset(self::MAX_TERM_EMBEDDINGS); $results = $db->execute($select_sql, [C\THREAD_RECOMMENDATION]); $term_embeddings = []; $item_terms = []; L\crawlLog("Start Populating LRUCache of Embeddings..."); while ($row = $db->fetchArray($results)) { if (is_string($row['VECTOR'])) { $this->lru_cache->put($row['ID'], base64_decode($row['VECTOR'], true)); } else { var_dump($row['VECTOR']); } } L\crawlLog("Finish Populating LRUCache of Embeddings"); $context_distance_sum = (self::CONTEXT_WINDOW_LENGTH * (self::CONTEXT_WINDOW_LENGTH + 1)) / 2.0; $mean = $context_distance_sum / self::CONTEXT_WINDOW_LENGTH; $carry = 0.0; for ($i = 1; $i <= self::CONTEXT_WINDOW_LENGTH; $i++) { $difference = $i - $mean; $carry += $difference * $difference; } $std_deviation = sqrt($carry / self::CONTEXT_WINDOW_LENGTH); $item_count_sql = "SELECT COUNT(*) AS NUM_ITEMS FROM GROUP_ITEM ". "WHERE ID = PARENT_ID AND TITLE NOT LIKE '%Page%'"; $results = $db->execute($item_count_sql); $num_items = 0; if ($results) { $row = $db->fetchArray($results); $num_items = min($row['NUM_ITEMS'], self::MAX_GROUP_ITEMS) ?? 0; } $num_batches = ceil($num_items/self::MAX_BATCH_SIZE); L\crawlLog("Number of group items will consider: " . $num_items); $item_count = 0; for ($item_batch = 0; $item_batch < $num_batches; $item_batch++) { $group_item_sql = "SELECT * FROM GROUP_ITEM WHERE ID = PARENT_ID" . " AND TITLE NOT LIKE '%Page%' ORDER BY EDIT_DATE DESC " . $db->limitOffset($item_batch * self::MAX_BATCH_SIZE, self::MAX_BATCH_SIZE); L\crawlTimeoutLog("Have processed $item_count many group items"); $results = $db->execute($group_item_sql); $batch_items = []; while ($row = $db->fetchArray($results)) { $item_id = $row['ID']; $text_corpus = $row['TITLE'] . " " . $row['DESCRIPTION']; $text_corpus = mb_strtolower($text_corpus); $terms = $this->cleanRemoveStopWords($text_corpus); $batch_items[$item_id] = [$terms, $row['GROUP_ID']]; } foreach ($batch_items as $item_id => $term_data) { $item_terms[$item_id] = $term_data; $terms = $term_data[0]; $num_terms = count($terms); for ($i = 0; $i < $num_terms; $i++) { L\crawlTimeoutLog("Have processed $i of $num_terms terms"); [$term_id, $term] = $terms[$i]; $term_hash = unpack('N', hash(self::HASH_ALGORITHM, $term, true))[1] % C\EMBEDDING_VECTOR_SIZE + 1; $term_sign_hash = hash(self::SIGN_HASH_ALGORITHM, $term, true); $term_sign = unpack('N', $term_sign_hash)[1] % 2 == 0 ? -1 : 1; $term_embedding = $this->getTermEmbedding($term_id, C\THREAD_RECOMMENDATION); $term_embedding = unpack("E*", $term_embedding); for ($j = $i - 1; $j >= 0 && $j >= $i - self::CONTEXT_WINDOW_LENGTH; $j--) { [$context_term_id, $context_term] = $terms[$j]; $context_term_embedding = $this->getTermEmbedding( $context_term_id, C\THREAD_RECOMMENDATION); $context_term_embedding = unpack("E*", $context_term_embedding); $weight = exp(-1 * pow(($i - $j) / $std_deviation, 2)); $context_term_hash = unpack('N', hash(self::HASH_ALGORITHM, $context_term, true))[1] % C\EMBEDDING_VECTOR_SIZE + 1; $context_term_sign_hash = hash(self::SIGN_HASH_ALGORITHM,$context_term, true); $context_term_sign = unpack('N', $context_term_sign_hash)[1] % 2 == 0 ? -1 : 1; $term_embedding[$context_term_hash] += $context_term_sign * $weight; $context_term_embedding[$term_hash] += $term_sign * $weight; $context_term_embedding = pack("E*", ...$context_term_embedding); $this->updateTermEmbeddingCache($context_term_id, $context_term_embedding, C\THREAD_RECOMMENDATION, "thread_context_term_update"); } $term_embedding = pack("E*", ...$term_embedding); $this->updateTermEmbeddingCache($term_id, $term_embedding, C\THREAD_RECOMMENDATION, "thread_term_update"); } } $item_count++; } return $item_terms; } /** * Computes the item embeddings for individual items (main thread only and * not comments) in groups feeds using the term embeddings for their terms. * Additionally fetches the existing item embeddings from database and * updates them if the term embeddings are updated for their terms * * @param array $item_terms terms in each item * @return array $updated_item_embeddings containing embeddings for items */ public function computeItemEmbeddings($item_terms) { $db = $this->db; $updated_item_embeddings = []; $item_count = 0; foreach ($item_terms as $item_id => [$terms, $group_id]) { L\crawlTimeoutLog("Have done $item_count many group items"); $item_embedding = array_fill(1, C\EMBEDDING_VECTOR_SIZE, 0); foreach ($terms as [$term_id, $term]) { $term_embedding = $this->getTermEmbedding($term_id, C\THREAD_RECOMMENDATION, true); $term_embedding = unpack("E*", $term_embedding); $item_embedding = LinearAlgebra::add($item_embedding, $term_embedding); } $item_embedding = LinearAlgebra::normalize($item_embedding); $item_embedding = pack("E*", ...$item_embedding); $updated_item_embeddings[$item_id] = [$item_embedding, $group_id]; $item_count++; } $base_delete_sql = "DELETE FROM RECOMMENDATION_ITEM_EMBEDDING" . " WHERE ITEM_TYPE = ? AND ID IN ("; $delete_sql = $base_delete_sql; $base_insert_sql = "INSERT INTO RECOMMENDATION_ITEM_EMBEDDING VALUES "; $insert_sql = $base_insert_sql; $comma = ""; $insert_count = 0; $total_insert = 0; $item_type = C\THREAD_RECOMMENDATION; foreach ($updated_item_embeddings as $item_id => [$embedding, $parent_id]) { L\crawlTimeoutLog("Have inserted $total_insert many group items"); $embedding = base64_encode($embedding); $insert_sql .= "$comma($item_id, $item_type, " . "'$embedding', $parent_id)"; $delete_sql .= "$comma $item_id"; $comma = ","; $insert_count++; $total_insert++; if ($insert_count == C\BATCH_SQL_INSERT_NUM) { $delete_sql .= ")"; $db->execute($delete_sql, [C\THREAD_RECOMMENDATION]); $insert_sql = $db->insertIgnore($insert_sql); $db->execute($insert_sql); $insert_count = 0; $comma = ""; $delete_sql = $base_delete_sql; $insert_sql = $base_insert_sql; } } if ($insert_count > 0) { $delete_sql .= ")"; $db->execute($delete_sql, [C\THREAD_RECOMMENDATION]); $insert_sql = $db->insertIgnore($insert_sql); $db->execute($insert_sql); } return $updated_item_embeddings; } /** * Computes the user embeddings based on the item embeddings which user have * impression in ITEM_IMPRESSION_SUMMARY table for defined UPDATE_PERIOD * * @param array $item_embeddings embedding vectors of items * @return array [$item_user_embedding, $user_items] user embeddings for * items and the items id user have impression */ public function computeItemUserEmbeddings($item_embeddings) { $db = $this->db; //SQLITE and MYSQL use GROUP_CONCAT, Postgres uses STRING_AGG $db_list_function = in_array($db->to_upper_dbms, ["SQLITE3", "MYSQL"]) ? "GROUP_CONCAT" : "STRING_AGG"; $timestamp = floor(time() / self::UPDATE_PERIOD ) * self::UPDATE_PERIOD; $condition = "ITEM_TYPE = ? AND USER_ID <> 2 AND" . " ((UPDATE_PERIOD = ? AND UPDATE_TIMESTAMP = ?) OR" . " (UPDATE_PERIOD = ?))"; $impression_sql = "SELECT USER_ID, ". "$db_list_function(CAST(ITEM_ID AS VARCHAR), ',') AS " . "ITEM_IDS FROM ITEM_IMPRESSION_SUMMARY WHERE $condition " . "GROUP BY USER_ID"; $results = $db->execute($impression_sql, [C\THREAD_IMPRESSION, self::UPDATE_PERIOD, $timestamp, C\MOST_RECENT_VIEW]); $item_user_embeddings = []; $user_items = []; $user_count = 0; while ($row = $db->fetchArray($results)) { L\crawlTimeoutLog("Have done $user_count many user embeddings"); $user_id = $row['USER_ID']; $item_ids = explode(",", $row['ITEM_IDS']); $item_ids = array_unique($item_ids); $item_user_embeddings[$user_id] = array_fill(1, C\EMBEDDING_VECTOR_SIZE, 0); $user_items[$user_id] = []; foreach ($item_ids as $item_id) { if (array_key_exists($item_id, $item_embeddings)) { $item_embedding = unpack("E*", $item_embeddings[$item_id][0]); $item_user_embeddings[$user_id] = LinearAlgebra::add( $item_user_embeddings[$user_id], $item_embedding); $user_items[$user_id][] = $item_id; } } $item_user_embeddings[$user_id] = LinearAlgebra::normalize( $item_user_embeddings[$user_id]); $item_user_embeddings[$user_id] = pack("E*", ...$item_user_embeddings[$user_id]); $user_count++; } return [$item_user_embeddings, $user_items]; } /** * Computes the items recommendation for user based on the cosine similarity * between user embeddings and item embeddings. Recommendations are * calculated for the items user have not interacted with yet and items * should be from the groups where the user is already a memeber * * @param array $item_embeddings embeddings vectors for items * @param array $item_user_embeddings embeddings vectors for user * @param array $user_items items id for user in impression table * @return array $user_groups group ids where the user is a member */ public function computeItemUserRecommendations($item_embeddings, $item_user_embeddings, $user_items) { L\crawlLog("...Computing User Item Similarity Scores."); $db = $this->db; //SQLITE and MYSQL use GROUP_CONCAT, Postgres uses STRING_AGG $db_list_function = in_array($db->to_upper_dbms, ["SQLITE3", "MYSQL"]) ? "GROUP_CONCAT" : "STRING_AGG"; $user_group_sql = "SELECT USER_ID, $db_list_function(" . "CAST(GROUP_ID AS VARCHAR), ',') " . "AS GROUP_IDS FROM USER_GROUP GROUP BY USER_ID"; $results = $db->execute($user_group_sql); $user_groups = []; while ($row = $db->fetchArray($results)) { $user_id = $row['USER_ID']; $group_ids = explode(",", $row['GROUP_IDS']); $user_groups[$user_id] = $group_ids; } $item_user_recommendations = []; $user_count = 0; foreach ($item_user_embeddings as $user_id => $embedding) { L\crawlTimeoutLog("Have done $user_count many user recommendation"); $embedding = unpack("E*", $embedding); if (array_key_exists($user_id, $user_groups)) { $item_count = 0; foreach ($item_embeddings as $item_id => [$item_embedding, $parent_id]) { L\crawlTimeoutLog("Have done $item_count many items"); if (in_array($item_id, $user_items[$user_id]) || !in_array($parent_id, $user_groups[$user_id])) { continue; } $item_embedding = unpack("E*", $item_embedding); $similarity = LinearAlgebra::similarity( $item_embedding, $embedding); $item_user_recommendations[] = [$user_id, $item_id, $similarity]; $item_count++; } } $user_count++; } $delete_sql = "DELETE FROM GROUP_ITEM_RECOMMENDATION WHERE" . " ITEM_TYPE = ?"; $db->execute($delete_sql, [C\THREAD_RECOMMENDATION]); $base_insert_sql = "INSERT INTO GROUP_ITEM_RECOMMENDATION VALUES "; $insert_sql = $base_insert_sql; $comma = ""; $insert_count = 0; $total_insert = 0; $item_type = C\THREAD_RECOMMENDATION; foreach ($item_user_recommendations as $recommendation) { [$user_id, $item_id, $similarity] = $recommendation; L\crawlTimeoutLog("Have inserted $total_insert recommendations"); $insert_sql .= "$comma($user_id, $item_id" . ", $item_type, $similarity, {$this->update_time})"; $comma = ","; $insert_count++; $total_insert++; if ($insert_count == C\BATCH_SQL_INSERT_NUM) { $insert_sql = $db->insertIgnore($insert_sql); $db->execute($insert_sql); $insert_count = 0; $comma = ""; $insert_sql = $base_insert_sql; } } if ($insert_count > 0) { $insert_sql = $db->insertIgnore($insert_sql); $db->execute($insert_sql); } return $user_groups; } /** * Computes the group embeddings using the item embeddings for the items in * a group. Additionally fetches the existing group embeddings from database * and updates them if the item embeddings are updated * * @param array $item_embeddings embedding for the items * @return array $updated_group_embeddings containing embeddings for groups */ public function computeGroupEmbeddings($item_embeddings) { $db = $this->db; $updated_group_embeddings = []; $group_count = 0; foreach ($item_embeddings as $item_id => [$embedding, $parent_id]) { L\crawlTimeoutLog("Have done $group_count many groups"); if (array_key_exists($parent_id, $updated_group_embeddings)) { $embedding = unpack("E*", $embedding); $group_embedding = unpack("E*", $updated_group_embeddings[$parent_id]); $updated_group_embeddings[$parent_id] = pack("E*", ...LinearAlgebra::add($embedding, $group_embedding)); } else { $updated_group_embeddings[$parent_id] = $embedding; } $group_count++; } foreach ($updated_group_embeddings as $group_id => $embedding) { $embedding = unpack("E*", $embedding); $embedding = LinearAlgebra::normalize($embedding); $updated_group_embeddings[$group_id] = pack("E*", ...$embedding); } $base_delete_sql = "DELETE FROM RECOMMENDATION_ITEM_EMBEDDING" . " WHERE ITEM_TYPE = ? AND ID IN ("; $delete_sql = $base_delete_sql; $base_insert_sql = "INSERT INTO RECOMMENDATION_ITEM_EMBEDDING VALUES "; $insert_sql = $base_insert_sql; $comma = ""; $insert_count = 0; $total_insert = 0; $item_type = C\GROUP_RECOMMENDATION; foreach ($updated_group_embeddings as $group_id => $embedding) { L\crawlTimeoutLog("Have inserted $total_insert group embeddings"); $embedding = serialize(unpack("E*", $embedding)); $insert_sql .= "$comma($group_id, $item_type, " . "'$embedding', $group_id)"; $delete_sql .= "$comma $group_id"; $comma = ","; $insert_count++; $total_insert++; if ($insert_count == C\BATCH_SQL_INSERT_NUM) { $delete_sql .= ")"; $db->execute($delete_sql, [C\GROUP_RECOMMENDATION]); $insert_sql = $db->insertIgnore($insert_sql); $db->execute($insert_sql); $insert_count = 0; $comma = ""; $delete_sql = $base_delete_sql; $insert_sql = $base_insert_sql; } } if ($insert_count > 0) { $delete_sql .= ")"; $db->execute($delete_sql, [C\GROUP_RECOMMENDATION]); $insert_sql = $db->insertIgnore($insert_sql); $db->execute($insert_sql); } return $updated_group_embeddings; } /** * Computes the user embeddings based on the group embeddings which user * have impression in ITEM_IMPRESSION_SUMMARY table for defined * UPDATE_PERIOD or are a member in the group * * @param array $group_embeddings embedding vectors of groups * @return array [$group_user_embedding, $user_groups] user embeddings for * groups and the groups id user have membership */ public function computeGroupUserEmbeddings($group_embeddings) { $db = $this->db; //SQLITE and MYSQL use GROUP_CONCAT, Postgres uses STRING_AGG $db_list_function = in_array($db->to_upper_dbms, ["SQLITE3", "MYSQL"]) ? "GROUP_CONCAT" : "STRING_AGG"; $timestamp = floor(time() / self::UPDATE_PERIOD ) * self::UPDATE_PERIOD; $condition = "ITEM_TYPE = ? AND USER_ID <> 2 AND" . " ((UPDATE_PERIOD = ? AND UPDATE_TIMESTAMP = ?) OR" . " (UPDATE_PERIOD = ?))"; $impression_sql = "SELECT USER_ID, $db_list_function( ". "CAST(ITEM_ID AS VARCHAR), ',') AS " . "ITEM_IDS FROM ITEM_IMPRESSION_SUMMARY WHERE $condition " . "GROUP BY USER_ID"; $results = $db->execute($impression_sql, [C\GROUP_IMPRESSION, self::UPDATE_PERIOD, $timestamp, C\MOST_RECENT_VIEW]); $group_user_embeddings = []; $user_groups = []; $user_count = 0; while ($row = $db->fetchArray($results)) { L\crawlTimeoutLog("Have done $user_count many user embeddings"); $user_id = $row['USER_ID']; $group_ids = explode(",", $row['ITEM_IDS']); $group_ids = array_unique($group_ids); $group_user_embeddings[$user_id] = array_fill(1, C\EMBEDDING_VECTOR_SIZE, 0); $user_groups[$user_id] = []; $group_count = 0; foreach ($group_ids as $group_id) { L\crawlTimeoutLog("Have done $group_count many groups"); if (array_key_exists($group_id, $group_embeddings)) { $embedding = unpack("E*", $group_embeddings[$group_id]); $group_user_embeddings[$user_id] = LinearAlgebra::add( $group_user_embeddings[$user_id], $embedding); $user_groups[$user_id][] = $group_id; } $group_count++; } $group_user_embeddings[$user_id] = pack("E*", ...LinearAlgebra::normalize($group_user_embeddings[$user_id])); $user_count++; } return [$group_user_embeddings, $user_groups]; } /** * Computes the group recommendation for user based on the cosine similarity * between user embeddings and group embeddings. Recommendations are * calculated for the groups whic user has not interacted with yet and * they are not member of that group * * @param array $group_embeddings embeddings vector for groups * @param array $group_user_embeddings embeddings vector for users * @param array $user_groups groups id for user having membership * @return array $user_group_impression group ids which user has seen */ public function computeGroupUserRecommendations($group_embeddings, $group_user_embeddings, $user_groups, $user_group_impression) { $db = $this->db; $invite_groups_sql = "SELECT GROUP_ID FROM SOCIAL_GROUPS" . " WHERE REGISTER_TYPE = ?"; $results = $db->execute($invite_groups_sql, [C\INVITE_ONLY_JOIN]); $exclude_group_ids = []; while ($row = $db->fetchArray($results)) { $exclude_group_ids[] = $row['GROUP_ID']; } $group_user_recommendations = []; $user_count = 0; foreach ($group_user_embeddings as $user_id => $embedding) { L\crawlTimeoutLog("Have done $user_count many user"); $embedding = unpack("E*", $embedding); $group_count = 0; foreach ($group_embeddings as $group_id => $group_embedding) { L\crawlTimeoutLog("Have done $group_count many groups"); if (in_array($group_id, $exclude_group_ids) || in_array($group_id, ($user_groups[$user_id] ?? [])) || in_array($group_id, ($user_group_impression[$user_id] ?? []))) { continue; } $group_embedding = unpack("E*", $group_embedding); $similarity = LinearAlgebra::similarity($embedding, $group_embedding); $group_user_recommendations[] = [$user_id, $group_id, $similarity]; $group_count++; } $user_count++; } $delete_sql = "DELETE FROM GROUP_ITEM_RECOMMENDATION WHERE" . " ITEM_TYPE = ?"; $db->execute($delete_sql, [C\GROUP_RECOMMENDATION]); $base_insert_sql = "INSERT INTO GROUP_ITEM_RECOMMENDATION VALUES "; $insert_sql = $base_insert_sql; $comma = ""; $insert_count = 0; $total_insert = 0; $item_type = C\GROUP_RECOMMENDATION; foreach ($group_user_recommendations as $recommendation) { L\crawlTimeoutLog("Have inserted $total_insert recommendations"); [$user_id, $group_id, $similarity] = $recommendation; $insert_sql .= "$comma($user_id, $group_id" . ", $item_type, $similarity, {$this->update_time})"; $comma = ","; $insert_count++; $total_insert++; if ($insert_count == C\BATCH_SQL_INSERT_NUM) { $insert_sql = $db->insertIgnore($insert_sql); $db->execute($insert_sql); $insert_count = 0; $comma = ""; $insert_sql = $base_insert_sql; } } if ($insert_count > 0) { $insert_sql = $db->insertIgnore($insert_sql); $db->execute($insert_sql); } } /** * Manages the whole process of computing wiki resource recommendations * for users. Makes a series of calls to handle parts of this computation * before synthesizing the result */ public function computeWikiResourceRecommendations() { L\crawlLog("...Start fetching descriptions for the wiki page " . "resources..."); [$descriptions, $resource_metadata] = $this-> getWikiResourceDescriptions(); L\crawlLog("...Finished fetching descriptions for the wiki page " . "resources..."); L\crawlLog("...Start computing wiki term embeddings..."); [$resource_terms, $meta_details_terms] = $this->computeWikiTermEmbeddings($descriptions); L\crawlLog("...Finished computing wiki term embeddings..."); L\crawlLog("...Start computing wiki resource embeddings..."); $item_embeddings = $this->computeWikiResourceEmbeddings($resource_terms, $meta_details_terms); L\crawlLog("...Finished computing wiki resource embeddings..."); unset($resource_terms); unset($meta_details_terms); L\crawlLog("...Start write back term embeddings from cache to db"); $this->saveTermEmbeddingsCacheToDb(C\RESOURCE_RECOMMENDATION); L\crawlLog("...Finished write back term embeddings from cache to db"); L\crawlLog("...Start computing wiki user embeddings..."); [$user_embeddings, $user_items] = $this->computeWikiUserEmbeddings( $item_embeddings); L\crawlLog("...Finished computing wiki user embeddings..."); L\crawlLog("...Start computing wiki resource recommendations..."); $this->computeWikiUserRecommendations($item_embeddings, $user_embeddings, $user_items, $resource_metadata); L\crawlLog("...Done computing wiki resource recommendations..."); unset($user_embeddings); unset($user_items); unset($item_embeddings); unset($resource_metadata); } /** * Fetches the description for the eligible wiki resources having the root * folder path captured in RECOMMENDATION_FILE * * @return array $descriptions of resources */ public function getWikiResourceDescriptions() { $thumb_folders = []; if (file_exists(self::RECOMMENDATION_FILE)) { $thumb_folders = explode("\n", file_get_contents(self::RECOMMENDATION_FILE)); } $thumb_folders = array_unique($thumb_folders); $thumb_folders_copy = $thumb_folders; $descriptions = []; $resource_metadata = []; foreach ($thumb_folders as $thumb_folder) { array_shift($thumb_folders_copy); if (empty($thumb_folder)) { continue; } list($group_id, $page_id, $folder) = explode("###", $thumb_folder); $folder = trim($folder, " \n\r\t\v\x00"); $files = $this->getDescriptionFiles($folder); foreach ($files as $file) { $resource_file = substr($file, 0, strlen($file) - 4); $resource_id = unpack('n', md5($group_id . $page_id . $resource_file, true))[1]; if (array_key_exists($resource_id, $descriptions)) { continue; } $description = file_get_contents($file); if (strcmp($description, "Description search sources". " failed to find description.") == 0) { continue; } $descriptions[$resource_id] = $description; $resource_metadata[$resource_id] = [$group_id, $page_id, $resource_file]; if (count($descriptions) >= self::MAX_BATCH_SIZE) { L\crawlLog("Reached max resources limit"); file_put_contents(self::RECOMMENDATION_FILE, implode(PHP_EOL, $thumb_folders_copy)); return [$descriptions, $resource_metadata]; } } } return [$descriptions, $resource_metadata]; } /** * Returns all the resource description files in a given thumb folder and * also recursively scan through subfolders if any * * @param string $thumb_folder path of a thumb folder * @return array $files list of description files path in given folder */ public function getDescriptionFiles($thumb_folder) { if (!is_dir($thumb_folder)) { return []; } $exclude_files = [".", "..", "needs_description.txt", "subfolder_counts.txt", ".DS_Store"]; $files = scandir($thumb_folder); $file_paths = []; foreach ($files as $file) { if (in_array($file, $exclude_files)) { continue; } $to_process = $thumb_folder . "/" . $file; if (is_dir($to_process)) { L\crawlLog("...$to_process is a folder," . " looking files inside it..."); $sub_file_paths = $this->getDescriptionFiles($to_process); $file_paths = array_merge($file_paths, $sub_file_paths); } else { $file_paths[] = $to_process; } } return $file_paths; } /** * Computes the embedding for new terms in the description of wiki * resources and updates the embedding of existing terms using Hash2Vec * approach * * @param array $descriptions of resources * @return array [$resource_terms, $meta_details_term] */ public function computeWikiTermEmbeddings($descriptions) { $db = $this->db; $this->lru_cache = new LRUCache(self::MAX_TERM_EMBEDDINGS); $select_sql = "SELECT * FROM RECOMMENDATION_TERM_EMBEDDING WHERE " . "ITEM_TYPE = ? " . $db->limitOffset(self::MAX_TERM_EMBEDDINGS); $results = $db->execute($select_sql, [C\RESOURCE_RECOMMENDATION]); $resource_terms = []; $meta_details_terms = []; while ($row = $db->fetchArray($results)) { if (is_string($row['VECTOR'])) { $this->lru_cache->put($row['ID'], base64_decode($row['VECTOR'], true)); } else { var_dump($row); } } $context_distance_sum = (self::CONTEXT_WINDOW_LENGTH * (self::CONTEXT_WINDOW_LENGTH + 1)) / 2.0; $mean = $context_distance_sum / self::CONTEXT_WINDOW_LENGTH; $carry = 0.0; for ($i = 1; $i <= self::CONTEXT_WINDOW_LENGTH; $i++) { $difference = $i - $mean; $carry += $difference * $difference; } $std_deviation = sqrt($carry / self::CONTEXT_WINDOW_LENGTH); $resource_count = 0; foreach ($descriptions as $resource_id => $description) { L\crawlTimeoutLog("Have processed $resource_count many resources"); $resource_terms[$resource_id] = []; $meta_details_terms[$resource_id] = []; $description_parts = explode("\n", $description); foreach ($description_parts as $description_part) { $description_part = mb_strtolower($description_part); $terms = $this->cleanRemoveStopWords($description_part, true); if (count($terms) < self::CONTEXT_WINDOW_LENGTH) { $meta_details_terms[$resource_id] = array_merge($terms, $meta_details_terms[$resource_id]); } else { $resource_terms[$resource_id] = array_merge($terms, $resource_terms[$resource_id]); } } if (count($resource_terms[$resource_id]) > 0) { $terms = $resource_terms[$resource_id]; $num_terms = count($terms); for ($i = 0; $i < $num_terms; $i++) { L\crawlTimeoutLog("Have processed $i of $num_terms terms"); [$term_id, $term] = $terms[$i]; $term_hash = unpack('N', hash(self::HASH_ALGORITHM, $term, true))[1] % C\EMBEDDING_VECTOR_SIZE + 1; $term_sign_hash = hash(self::SIGN_HASH_ALGORITHM, $term, true); $term_sign = unpack('N', $term_sign_hash)[1] % 2 == 0 ? -1 : 1; $term_embedding = $this->getTermEmbedding($term_id, C\RESOURCE_RECOMMENDATION); $term_embedding = unpack("E*", $term_embedding); for ($j = $i - 1; $j >= 0 && $j >= $i - self::CONTEXT_WINDOW_LENGTH; $j--) { [$context_term_id, $context_term] = $terms[$j]; $context_term_embedding = $this->getTermEmbedding( $context_term_id, C\RESOURCE_RECOMMENDATION); $context_term_embedding = unpack("E*", $context_term_embedding); $weight = exp(-1 * pow(($i - $j) / $std_deviation, 2)); $context_term_hash = unpack('N', hash( self::HASH_ALGORITHM, $context_term, true))[1] % C\EMBEDDING_VECTOR_SIZE + 1; $context_term_sign_hash = hash( self::SIGN_HASH_ALGORITHM, $context_term, true); $context_term_sign = unpack('N', $context_term_sign_hash)[1] % 2 == 0 ? -1 : 1; $term_embedding[$context_term_hash] += $context_term_sign * $weight; $context_term_embedding[$term_hash] += $term_sign * $weight; $context_term_embedding = pack("E*", ...$context_term_embedding); $this->updateTermEmbeddingCache($context_term_id, $context_term_embedding, C\RESOURCE_RECOMMENDATION, "resource_context_term_update"); } $term_embedding = pack("E*", ...$term_embedding); $this->updateTermEmbeddingCache($term_id, $term_embedding, C\RESOURCE_RECOMMENDATION, "resource_term_update"); } $resource_count++; } } return [$resource_terms, $meta_details_terms]; } /** * Split the given text into terms, clean the terms by removing non * alphanumeric characters and remove the stop terms in order to reduce the * noise while calculating the embeddings * * @param string $text which needs to be processed * @param boolean $description_stop_word_flag to remove * words present in DESCRIPTION_STOP_WORDS * @return array $terms [term_id, term] term_id calculated using md5 hash * for the term */ public function cleanRemoveStopWords($text, $description_stop_word_flag = false) { $raw_terms = preg_split("/[\s,\/\._-]+/", $text); $terms = []; foreach ($raw_terms as $term) { $term = preg_replace("/\W/", "", $term); $term = preg_replace("/&rsquo/", "'", $term); $term = str_replace(['"', "'"], "", $term); if (strlen($term) > 0) { $terms[] = $term; } } $text_locale = L\guessLocaleFromString($text); $stop_obj = PhraseParser::getTokenizer($text_locale); if ($stop_obj && method_exists($stop_obj, "stoptermsRemover")) { $terms = $stop_obj->stoptermsRemover($terms); } $term_ids = []; foreach ($terms as $term) { if ($description_stop_word_flag && in_array($term, self::DESCRIPTION_STOP_WORDS)) { continue; } $term_id = L\canonicalTerm($term); $term_ids[] = [$term_id, $term]; } return $term_ids; } /** * Computes the embeddings for wiki page resources using the calculated * term embeddings and add the metadata details separately to the embeddings * * @param array $resource_terms of processed terms from resource description * @param array $meta_details_terms of raw resource descriptions * @return array $updated_item_embeddings array of updated wiki resource * embeddings */ public function computeWikiResourceEmbeddings($resource_terms, $meta_details_terms) { $db = $this->db; $updated_item_embeddings = []; $resource_count = 0; foreach ($resource_terms as $resource_id => $terms) { L\crawlTimeoutLog("Have processed $resource_count many resources"); $item_embedding = array_fill(1, C\EMBEDDING_VECTOR_SIZE, 0); foreach ($terms as [$term_id, $term]) { $term_embedding = $this->getTermEmbedding($term_id, C\RESOURCE_RECOMMENDATION, true); $term_embedding = unpack("E*", $term_embedding); $item_embedding = LinearAlgebra::add($item_embedding, $term_embedding); } $updated_item_embeddings[$resource_id] = pack("E*", ...$item_embedding); $resource_count++; } foreach ($meta_details_terms as $resource_id => $meta_terms) { if (!array_key_exists($resource_id, $updated_item_embeddings)) { $item_embedding = array_fill(1, C\EMBEDDING_VECTOR_SIZE, 0); } else { $item_embedding = unpack("E*", $updated_item_embeddings[$resource_id]); } foreach ($meta_terms as [$meta_term_id, $meta_term]) { if (strlen($meta_term) <= 1) { continue; } $meta_term_hash = unpack('N', hash(self::HASH_ALGORITHM, $meta_term, true))[1] % C\EMBEDDING_VECTOR_SIZE + 1; $sign_hash = hash(self::SIGN_HASH_ALGORITHM, $meta_term, true); $sign = unpack('N', $sign_hash)[1] % 2 == 0 ? -1 : 1; $item_embedding[$meta_term_hash] += $sign * 1.0; } $updated_item_embeddings[$resource_id] = pack("E*", ...$item_embedding); } foreach ($updated_item_embeddings as $item_id => $embedding) { $embedding = unpack("E*", $embedding); $updated_item_embeddings[$item_id] = pack("E*", ...LinearAlgebra::normalize($embedding)); } $delete_sql = "DELETE FROM RECOMMENDATION_ITEM_EMBEDDING WHERE" . " ITEM_TYPE = ?"; $db->execute($delete_sql, [C\RESOURCE_RECOMMENDATION]); $base_insert_sql = "INSERT INTO RECOMMENDATION_ITEM_EMBEDDING VALUES "; $insert_sql = $base_insert_sql; $comma = ""; $insert_count = 0; $total_insert = 0; $item_type = C\RESOURCE_RECOMMENDATION; foreach ($updated_item_embeddings as $resource_id => $embedding) { L\crawlTimeoutLog("Have inserted $total_insert many resources"); $embedding = base64_encode($embedding); $insert_sql .= "$comma($resource_id, $item_type," . " '$embedding', $resource_id)"; $comma = ","; $insert_count++; $total_insert++; if ($insert_count == C\BATCH_SQL_INSERT_NUM) { $insert_sql = $db->insertIgnore($insert_sql); $db->execute($insert_sql); $insert_count = 0; $comma = ""; $insert_sql = $base_insert_sql; } } if ($insert_count > 0) { $insert_sql = $db->insertIgnore($insert_sql); $db->execute($insert_sql); } return $updated_item_embeddings; } /** * Computes user embeddings for wiki resources based on the user's resources * impression logged in ITEM_IMPRESSION_SUMMARY table for the defined update * period * * @param array $item_embeddings of wiki page resources embedding * @return array [$user_embeddings, $user_items] of user embeddings * for wiki resources and the user resource impression */ public function computeWikiUserEmbeddings($item_embeddings) { $db = $this->db; //SQLITE and MYSQL use GROUP_CONCAT, Postgres uses STRING_AGG $db_list_function = in_array($db->to_upper_dbms, ["SQLITE3", "MYSQL"]) ? "GROUP_CONCAT" : "STRING_AGG"; $timestamp = floor(time() / self::UPDATE_PERIOD ) * self::UPDATE_PERIOD; $condition = "ITEM_TYPE = ? AND USER_ID <> 2 AND" . " ((UPDATE_PERIOD = ? AND UPDATE_TIMESTAMP = ?) OR" . " (UPDATE_PERIOD = ?))"; $impression_sql = "SELECT USER_ID, $db_list_function( " . "CAST(ITEM_ID AS VARCHAR), ',') AS " . "ITEM_IDS FROM ITEM_IMPRESSION_SUMMARY WHERE $condition " . "GROUP BY USER_ID"; $results = $db->execute($impression_sql, [C\RESOURCE_IMPRESSION, self::UPDATE_PERIOD, $timestamp, C\MOST_RECENT_VIEW]); $user_embeddings = []; $user_items = []; $user_count = 0; while ($row = $db->fetchArray($results)) { L\crawlTimeoutLog("Have processed $user_count many users"); $user_id = $row['USER_ID']; $item_ids = explode(",", $row['ITEM_IDS']); $item_ids = array_unique($item_ids); $user_embeddings[$user_id] = array_fill(1, C\EMBEDDING_VECTOR_SIZE, 0); $user_items[$user_id] = []; foreach ($item_ids as $item_id) { if (array_key_exists($item_id, $item_embeddings)) { $embedding = unpack("E*", $item_embeddings[$item_id]); $user_embeddings[$user_id] = LinearAlgebra::add( $user_embeddings[$user_id], $embedding); $user_items[$user_id][] = $item_id; } } $user_embeddings[$user_id] = pack("E*", ...LinearAlgebra::normalize($user_embeddings[$user_id])); $user_count++; } return [$user_embeddings, $user_items]; } /** * Computes the wiki resource recommendations based on cosine similarity * between resource embeddings and user embeddings * * @param array $item_embeddings of wiki resources embeddings * @param array $user_embeddings of users consumed wiki resources * embeddings * @param array $user_items of users consumed wiki resources */ public function computeWikiUserRecommendations($item_embeddings, $user_embeddings, $user_items, $resource_metadata) { $db = $this->db; $recommendations = []; $user_count = 0; foreach ($user_embeddings as $user_id => $user_embedding) { L\crawlTimeoutLog("Have processed $user_count many users"); $user_embedding = unpack("E*", $user_embedding); $resource_count = 0; foreach ($item_embeddings as $item_id => $item_embedding) { L\crawlTimeoutLog("Have processed $resource_count resources"); if (in_array($item_id, $user_items[$user_id]) || !array_key_exists($item_id, $resource_metadata)) { continue; } $item_embedding = unpack("E*", $item_embedding); $similarity = LinearAlgebra::similarity($user_embedding, $item_embedding); list($group_id, $page_id, $resource_path) = $resource_metadata[$item_id]; unset($resource_metadata[$item_id]); $recommendations[] = [$user_id, $group_id, $page_id, $resource_path, $similarity, $item_id]; $resource_count++; } $user_count++; } $delete_sql = "DELETE FROM GROUP_RESOURCE_RECOMMENDATION"; $db->execute($delete_sql); $base_insert_sql = "INSERT INTO GROUP_RESOURCE_RECOMMENDATION " . "VALUES "; $insert_sql = $base_insert_sql; $comma = ""; $insert_count = 0; $total_insert = 0; foreach ($recommendations as $recommendation) { L\crawlTimeoutLog("Have inserted $total_insert recommendations"); list($user_id, $group_id, $page_id, $resource_path, $score, $item_id) = $recommendation; $time = $this->update_time; $insert_sql .= "$comma($user_id, $group_id, $page_id, " . "'$resource_path', $score, $time, $item_id)"; $comma = ","; $insert_count++; $total_insert++; if ($insert_count == C\BATCH_SQL_INSERT_NUM) { $insert_sql = $db->insertIgnore($insert_sql); $db->execute($insert_sql); $insert_count = 0; $comma = ""; $insert_sql = $base_insert_sql; } } if ($insert_count > 0) { $insert_sql = $db->insertIgnore($insert_sql); $db->execute($insert_sql); } } /** * Returns the term embedding either from LRU cache or database * * @param int $term_id * @param int $item_type * @param boolean $update indicates whether to update the cache * @return string $term_embedding */ public function getTermEmbedding($term_id, $item_type, $update = false) { $db = $this->db; $term_embedding = $this->lru_cache->get($term_id); if (!isset($term_embedding)) { $sql = "SELECT VECTOR FROM RECOMMENDATION_TERM_EMBEDDING " . "WHERE ITEM_TYPE = ? AND ID = ? " . $db->limitOffset(1); $result = $db->execute($sql, [$item_type, $term_id]); $row = null; if ($result) { $row = $db->fetchArray($result); } if (!$row || !is_string($row['VECTOR'])) { $term_embedding = pack("E*", ...array_fill(1, C\EMBEDDING_VECTOR_SIZE, 0.0)); } else { $db->closeCursor($result); $term_embedding = base64_decode($row['VECTOR'], true); } } if ($update) { $this->updateTermEmbeddingCache($term_id, $term_embedding, $item_type, "get_term_embedding"); } return $term_embedding; } /** * Updates LRU cache of term embeddings and save the evicted * embedding back to database * * @param int $term_id * @param string $term_embedding * @param int $item_type * @param string $string */ public function updateTermEmbeddingCache($term_id, $term_embedding, $item_type, $message = "") { $db = $this->db; $evicted_item = $this->lru_cache->put($term_id, $term_embedding); if (isset($evicted_item)) { $on_conflict = in_array($db->to_upper_dbms, ["MYSQL"]) ? " ON DUPLICATE KEY " : " ON CONFLICT (ITEM_TYPE, ID) DO UPDATE "; $sql = "INSERT INTO RECOMMENDATION_TERM_EMBEDDING VALUES ". "(?, ?, ?) $on_conflict SET VECTOR = ?"; $vector = base64_encode($evicted_item[1]); $db->pre_message = "$message {$evicted_item[0]} was evicted"; $db->execute($sql, [$evicted_item[0], $item_type, $vector, $vector]); } } /** * Writes back the term embeddings in cache to database and free up memory * * @param int $item_type value for ITEM_TYPE column */ public function saveTermEmbeddingsCacheToDb($item_type) { L\crawlLog("Doing final persistence flush of LRU cache for ". "$item_type"); $db = $this->db; $base_delete_sql = "DELETE FROM RECOMMENDATION_TERM_EMBEDDING" . " WHERE ITEM_TYPE = ? AND ID IN ("; $delete_sql = $base_delete_sql; $base_insert_sql = "INSERT INTO RECOMMENDATION_TERM_EMBEDDING VALUES "; $insert_sql = $base_insert_sql; $comma = ""; $insert_count = 0; $total_insert = 0; foreach ($this->lru_cache->getAll() as $id => $embedding) { L\crawlTimeoutLog("Have inserted $total_insert many embeddings"); $embedding = base64_encode($embedding); $insert_sql .= "$comma('$id', $item_type, '$embedding')"; $delete_sql .= "$comma '$id'"; $comma = ","; $insert_count++; $total_insert++; if ($insert_count == C\BATCH_SQL_INSERT_NUM) { $delete_sql .= ")"; $db->execute($delete_sql, [$item_type]); $insert_sql = $db->insertIgnore($insert_sql); $db->execute($insert_sql); $insert_count = 0; $comma = ""; $delete_sql = $base_delete_sql; $insert_sql = $base_insert_sql; } } if ($insert_count > 0) { $delete_sql .= ")"; $db->execute($delete_sql, [$item_type]); $insert_sql = $db->insertIgnore($insert_sql); $db->execute($insert_sql); } unset($this->lru_cache); } }