viewgit/inc/functions.php:22 Function utf8_encode() is deprecated [8192]
<?php /** * SeekQuarry/Yioop -- * Open Source Pure PHP Search Engine, Crawler, and Indexer * * Copyright (C) 2009 - 2023 Chris Pollett chris@pollett.org * * LICENSE: * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program. If not, see <https://www.gnu.org/licenses/>. * * END LICENSE * * @author Chris Pollett chris@pollett.org * @license https://www.gnu.org/licenses/ GPL3 * @link https://www.seekquarry.com/ * @copyright 2009 - 2023 * @filesource */ namespace seekquarry\yioop\library; use seekquarry\yioop\configs as C; /** For remoteAddress and crawlHash function and Yioop constants */ require_once __DIR__ . "/../library/Utility.php"; /** * Encapsulates the temporary storage of messages sent between QueueServers * and Fetchers during the course of a Crawl * * @author Chris Pollett */ class MessagesBundle implements CrawlConstants { /** * Reference to a database object. Used since has directory manipulation * functions * @var object */ public $db; /** * The folder name of this MessagesBundle * @var string */ public $dir_name; /** * Largest sequence number currently in use for sliding window protocol * messages used for fetch batches of urls and downloaded web page * for those urls responses * @var int */ public $max_sequence_number; /** * Contains sequence_number => filename associations about which fetch * batches have been successfully downloaded, received and should be * moved for further processing. These will be moved once all earlier * sequence numbers have been similarly received. * @var array */ public $receive; /** * Contains sequence_number => [filename, schedule_time, tries] associations * for fetch batches of urls to crawl which have been produced. Here * filename is the name of the file that has the data for the batch, * schedule_time is the last time the batch was scheduled to a Fetcher, * and tries is the number of times it has been scheduled. * @var array */ public $send; /** * Used in a crawl to set the maximum number of schedules generated at a * time (if these many schedules are created already, queue server pauses * on schedule creation until one or more of them have been successfully * downloaded) * In the QueueServer the window size is set to be the total number of * currently active fetchers * @var int */ public $window_size; /** * String prefix to be used before filenames of message files in the * ETAG_EXPIRES_FOLDER, INDEX_DATA_FOLDER, ROBOTS_FOLDER, and * SCHEDULES_FOLDER folders. */ const MESSAGE_PREFIX = "At"; /** * Folder name for the folder within the MessageBundle folder used to * contain files with etag and expires header information waiting to be * processed by the scheduler */ const ETAG_EXPIRES_FOLDER = "etag_expires"; /** * Folder name for the folder within the MessageBundle folder used to * contain files with web page summaries and downloaded page information * waiting to be by the indexer (i.e., to make an inverted index) * processed */ const INDEX_DATA_FOLDER = "index_data"; /** * Folder name for the folder within the MessageBundle folder used to * contain files with robots.txt information waiting to be * processed by the scheduler */ const ROBOTS_FOLDER = "robots"; /** * Folder name for the folder within the MessageBundle folder used to * contain files with to crawl url information waiting to be * processed by the scheduler */ const SCHEDULES_FOLDER = "schedules"; /** * Makes a MessagesBundle with the provided parameters. This method * does not initialize send and receive properties related to * the fetch batch window. * * @param string $dir_name folder name used by this MessagesBundle * @param int $window_size number of number of outstanding url fetch batch * files to allow before stop accepting new ones * */ public function __construct($dir_name, $window_size = 1) { $this->dir_name = $dir_name; if (!file_exists($dir_name)) { mkdir($dir_name); set_error_handler(null); @chmod($dir_name, 0777); set_error_handler(C\NS_CONFIGS . "yioop_error_handler"); } $this->window_size = $window_size; $db_class = C\NS_DATASOURCES . ucfirst(C\DBMS) . "Manager"; $this->db = new $db_class(); } /** * Use to intialize the send and receiver property arrays for the * send and receive fetch batch window. * * @param bool $reset_schedule_times whether to reset the schedule * times of each batch current in the send folder, but not acknowledged * to 0, forcing them to eb rescheduled. */ public function initWindows($reset_schedule_times = false) { $receive_dir = $this->dir_name . "/receive"; $send_dir = $this->dir_name . "/send"; $max_sequence_number = 0; foreach (['receive' => $receive_dir, 'send' => $send_dir] as $window_name => $dir) { if (!file_exists($dir)) { mkdir($dir); set_error_handler(null); @chmod($dir, 0777); set_error_handler(C\NS_CONFIGS . "yioop_error_handler"); } $files = glob("$dir/*.txt"); $window_data = []; $sequence_number = 0; $len_prefix = strlen("$dir/"); $len_suffix = strlen(".txt"); foreach ($files as $file) { $base_name = substr($file, $len_prefix, -$len_suffix); $base_parts = explode("-", $base_name); $expected_num_parts = ($window_name == 'receive') ? 1 : 3; if (count($base_parts) != $expected_num_parts) { continue; } $sequence_number = intval($base_parts[0]); if ($sequence_number > $max_sequence_number) { $max_sequence_number = $sequence_number; } if ($window_name == 'send') { list(, $schedule_time, $tries) = $base_parts; $item = ['file' => $file, 'schedule_time' => intval($schedule_time), 'tries' => intval($tries)]; if ($reset_schedule_times) { $schedule_time = 0; $item['file'] = $this->makeSendWindowFilename( $sequence_number, $schedule_time, $tries); rename($file, $item['file']); } } else { $item = $file; } $window_data[$sequence_number] = $item; } $this->$window_name = $window_data; } $this->max_sequence_number = $max_sequence_number; } /** * Returns where the send folder (containing fetch batches which have been * received but not yet downloaded) can hold another fetch batch given * the window size. * @return bool true if it can hold another fetch batch, fall otherwise */ public function isSendFull() { return (count($this->receive) + count($this->send) >= $this->window_size); } /** * Returns the data from the least sequence numbered file in the send * window that either has not previously been returned (tries 0) or * that has been previously returned but more than timeout seconds earlier * In both cases the try count of the file is bumped. If a file is * found in the search process with a try count above * MAX_RESCHEDULE_ATTEMPTS, it is moved instead to the receive window * but with no data received. * * @return string file contents of found file */ public function extractSendWindow() { ksort($this->send); $data = false; $time = time(); foreach($this->send as $sequence_number => $item) { $tries = $item['tries']; $timeout = ($item['schedule_time'] + C\SCHEDULE_TIMEOUT) < $time; if($item['tries'] == 0 || ($timeout && $tries < C\MAX_RESCHEDULE_ATTEMPTS)) { $data = file_get_contents($item['file']); $tries++; $this->send[$sequence_number]['tries'] = $tries; $schedule_time = ($timeout) ? $time : $item['schedule_time']; $filename = $this->makeSendWindowFilename( $sequence_number, $schedule_time, $tries); rename($item['file'], $filename); $this->send[$sequence_number]['file'] = $filename; break; } else if ($timeout) { $this->updateReceiveWindow($sequence_number, ""); } } return $data; } /** * Add a fetch batch to the send folder of the messages bundle if there * space in the folder given the current window size * @param string $tmp_send_filename filename of the file with the * fetch batch. If this operation succeeds, the file at this location * will be moved to the send folder * @return bool whether operation was successful */ public function insertSendWindow($tmp_send_filename) { if ($this->isSendFull()) { return false; } $schedule_time = time(); $tries = 0; $this->max_sequence_number++; $sequence_number = $this->max_sequence_number; $filename = $this->makeSendWindowFilename($sequence_number, $schedule_time, $tries); rename($tmp_send_filename, $filename); set_error_handler(null); @chmod($filename, 0777); set_error_handler(C\NS_CONFIGS . "yioop_error_handler"); $this->send[$sequence_number] = ['file' => $filename, 'schedule_time' => $schedule_time, 'tries' => $tries]; return true; } /** * Used to output the filename of a file storing a fetch batch of urls in * the send folder * @param int $sequence_number sequence number used for sliding window * algorithm * @param int $schedule_time time at which the given file was last scheduled * for download * @param int $tries number of times a fetcher has been scheduled to try * to download the given list of urls * @return string the filename to use for the fetch batch with the given * parameters */ public function makeSendWindowFilename($sequence_number, $schedule_time, $tries) { return $this->dir_name . "/send/{$sequence_number}-$schedule_time-$tries.txt"; } /** * Used to output the filename of a file returned from a fetcher containing * downloaded pages for a set of urls * * @param int $sequence_number sequence number used for sliding window * algorithm * @return string the filename to use for the fetch batch with the given * parameters */ public function makeReceiveWindowFilename($sequence_number) { return $this->dir_name . "/receive/{$sequence_number}.txt"; } /** * Adds a new messages file to the received subfolder of the * MessagesBundle with the given sequence number and containing the * passed data * * @param int $sequence_number sequence number to use in making the * filename of the new file to add to the receive folder * @param string $data message contents of the file to add */ public function updateReceiveWindow($sequence_number, $data) { if (empty($this->send[$sequence_number])) { return; } $target_name = $this->makeReceiveWindowFilename($sequence_number); file_put_contents($target_name, $data); set_error_handler(null); @chmod($target_name, 0777); set_error_handler(C\NS_CONFIGS . "yioop_error_handler"); $this->receive[$sequence_number] = $target_name; unlink($this->send[$sequence_number]['file']); unset($this->send[$sequence_number]); } /** * Searches for a file is in the receive subfolder of this MessagesBundle * of least sequence number. If there is no file in the send subfolder * of the MessagesBundle with smaller sequence number, then the file * is read, deleted and its contents returned. Otherwise, this function * returns false. * * @return string|bool Data of found file or file */ public function extractReceiveWindow() { ksort($this->send); $first_send_number = array_key_first($this->send); ksort($this->receive); $first_receive_number = array_key_first($this->receive); if ($first_send_number !== null && $first_send_number < $first_receive_number) { return false; } $filename = $this->receive[$first_receive_number] ?? ""; if (!empty($filename) && file_exists($filename)) { $data = file_get_contents($filename); unlink($filename); } else { $data = false; } unset($this->receive[$first_receive_number]); return $data; } /** * Adds a file with contents $data and with name containing $address and * $time to a subfolder $day in the folder $messages_type subfolder of * the MessagesBundle * * @param string $messages_type the kind of messages being saved * @param string &$data_string encoded, compressed, serialized data the * schedule is to contain * @param int $time */ public function addMessages($messages_type, &$data_string, $time = 0) { $dir = $this->dir_name . "/$messages_type"; $address = strtr(remoteAddress(), ["." => "-", ":" => "_"]); $time = ($time == 0) ? time() : $time; $day = floor($time/C\ONE_DAY); if (!file_exists($dir)) { mkdir($dir); chmod($dir, 0777); } $dir .= "/$day"; if (!file_exists($dir)) { mkdir($dir); chmod($dir, 0777); } $data_hash = crawlHash($data_string); file_put_contents($dir . "/" . self::MESSAGE_PREFIX . $time . "From" . $address . "WithHash$data_hash.txt", $data_string); } /** * Get the next messages file by timestamp stored in the MessagesBundle * subfolder of of the given type (etag_expires, robots, schedules, * index_data) and either return it as a string or as a deserialized array * or object. This subfolder is typically organized into days subsubfolders * with the appropriately timestamped files in a given days folder. * After reading its data, but before returning it from * this method the file itself is deleted. * * @param string $type kind of data to look for messages about. This * kind should correspond to a folder in the messages bundle. Typically, * these folders will be (etag_expires, robots, schedules, index_data) * @param bool $unpack whether to unpack (decode/gzunzip/wddx_deserialize) * the first found file and return its data, or just return its string * contents after webdecode them. * @param string $message_prefix prefix string of filenames in * given folder (etag_expires, robots, schedules, index_data) that have * messages data * @return (string|array|object)? */ public function nextMessages($type, $unpack = true, $message_prefix = self::MESSAGE_PREFIX) { $dirs = glob("{$this->dir_name}/$type/*", GLOB_ONLYDIR); $len_prefix = strlen($message_prefix); foreach ($dirs as $dir) { $files = glob($dir . '/*.txt'); if (isset($old_dir)) { crawlLog("Deleting $old_dir\n"); $this->db->unlinkRecursive($old_dir); /* The idea is that only go through outer loop more than once if earlier data directory empty. Note: older directories should only have data dirs or deleting like this might cause problems! */ } /* the code below returns the data from the lex-first file beginning with self::MESSAGE_PREFIX */ foreach ($files as $file) { $path_parts = pathinfo($file); $base_name = $path_parts['basename']; $file_root_name = substr($base_name, 0, $len_prefix); if (strcmp($file_root_name, $message_prefix) == 0) { if ($unpack) { $message_data = $this->unpackMessages($file); } else { $message_data = webdecode(file_get_contents($file)); } unlink($file); return $message_data; } } $old_dir = $dir; } return null; } /** * Returns whether they are any as yet to be produced to crawl schedules * for the crawl queue in the given MessagesBundle's scheduler folder. * @param string $messages_bundle_dir folder to check from schedules * @return bool whether any found (true, if yes) */ public static function isResumable($messages_bundle_dir) { $schedules_dir_name = "$messages_bundle_dir/" . self::SCHEDULES_FOLDER; if (!is_dir($schedules_dir_name)) { return false; } $schedules_dir = opendir($schedules_dir_name); $schedule_prefix = self::MESSAGE_PREFIX; $len_prefix = strlen(self::MESSAGE_PREFIX); while (($name = readdir($schedules_dir)) !== false) { $sub_path = "$schedules_dir_name/$name"; if (!is_dir($sub_path) || $name == '.' || $name == '..') { continue; } $sub_dir = opendir($sub_path); while (($sub_name = readdir($sub_dir)) !== false) { if (substr($sub_name, 0, $len_prefix) == $schedule_prefix) { return true; } } closedir($sub_dir); } closedir($schedules_dir); return false; } /** * Decodes the web-string safety encoding and then gunzips and deserializes * the result for a messages file used for communication between a Fetcher * and the QueueServer * @param string $file name of the * @return array|object unserialized data array or object contained in the * messages file to be processed */ public function unpackMessages($file) { crawlLog("Processing File: $file"); $decode = file_get_contents($file); $decode = webdecode($decode); set_error_handler(null); $decode = @gzuncompress($decode); $sites = @unserialize($decode); set_error_handler(C\NS_CONFIGS . "yioop_error_handler"); return $sites; } }