viewgit/inc/functions.php:22 Function utf8_encode() is deprecated [8192]
<?php /** * SeekQuarry/Yioop -- * Open Source Pure PHP Search Engine, Crawler, and Indexer * * Copyright (C) 2009 - 2022 Chris Pollett chris@pollett.org * * LICENSE: * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program. If not, see <https://www.gnu.org/licenses/>. * * END LICENSE * * @author Chris Pollett chris@pollett.org * @license https://www.gnu.org/licenses/ GPL3 * @link https://www.seekquarry.com/ * @copyright 2009 - 2022 * @filesource */ namespace seekquarry\yioop\library; use seekquarry\yioop\configs as C; /** * Loads crawlLog functions if needed */ require_once __DIR__ . "/Utility.php"; /** * A partition document bundle is a collection of partition each of which in * turn can hold a concatenated sequence of compressed documents and which * are managed together. It is a successor format to the earlier * WebArchiveBundle of Yioop. The partition document bundle stores individual * records using a record format defined via the PackedTableTools class. * This basic format has been extended by two new types BLOB and SERIAL (a * PHP serialized object represesnted as a blob). * Data for columns of these types are stored in separate files from the rest * of records. Offset into these archive files for blobs and serial's are * stored in a record as columns representing a difference list of int's * together with a LAST_BLOB_LEN column. Using this info, blob's and serial's * associated with a record can be retrieved. How many documents are * together with a collected into a partition can be tuned * for read, write, and in-memory efficiency. * * @author Chris Pollett */ class PartitionDocumentBundle { /** * Compression strategy used to compress blob and serial columns */ const DEFAULT_COMPRESSOR = C\NS_COMPRESSORS . "NonCompressor"; /** * Default parameters to use when constructing a PartitionDocumentBundle */ const DEFAULT_PARAMETERS = ["RECORD_COMPRESSOR" => self::DEFAULT_COMPRESSOR, "BLOB_COMPRESSOR" => self::DEFAULT_COMPRESSOR, "COUNT" => 0, "PARTITION_SIZE_THRESHOLD" => self::PARTITION_SIZE_THRESHOLD, "FORMAT" => ["PRIMARY KEY" => "KEY", "VALUE" => "BLOB"], "MAX_ITEMS_PER_FILE" => self::MAX_ITEMS_PER_FILE, "SAVE_PARTITION" => 0, "ACTIVE_COUNT" => 0 ]; /** * Extension for PartitionDocumentBundle partition files used to contain * records */ const INDEX_EXTENSION = ".ix"; /** * Default maximum number of records to store in a partition */ const MAX_ITEMS_PER_FILE = 16384; /** * File name of file used to store the parameters of this * PartitionDocumentBundle */ const PARAMETERS_FILE = "pdb_parameters.txt"; /** * Prefix to file names of PartitionDocumentBundle partition files */ const PARTITION_PREFIX = "partition_"; /** * Maximum number of bytes a partition can have before the next partition * is started. Notice this implies a maximum file size to store * in BLOB columns */ const PARTITION_SIZE_THRESHOLD = 2147483648; /** * Used to store the file handle to, the partition number, and last add time * for the last time an item's blob/serial columns were added to for * the PartitionDocumentBundle * @var array */ public $add_archive_cache = [null, "", -1]; /** * Used to store the file handle to, the partition number, and last access * time for the last time an item's blob/serial columns were accessed for * the PartitionDocumentBundle * @var array */ public $get_archive_cache = [null, "", -1]; /** * Array of column names for the columns in a PartitionDocumentBundle which * are of type BLOB or SERIAL * @var array */ public $blob_columns; /** * The seekquarry\yioop\library\compressors\Compressor object used to * compress record files. * @var object */ public $record_compressor; /** * The seekquarry\yioop\library\compressors\Compressor object used to * compress blob columns. * @var object */ public $blob_compressor; /** * Folder path where the PartitionDocumentBundle is stored * @var string */ public $folder; /** * In memory cache of partitions from the PartitionDocumentBundle * @var array */ public $index_cache; /** * Maximum number of items the partition cache is allowed to hold */ public $index_cache_size; /** * Used to keep track of when this instance was created, as part of managing * file handles expiration (could be set/updated externally to reflect * some other instance using the bundle) * @var int */ public $instance_time; /** * Name of primary key column for records * @var string */ public $key_field; /** * Stores the constructor parameters used to create this * PartitionDocumentBundle * @var array */ public $parameters; /** * Array of column names for the columns in a PartitionDocumentBundle which * are of type SERIAL * @var array */ public $serial_columns; /** * The PackedTableTools object used to pack and unpack records in * partitions * @var object */ public $table_tools; /** * Used to create a new instance of a PartitionDocumentBundle * * @param string $folder the path to the folder to store this * PartitionDocumentBundle * @param array $format the column names, keys and types for this * PartitionDocumentBundle object * @param int $max_items_per_file maximum number of items to store * in a partition before making the next partition * @param int $partition_size_threshold maximum length of a partition * file in bytes before a new partition file should be started * @param object $record_compressor_type * seekquarry\yioop\library\compressors\Compressor object used to * compress record files excluding blob columns. * @param object $blob_compressor_type * seekquarry\yioop\library\compressors\Compressor object used to * compress blob columns. */ public function __construct($folder, $format = self::DEFAULT_PARAMETERS["FORMAT"], $max_items_per_file = self::MAX_ITEMS_PER_FILE, $partition_size_threshold = self::PARTITION_SIZE_THRESHOLD, $record_compressor_type = self::DEFAULT_COMPRESSOR, $blob_compressor_type = self::DEFAULT_COMPRESSOR) { $initial_parameters = self::DEFAULT_PARAMETERS; $initial_parameters["PARTITION_SIZE_THRESHOLD"] = $partition_size_threshold; $initial_parameters["MAX_ITEMS_PER_FILE"] = $max_items_per_file; $initial_parameters["RECORD_COMPRESSOR"] = $record_compressor_type; $initial_parameters["BLOB_COMPRESSOR"] = $blob_compressor_type; $this->record_compressor = new $record_compressor_type(); $this->blob_compressor = new $blob_compressor_type(); $initial_parameters["FORMAT"] = $format; $this->instance_time = hrtime(true); $this->index_cache_size = min(50, floor(metricToInt( ini_get('memory_limit'))/128000000)); $this->folder = $folder; $folder_paths = [$folder]; $changed_parameters = false; foreach ($folder_paths as $folder_path) { if (!file_exists($folder_path)) { $changed_parameters = true; if(!mkdir($folder_path)) { return null; } } } $this->parameters = self::getParameterInfo($folder); foreach (self::DEFAULT_PARAMETERS as $field => $value) { if (!isset($this->parameters[$field])) { $this->parameters[$field] = $initial_parameters[$field]; $changed_parameters = true; } } $format = $this->parameters["FORMAT"]; $packed_table_format = []; $this->blob_columns = []; $this->serial_columns = []; $this->key_field = null; foreach ($format as $field_name => $type) { $upper_field_name = strtoupper($field_name); if ($upper_field_name == "PRIMARY KEY") { $packed_table_format["PRIMARY KEY"] = $type; $this->key_field = (is_array($type)) ? $type[0] : $type; continue; } $upper_type = strtoupper($type); if (in_array($upper_type, ["BLOB", "SERIAL"])) { $this->blob_columns[] = $field_name; if ($upper_type == "SERIAL") { $this->serial_columns[] = $field_name; } $packed_table_format[$field_name] = "INT"; } else if (isset(PackedTableTools::TYPE_SYNONYMS[$upper_type])) { $packed_table_format[$field_name] = PackedTableTools::TYPE_SYNONYMS[$upper_type]; } else { return null; } } if (empty($this->key_field)) { return null; } if (!empty($this->blob_columns)) { $packed_table_format["LAST_BLOB_LEN"] = "INT"; } $this->table_tools = new PackedTableTools($packed_table_format, $record_compressor_type); if ($changed_parameters) { $this->saveParameters(); } } /** * Returns $fields columns from the record associated with $key in * the $partition partition of this PartitionDocumentBundle if exists. * If $fields is empty all columns returned. * * @param string $key to look up in partition * @param int $partition to look for record in * @param array $fields names of fields in this PartitionDocumentBundle * to return * @return array|false unpacked record on success, otherwise false */ public function get($key, $partition, $fields = []) { $table_tools = $this->table_tools; $partition_filename = $this->getPartition($partition); $index = $this->loadPartitionIndex($partition, false, PackedTableTools::AS_STRING_MODE); if (is_string($index)) { $index_data_raw = $table_tools->findRowFromKeyTableString($index, $key); } else { $index_data_raw = $table_tools->find($index, $key); } if (empty($index_data_raw)) { return false; } if (!empty($fields)) { if (isset($fields[0]) && $fields == array_values($fields)) { $fields = array_combine($fields, $fields); } } $index_data = $table_tools->unpack($index_data_raw)[0]; if (!empty($this->blob_columns)) { $num_blob_columns = count($this->blob_columns); $offset = intval($index_data[$this->blob_columns[0]]); for ($i = 0; $i < $num_blob_columns; $i++) { $column_name = $this->blob_columns[$i]; $len = ($i + 1 < $num_blob_columns) ? intval($index_data[$this->blob_columns[$i + 1]]) : $index_data["LAST_BLOB_LEN"]; if (empty($fields) || isset($fields[$column_name])) { set_error_handler(null); $index_data[$column_name] = ($len == 0) ? "" : @$this->getArchive($partition_filename, $offset, $len); set_error_handler(C\NS_CONFIGS . "yioop_error_handler"); } $offset += $len; } unset($index_data["LAST_BLOB_LEN"]); } if (empty($fields)) { foreach ($this->serial_columns as $field_name) { $index_data[$field_name] = unserialize( $index_data[$field_name]); } $index_data[$this->key_field] = $key; $out_data = $index_data; } else { $out_data = []; if (isset($fields[$this->key_field])) { $out_data[$fields[$this->key_field]] = $key; unset($fields[$this->key_field]); } set_error_handler(null); foreach ($fields as $in_name => $out_name) { if (isset($index_data[$in_name])) { if (in_array($in_name, $this->serial_columns)) { $out_data[$out_name] = @unserialize( $index_data[$in_name]); } else { $out_data[$out_name] = $index_data[$in_name]; } } } set_error_handler(C\NS_CONFIGS . "yioop_error_handler"); } return $out_data; } /** * Retrieve a BLOB string in the file $archive_filename at byte position * $offset of length $len. It uncompresses this string using * $compressor->uncompress and return the result. * * @param string $archive_filename the filename of a partition archive * file to get a blob object from * @param int $offset a byte position in that file * @param int $len number of bytes from $offset to read. * @return string the result of uncompressing the string at $offset of * length $len */ public function getArchive($archive_filename, $offset, $len) { list($fh, $previous_archive_filename, $previous_instance_time) = $this->get_archive_cache; $blob_compressor = $this->blob_compressor; if (!$fh || $previous_archive_filename != $archive_filename || $previous_instance_time != $this->instance_time) { if ($fh) { fclose($fh); } $fh = fopen($archive_filename , "r"); $previous_archive_filename = $archive_filename; $previous_instance_time = $this->instance_time; } $value = false; if (fseek($fh, $offset) == 0) { $compressed_file = fread($fh, $len); $value = $blob_compressor->uncompress($compressed_file); } $this->get_archive_cache = [$fh, $previous_archive_filename, $previous_instance_time]; return $value; } /** * Returns the path to the archive file (used to store BLOB and SERIAL * columns) for the $i partition in this PartitionDocumentBundle * * @param int $i partition to get the archive file name for * @return string path of $i partition archive file */ public function getPartition($i) { return $this->folder . "/" . self::PARTITION_PREFIX . $i . $this->blob_compressor->fileExtension(); } /** * Returns the path to the index file (used to store all columns * a partition record except blob and serial columns) for the $i partition * in this PartitionDocumentBundle * * @param int $i partition to get the index file name for * @return string path of $i partition index file */ public function getPartitionIndex($i) { return $this->folder . "/" . self::PARTITION_PREFIX . $i . self::INDEX_EXTENSION; } /** * Returns the unserialized index file for the $partition partition of * this PartitionDocumentBundle. If $force_load is set to true then reloads * from disk rather than use a cached value if present. * * @param int $partition which partition index to read * @param bool $force_load whether to reload the index from disk or to * use a cached value if present * @param int $mode PackedTableTools mode to use when reading in partition * @return mixed either a string if $mode as AS_STRING_MODE, or array $key => packed records pairs where records are * packed according to this PartitionDocumentBundle's signature */ public function loadPartitionIndex($partition, $force_load = false, $mode = PackedTableTools::REPLACE_MODE) { $index_file_name = $this->getPartitionIndex($partition); $time = microtime(true); if (!empty($this->index_cache[$partition]) && !$force_load) { $index = $this->index_cache[$partition][0]; $this->index_cache[$partition][1] = $time; } else { $this->index_cache ??= []; $cache_size = count($this->index_cache); if ((php_sapi_name() == 'cli') && !C\nsdefined("IS_OWN_WEB_SERVER")) { crawlLog("PartitionDocumentBundle cache size: $cache_size"); } if ($cache_size > $this->index_cache_size) { $oldest_partition = -1; $oldest_time = 2 * $time; foreach ($this->index_cache as $index_partition => $cache_info){ if (empty($cache_info[1])) { unset($this->index_cache[$index_partition]); } else if (!empty($cache_info[1]) && $cache_info[1] <= $oldest_time) { $oldest_time = $cache_info[1]; $oldest_partition = $index_partition; } } unset($this->index_cache[$oldest_partition]); } if ($force_load) { unset($this->index_cache[$partition]); } $this->index_cache[$partition] = [ $this->table_tools->load($index_file_name, $mode), $time]; $index = $this->index_cache[$partition][0]; } if (empty($index)) { return false; } return $index; } /** * Used to add new records to the PartitionDocumentBundle * * @param array $row_or_rows either array of record with fields given * by this PartitionDocumentBundle's signature or an array of rows. * @return bool success or not */ public function put($row_or_rows) { $rows = (empty($row_or_rows[$this->key_field])) ? $row_or_rows : [$row_or_rows]; $table_tools = $this->table_tools; $num_rows = count($rows); $i = 0; $save_partition = $this->parameters["SAVE_PARTITION"]; // remove $save_partition from read cache unset($this->index_cache[$save_partition]); $save_partition_name = $this->getPartition($save_partition); clearstatcache(); $save_partition_len = file_exists($save_partition_name) ? filesize($save_partition_name) : 0; $this->save_index = $this->loadPartitionIndex($save_partition, false, PackedTableTools::AS_STRING_MODE); unset($this->index_cache[$save_partition]); foreach ($rows as $row) { crawlTimeoutLog("..still adding pages partition document bundle. " . " Have added %s of %s.", $i, $num_rows); $key = $row[$this->key_field] ?? false; if ($key === false) { crawlLog("PartitionDocumentBundle Put Failed A"); return false; } unset($row[$this->key_field]); $value = $row; if (($this->parameters['ACTIVE_COUNT'] > $this->parameters["MAX_ITEMS_PER_FILE"]) || ( $save_partition_len > $this->parameters["PARTITION_SIZE_THRESHOLD"])) { $active_count = $this->parameters['ACTIVE_COUNT']; crawlLog("PartitionDocumentBundle advancing partition because ". "ACTIVE_COUNT = $active_count and SAVE_PARTITION_LEN = ". $save_partition_len); $this->advanceSavePartition(); $save_partition = $this->parameters["SAVE_PARTITION"]; $save_partition_len = 0; $this->save_index = ""; $index_dirty = false; } $blob_columns = $this->blob_columns ?? []; $serial_columns = $this->serial_columns ?? []; $format = $this->parameters["FORMAT"]; unset($format['PRIMARY KEY']); $format_keys = array_keys($format); $out_value = []; foreach ($format_keys as $format_key) { $out_value[$format_key] = $value[$format_key] ?? null; } if (!empty($blob_columns)) { $old_offset = 0; foreach ($blob_columns as $blob_column) { $blob = $out_value[$blob_column] ?? ""; if (in_array($blob_column, $serial_columns)) { $blob = serialize($blob); } if (($add_info = $this->addArchive($blob)) === false) { crawlLog("PartitionDocumentBundle Put Failed C"); return false; } list($offset, $len, $partition) = $add_info; $save_partition_len += $len; $out_value[$blob_column] = $offset - $old_offset; $old_offset = $offset; } $out_value["LAST_BLOB_LEN"] = $len; } $out_value = $table_tools->pack($out_value); if ($table_tools->add($this->save_index, $key, $out_value, $table_tools::ADD_MEM_TABLE_STRING)) { $this->parameters['ACTIVE_COUNT']++; } else { crawlLog("PartitionDocumentBundle Put Failed D"); return false; } $i++; } $save_index_name = $this->getPartitionIndex($save_partition); if (isset($this->save_index)) { $table_tools->save($save_index_name, $this->save_index); } $this->saveParameters(); return true; } /** * Saves the current save partition, adds one to the save partition number, * and starts a new save partition. * * @param int $new_save_partition partition and add one to. If use default, * then this method will use the parameters "SAVE_PARTITION" * value. */ public function advanceSavePartition($new_save_partition = 0) { $save_partition = $this->parameters["SAVE_PARTITION"]; $new_save_partition = (($new_save_partition) > 0) ? $new_save_partition : $save_partition + 1; if ($new_save_partition <= $save_partition) { return false; } $save_index_name = $this->getPartitionIndex($save_partition); if (isset($this->save_index)) { $this->table_tools->save($save_index_name, $this->save_index); } $new_save_index_name = $this->getPartitionIndex($new_save_partition); if (file_exists($new_save_index_name)) { unlink($new_save_index_name); } $this->parameters["SAVE_PARTITION"] = $new_save_partition; $this->parameters['COUNT'] += $this->parameters['ACTIVE_COUNT']; $this->parameters['ACTIVE_COUNT'] = 0; return true; } /** * Creates a new counter $field to be maintained * * @param string $field field of info struct to add a counter for */ public function initCountIfNotExists($field = "COUNT") { $this->parameters[$field] ??= 0; $this->saveParameters(); } /** * Add $num to maintained counter $field * * @param int $num number of items to add to current count * @param string $field field of info struct to add to the count of */ public function addCount($num, $field = "COUNT") { $this->parameters[$field] ??= 0; $this->parameters[$field] += $num; $this->saveParameters(); } /** * Save the operating parameters of this PartitionDocumentBundle */ public function saveParameters() { $parameter_path = $this->folder . "/" . self::PARAMETERS_FILE; file_put_contents($parameter_path, serialize($this->parameters), LOCK_EX); } /** * Returns the parameters (such as its signature, max number of * documents per partition and counts) used to configure the * PartitionDocumentBundle stored at $folder * * @param string $folder file path to a stored PartitionDocumentBundle * @return array configuration info about the PartitionDocumentBundle */ public static function getParameterInfo($folder) { $parameter_path = $folder . "/" . self::PARAMETERS_FILE; if(file_exists($parameter_path)) { $parameters = unserialize(file_get_contents($parameter_path)); if (!is_array($parameters)) { $parameters = []; } if (!empty($parameters["COMPRESSOR"])) { //original format didn't distinguish between compressor use $parameters["RECORD_COMPRESSOR"] ??= $parameters["COMPRESSOR"]; $parameters["BLOB_COMPRESSOR"] ??= $parameters["COMPRESSOR"]; } if (empty($parameters['SAVE_PARTITION']) || $parameters['SAVE_PARTITION'] == 0) { $parameters['SAVE_PARTITION'] = max(count(glob("$folder/*" . self::INDEX_EXTENSION))-1, 0); } return $parameters; } else { return []; } } /** * Used to add a blob item to the current save partition file. * * @param string $value blob item to be added to file * @return array [offset into save partition, length stored, * partition number OF current save partition] */ protected function addArchive($value) { list($fh, $previous_partition_filename, $previous_instance_time) = $this->add_archive_cache; $blob_compressor = $this->blob_compressor; $save_partition = $this->parameters["SAVE_PARTITION"]; $partition_filename = $this->getPartition($save_partition); if (!is_resource($fh) || $partition_filename != $previous_partition_filename || $previous_instance_time != $this->instance_time) { if (!empty($fh) && is_resource($fh)) { fclose($fh); } $fh = fopen($partition_filename , "c+"); $previous_partition_filename = $partition_filename; $previous_instance_time = $this->instance_time; } fseek($fh, 0, SEEK_END); $offset = ftell($fh); $compressed_value = $blob_compressor->compress($value); $len = strlen($compressed_value); fwrite($fh, $compressed_value, $len); $this->add_archive_cache = [$fh, $previous_partition_filename, $previous_instance_time]; return [$offset, $len, $save_partition]; } }