#!/usr/bin/env python3
#
# Copyright (c) 2016-2022 Intel Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Based on the software developed by:
# Copyright (c) 2008,2016 david decotigny (Pool of threads)
# Copyright (c) 2006-2008, R Oudkerk (multiprocessing.Pool)
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
# 1. Redistributions of source code must retain the above copyright
#    notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
#    notice, this list of conditions and the following disclaimer in the
#    documentation and/or other materials provided with the distribution.
# 3. Neither the name of author nor the names of any contributors may be
#    used to endorse or promote products derived from this software
#    without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
# SUCH DAMAGE.
#

import time
import threading

from .api import *
from .pool import *


def test(arg=None):
    if arg == "-v":
        def say(*x):
            print(*x)
    else:
        def say(*x):
            pass
    say("Start Pool testing")
    print("oneTBB version is %s" % runtime_version())
    print("oneTBB interface version is %s" % runtime_interface_version())

    get_tid = lambda: threading.current_thread().ident

    assert default_num_threads() == this_task_arena_max_concurrency()

    def return42():
        return 42

    def f(x):
        return x * x

    def work(mseconds):
        res = str(mseconds)
        if mseconds < 0:
            mseconds = -mseconds
        say("[%d] Start to work for %fms..." % (get_tid(), mseconds*10))
        time.sleep(mseconds/100.)
        say("[%d] Work done (%fms)." % (get_tid(), mseconds*10))
        return res

    # special flag to to be set by thread calling async work
    spin_flag = None
    def timeout_work(param):
        say("[%d] Spin wait work start..." % get_tid())
        while spin_flag:
            time.sleep(0.0001) # yield equivalent
        say("[%d] Work done." % get_tid())
        return str(param) if param != None else None

    def prepare_timeout_exception():
        nonlocal spin_flag
        spin_flag = True # lock threads in timeout_work

    def check_timeout_exception(pool_object, func):
        nonlocal spin_flag
        try:
            func(pool_object)
        except TimeoutError:
            say("Good. Got expected timeout exception.")
        else:
            assert False, "Expected exception !"
        spin_flag = False # unlock threads in timeout_work

    ### Test copy/pasted from multiprocessing
    pool = Pool(4)  # start worker threads

    # edge cases
    assert pool.map(return42, []) == []
    assert pool.apply_async(return42, []).get() == 42
    assert pool.apply(return42, []) == 42
    assert list(pool.imap(return42, iter([]))) == []
    assert list(pool.imap_unordered(return42, iter([]))) == []
    assert pool.map_async(return42, []).get() == []
    assert list(pool.imap_async(return42, iter([])).get()) == []
    assert list(pool.imap_unordered_async(return42, iter([])).get()) == []

    # basic tests
    result = pool.apply_async(f, (10,))  # evaluate "f(10)" asynchronously
    assert result.get(timeout=1) == 100  # ... unless slow computer
    assert list(pool.map(f, range(10))) == list(map(f, range(10)))
    it = pool.imap(f, range(10))
    assert next(it) == 0
    assert next(it) == 1
    assert next(it) == 4

    # Test apply_sync exceptions
    prepare_timeout_exception()
    result = pool.apply_async(timeout_work, (None,))
    check_timeout_exception(result, lambda result : say(result.get(timeout=1)))
    assert result.get() is None  # sleep() returns None

    def cb(s):
        say("Result ready: %s" % s)

    # Test imap()
    assert list(pool.imap(work, range(10, 3, -1), chunksize=4)) == list(map(
        str, range(10, 3, -1)))

    # Test imap_unordered()
    assert sorted(pool.imap_unordered(work, range(10, 3, -1))) == sorted(map(
        str, range(10, 3, -1)))

    # Test map_async()
    prepare_timeout_exception()
    result = pool.map_async(timeout_work, range(10), callback=cb)
    check_timeout_exception(result, lambda result : result.get(timeout=0.01))
    say(result.get())

    # Test imap_async()
    prepare_timeout_exception()
    result = pool.imap_async(timeout_work, range(3, 10), callback=cb)
    check_timeout_exception(result, lambda result : result.get(timeout=0.01))
    for i in result.get():
        say("Item:", i)
    say("### Loop again:")
    for i in result.get():
        say("Item2:", i)

    # Test imap_unordered_async()
    prepare_timeout_exception()
    result = pool.imap_unordered_async(timeout_work, range(10, 3, -1), callback=cb)
    check_timeout_exception(result, lambda result : result.get(timeout=0.01))
    for i in result.get():
        say("Item1:", i)
    for i in result.get():
        say("Item2:", i)
    r = result.get()
    for i in r:
        say("Item3:", i)
    for i in r:
        say("Item4:", i)
    for i in r:
        say("Item5:", i)

    #
    # The case for the exceptions
    #

    # Exceptions in imap_unordered_async()
    result = pool.imap_unordered_async(work, range(2, -10, -1), callback=cb)
    time.sleep(3)
    try:
        for i in result.get():
            say("Got item:", i)
    except (IOError, ValueError):
        say("Good. Got expected exception")

    # Exceptions in imap_async()
    result = pool.imap_async(work, range(2, -10, -1), callback=cb)
    time.sleep(3)
    try:
        for i in result.get():
            say("Got item:", i)
    except (IOError, ValueError):
        say("Good. Got expected exception")

    # Stop the test: need to stop the pool !!!
    pool.terminate()
    pool.join()

if __name__ == "__main__":
    test()
