mindformers.models.base_tokenizer 源代码

# Copyright 2023 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# This file was refer to project:
# https://github.com/huggingface/transformers/blob/main/src/transformers/tokenization_utils.py
# ============================================================================
"""Base Tokenizer for the pretrained tokenizer"""
from typing import Optional, List, Union

import copy
import os
import json
import shutil
from collections import defaultdict
import yaml

import numpy as np

import mindspore
from mindspore import Tensor

from mindformers.tools import logger
from mindformers.tools.register import MindFormerRegister, MindFormerModuleType, MindFormerConfig

from .build_tokenizer import build_tokenizer

__all__ = ['BaseTokenizer', 'Tokenizer', 'SpecialTokensMixin']

from ..tools.download_tools import download_with_progress_bar
from ..tools.utils import try_sync_file
from ..mindformer_book import MindFormerBook, print_path_or_list

SPECIAL_TOKEN_FILE_NAME = 'special_tokens_map.json'
TOKENIZER_CONFIG_NAME = 'tokenizer_config.json'


class SpecialTokensMixin:
    """A class for managing the specific tokens"""
    SPECIAL_TOKENS = ['pad_token', 'cls_token', 'sep_token', 'unk_token', 'mask_token', 'bos_token', 'eos_token']

    def __init__(self,
                 **kwargs):
        self._pad_token = None
        self._sep_token = None
        self._cls_token = None
        self._unk_token = None
        self._mask_token = None
        self._bos_token = None
        self._eos_token = None
        self._pad_token_type_id = 0
        for k, v in kwargs.items():
            for item in self.SPECIAL_TOKENS:
                if item == k:
                    setattr(self, '_' + item, v)

    @property
    def pad_token(self):
        return self._pad_token

    @property
    def pad_token_id(self):
        return self._convert_tokens_to_ids(self._pad_token)

    @property
    def sep_token(self):
        return self._sep_token

    @property
    def sep_token_id(self):
        return self._convert_tokens_to_ids(self._sep_token)

    @property
    def cls_token(self):
        return self._cls_token

    @property
    def cls_token_id(self):
        return self._convert_tokens_to_ids(self._cls_token)

    @property
    def unk_token(self):
        return self._unk_token

    @property
    def unk_token_id(self):
        return self._convert_tokens_to_ids(self._unk_token)

    @property
    def eos_token(self):
        return self._eos_token

    @property
    def eos_token_id(self):
        return self._convert_tokens_to_ids(self._eos_token)

    @property
    def bos_token(self):
        return self._bos_token

    @property
    def bos_token_id(self):
        return self._convert_tokens_to_ids(self._bos_token)

    @property
    def pad_token_type_id(self):
        return self._pad_token_type_id

    @property
    def all_specifical_token_index(self):
        all_specifical_token_index = []
        for item in self.SPECIAL_TOKENS:
            if hasattr(self, '_' + item):
                cur_item = getattr(self, '_' + item)
                if cur_item:
                    all_specifical_token_index.append(self._convert_tokens_to_ids(cur_item))
        return all_specifical_token_index



class BaseTokenizer(SpecialTokensMixin):
    """The pretrained tokenize providing basic method for tokenizing."""
    MODEL_INPUT_NAME = ['input_ids', 'attention_mask', 'token_type_ids']
    VOCAB_FILES = {}
    FILE_LIST = []
    _model_type = 0
    _model_name = 1

    def __init__(self, **kwargs):
        super(BaseTokenizer, self).__init__(**kwargs)
        self.model_inputs = self.MODEL_INPUT_NAME
        self.init_kwargs = kwargs

    def __call__(self,
                 text: Optional[Union[str, List[str]]],
                 text_pair: Optional[Union[str, List[str]]] = None,
                 add_special_tokens: bool = True,
                 max_length: Optional[int] = None,
                 padding: str = False,
                 truncation: bool = False,
                 return_tensors: Optional[bool] = None,
                 **kwargs):
        r"""
        Tokenize the input string and convert them into the ids.

        Args:
            text(str, list(str)) : To be converted text strings. It can be string or a list of strings.
            text_pair(str, list(str)): To be converted text pair strings. It can be string or a list of strings.
            add_special_tokens(bool): Whether to add special tokens such as CLS and EOS to the token list. The subclass
                can determine the behavior of the adding by overriding the method `build_inputs_with_special_tokens`.
                If True, the special token will be added. Default True.
            max_length (int): max length of tokenizer's output . Default None.
            padding(False / "max_length"): padding for max_length. Default None.
            truncation(bool): To truncate the sequence if the length exceeds the max_length. Default False.
            return_tensors(str): Specific the returned tensor type. If set None, the returned tensor will be
                `numpy.ndarray`. If set `ms`, the returned tensor will be of `mindspore.Tensor`.
                Support 'ms' and None. Default None.
            **kwargs: The other kwargs passed to the internal behaivor, currently not used.

        Examples:
            >>> from mindformers import T5Tokenizer
            >>> tokenizer = T5Tokenizer.from_pretrained("t5_small")
            >>> res = tokenizer("hello world")
            >>> print(res)
            {'input_ids': [21820, 296, 1], 'attention_mask': [1, 1, 1]}
            >>> res = tokenizer("hello world", padding='max_length', max_length=10)
            >>> print(res)
            {'input_ids': [21820, 296, 1, 0, 0, 0, 0, 0, 0, 0],
             'attention_mask': [1, 1, 1, 0, 0, 0, 0, 0, 0, 0]}
            >>> res = tokenizer("hello world", add_special_tokens=False)
            >>> print(res)
            {'input_ids': [21820, 296], 'attention_mask': [1, 1]}
            >>> res = tokenizer("hello world", return_tensors='ms')
            >>> print(res)
            {'input_ids': Tensor(shape=[3], dtype=Int32, value= [21820,   296,     1]),
            'attention_mask': Tensor(shape=[3], dtype=Int32, value= [1, 1, 1])}
            >>> res = tokenizer(["hello world", "today is a good day"],
            ...                 max_length=7, padding='max_length', return_tensors='ms')
            >>> print(res)
            {'input_ids': Tensor(shape=[3], dtype=Int32, value= [21820,   296,     1]),
            'attention_mask': Tensor(shape=[3], dtype=Int32, value= [1, 1, 1])}

        Outputs:
            A dict contains the processed ids, attention_mask that specific by the member `MODEL_INPUT_NAME`
            of the subclass.
        """
        return_batch = True
        if isinstance(text, str):
            return_batch = False
        output = self.batch_encode_plus(text, text_pair=text_pair, max_length=max_length,
                                        add_special_tokens=add_special_tokens,
                                        padding=padding,
                                        truncation=truncation,
                                        return_tensors=return_tensors,
                                        return_batch=return_batch,
                                        **kwargs)
        return output

    def _batch_encode_plus(self,
                           text,
                           text_pair=None,
                           max_length=None,
                           padding_strategy="do_not_pad",
                           add_special_tokens=True,
                           truncation=False,
                           return_tensors=None,
                           return_token_type_ids=None,
                           return_attention_mask=None,
                           return_batch=True,
                           **kwargs):
        """Convert the input text into the ids"""
        raise NotImplementedError

    def truncate_sequences(self, ids, id_pairs, nums_tokens_to_remove):
        if nums_tokens_to_remove <= 0:
            return ids, id_pairs
        if id_pairs:
            raise ValueError("The id_pairs do not support truncation, please set it to be a empty list or None.")

        ids = ids[:-nums_tokens_to_remove]
        return ids, id_pairs

    def batch_encode_plus(self,
                          text: Optional[Union[str, List[str]]],
                          text_pair: Optional[Union[str, List[str]]] = None,
                          max_length: Optional[int] = None,
                          padding: Optional[str] = None,
                          add_special_tokens: bool = True,
                          truncation: bool = False,
                          return_token_type_ids: Optional[bool] = None,
                          return_attention_mask: Optional[bool] = None,
                          return_tensors: Optional[bool] = None,
                          return_batch: bool = True,
                          **kwargs):
        r"""
        The core function of the __call__ method. The method aims to tokenizer the input strings and then convert them
        into ids.

        Args:
            text(str) : To be converted text strings. It can be string or a list of strings.
            text_pair(str): To be converted text pair strings. It can be string or a list of strings.
            max_length (int): max length of tokenizers output . Default None.
            padding(bool, str): padding for max_length. Default None.
            truncation(bool): To truncate the sequence if the length exceeds the max_length. Default False.
            add_special_tokens(bool): Whether to add special tokens such as CLS and EOS to the token list. The subclass
                can determine the behavior of the adding by overriding the method `build_inputs_with_special_tokens`.
                If True, the special token will be added. Default True.
            return_token_type_ids(bool): Whether to add `token_type_ids` in the returned dict. If True,
                `token_type_ids` will be added, otherwise not. If None, it will be added if it is in the
                MODEL_INPUT_NAME. Default None.
            return_attention_mask(bool): Whether to add `return_attention_mask` in the returned dict. If True,
                `return_attention_mask` will be added, otherwise not. If None, it will be added if it is in the
                MODEL_INPUT_NAME. Default None.
            return_tensors(str): Specific the returned tensor type. If support `ms` only, the returned value in the
                dict will be converted to the mindspore.Tensor, otherwise it will return the list.
                Default None.
            return_batch(bool): Whether the returned the list should be added batch dimension. Default True.
            **kwargs: The other kwargs passed to the internal method, currently not used.

        Outputs:
            A dict contains the processed ids, attention_mask that specific by the member `MODEL_INPUT_NAME`
            of the subclass.
        """
        if padding and padding != "max_length":
            raise ValueError("padding only supports `max_length` or `None`.")
        padding_strategy = None
        if padding:
            padding_strategy = "max_length"
        if max_length and not padding:
            logger.warning("If you want to enable the padding, please set padding to `max_length`.")
        # if input text is only one list, we should prepare it into a tensor with batch size 1.
        text = self._prepare_input_to_list(text)
        text_pair = self._prepare_input_to_list(text_pair)
        return self._batch_encode_plus(text,
                                       text_pair=text_pair,
                                       max_length=max_length,
                                       padding_strategy=padding_strategy,
                                       truncation=truncation,
                                       add_special_tokens=add_special_tokens,
                                       return_tensors=return_tensors,
                                       return_token_type_ids=return_token_type_ids,
                                       return_attention_mask=return_attention_mask,
                                       return_batch=return_batch,
                                       **kwargs)

    def _prepare_input_to_list(self, inputs):
        """put the input into the list"""
        if inputs is None:
            return inputs
        if not isinstance(inputs, list):
            inputs = [inputs]
        return inputs

    def _get_token_ids(self, text):
        """Get the token_ids"""
        if not isinstance(text, list):
            tokens = self.tokenize(text)
            res = self.convert_tokens_to_ids(tokens)
            return res
        output = []
        for item in text:
            tokens = self.tokenize(item)
            res = self.convert_tokens_to_ids(tokens)
            output.append(res)
        if len(text) == 1 and isinstance(text[0], str) and output and isinstance(output[0], list):
            output = output[0]
        return output

    def encode(self,
               text: Optional[Union[str, List[str]]],
               text_pair: Optional[Union[str, List[str]]] = None,
               max_length: Optional[int] = None,
               padding: Optional[str] = None,
               return_tensors: Optional[bool] = None,
               add_special_tokens: bool = True,
               return_batch: bool = False):
        """
        Convert the input strings to the id list. The arguments are mostly same with the `batch_encode_plus`, but
        it returns the token id list rather than the python dict.

        Examples:
            >>> from mindformers import T5Tokenizer
            >>> tokenizer = T5Tokenizer.from_pretrained("t5_small")
            >>> res = tokenizer.encode("hello world!")
            >>> print(res)
            [21820, 296, 55, 1]

        Returns:
            A list of token ids mapped by the vocabulary.
        """
        res = self.batch_encode_plus(text=text,
                                     text_pair=text_pair,
                                     padding=padding,
                                     max_length=max_length,
                                     return_tensors=return_tensors,
                                     add_special_tokens=add_special_tokens,
                                     return_batch=return_batch)
        return res["input_ids"]

    @classmethod
    def from_pretrained(cls, name_or_path: str, **kwargs):
        """
        Instantiates a tokenizer by the name_or_path. User can get the name using `get_support_list` of any tokenizer,
        it will download the necessary files from the cloud. or pass a directory where contains the vocabulary file
        and tokenizers yaml configuration file.

        Args:
            name_or_path (str): It supports the following two input types: If the name_or_path is a supported tokenizer
                name, for example, `clip_vit_b_32` and `t5_small`, it will download the necessary files from the cloud.
                User can select one from the support list by call `MindFormerBook.show_tokenizer_support_list()`.
                If name_or_path is a path to the local directory where there should have vocaburary files and
                configuration file ended with `yaml`. The vocaburary file needed by the tokenizer is determined
                by `.VOCAB_FILES`.
            pretrained_model_name_or_path (Optional[str]): Equal to "name_or_path",
                if "pretrained_model_name_or_path" is set, "name_or_path" is useless.

        Examples:
            >>> from mindformers import T5Tokenizer
            >>> tokenizer = T5Tokenizer.from_pretrained("t5_small")
            >>> res = tokenizer.encode("hello world!")
            >>> print(res)
            [21820, 296, 55, 1]

        Returns:
            A instanced tokenizer.
        """
        pretrained_model_name_or_path = kwargs.pop("pretrained_model_name_or_path", None)
        if pretrained_model_name_or_path is not None:
            name_or_path = pretrained_model_name_or_path

        is_exist = os.path.exists(name_or_path)
        is_dir = os.path.isdir(name_or_path)
        if not is_exist and (name_or_path not in cls._support_list):
            raise ValueError(f'{name_or_path} does not exist,'
                             f' or it is not supported by {cls.__name__}. '
                             f'please select from {cls._support_list}.')

        if is_exist and not is_dir:
            raise ValueError(f"{name_or_path} is not a directory.")

        kwargs = dict()
        class_name = None
        loaded_kwargs = {}
        if name_or_path in MindFormerBook.get_tokenizer_url_support_list():
            config, cache_path = cls._download_using_name(name_or_path)
            class_name, loaded_kwargs = cls._get_class_name_and_args_form_config(config)
            name_or_path = cache_path

        yaml_list = None
        if os.path.isdir(name_or_path):
            yaml_list = [file for file in os.listdir(name_or_path) if file.endswith(".yaml")]
            if len(yaml_list) > 1:
                logger.warning("There should be only one yaml file under the directory %s, "
                               "but followings are found: %s", name_or_path, yaml_list)
        if yaml_list:
            yaml_file = os.path.join(name_or_path, yaml_list[0])
            logger.info("config in the yaml file %s are used for tokenizer building.", yaml_file)
            config = MindFormerConfig(yaml_file)
            class_name, loaded_kwargs = cls._get_class_name_and_args_form_config(config)

        vocab_dict, file_dict = cls.read_files_according_specific_by_tokenizer(name_or_path)
        if 'tokenizer_config.json' in file_dict:
            class_name = file_dict['tokenizer_config.json'].pop('tokenizer_class', None)
            loaded_kwargs = file_dict['tokenizer_config.json']
        else:
            logger.warning("Can't find the tokenizer_config.json in the file_dict. "
                           "The content of file_dict is : %s", file_dict)
        kwargs.update(loaded_kwargs)
        kwargs.update(vocab_dict)
        if not class_name:
            class_name = cls.__name__
        logger.info("build tokenizer class name is: %s using args %s.", class_name, kwargs)
        return build_tokenizer(class_name=class_name, **kwargs)

    @classmethod
    def _download_using_name(cls, name_or_path):
        """Given the supported model name, download it from the urls"""
        tokenizer_name = name_or_path
        if name_or_path.startswith('mindspore'):
            # Adaptation the name of tokenizer at the beginning of mindspore,
            # the relevant file will be downloaded from the Xihe platform.
            # such as "mindspore/clip_vit_b_32"
            tokenizer_name = name_or_path.split('/')[cls._model_name]
            cache_path = os.path.join(MindFormerBook.get_xihe_checkpoint_download_folder(),
                                      tokenizer_name.split('_')[cls._model_type])
        else:
            # Default the name of tokenizer,
            # the relevant file will be downloaded from the Obs platform.
            # such as "clip_vit_b_32"
            cache_path = os.path.join(MindFormerBook.get_default_checkpoint_download_folder(),
                                      name_or_path.split('_')[cls._model_type])

        if not os.path.exists(cache_path):
            os.makedirs(cache_path, exist_ok=True)

        yaml_file = os.path.join(cache_path, tokenizer_name + ".yaml")

        def get_default_yaml_file(model_name):
            default_yaml_file = ""
            for model_dict in MindFormerBook.get_trainer_support_task_list().values():
                if model_name in model_dict:
                    default_yaml_file = model_dict.get(model_name)
                    break
            return default_yaml_file

        if not os.path.exists(yaml_file):
            default_yaml_file = get_default_yaml_file(tokenizer_name)
            if os.path.realpath(default_yaml_file) and os.path.exists(default_yaml_file):
                shutil.copy(default_yaml_file, yaml_file)
                logger.info("default yaml config in %s is used.", yaml_file)
            else:
                raise FileNotFoundError(f'default yaml file path must be correct, but get {default_yaml_file}')

        # some tokenizers rely on more than one file, e.g gpt2
        tokenizer_need_files = MindFormerBook.get_tokenizer_url_support_list()[name_or_path]
        for url_file in tokenizer_need_files:
            local_file_name = url_file.split('/')[-1]
            file_path = os.path.join(cache_path, local_file_name)
            if not os.path.exists(file_path):
                logger.info("Download the vocab from the url %s to %s.", url_file, file_path)
                download_with_progress_bar(url_file, file_path)
            try_sync_file(file_path)

        config = MindFormerConfig(yaml_file)
        return config, cache_path

    @classmethod
    def cache_vocab_files(cls, name_or_path, cache_path=None):
        """Cache the vocab files to the default dir"""
        if not cache_path:
            cache_path = os.path.join(MindFormerBook.get_default_checkpoint_download_folder(),
                                      name_or_path.split("_")[cls._model_type])
            if not os.path.exists(cache_path):
                os.makedirs(cache_path, exist_ok=True)

        # some tokenizers rely on more than one file, e.g gpt2
        tokenizer_need_files = MindFormerBook.get_tokenizer_url_support_list()[name_or_path]
        for url_file in tokenizer_need_files:
            local_file_name = url_file.split('/')[-1]
            file_path = os.path.join(cache_path, local_file_name)
            if not os.path.exists(file_path):
                logger.info("Download the yaml from the url %s to %s.", url_file, file_path)
                download_with_progress_bar(url_file, file_path)
            try_sync_file(file_path)
        read_vocab_file_dict, _ = cls.read_files_according_specific_by_tokenizer(cache_path)
        return read_vocab_file_dict

    @classmethod
    def _get_class_name_and_args_form_config(cls, config):
        """Lookup the yaml files under the name_or_path"""
        class_name = None
        tokenizer_args = {}
        if config and 'processor' in config and 'tokenizer' in config['processor'] \
                and config.processor.tokenizer and'type' in config.processor.tokenizer:
            tokenizer_args = config['processor']['tokenizer']
            class_name = tokenizer_args.pop('type', None)
        else:
            logger.info("There is no matched format config['processor']['tokenizer']  in config %s", config)
        return class_name, tokenizer_args

    @classmethod
    def read_files_according_specific_by_tokenizer(cls, name_or_path):
        """Read the file path specific by the class variable in the tokenizer"""
        read_vocab_file_dict = {}
        read_tokenizer_file_dict = {}
        for k, name in cls.VOCAB_FILES.items():
            if isinstance(name, str):
                path = os.path.join(name_or_path, name)
                if os.path.isfile(path):
                    read_vocab_file_dict[k] = path
            # To support tokenizer like clip that has two types for vocab files.
            elif isinstance(name, list):
                for sub_name in name:
                    path = os.path.join(name_or_path, sub_name)
                    if os.path.isfile(path):
                        read_vocab_file_dict[k] = path

        for item in cls.FILE_LIST:
            path = os.path.join(name_or_path, item)
            if os.path.isfile(path):
                read_tokenizer_file_dict[item] = json.load(open(path, 'r'))
        return read_vocab_file_dict, read_tokenizer_file_dict

    def _pad(self, id_dict, max_length, padding_strategy="do_not_pad", return_attention_mask=None):
        """Do padding according to the max_length"""
        is_batch = False
        if isinstance(id_dict['input_ids'], list) and isinstance(id_dict['input_ids'][0], list):
            is_batch = True
            length_each = [len(line) for line in id_dict['input_ids']]
            for item in length_each:
                if length_each[0] != item and (not max_length or padding_strategy != "max_length"):
                    raise ValueError(f"You should set `max_length` to {max(length_each)} "
                                     f"and padding_strategy to `max_length`, as the length in the batch "
                                     f"is different, which should be padded.")


        if return_attention_mask is not False:
            return_attention_mask = True

        if return_attention_mask and 'attention_mask' in self.model_inputs:
            if is_batch:
                id_dict['attention_mask'] = [[1] * len(line) for line in id_dict['input_ids']]
            else:
                id_dict['attention_mask'] = [1] * len(id_dict['input_ids'])

        if not max_length or padding_strategy != "max_length":
            return id_dict

        def _pad_batch(source_ids, pad_value):
            if not is_batch:
                source_ids = [source_ids]
            for i in range(len(source_ids)):
                if max_length < len(source_ids[i]):
                    raise ValueError(f"The length of input_ids {len(source_ids[i])} "
                                     f"exceeds the max_length {max_length}, "
                                     f"please increase the `max_length` of the tokenizer.")
                source_ids[i] += [pad_value] * (max_length - len(source_ids[i]))
            if not is_batch:
                source_ids = source_ids[0]

        _pad_batch(id_dict['input_ids'], pad_value=self.pad_token_id)
        if "attention_mask" in id_dict:
            _pad_batch(id_dict['attention_mask'], pad_value=0)
        if "token_type_ids" in id_dict:
            _pad_batch(id_dict['token_type_ids'], pad_value=self.pad_token_type_id)

        return id_dict

    def postprocess_ids(self,
                        ids,
                        pair_ids=None,
                        add_special_tokens=True,
                        max_length=None,
                        truncation=False,
                        padding_strategy="do_not_pad",
                        return_tensors=None,
                        return_token_type_ids=None,
                        return_attention_mask=None):
        """
        Insert the special ids into the input ids, generate the attention mask and do padding.
        """
        if not isinstance(ids, list):
            raise ValueError("The input ids should be a list.")
        output_map = dict()

        def process_token_id(ids, par_ids=None):
            sentence_b_type_ids = []
            if par_ids:
                sentence_b_type_ids = [1] * len(par_ids)
            return [0] * len(ids) + sentence_b_type_ids

        length_of_each = len(ids)

        if max_length and truncation:
            num_sp_tokens = self.num_special_tokens_to_add() if add_special_tokens else 0
            num_tokens = length_of_each + num_sp_tokens - max_length
            ids, pair_ids = self.truncate_sequences(ids, pair_ids, num_tokens)

        if add_special_tokens:
            # add cls and sep: [cls] ids [seq] pair_ids
            input_ids_output = self.build_inputs_with_special_tokens(ids, pair_ids)
            type_ids = self.create_token_type_ids_from_sequences(ids, pair_ids)
        else:
            input_ids_output = [ids + pair_ids] if pair_ids else ids
            type_ids = process_token_id(ids, pair_ids)

        output_map['input_ids'] = input_ids_output
        if return_token_type_ids or 'token_type_ids' in self.model_inputs:
            output_map['token_type_ids'] = type_ids

        output_map = self._pad(output_map, max_length=max_length, padding_strategy=padding_strategy,
                               return_attention_mask=return_attention_mask)

        if return_tensors and return_tensors != 'ms':
            raise ValueError("You should set return_tensors to be `ms`.")
        if return_tensors:
            for k, v in output_map.items():
                v = np.array(v)
                if v.dtype == np.int64:
                    v = v.astype(np.int32)
                output_map[k] = Tensor(v)
        return output_map

    def save_pretrained(self,
                        save_directory: Optional[str] = None,
                        save_name: str = "mindspore_model",
                        file_format: str = 'yaml'):
        """
        Save the tokenizer by writing the `save_name`.yaml and vocaburary files those are determinied by `.VOCAB_FILES`
        to the disk. The kwargs passed to initialize the tokenizer will be saved.

        Args:
            save_directory(str): The output file directory. If None, the directory will be  `./checkpoint_save`,
                which can be obtained by the `MindFormerBook.get_default_checkpoint_save_folder()`. Default None.
            save_name(str): The file name of the saved files. Default mindspore_model.
            file_format(str): Support json or yaml. Default yaml.

        Examples:
            >>> from mindformers import T5Tokenizer, MindFormerBook
            >>> tokenizer = T5Tokenizer.from_pretrained("t5_small")
            >>> tokenizer.save_pretrained()
            >>> output_path = MindFormerBook.get_default_checkpoint_save_folder()
            >>> print(os.listdir(output_path))
            ['mindspore_model.yaml', 'spiece.model']

        """
        default_directory = MindFormerBook.get_default_checkpoint_save_folder()
        if save_directory is None:
            save_directory = default_directory
            os.makedirs(save_directory, exist_ok=True)
        if save_name is None:
            save_name = "mindspore_model"
        if file_format not in ('yaml', 'json'):
            raise ValueError(f"format should be one of [`yaml`, `json`], but got {file_format}.")

        kwargs = copy.deepcopy(self.init_kwargs)
        # Start to save the kwargs for the tokenizer
        if file_format == 'yaml':
            kwargs['type'] = self.__class__.__name__
            merged_dict = dict()

            yaml_file = os.path.join(save_directory, save_name + '.yaml')
            if os.path.exists(yaml_file):
                with open(yaml_file, 'r') as file_reader:
                    merged_dict = yaml.load(file_reader.read(), Loader=yaml.Loader)
                    if merged_dict is None:
                        merged_dict = dict()

            processor_name = MindFormerBook.get_tokenizer_name_to_processor()[kwargs['type']]
            merged_dict['processor'] = {"type": processor_name}
            merged_dict['processor']['tokenizer'] = kwargs
            with open(yaml_file, 'w') as file_reader:
                yaml.dump(merged_dict, file_reader)
        elif file_format == 'json':
            kwargs["tokenizer_class"] = self.__class__.__name__
            tokenizer_config_path = os.path.join(save_directory, TOKENIZER_CONFIG_NAME)
            with open(tokenizer_config_path, 'w') as fp:
                json.dump(kwargs, fp, indent=4)
        else:
            raise ValueError(f"file_format should be one of [json, yaml], but got {file_format}.")

        # some tokenizers rely on more than one file, e.g gpt2
        name_keys = self.VOCAB_FILES.keys()
        for name_key in name_keys:
            output_name = self.VOCAB_FILES[name_key]
            if isinstance(output_name, list):
                output_name = output_name[0]
            self.save_vocabulary(save_directory, output_name)

    def save_vocabulary(self, save_directory, filename_prefix):
        """Save the vocabulary to the specific path with name_prefix"""
        raise NotImplementedError

    def _convert_to_numpy_and_check(self, ids):
        if isinstance(ids, np.ndarray):
            ids = ids.tolist()
        if not isinstance(ids, list):
            raise TypeError(f"`ids` should be the list, but got type {type(ids)}.")
        return ids

    def decode(self,
               token_ids: Optional[Union[List[int], List[List[int]]]],
               skip_special_tokens: bool = False,
               **kwargs):
        """
        Convert the token ids to the string

        Args:
            token_ids(list[int], list[list[int]]):
            skip_special_tokens(bool): Whether to skip the special the token such as the CLS and EOS. Default False.
            **kwargs: Other kwargs passed to the internal function. Currently not used.

        Examples:
            >>> from mindformers import T5Tokenizer
            >>> tokenizer = T5Tokenizer.from_pretrained('t5_small')
            >>> ids = tokenizer.encode("hell world")
            >>> output = tokenizer.decode(ids)
            >>> print(output)
            hell world
            >>> ids = tokenizer.batch_encode_plus(["hello world", "nice to see you!"],
            ...                                   padding="max_length", max_length=8)["input_ids"]
            >>> output = tokenizer.decode(ids)
            >>> print(output)
            ['hello world', 'nice to see you!']


        Returns:
            A string or a list of strings.
        """
        if isinstance(token_ids, mindspore.Tensor):
            token_ids = token_ids.asnumpy().tolist()
        token_ids = self._convert_to_numpy_and_check(token_ids)
        if isinstance(token_ids[0], (list, np.ndarray)):
            output = []
            for line in token_ids:
                line = self._convert_to_numpy_and_check(line)
                new_strs = self._decode(line, skip_special_tokens=skip_special_tokens, **kwargs)
                output.append(new_strs)
        else:
            output = self._decode(token_ids, skip_special_tokens=skip_special_tokens, **kwargs)
        return output

    def _decode(self, token_ids,
                skip_special_tokens,
                **kwargs):
        """The basic function of the decode"""
        raise NotImplementedError


[文档]@MindFormerRegister.register(MindFormerModuleType.TOKENIZER) class Tokenizer(BaseTokenizer): """Pretrained Tokenizer provides detailed the tokenizer method.""" _support_list = []
[文档] def convert_ids_to_tokens(self, input_ids, skip_special_tokens=False): """Convert the ids to tokens using vocab mapping""" output = [] for item in input_ids: if skip_special_tokens and item in self.all_specifical_token_index: continue else: output.append(self._convert_ids_to_tokens(item)) return output
[文档] def convert_tokens_to_ids(self, input_tokens): """Convert the tokens to ids using vocab mapping""" return self._convert_tokens_to_ids(input_tokens)
[文档] def convert_tokens_to_string(self, tokens): """Convert the tokens to the string""" if isinstance(tokens, list): outstring = "" for token in tokens: outstring += " " + " ".join(token).strip() return outstring return " ".join(tokens).strip()
def tokenize(self, text): raise NotImplementedError def build_inputs_with_special_tokens(self, token_ids_0, token_ids_1): if token_ids_1: return token_ids_0 + token_ids_1 return token_ids_0
[文档] def num_special_tokens_to_add(self): """Return the special tokens to be added to the ids and ids_pair""" ids = [] ids_pair = [] output = self.build_inputs_with_special_tokens(ids, ids_pair) return len(output)
def create_token_type_ids_from_sequences(self, token_ids_0, token_ids_1): if token_ids_1: return [0] * (len(token_ids_0) + 1) + [1] * (len(token_ids_1) + 1) # cls and sep is 1 return [0] * (len(token_ids_0) + 1 + 1) def _batch_encode_plus(self, text, text_pair=None, max_length=None, padding_strategy="do_not_pad", add_special_tokens=True, truncation=False, return_tensors=None, return_token_type_ids=None, return_attention_mask=None, return_batch=True, **kwargs): """Convert the text into the converted id. text should be batched. For example, [["hello world"]]""" if not isinstance(text, list) and not isinstance(text[0], list): raise ValueError("For _batch_encode_plus, the input `text` should be batched, " "for example: [['hello world']].") text_ids = [self._get_token_ids(item) for item in text] text_pair_ids = [self._get_token_ids(item) for item in text_pair] if text_pair else None processed_output = self._batch_postprocess_ids(ids=text_ids, pair_ids=text_pair_ids, max_length=max_length, truncation=truncation, padding_strategy=padding_strategy, add_special_tokens=add_special_tokens, return_tensors=return_tensors, return_token_type_ids=return_token_type_ids, return_attention_mask=return_attention_mask, return_batch=return_batch) return processed_output def _batch_postprocess_ids(self, ids, pair_ids=None, add_special_tokens=True, max_length=None, truncation=False, padding_strategy="do_not_pad", return_tensors=None, return_token_type_ids=None, return_attention_mask=None, return_batch=True): """Convert the input_ids to the format of model inputs""" if return_tensors and return_tensors != 'ms': raise ValueError("You should set return_tensors to be `ms`.") if not return_batch and len(ids) != 1: raise ValueError(f"If `return_batch` is False, the length of input ids should be 1. But found {len(ids)}. " f"Input ids is: {ids}. To fix this, you can set the return_batch=True") if pair_ids: paired_ids = zip(ids, pair_ids) else: paired_ids = zip(ids, [None] * len(ids)) output = defaultdict(list) for per_ids, per_pair_ids in paired_ids: per_output = self.postprocess_ids(ids=per_ids, pair_ids=per_pair_ids, padding_strategy="do_not_pad", return_tensors=None, add_special_tokens=add_special_tokens, max_length=max_length, truncation=truncation, return_token_type_ids=return_token_type_ids, return_attention_mask=return_attention_mask) if not return_batch: output = per_output else: for k, v in per_output.items(): output[k].append(v) output_map = self._pad(output, max_length=max_length, padding_strategy=padding_strategy, return_attention_mask=return_attention_mask) if return_tensors: for k in output_map.keys(): v = np.array(output_map[k]) if v.dtype == np.int64: v = v.astype(np.int32) output_map[k] = Tensor(v) return output_map def _decode(self, token_ids, skip_special_tokens=False, **kwargs): ids = self.convert_ids_to_tokens(token_ids, skip_special_tokens=skip_special_tokens) return self.convert_tokens_to_string(ids) def _tokenize(self, text, **kwargs): """Converts the text to tokens""" raise NotImplementedError @property def vocab_size(self): """Get the vocab size of the """ raise NotImplementedError
[文档] @classmethod def show_support_list(cls): """show_support_list method""" logger.info("support list of %s is:", cls.__name__) print_path_or_list(cls._support_list)
[文档] @classmethod def get_support_list(cls): """get_support_list method""" return cls._support_list