| ############################################################################## |
| # |
| # Copyright (c) 2002 Zope Foundation and Contributors. |
| # All Rights Reserved. |
| # |
| # This software is subject to the provisions of the Zope Public License, |
| # Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution. |
| # THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED |
| # WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED |
| # WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS |
| # FOR A PARTICULAR PURPOSE. |
| # |
| ############################################################################## |
| """HTTP Request Parser tests |
| """ |
| import unittest |
| |
| from waitress.adjustments import Adjustments |
| from waitress.parser import ( |
| HTTPRequestParser, |
| ParsingError, |
| TransferEncodingNotImplemented, |
| crack_first_line, |
| get_header_lines, |
| split_uri, |
| unquote_bytes_to_wsgi, |
| ) |
| from waitress.utilities import ( |
| BadRequest, |
| RequestEntityTooLarge, |
| RequestHeaderFieldsTooLarge, |
| ServerNotImplemented, |
| ) |
| |
| |
| class TestHTTPRequestParser(unittest.TestCase): |
| def setUp(self): |
| my_adj = Adjustments() |
| self.parser = HTTPRequestParser(my_adj) |
| |
| def test_get_body_stream_None(self): |
| self.parser.body_recv = None |
| result = self.parser.get_body_stream() |
| self.assertEqual(result.getvalue(), b"") |
| |
| def test_get_body_stream_nonNone(self): |
| body_rcv = DummyBodyStream() |
| self.parser.body_rcv = body_rcv |
| result = self.parser.get_body_stream() |
| self.assertEqual(result, body_rcv) |
| |
| def test_received_get_no_headers(self): |
| data = b"HTTP/1.0 GET /foobar\r\n\r\n" |
| result = self.parser.received(data) |
| self.assertEqual(result, 24) |
| self.assertTrue(self.parser.completed) |
| self.assertDictEqual(self.parser.headers, {}) |
| |
| def test_received_bad_host_header(self): |
| data = b"HTTP/1.0 GET /foobar\r\n Host: foo\r\n\r\n" |
| result = self.parser.received(data) |
| self.assertEqual(result, 36) |
| self.assertTrue(self.parser.completed) |
| self.assertIsInstance(self.parser.error, BadRequest) |
| |
| def test_received_bad_transfer_encoding(self): |
| data = ( |
| b"GET /foobar HTTP/1.1\r\n" |
| b"Transfer-Encoding: foo\r\n" |
| b"\r\n" |
| b"1d;\r\n" |
| b"This string has 29 characters\r\n" |
| b"0\r\n\r\n" |
| ) |
| result = self.parser.received(data) |
| self.assertEqual(result, 48) |
| self.assertTrue(self.parser.completed) |
| self.assertIsInstance(self.parser.error, ServerNotImplemented) |
| |
| def test_received_nonsense_nothing(self): |
| data = b"\r\n\r\n" |
| result = self.parser.received(data) |
| self.assertEqual(result, 4) |
| self.assertTrue(self.parser.completed) |
| self.assertDictEqual(self.parser.headers, {}) |
| |
| def test_received_no_doublecr(self): |
| data = b"GET /foobar HTTP/8.4\r\n" |
| result = self.parser.received(data) |
| self.assertEqual(result, 22) |
| self.assertFalse(self.parser.completed) |
| self.assertDictEqual(self.parser.headers, {}) |
| |
| def test_received_already_completed(self): |
| self.parser.completed = True |
| result = self.parser.received(b"a") |
| self.assertEqual(result, 0) |
| |
| def test_received_cl_too_large(self): |
| self.parser.adj.max_request_body_size = 2 |
| data = b"GET /foobar HTTP/8.4\r\nContent-Length: 10\r\n\r\n" |
| result = self.parser.received(data) |
| self.assertEqual(result, 44) |
| self.assertTrue(self.parser.completed) |
| self.assertIsInstance(self.parser.error, RequestEntityTooLarge) |
| |
| def test_received_headers_not_too_large_multiple_chunks(self): |
| data = b"GET /foobar HTTP/8.4\r\nX-Foo: 1\r\n" |
| data2 = b"X-Foo-Other: 3\r\n\r\n" |
| self.parser.adj.max_request_header_size = len(data) + len(data2) + 1 |
| result = self.parser.received(data) |
| self.assertEqual(result, 32) |
| result = self.parser.received(data2) |
| self.assertEqual(result, 18) |
| self.assertTrue(self.parser.completed) |
| self.assertFalse(self.parser.error) |
| |
| def test_received_headers_too_large(self): |
| self.parser.adj.max_request_header_size = 2 |
| data = b"GET /foobar HTTP/8.4\r\nX-Foo: 1\r\n\r\n" |
| result = self.parser.received(data) |
| self.assertEqual(result, 34) |
| self.assertTrue(self.parser.completed) |
| self.assertIsInstance(self.parser.error, RequestHeaderFieldsTooLarge) |
| |
| def test_received_body_too_large(self): |
| self.parser.adj.max_request_body_size = 2 |
| data = ( |
| b"GET /foobar HTTP/1.1\r\n" |
| b"Transfer-Encoding: chunked\r\n" |
| b"X-Foo: 1\r\n" |
| b"\r\n" |
| b"1d;\r\n" |
| b"This string has 29 characters\r\n" |
| b"0\r\n\r\n" |
| ) |
| |
| result = self.parser.received(data) |
| self.assertEqual(result, 62) |
| self.parser.received(data[result:]) |
| self.assertTrue(self.parser.completed) |
| self.assertIsInstance(self.parser.error, RequestEntityTooLarge) |
| |
| def test_received_error_from_parser(self): |
| data = ( |
| b"GET /foobar HTTP/1.1\r\n" |
| b"Transfer-Encoding: chunked\r\n" |
| b"X-Foo: 1\r\n" |
| b"\r\n" |
| b"garbage\r\n" |
| ) |
| # header |
| result = self.parser.received(data) |
| # body |
| result = self.parser.received(data[result:]) |
| self.assertEqual(result, 9) |
| self.assertTrue(self.parser.completed) |
| self.assertIsInstance(self.parser.error, BadRequest) |
| |
| def test_received_chunked_completed_sets_content_length(self): |
| data = ( |
| b"GET /foobar HTTP/1.1\r\n" |
| b"Transfer-Encoding: chunked\r\n" |
| b"X-Foo: 1\r\n" |
| b"\r\n" |
| b"1d\r\n" |
| b"This string has 29 characters\r\n" |
| b"0\r\n\r\n" |
| ) |
| result = self.parser.received(data) |
| self.assertEqual(result, 62) |
| data = data[result:] |
| result = self.parser.received(data) |
| self.assertTrue(self.parser.completed) |
| self.assertIsNone(self.parser.error) |
| self.assertEqual(self.parser.headers["CONTENT_LENGTH"], "29") |
| |
| def test_parse_header_gardenpath(self): |
| data = b"GET /foobar HTTP/8.4\r\nfoo: bar\r\n" |
| self.parser.parse_header(data) |
| self.assertEqual(self.parser.first_line, b"GET /foobar HTTP/8.4") |
| self.assertEqual(self.parser.headers["FOO"], "bar") |
| |
| def test_parse_header_no_cr_in_headerplus(self): |
| data = b"GET /foobar HTTP/8.4" |
| |
| try: |
| self.parser.parse_header(data) |
| except ParsingError: |
| pass |
| else: # pragma: nocover |
| self.assertTrue(False) |
| |
| def test_parse_header_bad_content_length(self): |
| data = b"GET /foobar HTTP/8.4\r\ncontent-length: abc\r\n" |
| |
| try: |
| self.parser.parse_header(data) |
| except ParsingError as e: |
| self.assertIn("Content-Length is invalid", e.args[0]) |
| else: # pragma: nocover |
| self.assertTrue(False) |
| |
| def test_parse_header_bad_content_length_plus(self): |
| data = b"GET /foobar HTTP/8.4\r\ncontent-length: +10\r\n" |
| |
| try: |
| self.parser.parse_header(data) |
| except ParsingError as e: |
| self.assertIn("Content-Length is invalid", e.args[0]) |
| else: # pragma: nocover |
| self.assertTrue(False) |
| |
| def test_parse_header_bad_content_length_minus(self): |
| data = b"GET /foobar HTTP/8.4\r\ncontent-length: -10\r\n" |
| |
| try: |
| self.parser.parse_header(data) |
| except ParsingError as e: |
| self.assertIn("Content-Length is invalid", e.args[0]) |
| else: # pragma: nocover |
| self.assertTrue(False) |
| |
| def test_parse_header_multiple_content_length(self): |
| data = b"GET /foobar HTTP/8.4\r\ncontent-length: 10\r\ncontent-length: 20\r\n" |
| |
| try: |
| self.parser.parse_header(data) |
| except ParsingError as e: |
| self.assertIn("Content-Length is invalid", e.args[0]) |
| else: # pragma: nocover |
| self.assertTrue(False) |
| |
| def test_parse_header_11_te_chunked(self): |
| # NB: test that capitalization of header value is unimportant |
| data = b"GET /foobar HTTP/1.1\r\ntransfer-encoding: ChUnKed\r\n" |
| self.parser.parse_header(data) |
| self.assertEqual(self.parser.body_rcv.__class__.__name__, "ChunkedReceiver") |
| |
| def test_parse_header_transfer_encoding_invalid(self): |
| data = b"GET /foobar HTTP/1.1\r\ntransfer-encoding: gzip\r\n" |
| |
| try: |
| self.parser.parse_header(data) |
| except TransferEncodingNotImplemented as e: |
| self.assertIn("Transfer-Encoding requested is not supported.", e.args[0]) |
| else: # pragma: nocover |
| self.assertTrue(False) |
| |
| def test_parse_header_transfer_encoding_invalid_multiple(self): |
| data = b"GET /foobar HTTP/1.1\r\ntransfer-encoding: gzip\r\ntransfer-encoding: chunked\r\n" |
| |
| try: |
| self.parser.parse_header(data) |
| except TransferEncodingNotImplemented as e: |
| self.assertIn("Transfer-Encoding requested is not supported.", e.args[0]) |
| else: # pragma: nocover |
| self.assertTrue(False) |
| |
| def test_parse_header_transfer_encoding_invalid_whitespace(self): |
| data = b"GET /foobar HTTP/1.1\r\nTransfer-Encoding:\x85chunked\r\n" |
| |
| try: |
| self.parser.parse_header(data) |
| except TransferEncodingNotImplemented as e: |
| self.assertIn("Transfer-Encoding requested is not supported.", e.args[0]) |
| else: # pragma: nocover |
| self.assertTrue(False) |
| |
| def test_parse_header_transfer_encoding_invalid_unicode(self): |
| # This is the binary encoding for the UTF-8 character |
| # https://www.compart.com/en/unicode/U+212A "unicode character "K"" |
| # which if waitress were to accidentally do the wrong thing get |
| # lowercased to just the ascii "k" due to unicode collisions during |
| # transformation |
| data = b"GET /foobar HTTP/1.1\r\nTransfer-Encoding: chun\xe2\x84\xaaed\r\n" |
| |
| try: |
| self.parser.parse_header(data) |
| except TransferEncodingNotImplemented as e: |
| self.assertIn("Transfer-Encoding requested is not supported.", e.args[0]) |
| else: # pragma: nocover |
| self.assertTrue(False) |
| |
| def test_parse_header_11_expect_continue(self): |
| data = b"GET /foobar HTTP/1.1\r\nexpect: 100-continue\r\n" |
| self.parser.parse_header(data) |
| self.assertTrue(self.parser.expect_continue) |
| |
| def test_parse_header_connection_close(self): |
| data = b"GET /foobar HTTP/1.1\r\nConnection: close\r\n" |
| self.parser.parse_header(data) |
| self.assertTrue(self.parser.connection_close) |
| |
| def test_close_with_body_rcv(self): |
| body_rcv = DummyBodyStream() |
| self.parser.body_rcv = body_rcv |
| self.parser.close() |
| self.assertTrue(body_rcv.closed) |
| |
| def test_close_with_no_body_rcv(self): |
| self.parser.body_rcv = None |
| self.parser.close() # doesn't raise |
| |
| def test_parse_header_lf_only(self): |
| data = b"GET /foobar HTTP/8.4\nfoo: bar" |
| |
| try: |
| self.parser.parse_header(data) |
| except ParsingError: |
| pass |
| else: # pragma: nocover |
| self.assertTrue(False) |
| |
| def test_parse_header_cr_only(self): |
| data = b"GET /foobar HTTP/8.4\rfoo: bar" |
| try: |
| self.parser.parse_header(data) |
| except ParsingError: |
| pass |
| else: # pragma: nocover |
| self.assertTrue(False) |
| |
| def test_parse_header_extra_lf_in_header(self): |
| data = b"GET /foobar HTTP/8.4\r\nfoo: \nbar\r\n" |
| try: |
| self.parser.parse_header(data) |
| except ParsingError as e: |
| self.assertIn("Bare CR or LF found in header line", e.args[0]) |
| else: # pragma: nocover |
| self.assertTrue(False) |
| |
| def test_parse_header_extra_lf_in_first_line(self): |
| data = b"GET /foobar\n HTTP/8.4\r\n" |
| try: |
| self.parser.parse_header(data) |
| except ParsingError as e: |
| self.assertIn("Bare CR or LF found in HTTP message", e.args[0]) |
| else: # pragma: nocover |
| self.assertTrue(False) |
| |
| def test_parse_header_invalid_whitespace(self): |
| data = b"GET /foobar HTTP/8.4\r\nfoo : bar\r\n" |
| try: |
| self.parser.parse_header(data) |
| except ParsingError as e: |
| self.assertIn("Invalid header", e.args[0]) |
| else: # pragma: nocover |
| self.assertTrue(False) |
| |
| def test_parse_header_invalid_whitespace_vtab(self): |
| data = b"GET /foobar HTTP/1.1\r\nfoo:\x0bbar\r\n" |
| try: |
| self.parser.parse_header(data) |
| except ParsingError as e: |
| self.assertIn("Invalid header", e.args[0]) |
| else: # pragma: nocover |
| self.assertTrue(False) |
| |
| def test_parse_header_invalid_no_colon(self): |
| data = b"GET /foobar HTTP/1.1\r\nfoo: bar\r\nnotvalid\r\n" |
| try: |
| self.parser.parse_header(data) |
| except ParsingError as e: |
| self.assertIn("Invalid header", e.args[0]) |
| else: # pragma: nocover |
| self.assertTrue(False) |
| |
| def test_parse_header_invalid_folding_spacing(self): |
| data = b"GET /foobar HTTP/1.1\r\nfoo: bar\r\n\t\x0bbaz\r\n" |
| try: |
| self.parser.parse_header(data) |
| except ParsingError as e: |
| self.assertIn("Invalid header", e.args[0]) |
| else: # pragma: nocover |
| self.assertTrue(False) |
| |
| def test_parse_header_invalid_chars(self): |
| data = b"GET /foobar HTTP/1.1\r\nfoo: bar\r\nfoo: \x0bbaz\r\n" |
| try: |
| self.parser.parse_header(data) |
| except ParsingError as e: |
| self.assertIn("Invalid header", e.args[0]) |
| else: # pragma: nocover |
| self.assertTrue(False) |
| |
| def test_parse_header_other_whitespace(self): |
| data = b"GET /foobar HTTP/1.1\r\nfoo: \xa0something\x85\r\n" |
| self.parser.parse_header(data) |
| self.assertEqual(self.parser.headers["FOO"], "\xa0something\x85") |
| |
| def test_parse_header_empty(self): |
| data = b"GET /foobar HTTP/1.1\r\nfoo: bar\r\nempty:\r\n" |
| self.parser.parse_header(data) |
| |
| self.assertIn("EMPTY", self.parser.headers) |
| self.assertIn("FOO", self.parser.headers) |
| self.assertEqual(self.parser.headers["EMPTY"], "") |
| self.assertEqual(self.parser.headers["FOO"], "bar") |
| |
| def test_parse_header_multiple_values(self): |
| data = b"GET /foobar HTTP/1.1\r\nfoo: bar, whatever, more, please, yes\r\n" |
| self.parser.parse_header(data) |
| |
| self.assertIn("FOO", self.parser.headers) |
| self.assertEqual(self.parser.headers["FOO"], "bar, whatever, more, please, yes") |
| |
| def test_parse_header_multiple_values_header_folded(self): |
| data = b"GET /foobar HTTP/1.1\r\nfoo: bar, whatever,\r\n more, please, yes\r\n" |
| self.parser.parse_header(data) |
| |
| self.assertIn("FOO", self.parser.headers) |
| self.assertEqual(self.parser.headers["FOO"], "bar, whatever, more, please, yes") |
| |
| def test_parse_header_multiple_values_header_folded_multiple(self): |
| data = b"GET /foobar HTTP/1.1\r\nfoo: bar, whatever,\r\n more\r\nfoo: please, yes\r\n" |
| self.parser.parse_header(data) |
| |
| self.assertIn("FOO", self.parser.headers) |
| self.assertEqual(self.parser.headers["FOO"], "bar, whatever, more, please, yes") |
| |
| def test_parse_header_multiple_values_extra_space(self): |
| # Tests errata from: https://www.rfc-editor.org/errata_search.php?rfc=7230&eid=4189 |
| data = b"GET /foobar HTTP/1.1\r\nfoo: abrowser/0.001 (C O M M E N T)\r\n" |
| self.parser.parse_header(data) |
| |
| self.assertIn("FOO", self.parser.headers) |
| self.assertEqual(self.parser.headers["FOO"], "abrowser/0.001 (C O M M E N T)") |
| |
| def test_parse_header_invalid_backtrack_bad(self): |
| data = b"GET /foobar HTTP/1.1\r\nfoo: bar\r\nfoo: xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\x10\r\n" |
| try: |
| self.parser.parse_header(data) |
| except ParsingError as e: |
| self.assertIn("Invalid header", e.args[0]) |
| else: # pragma: nocover |
| self.assertTrue(False) |
| |
| def test_parse_header_short_values(self): |
| data = b"GET /foobar HTTP/1.1\r\none: 1\r\ntwo: 22\r\n" |
| self.parser.parse_header(data) |
| |
| self.assertIn("ONE", self.parser.headers) |
| self.assertIn("TWO", self.parser.headers) |
| self.assertEqual(self.parser.headers["ONE"], "1") |
| self.assertEqual(self.parser.headers["TWO"], "22") |
| |
| |
| class Test_split_uri(unittest.TestCase): |
| def _callFUT(self, uri): |
| ( |
| self.proxy_scheme, |
| self.proxy_netloc, |
| self.path, |
| self.query, |
| self.fragment, |
| ) = split_uri(uri) |
| |
| def test_split_uri_unquoting_unneeded(self): |
| self._callFUT(b"http://localhost:8080/abc def") |
| self.assertEqual(self.path, "/abc def") |
| |
| def test_split_uri_unquoting_needed(self): |
| self._callFUT(b"http://localhost:8080/abc%20def") |
| self.assertEqual(self.path, "/abc def") |
| |
| def test_split_url_with_query(self): |
| self._callFUT(b"http://localhost:8080/abc?a=1&b=2") |
| self.assertEqual(self.path, "/abc") |
| self.assertEqual(self.query, "a=1&b=2") |
| |
| def test_split_url_with_query_empty(self): |
| self._callFUT(b"http://localhost:8080/abc?") |
| self.assertEqual(self.path, "/abc") |
| self.assertEqual(self.query, "") |
| |
| def test_split_url_with_fragment(self): |
| self._callFUT(b"http://localhost:8080/#foo") |
| self.assertEqual(self.path, "/") |
| self.assertEqual(self.fragment, "foo") |
| |
| def test_split_url_https(self): |
| self._callFUT(b"https://localhost:8080/") |
| self.assertEqual(self.path, "/") |
| self.assertEqual(self.proxy_scheme, "https") |
| self.assertEqual(self.proxy_netloc, "localhost:8080") |
| |
| def test_split_uri_unicode_error_raises_parsing_error(self): |
| # See https://github.com/Pylons/waitress/issues/64 |
| |
| # Either pass or throw a ParsingError, just don't throw another type of |
| # exception as that will cause the connection to close badly: |
| try: |
| self._callFUT(b"/\xd0") |
| except ParsingError: |
| pass |
| |
| def test_split_uri_path(self): |
| self._callFUT(b"//testing/whatever") |
| self.assertEqual(self.path, "//testing/whatever") |
| self.assertEqual(self.proxy_scheme, "") |
| self.assertEqual(self.proxy_netloc, "") |
| self.assertEqual(self.query, "") |
| self.assertEqual(self.fragment, "") |
| |
| def test_split_uri_path_query(self): |
| self._callFUT(b"//testing/whatever?a=1&b=2") |
| self.assertEqual(self.path, "//testing/whatever") |
| self.assertEqual(self.proxy_scheme, "") |
| self.assertEqual(self.proxy_netloc, "") |
| self.assertEqual(self.query, "a=1&b=2") |
| self.assertEqual(self.fragment, "") |
| |
| def test_split_uri_path_query_fragment(self): |
| self._callFUT(b"//testing/whatever?a=1&b=2#fragment") |
| self.assertEqual(self.path, "//testing/whatever") |
| self.assertEqual(self.proxy_scheme, "") |
| self.assertEqual(self.proxy_netloc, "") |
| self.assertEqual(self.query, "a=1&b=2") |
| self.assertEqual(self.fragment, "fragment") |
| |
| |
| class Test_get_header_lines(unittest.TestCase): |
| def _callFUT(self, data): |
| return get_header_lines(data) |
| |
| def test_get_header_lines(self): |
| result = self._callFUT(b"slam\r\nslim") |
| self.assertListEqual(result, [b"slam", b"slim"]) |
| |
| def test_get_header_lines_folded(self): |
| # From RFC2616: |
| # HTTP/1.1 header field values can be folded onto multiple lines if the |
| # continuation line begins with a space or horizontal tab. All linear |
| # white space, including folding, has the same semantics as SP. A |
| # recipient MAY replace any linear white space with a single SP before |
| # interpreting the field value or forwarding the message downstream. |
| |
| # We are just preserving the whitespace that indicates folding. |
| result = self._callFUT(b"slim\r\n slam") |
| self.assertListEqual(result, [b"slim slam"]) |
| |
| def test_get_header_lines_tabbed(self): |
| result = self._callFUT(b"slam\r\n\tslim") |
| self.assertListEqual(result, [b"slam\tslim"]) |
| |
| def test_get_header_lines_malformed(self): |
| # https://corte.si/posts/code/pathod/pythonservers/index.html |
| self.assertRaises(ParsingError, self._callFUT, b" Host: localhost\r\n\r\n") |
| |
| |
| class Test_crack_first_line(unittest.TestCase): |
| def _callFUT(self, line): |
| return crack_first_line(line) |
| |
| def test_crack_first_line_matchok(self): |
| result = self._callFUT(b"GET / HTTP/1.0") |
| self.assertTupleEqual(result, (b"GET", b"/", b"1.0")) |
| |
| def test_crack_first_line_lowercase_method(self): |
| self.assertRaises(ParsingError, self._callFUT, b"get / HTTP/1.0") |
| |
| def test_crack_first_line_nomatch(self): |
| result = self._callFUT(b"GET / bleh") |
| self.assertTupleEqual(result, (b"", b"", b"")) |
| |
| result = self._callFUT(b"GET /info?txtAirPlay&txtRAOP RTSP/1.0") |
| self.assertTupleEqual(result, (b"", b"", b"")) |
| |
| def test_crack_first_line_missing_version(self): |
| result = self._callFUT(b"GET /") |
| self.assertTupleEqual(result, (b"GET", b"/", b"")) |
| |
| def test_crack_first_line_bad_method(self): |
| result = self._callFUT(b"GE\x00 /foobar HTTP/8.4") |
| self.assertTupleEqual(result, (b"", b"", b"")) |
| |
| def test_crack_first_line_bad_version(self): |
| result = self._callFUT(b"GET /foobar HTTP/.1.") |
| self.assertTupleEqual(result, (b"", b"", b"")) |
| |
| |
| class TestHTTPRequestParserIntegration(unittest.TestCase): |
| def setUp(self): |
| my_adj = Adjustments() |
| self.parser = HTTPRequestParser(my_adj) |
| |
| def feed(self, data): |
| parser = self.parser |
| |
| for n in range(100): # make sure we never loop forever |
| consumed = parser.received(data) |
| data = data[consumed:] |
| |
| if parser.completed: |
| return |
| raise ValueError("Looping") # pragma: no cover |
| |
| def testSimpleGET(self): |
| data = ( |
| b"GET /foobar HTTP/8.4\r\n" |
| b"FirstName: mickey\r\n" |
| b"lastname: Mouse\r\n" |
| b"content-length: 6\r\n" |
| b"\r\n" |
| b"Hello." |
| ) |
| parser = self.parser |
| self.feed(data) |
| self.assertTrue(parser.completed) |
| self.assertEqual(parser.version, "8.4") |
| self.assertFalse(parser.empty) |
| self.assertDictEqual( |
| parser.headers, |
| { |
| "FIRSTNAME": "mickey", |
| "LASTNAME": "Mouse", |
| "CONTENT_LENGTH": "6", |
| }, |
| ) |
| self.assertEqual(parser.path, "/foobar") |
| self.assertEqual(parser.command, "GET") |
| self.assertEqual(parser.query, "") |
| self.assertEqual(parser.proxy_scheme, "") |
| self.assertEqual(parser.proxy_netloc, "") |
| self.assertEqual(parser.get_body_stream().getvalue(), b"Hello.") |
| |
| def testComplexGET(self): |
| data = ( |
| b"GET /foo/a+%2B%2F%C3%A4%3D%26a%3Aint?d=b+%2B%2F%3D%26b%3Aint&c+%2B%2F%3D%26c%3Aint=6 HTTP/8.4\r\n" |
| b"FirstName: mickey\r\n" |
| b"lastname: Mouse\r\n" |
| b"content-length: 10\r\n" |
| b"\r\n" |
| b"Hello mickey." |
| ) |
| parser = self.parser |
| self.feed(data) |
| self.assertEqual(parser.command, "GET") |
| self.assertEqual(parser.version, "8.4") |
| self.assertFalse(parser.empty) |
| self.assertDictEqual( |
| parser.headers, |
| {"FIRSTNAME": "mickey", "LASTNAME": "Mouse", "CONTENT_LENGTH": "10"}, |
| ) |
| # path should be utf-8 encoded |
| self.assertEqual( |
| parser.path.encode("latin-1").decode("utf-8"), |
| b"/foo/a++/\xc3\xa4=&a:int".decode("utf-8"), |
| ) |
| # parser.request_uri should preserve the % escape sequences and the query string. |
| self.assertEqual( |
| parser.request_uri, |
| "/foo/a+%2B%2F%C3%A4%3D%26a%3Aint?d=b+%2B%2F%3D%26b%3Aint&c+%2B%2F%3D%26c%3Aint=6", |
| ) |
| self.assertEqual( |
| parser.query, "d=b+%2B%2F%3D%26b%3Aint&c+%2B%2F%3D%26c%3Aint=6" |
| ) |
| self.assertEqual(parser.get_body_stream().getvalue(), b"Hello mick") |
| |
| def testProxyGET(self): |
| data = ( |
| b"GET https://example.com:8080/foobar HTTP/8.4\r\n" |
| b"content-length: 6\r\n" |
| b"\r\n" |
| b"Hello." |
| ) |
| parser = self.parser |
| self.feed(data) |
| self.assertTrue(parser.completed) |
| self.assertEqual(parser.version, "8.4") |
| self.assertFalse(parser.empty) |
| self.assertEqual(parser.headers, {"CONTENT_LENGTH": "6"}) |
| self.assertEqual(parser.path, "/foobar") |
| self.assertEqual(parser.command, "GET") |
| self.assertEqual(parser.proxy_scheme, "https") |
| self.assertEqual(parser.proxy_netloc, "example.com:8080") |
| self.assertEqual(parser.command, "GET") |
| self.assertEqual(parser.query, "") |
| self.assertEqual(parser.get_body_stream().getvalue(), b"Hello.") |
| |
| def testDuplicateHeaders(self): |
| # Ensure that headers with the same key get concatenated as per |
| # RFC2616. |
| data = ( |
| b"GET /foobar HTTP/8.4\r\n" |
| b"x-forwarded-for: 10.11.12.13\r\n" |
| b"x-forwarded-for: unknown,127.0.0.1\r\n" |
| b"X-Forwarded_for: 255.255.255.255\r\n" |
| b"content-length: 6\r\n" |
| b"\r\n" |
| b"Hello." |
| ) |
| self.feed(data) |
| self.assertTrue(self.parser.completed) |
| self.assertDictEqual( |
| self.parser.headers, |
| { |
| "CONTENT_LENGTH": "6", |
| "X_FORWARDED_FOR": "10.11.12.13, unknown,127.0.0.1", |
| }, |
| ) |
| |
| def testSpoofedHeadersDropped(self): |
| data = ( |
| b"GET /foobar HTTP/8.4\r\n" |
| b"x-auth_user: bob\r\n" |
| b"content-length: 6\r\n" |
| b"\r\n" |
| b"Hello." |
| ) |
| self.feed(data) |
| self.assertTrue(self.parser.completed) |
| self.assertDictEqual( |
| self.parser.headers, |
| { |
| "CONTENT_LENGTH": "6", |
| }, |
| ) |
| |
| |
| class Test_unquote_bytes_to_wsgi(unittest.TestCase): |
| def _callFUT(self, v): |
| return unquote_bytes_to_wsgi(v) |
| |
| def test_highorder(self): |
| val = b"/a%C5%9B" |
| result = self._callFUT(val) |
| # PEP 3333 urlunquoted-latin1-decoded-bytes |
| self.assertEqual(result, "/aÅ\x9b") |
| |
| |
| class DummyBodyStream: |
| def getfile(self): |
| return self |
| |
| def getbuf(self): |
| return self |
| |
| def close(self): |
| self.closed = True |