YAML Unicode support
This commit is contained in:
@@ -26,19 +26,34 @@ from tests.common import build_temp_workspace
|
|||||||
from yamllint import linter
|
from yamllint import linter
|
||||||
from yamllint.config import YamlLintConfig
|
from yamllint.config import YamlLintConfig
|
||||||
|
|
||||||
|
CONFIG = """
|
||||||
|
extends: default
|
||||||
|
"""
|
||||||
|
|
||||||
GREEK = """---
|
GREEK = """---
|
||||||
greek:
|
greek:
|
||||||
8: [Θ, θ, θήτα, [тета], Т]
|
8: [Θ, θ, θήτα, [тета], Т]
|
||||||
20: [Υ, υ, ύψιλον, [ипсилон], И]
|
20: [Υ, υ, ύψιλον, [ипсилон], И]
|
||||||
"""
|
"""
|
||||||
|
GREEK_P = set([('document-end', 4)])
|
||||||
|
|
||||||
CP1252 = """---
|
CP1252 = """---
|
||||||
capitals:
|
capitals:
|
||||||
1: Reykjavík
|
1: Reykjavík
|
||||||
2: Tórshavn
|
2: Tórshavn
|
||||||
"""
|
"""
|
||||||
|
CP1252_P = set([('unicode-decode', 0)])
|
||||||
|
|
||||||
ENC = ['utf-8']
|
MINIMAL = "m:\n"
|
||||||
|
MINIMAL_P = set([('document-start', 1),
|
||||||
|
('document-end', 1)])
|
||||||
|
|
||||||
|
FIRST = """Θ:\n"""
|
||||||
|
FIRST_P = set([('unicode-first-not-ascii', 1),
|
||||||
|
('document-start', 1),
|
||||||
|
('document-end', 1)])
|
||||||
|
|
||||||
|
ENC = ['utf-8', 'utf-16le', 'utf-16be', 'utf-32le', 'utf-32be']
|
||||||
|
|
||||||
|
|
||||||
class UnicodeTestCase(unittest.TestCase):
|
class UnicodeTestCase(unittest.TestCase):
|
||||||
@@ -63,7 +78,9 @@ class UnicodeTestCase(unittest.TestCase):
|
|||||||
for enc in ENC:
|
for enc in ENC:
|
||||||
cls.create_file(GREEK, enc, True)
|
cls.create_file(GREEK, enc, True)
|
||||||
cls.create_file(GREEK, enc, False)
|
cls.create_file(GREEK, enc, False)
|
||||||
|
cls.create_file(GREEK, 'utf-7', True)
|
||||||
cls.create_file(CP1252, 'cp1252', False)
|
cls.create_file(CP1252, 'cp1252', False)
|
||||||
|
cls.create_file(MINIMAL, 'ascii', False)
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def tearDownClass(cls):
|
def tearDownClass(cls):
|
||||||
@@ -72,55 +89,104 @@ class UnicodeTestCase(unittest.TestCase):
|
|||||||
shutil.rmtree(cls.wd)
|
shutil.rmtree(cls.wd)
|
||||||
locale.setlocale(locale.LC_ALL, cls.slc)
|
locale.setlocale(locale.LC_ALL, cls.slc)
|
||||||
|
|
||||||
def run_fobj(self, fobj):
|
def run_fobj(self, fobj, exp):
|
||||||
decnt = 0
|
ep = exp.copy()
|
||||||
pcnt = 0
|
pcnt = 0
|
||||||
for p in linter.run(fobj, self.cfg):
|
for p in linter.run(fobj, self.cfg):
|
||||||
if p.rule == 'document-end' or p.line == 4:
|
if (p.rule, p.line) in ep:
|
||||||
decnt += 1
|
ep.remove((p.rule, p.line),)
|
||||||
else:
|
else:
|
||||||
print('UnicodeTestCase', p.desc, p.line, p.rule)
|
print('UnicodeTestCase', p.desc, p.line, p.rule)
|
||||||
pcnt += 1
|
pcnt += 1
|
||||||
self.assertEqual(decnt, 1)
|
self.assertEqual(len(ep), 0)
|
||||||
self.assertEqual(pcnt, 0)
|
self.assertEqual(pcnt, 0)
|
||||||
|
|
||||||
def run_file(self, lc, enc, bom):
|
def run_file(self, lc, enc, bom, exp):
|
||||||
try:
|
try:
|
||||||
locale.setlocale(locale.LC_ALL, lc)
|
locale.setlocale(locale.LC_ALL, lc)
|
||||||
with open(self.fn(enc, bom)) as f:
|
with open(self.fn(enc, bom)) as f:
|
||||||
self.run_fobj(f)
|
self.run_fobj(f, exp)
|
||||||
locale.setlocale(locale.LC_ALL, self.slc)
|
locale.setlocale(locale.LC_ALL, self.slc)
|
||||||
except locale.Error:
|
except locale.Error:
|
||||||
self.skipTest('locale ' + lc + ' not available')
|
self.skipTest('locale ' + lc + ' not available')
|
||||||
|
|
||||||
def run_bytes(self, body, enc, bom, buf):
|
def run_bytes(self, body, enc, bom, buf, exp):
|
||||||
bs = (("\uFEFF" if bom else "") + body).encode(enc)
|
bs = (("\uFEFF" if bom else "") + body).encode(enc)
|
||||||
if buf:
|
if buf:
|
||||||
self.run_fobj(io.TextIOWrapper(io.BufferedReader(io.BytesIO(bs))))
|
self.run_fobj(io.TextIOWrapper(io.BufferedReader(io.BytesIO(bs))),
|
||||||
|
exp)
|
||||||
else:
|
else:
|
||||||
self.run_fobj(io.TextIOWrapper(io.BytesIO(bs)))
|
self.run_fobj(io.TextIOWrapper(io.BytesIO(bs)), exp)
|
||||||
|
|
||||||
def test_file_en_US_UTF_8_utf8_nob(self):
|
def test_file_en_US_UTF_8_utf8_nob(self):
|
||||||
self.run_file('en_US.UTF-8', 'utf-8', False)
|
self.run_file('en_US.UTF-8', 'utf-8', False, GREEK_P)
|
||||||
|
|
||||||
def test_file_ru_RU_CP1251_utf8_nob(self):
|
def test_file_ru_RU_CP1251_utf8_nob(self):
|
||||||
self.run_file('ru_RU.CP1251', 'utf-8', False)
|
self.run_file('ru_RU.CP1251', 'utf-8', False, GREEK_P)
|
||||||
|
|
||||||
@unittest.expectedFailure
|
|
||||||
def test_file_en_US_utf8_cp1252(self):
|
def test_file_en_US_utf8_cp1252(self):
|
||||||
self.run_file('en_US.utf8' if sys.platform.startswith('linux')
|
self.run_file('en_US.utf8' if sys.platform.startswith('linux')
|
||||||
else 'en_US.UTF-8',
|
else 'en_US.UTF-8',
|
||||||
'cp1252', False)
|
'cp1252', False, CP1252_P)
|
||||||
|
|
||||||
@unittest.expectedFailure
|
|
||||||
def test_file_en_US_ISO8859_1_cp1252(self):
|
def test_file_en_US_ISO8859_1_cp1252(self):
|
||||||
self.run_file('en_US.ISO8859-1', 'cp1252', False)
|
self.run_file('en_US.ISO8859-1', 'cp1252', False, CP1252_P)
|
||||||
|
|
||||||
def test_file_C_utf8_nob(self):
|
def test_file_C_utf8_nob(self):
|
||||||
self.run_file('C', 'utf-8', False)
|
self.run_file('C', 'utf-8', False, GREEK_P)
|
||||||
|
|
||||||
def test_file_C_utf8(self):
|
def test_file_C_utf8(self):
|
||||||
self.run_file('C', 'utf-8', True)
|
self.run_file('C', 'utf-8', True, GREEK_P)
|
||||||
|
|
||||||
|
def test_file_C_utf16le_nob(self):
|
||||||
|
self.run_file('C', 'utf-16le', False, GREEK_P)
|
||||||
|
|
||||||
|
def test_file_C_utf16le(self):
|
||||||
|
self.run_file('C', 'utf-16le', True, GREEK_P)
|
||||||
|
|
||||||
|
def test_file_C_utf16be_nob(self):
|
||||||
|
self.run_file('C', 'utf-16be', False, GREEK_P)
|
||||||
|
|
||||||
|
def test_file_C_utf16be(self):
|
||||||
|
self.run_file('C', 'utf-16be', True, GREEK_P)
|
||||||
|
|
||||||
|
def test_file_C_utf32le_nob(self):
|
||||||
|
self.run_file('C', 'utf-32le', False, GREEK_P)
|
||||||
|
|
||||||
|
def test_file_C_utf32le(self):
|
||||||
|
self.run_file('C', 'utf-32le', True, GREEK_P)
|
||||||
|
|
||||||
|
def test_file_C_utf32be_nob(self):
|
||||||
|
self.run_file('C', 'utf-32be', False, GREEK_P)
|
||||||
|
|
||||||
|
def test_file_C_utf32be(self):
|
||||||
|
self.run_file('C', 'utf-32be', True, GREEK_P)
|
||||||
|
|
||||||
|
def test_file_C_utf7(self):
|
||||||
|
self.run_file('C', 'utf-7', True, GREEK_P)
|
||||||
|
|
||||||
|
def test_file_minimal_nob(self):
|
||||||
|
self.run_file('C', 'ascii', False, MINIMAL_P)
|
||||||
|
|
||||||
def test_bytes_utf8_nob(self):
|
def test_bytes_utf8_nob(self):
|
||||||
self.run_bytes(GREEK, 'utf-8', False, False)
|
self.run_bytes(GREEK, 'utf-8', False, False, GREEK_P)
|
||||||
|
|
||||||
|
def test_bytes_utf16(self):
|
||||||
|
# .encode('utf-16') insert BOM automatically
|
||||||
|
self.run_bytes(GREEK, 'utf-16', False, False, GREEK_P)
|
||||||
|
|
||||||
|
def test_bytes_utf32_buf(self):
|
||||||
|
# .encode('utf-32') insert BOM automatically
|
||||||
|
self.run_bytes(GREEK, 'utf-32', False, True, GREEK_P)
|
||||||
|
|
||||||
|
def test_bytes_minimal_nob(self):
|
||||||
|
self.run_bytes(MINIMAL, 'ascii', False, False, MINIMAL_P)
|
||||||
|
|
||||||
|
def test_bytes_minimal_nob_buf(self):
|
||||||
|
self.run_bytes(MINIMAL, 'ascii', False, True, MINIMAL_P)
|
||||||
|
|
||||||
|
def test_bytes_first_nob(self):
|
||||||
|
self.run_bytes(FIRST, 'utf-8', False, False, FIRST_P)
|
||||||
|
|
||||||
|
def test_bytes_first_nob_buf(self):
|
||||||
|
self.run_bytes(FIRST, 'utf-8', False, True, FIRST_P)
|
||||||
|
|||||||
@@ -232,11 +232,13 @@ def run(argv=None):
|
|||||||
try:
|
try:
|
||||||
with open(file, newline='') as f:
|
with open(file, newline='') as f:
|
||||||
problems = linter.run(f, conf, filepath)
|
problems = linter.run(f, conf, filepath)
|
||||||
|
prob_level = show_problems(problems,
|
||||||
|
file,
|
||||||
|
args_format=args.format,
|
||||||
|
no_warn=args.no_warnings)
|
||||||
except OSError as e:
|
except OSError as e:
|
||||||
print(e, file=sys.stderr)
|
print(e, file=sys.stderr)
|
||||||
sys.exit(-1)
|
sys.exit(-1)
|
||||||
prob_level = show_problems(problems, file, args_format=args.format,
|
|
||||||
no_warn=args.no_warnings)
|
|
||||||
max_level = max(max_level, prob_level)
|
max_level = max(max_level, prob_level)
|
||||||
|
|
||||||
# read yaml from stdin
|
# read yaml from stdin
|
||||||
|
|||||||
@@ -13,14 +13,14 @@
|
|||||||
# You should have received a copy of the GNU General Public License
|
# You should have received a copy of the GNU General Public License
|
||||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
import re
|
import codecs
|
||||||
import io
|
import io
|
||||||
|
import re
|
||||||
|
|
||||||
import yaml
|
import yaml
|
||||||
|
|
||||||
from yamllint import parser
|
from yamllint import parser
|
||||||
|
|
||||||
|
|
||||||
PROBLEM_LEVELS = {
|
PROBLEM_LEVELS = {
|
||||||
0: None,
|
0: None,
|
||||||
1: 'warning',
|
1: 'warning',
|
||||||
@@ -185,14 +185,96 @@ def get_syntax_error(buffer):
|
|||||||
return problem
|
return problem
|
||||||
|
|
||||||
|
|
||||||
def _run(buffer, conf, filepath):
|
def _read_yaml_unicode(f: io.IOBase) -> str:
|
||||||
assert hasattr(buffer, '__getitem__'), \
|
"""Reads and decodes file as p.5.2. Character Encodings
|
||||||
'_run() argument must be a buffer, not a stream'
|
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
f:
|
||||||
|
For CLI - file open for reading in text mode
|
||||||
|
(TextIOWrapper(BufferedReader(FileIO)))
|
||||||
|
|
||||||
|
For API & tests - may be text or binary file object
|
||||||
|
(StringIO, TextIOWrapper(BytesIO) or
|
||||||
|
TextIOWrapper(BufferedReader(BytesIO)))
|
||||||
|
"""
|
||||||
|
if not isinstance(f, io.TextIOWrapper):
|
||||||
|
# StringIO already have unicode, don't need decode
|
||||||
|
return (f.read(), False)
|
||||||
|
|
||||||
|
b = f.buffer
|
||||||
|
need = 4
|
||||||
|
if not isinstance(b, io.BufferedReader):
|
||||||
|
bs = bytes(b.getbuffer()[:need]) # BytesIO don't need peek()
|
||||||
|
else:
|
||||||
|
# Maximum of 4 raw.read()'s non-blocking file (or pipe)
|
||||||
|
# are required for peek 4 bytes or achieve EOF
|
||||||
|
lpbs = 0
|
||||||
|
bs = b.peek(need)
|
||||||
|
while len(bs) < need and len(bs) > lpbs:
|
||||||
|
# len(bs) > lpbs <=> b.raw.read() returned some bytes, not EOF
|
||||||
|
lpbs = len(bs)
|
||||||
|
bs = b.peek(need)
|
||||||
|
assert len(bs) >= need or not b.raw.read(1)
|
||||||
|
|
||||||
|
if bs.startswith(codecs.BOM_UTF32_BE):
|
||||||
|
f.reconfigure(encoding='utf-32be', errors='strict')
|
||||||
|
elif bs.startswith(codecs.BOM_UTF32_LE):
|
||||||
|
f.reconfigure(encoding='utf-32le', errors='strict')
|
||||||
|
elif bs.startswith(codecs.BOM_UTF16_BE):
|
||||||
|
f.reconfigure(encoding='utf-16be', errors='strict')
|
||||||
|
elif bs.startswith(codecs.BOM_UTF16_LE):
|
||||||
|
f.reconfigure(encoding='utf-16le', errors='strict')
|
||||||
|
elif bs.startswith(codecs.BOM_UTF8):
|
||||||
|
f.reconfigure(encoding='utf-8', errors='strict')
|
||||||
|
elif bs.startswith(b'+/v8'):
|
||||||
|
f.reconfigure(encoding='utf-7', errors='strict')
|
||||||
|
else:
|
||||||
|
if len(bs) >= 4:
|
||||||
|
if bs[:3] == b'\x00\x00\x00' and bs[3]:
|
||||||
|
f.reconfigure(encoding='utf-32be', errors='strict')
|
||||||
|
return (f.read(), False)
|
||||||
|
if bs[0] and bs[1:4] == b'\x00\x00\x00':
|
||||||
|
f.reconfigure(encoding='utf-32le', errors='strict')
|
||||||
|
return (f.read(), False)
|
||||||
|
if len(bs) >= 2:
|
||||||
|
if bs[0] == 0 and bs[1]:
|
||||||
|
f.reconfigure(encoding='utf-16be', errors='strict')
|
||||||
|
return (f.read(), False)
|
||||||
|
if bs[0] and bs[1] == 0:
|
||||||
|
f.reconfigure(encoding='utf-16le', errors='strict')
|
||||||
|
return (f.read(), False)
|
||||||
|
f.reconfigure(encoding='utf-8', errors='strict')
|
||||||
|
return (f.read(), False)
|
||||||
|
initial_bom = f.read(1)
|
||||||
|
assert initial_bom == '\uFEFF'
|
||||||
|
return (f.read(), True)
|
||||||
|
|
||||||
|
|
||||||
|
def _run(input, conf, filepath):
|
||||||
|
if isinstance(input, str):
|
||||||
|
buffer, initial_bom = input, False
|
||||||
|
else:
|
||||||
|
try:
|
||||||
|
buffer, initial_bom = _read_yaml_unicode(input)
|
||||||
|
except UnicodeDecodeError as e:
|
||||||
|
problem = LintProblem(0, 0, str(e), 'unicode-decode')
|
||||||
|
problem.level = 'error'
|
||||||
|
yield problem
|
||||||
|
return
|
||||||
|
|
||||||
first_line = next(parser.line_generator(buffer)).content
|
first_line = next(parser.line_generator(buffer)).content
|
||||||
if re.match(r'^#\s*yamllint disable-file\s*$', first_line):
|
if re.match(r'^#\s*yamllint disable-file\s*$', first_line):
|
||||||
return
|
return
|
||||||
|
|
||||||
|
if not initial_bom and first_line and not (first_line[0].isascii() and
|
||||||
|
(first_line[0].isprintable() or first_line[0].isspace())):
|
||||||
|
problem = LintProblem(1, 1,
|
||||||
|
"First Unicode character not ASCII without BOM",
|
||||||
|
'unicode-first-not-ascii')
|
||||||
|
problem.level = 'warning'
|
||||||
|
yield problem
|
||||||
|
|
||||||
# If the document contains a syntax error, save it and yield it at the
|
# If the document contains a syntax error, save it and yield it at the
|
||||||
# right line
|
# right line
|
||||||
syntax_error = get_syntax_error(buffer)
|
syntax_error = get_syntax_error(buffer)
|
||||||
@@ -226,11 +308,10 @@ def run(input, conf, filepath=None):
|
|||||||
if filepath is not None and conf.is_file_ignored(filepath):
|
if filepath is not None and conf.is_file_ignored(filepath):
|
||||||
return ()
|
return ()
|
||||||
|
|
||||||
if isinstance(input, (bytes, str)):
|
if isinstance(input, str):
|
||||||
return _run(input, conf, filepath)
|
return _run(input, conf, filepath)
|
||||||
elif isinstance(input, io.IOBase):
|
if isinstance(input, bytes):
|
||||||
# We need to have everything in memory to parse correctly
|
input = io.TextIOWrapper(io.BytesIO(input))
|
||||||
content = input.read()
|
if isinstance(input, io.IOBase):
|
||||||
return _run(content, conf, filepath)
|
return _run(input, conf, filepath)
|
||||||
else:
|
raise TypeError('input should be a string or a stream')
|
||||||
raise TypeError('input should be a string or a stream')
|
|
||||||
|
|||||||
Reference in New Issue
Block a user