복붙노트

[PYTHON] generator / iterable / iterator가있는 Python 무작위 샘플

PYTHON

generator / iterable / iterator가있는 Python 무작위 샘플

파이썬의 random.sample이 제네레이터 객체로 작업 할 수있는 방법이 있는지 알고 있습니까? 매우 큰 텍스트 코퍼스에서 무작위 샘플을 얻으려고합니다. 문제는 random.sample ()이 다음 오류를 발생시키는 것입니다.

TypeError: object of type 'generator' has no len()

itertools에서 무언가로 이것을 할 수있는 방법이 있을지도 모르지만 약간의 검색만으로는 아무 것도 찾을 수 없습니다.

다소 복잡한 예 :

import random
def list_item(ls):
    for item in ls:
        yield item

random.sample( list_item(range(100)), 20 )

최신 정보

MartinPieters의 요청에 따라 현재 제안 된 세 가지 방법 중 일부 타이밍을 수행했습니다. 결과는 다음과 같습니다.

Sampling 1000 from 10000
Using iterSample 0.0163 s
Using sample_from_iterable 0.0098 s
Using iter_sample_fast 0.0148 s

Sampling 10000 from 100000
Using iterSample 0.1786 s
Using sample_from_iterable 0.1320 s
Using iter_sample_fast 0.1576 s

Sampling 100000 from 1000000
Using iterSample 3.2740 s
Using sample_from_iterable 1.9860 s
Using iter_sample_fast 1.4586 s

Sampling 200000 from 1000000
Using iterSample 7.6115 s
Using sample_from_iterable 3.0663 s
Using iter_sample_fast 1.4101 s

Sampling 500000 from 1000000
Using iterSample 39.2595 s
Using sample_from_iterable 4.9994 s
Using iter_sample_fast 1.2178 s

Sampling 2000000 from 5000000
Using iterSample 798.8016 s
Using sample_from_iterable 28.6618 s
Using iter_sample_fast 6.6482 s

따라서 array.insert에는 큰 샘플 크기의 경우 심각한 단점이 있습니다. 메소드 시간을 지정하는 데 사용한 코드

from heapq import nlargest
import random
import timeit


def iterSample(iterable, samplesize):
    results = []
    for i, v in enumerate(iterable):
        r = random.randint(0, i)
        if r < samplesize:
            if i < samplesize:
                results.insert(r, v) # add first samplesize items in random order
            else:
                results[r] = v # at a decreasing rate, replace random items

    if len(results) < samplesize:
        raise ValueError("Sample larger than population.")

    return results

def sample_from_iterable(iterable, samplesize):
    return (x for _, x in nlargest(samplesize, ((random.random(), x) for x in iterable)))

def iter_sample_fast(iterable, samplesize):
    results = []
    iterator = iter(iterable)
    # Fill in the first samplesize elements:
    for _ in xrange(samplesize):
        results.append(iterator.next())
    random.shuffle(results)  # Randomize their positions
    for i, v in enumerate(iterator, samplesize):
        r = random.randint(0, i)
        if r < samplesize:
            results[r] = v  # at a decreasing rate, replace random items

    if len(results) < samplesize:
        raise ValueError("Sample larger than population.")
    return results

if __name__ == '__main__':
    pop_sizes = [int(10e+3),int(10e+4),int(10e+5),int(10e+5),int(10e+5),int(10e+5)*5]
    k_sizes = [int(10e+2),int(10e+3),int(10e+4),int(10e+4)*2,int(10e+4)*5,int(10e+5)*2]

    for pop_size, k_size in zip(pop_sizes, k_sizes):
        pop = xrange(pop_size)
        k = k_size
        t1 = timeit.Timer(stmt='iterSample(pop, %i)'%(k_size), setup='from __main__ import iterSample,pop')
        t2 = timeit.Timer(stmt='sample_from_iterable(pop, %i)'%(k_size), setup='from __main__ import sample_from_iterable,pop')
        t3 = timeit.Timer(stmt='iter_sample_fast(pop, %i)'%(k_size), setup='from __main__ import iter_sample_fast,pop')

        print 'Sampling', k, 'from', pop_size
        print 'Using iterSample', '%1.4f s'%(t1.timeit(number=100) / 100.0)
        print 'Using sample_from_iterable', '%1.4f s'%(t2.timeit(number=100) / 100.0)
        print 'Using iter_sample_fast', '%1.4f s'%(t3.timeit(number=100) / 100.0)
        print ''

나는 또한 모든 방법이 정말로 발전기의 불편한 샘플을 취하는 지 확인하기위한 테스트를 실시했다. 그래서 모든 방법에 대해 나는 10000 번에서 10000 번까지 1000 개의 요소를 샘플링하고 모든 세 가지 방법에 대해 기대했던 것처럼 인구에서 각 항목의 평균 발생 빈도를 ~ .1로 계산했습니다.

해결법

  1. ==============================

    1.Martijn Pieters의 대답은 정확하지만 샘플 크기가 커지면 loop.list에 list.insert를 사용하면 2 차적인 복잡성이 발생할 수 있으므로 속도가 느려집니다.

    Martijn Pieters의 대답은 정확하지만 샘플 크기가 커지면 loop.list에 list.insert를 사용하면 2 차적인 복잡성이 발생할 수 있으므로 속도가 느려집니다.

    필자의 의견으로는 성능을 높이면서 일관성을 유지할 수있는 대안이 있습니다.

    def iter_sample_fast(iterable, samplesize):
        results = []
        iterator = iter(iterable)
        # Fill in the first samplesize elements:
        try:
            for _ in xrange(samplesize):
                results.append(iterator.next())
        except StopIteration:
            raise ValueError("Sample larger than population.")
        random.shuffle(results)  # Randomize their positions
        for i, v in enumerate(iterator, samplesize):
            r = random.randint(0, i)
            if r < samplesize:
                results[r] = v  # at a decreasing rate, replace random items
        return results
    

    그 차이는 10000을 넘는 샘플 화 값에 대해 서서히 차이가 나기 시작합니다. (1000000, 100000)로 통화 시간 :

  2. ==============================

    2.당신은 할 수 없습니다.

    당신은 할 수 없습니다.

    다음과 같은 두 가지 옵션이 있습니다. 전체 생성기를 목록으로 읽어 들인 다음 해당 목록에서 샘플을 추출하거나 생성기를 하나씩 읽는 방법을 사용하여 샘플을 선택합니다.

    import random
    
    def iterSample(iterable, samplesize):
        results = []
    
        for i, v in enumerate(iterable):
            r = random.randint(0, i)
            if r < samplesize:
                if i < samplesize:
                    results.insert(r, v) # add first samplesize items in random order
                else:
                    results[r] = v # at a decreasing rate, replace random items
    
        if len(results) < samplesize:
            raise ValueError("Sample larger than population.")
    
        return results
    

    이 메서드는 지금까지 반복 가능한 항목의 수를 기준으로 다음 항목이 샘플에 포함될 확률을 조정합니다. 메모리에 샘플화된 항목 이상을 보관할 필요는 없습니다.

    해결책은 내 것이 아닙니다. 그것은 여기에 또 다른 대답의 일부로 제공되었습니다.

  3. ==============================

    3.단지 그것의 지옥을 위해, 여기에서 O (n lg k) 시간 안에 생성 된 n 개의 항목들로부터 대체하지 않고 k 개의 원소들을 샘플링하는 하나의 라이너가 있습니다 :

    단지 그것의 지옥을 위해, 여기에서 O (n lg k) 시간 안에 생성 된 n 개의 항목들로부터 대체하지 않고 k 개의 원소들을 샘플링하는 하나의 라이너가 있습니다 :

    from heapq import nlargest
    
    def sample_from_iterable(it, k):
        return (x for _, x in nlargest(k, ((random.random(), x) for x in it)))
    
  4. ==============================

    4.이터레이터의 항목 수가 알려지면 (항목을 계산하는 다른 곳에서) 다른 접근법은 다음과 같습니다.

    이터레이터의 항목 수가 알려지면 (항목을 계산하는 다른 곳에서) 다른 접근법은 다음과 같습니다.

    def iter_sample(iterable, iterlen, samplesize):
        if iterlen < samplesize:
            raise ValueError("Sample larger than population.")
        indexes = set()
        while len(indexes) < samplesize:
            indexes.add(random.randint(0,iterlen))
        indexesiter = iter(sorted(indexes))
        current = indexesiter.next()
        ret = []
        for i, item in enumerate(iterable):
            if i == current:
                ret.append(item)
                try:
                    current = indexesiter.next()
                except StopIteration:
                    break
        random.shuffle(ret)
        return ret
    

    나는 iterlen과 관련하여 sampsize가 작을 때 이것을 더 빨리 발견한다. 전체 또는 전체에 가까운 샘플을 요구할 때 문제가 있습니다.

    iter_sample (iterlen = 10000, samplesize = 100) 시간 : (1, 'ms') iter_sample_fast (iterlen = 10000, samplesize = 100) 시간 : (15, 'ms')

    iter_sample (iterlen = 1000000, samplesize = 100) 시간 : (65, 'ms') iter_sample_fast (iterlen = 1000000, samplesize = 100) 시간 : (1477, 'ms')

    iter_sample (iterlen = 1000000, samplesize = 1000) 시간 : (64, 'ms') iter_sample_fast (iterlen = 1000000, samplesize = 1000) 시간 : (1459, 'ms')

    iter_sample (iterlen = 1000000, samplesize = 10000) 시간 : (86, 'ms') iter_sample_fast (iterlen = 1000000, samplesize = 10000) 시간 : (1480, 'ms')

    iter_sample (iterlen = 1000000, samplesize = 100000) 시간 : (388, 'ms') iter_sample_fast (iterlen = 1000000, samplesize = 100000) 시간 : (1521, 'ms')

    iter_sample (iterlen = 1000000, samplesize = 1000000) 시간 : (25359, 'ms') iter_sample_fast (iterlen = 1000000, samplesize = 1000000) 시간 : (2178, 'ms')

  5. ==============================

    5.발전기가 얼마나 오래되는지 (그리고 점근선 형태로 균일하게 분포 될 것임)에 대한 아이디어가있을 때 다른 방법으로 입증 될 때까지 가장 빠른 방법 :

    발전기가 얼마나 오래되는지 (그리고 점근선 형태로 균일하게 분포 될 것임)에 대한 아이디어가있을 때 다른 방법으로 입증 될 때까지 가장 빠른 방법 :

    def gen_sample(generator_list, sample_size, iterlen):
        num = 0
        inds = numpy.random.random(iterlen) <= (sample_size * 1.0 / iterlen)
        results = []
        iterator = iter(generator_list)
        gotten = 0
        while gotten < sample_size: 
            try:
                b = iterator.next()
                if inds[num]: 
                    results.append(b)
                    gotten += 1
                num += 1    
            except: 
                num = 0
                iterator = iter(generator_list)
                inds = numpy.random.random(iterlen) <= ((sample_size - gotten) * 1.0 / iterlen)
        return results
    

    작은 iterable과 거대한 iterable 모두에서 가장 빠릅니다 (그리고 그 사이에 아마도 모두)

    # Huge
    res = gen_sample(xrange(5000000), 200000, 5000000)
    timing: 1.22s
    
    # Small
    z = gen_sample(xrange(10000), 1000, 10000) 
    timing: 0.000441    
    
  6. from https://stackoverflow.com/questions/12581437/python-random-sample-with-a-generator-iterable-iterator by cc-by-sa and MIT license