#!/usr/bin/python

# osh
# Copyright (C) 2005 Jack Orenstein <jao@geophile.com>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.

import traceback

from osh.api import *

def smoketest(label, pipeline, expected_output):
    print label
    pipeline.append(return_list())
    try:
        actual_output = osh(*pipeline)
        if actual_output != expected_output:
            print 'expected: %s' % str(expected_output)
            print 'actual:   %s' % str(actual_output)
    except:
        traceback.print_exc(file = sys.stderr)        
    
smoketest('gen and out',
          [gen(3)],
          [(0,), (1,), (2,)])

smoketest('f (lambda)',
          [gen(3), f(lambda x: x * 10)],
          [(0,), (10,), (20,)])
smoketest('f (string)',
          [gen(3), f('x: x * 10')],
          [(0,), (10,), (20,)])

smoketest('select (lambda)',
          [gen(4), select(lambda x: x % 2 == 1)],
          [(1,), (3,)])
smoketest('select (string)',
          [gen(4), select('x: x % 2 == 1')],
          [(1,), (3,)])

smoketest('agg (lambda)',
          [gen(5), agg(0, lambda sum, x: sum + x)],
          [(10,)])
smoketest('agg (string)',
          [gen(5), agg(0, 'sum, x: sum + x')],
          [(10,)])
smoketest('agg group (lambda)',
          [gen(5),
           f(lambda x: (x / 2, x)),
           agg(0, lambda sum, halfx, x: sum + x, group = lambda halfx, x: halfx)],
          [(0, 1), (1, 5), (2, 4)])
smoketest('agg group (string)',
          [gen(5),
           f('x: (x / 2, x)'),
           agg(0, 'sum, halfx, x: sum + x', group = 'halfx, x: halfx')],
          [(0, 1), (1, 5), (2, 4)])
smoketest('agg consecutive (lambda)',
          [gen(5),
           f(lambda x: (x / 2, x)),
           agg(0, lambda sum, halfx, x: sum + x, consecutive = lambda halfx, x: halfx)],
          [(0, 1), (1, 5), (2, 4)])
smoketest('agg consecutive (string)',
          [gen(5),
           f('x: (x / 2, x)'),
           agg(0, 'sum, halfx, x: sum + x', consecutive = 'halfx, x: halfx')],
          [(0, 1), (1, 5), (2, 4)])
smoketest('agg (running)',
          [gen(5), agg(0, lambda sum, x: sum + x, running = True)],
          [(0, 0), (1, 1), (3, 2), (6, 3), (10, 4)])
smoketest('agg group (running)',
          [gen(5),
           f(lambda x: (x / 2, x)),
           agg(0, lambda sum, halfx, x: sum + x, group = lambda halfx, x: halfx, running = True)],
          [(0, 0, 0), (1, 0, 1), (2, 1, 2), (5, 1, 3), (4, 2, 4)])
smoketest('agg consecutive (running)',
          [gen(5),
           f(lambda x: (x / 2, x)),
           agg(0, lambda sum, halfx, x: sum + x, consecutive = lambda halfx, x: halfx, running = True)],
          [(0, 0, 0), (1, 0, 1), (2, 1, 2), (5, 1, 3), (4, 2, 4)])

smoketest('sort (lambda)',
          [gen(5), sort(lambda x: -x)],
          [(4,), (3,), (2,), (1,), (0,)])
smoketest('sort (string)',
          [gen(5), sort('x: -x')],
          [(4,), (3,), (2,), (1,), (0,)])

smoketest('expand',
          [gen(4), f(lambda x: [x] * x), expand()],
          [(1,), (2,), (2,), (3,), (3,), (3,)])
smoketest('expand (position)',
          [gen(3), f(lambda x: (x, (5, 6))), expand(1)],
          [(0, 5), (0, 6), (1, 5), (1, 6), (2, 5), (2, 6)])
smoketest('expand (negative position)',
          [gen(3), f(lambda x: (x, (5, 6))), expand(-1)],
          [(0, 5), (0, 6), (1, 5), (1, 6), (2, 5), (2, 6)])

smoketest('squish (0-1)',
          [gen(9), window(disjoint = 3), squish(), squish('+')],
          [(3,), (12,), (21,)])
smoketest('squish (>1)',
          [gen(27), window(disjoint = 3), squish(), window(disjoint = 3), squish(lambda x, y: x + y, max, min)],
          [(9, 7, 2), (36, 16, 11), (63, 25, 20)])

smoketest('window (default)',
          [gen(9), window(disjoint = 3), squish()],
          [(0, 1, 2), (3, 4, 5), (6, 7, 8)])
smoketest('window (disjoint)',
          [gen(9), window(disjoint = 3), squish()],
          [(0, 1, 2), (3, 4, 5), (6, 7, 8)])
smoketest('window (overlap)',
          [gen(9), window(overlap = 3), squish()],
          [(0, 1, 2), (1, 2, 3), (2, 3, 4), (3, 4, 5), (4, 5, 6), (5, 6, 7), (6, 7, 8), (7, 8, None), (8, None, None)])
smoketest('window (predicate function)',
          [gen(9), window(lambda x: x % 3 == 0), squish()],
          [(0, 1, 2), (3, 4, 5), (6, 7, 8)])
smoketest('window (predicate string)',
          [gen(9), window('x: x % 3 == 0'), squish()],
          [(0, 1, 2), (3, 4, 5), (6, 7, 8)])

smoketest('unique',
          [gen(3, 1),
           f(lambda x: [x for i in range(x)]),
           expand(),
           unique(),
           sort()],
          [(1,), (2,), (3,)])
smoketest('unique (consecutive)',
          [gen(3, 1),
           f(lambda x: [x for i in range(x)]),
           expand(),
           unique(consecutive = True),
           sort()],
          [(1,), (2,), (3,)])

# Hmm. Testing stdin is tricky.
# ./smoketest_api "echo 'abc' | osh ^ f 's: [c for c in s]' $" "[['a', 'b', 'c']]"

smoketest('sh',
          [sh('echo abc'), f(lambda s: [c for c in s])],
          [('a', 'b', 'c')])

# Builtins
smoketest('builtin ifelse',
          [gen(3), f(lambda x: ifelse(x == 0, 999, x))],
          [(999,), (1,), (2,)])

# print 'pipeline'
smoketest('pipeline 1',
          [[gen(3)], f(lambda x: x * 10)],
          [(0,), (10,), (20,)])
smoketest('pipeline 2',
          [gen(3), [f(lambda x: x * 10)]],
          [(0,), (10,), (20,)])
smoketest('pipeline 3',
          [[gen(3), f(lambda x: x * 10)]],
          [(0,), (10,), (20,)])
smoketest('pipeline 4',
          [[gen(3), [f(lambda x: x * 10)]]],
          [(0,), (10,), (20,)])

# ./smoketest_api "osh [ gen 3 ] ^ f 'x: (x, n(), n())' $" "[(0, 0, 1), (1, 2, 3), (2, 4, 5)]"
# ./smoketest_api "osh gen 3 ^ [ f 'x: (x, n(), n())' ] $" "[(0, 0, 1), (1, 2, 3), (2, 4, 5)]"
# ./smoketest_api "osh [ gen 3  ^ f 'x: (x, n(), n())' ] $" "[(0, 0, 1), (1, 2, 3), (2, 4, 5)]"
# ./smoketest_api "osh [ gen 3  ^ [ f 'x: (x, n(), n())' ] ] $" "[(0, 0, 1), (1, 2, 3), (2, 4, 5)]"

# Error handling

smoketest('error handling (default)',
          [gen(3), f(lambda x: x / (x - 1))],
          [(0,), (2,)])


print 'error handling (overridden)'
o = []
def collect(x):
    o.append(x)
    
def raise_exception(exception, op, input, host = None):
    raise exception
set_exception_handler(raise_exception)
try:
    osh(gen(3),
        f(lambda x: x / (x - 1)),
        f(collect))
    print 'ZeroDivisionError should have been thrown'
except ZeroDivisionError:
    import traceback
    # traceback.print_exc()
    # expected
    pass
if o != [0]:
    print 'o is wrong: %s' % o
