Я пишу функцию, которая принимает на вход 2 массива нулей и единиц ~ 8000 элементов на массив. Плотность единиц намного выше нулей. Мы можем ожидать гораздо меньше нулей, чем единиц. Моя функция eps
вычисляет статистику по этим массивам и возвращает результат. Относительно тривиальные операции — просто проверка на 0 и запись индекса, где 0 находится в массиве, а затем некоторые вычисления. Я изо всех сил старался оптимизировать скорость, но лучшее, что я мог получить, — это 4,5 ~ 5 секунд (для 18k пар массивов), используя timeit
(1000 пробегов) библиотека. Быстрое выполнение важно, так как мне нужно запустить эту функцию для миллиардов пар массивов.
#e.g. inputs
#ts_1 = [0,1,1,0,0,1,1,0,......]
#ts_2 = [1,1,1,1,1,1,1,0,......]
# tau = positive integer/float
import numpy as np
import pandas as pd
from itertools import product
def eps(ts_1, ts_2, tau):
Q_tau = 0
q_tau = 0
event_index1, = np.where(np.array(ts_1) == 0)
n1 = event_index1.shape[0]
event_index2, = np.where(np.array(ts_2) == 0)
n2 = event_index2.shape[0]
if (n1 != 0 and n2 != 0):
matching_idx = set(event_index1).intersection(event_index2)
c_ij = c_ji = 0.5 *len(matching_idx)
for x,y in product(event_index1,event_index2):
if x-y > 0 and (x-y)<= tau:
c_ij += 1
elif y-x > 0 and (y-x) <= tau:
c_ji += 1
Q_tau = (c_ij+c_ji)/math.sqrt( n1 * n2 )
q_tau = (c_ij - c_ji)/math.sqrt( n1 * n2 )
return Q_tau, q_tau
5 ответов
Во-первых, некоторые основы вашей реализации:
- Напечатайте намек на свои аргументы; Мне пришлось прочитать и исследовать кучу, чтобы сделать вывод, что
tau
положительное целое число (y-x) <= tau
не нуждается в паре- спасти
math.sqrt( n1 * n2 )
к переменной вместо ее повторения
Полная матричная реализация
Это операция n ** 2, которая составляет порядка 64000000 элементов, если ваши массивы состоят из 8000 элементов. Это за пределами той точки, где я бы отказался и использовал C и что-то вроде BLAS. если ты фактически заботиться о производительности, сейчас самое время.
Ваше решение масштабируется в соответствии с плотностью событий, медленнее по мере увеличения плотности событий. Есть решение с постоянной плотностью во времени:
- Не звони
np.where
- Не используйте заданное пересечение; использовать векторизованное сравнение
- Признайте, что вы можете представлять свои
c_ij
суммирование как сумма по частичной треугольной матрице (ij
верхнийji
ниже) с остановкой на диагонали, заданнойtau
- Сама матрица формируется трансляцией двух векторов событий и получает маску для верхней и нижней половин.
- Начальные значения для
c_**
представляют собой половину следа (диагональной суммы) этой широковещательной матрицы - Для данного
tau
и длины векторов, вы можете сэкономить время, предварительно рассчитав маски (не показаны в справочном коде). Это очень важно и может сократить время выполнения этого метода примерно на две трети. Многие виды наборов данных в дикой природе имеют постоянные размеры и могут извлечь из этого пользу.
Где-то с плотностью единиц около 85%, этот метод имеет производительность, эквивалентную исходной (для длины тестового вектора 2000), если вы не рассчитываете предварительно маску.
Редкий подход
Библиотека, поддерживающая разреженные матрицы, должна работать лучше, чем указанная выше. У Сципи они есть. Обратите внимание на этот подход:
- Входные данные должны быть инвертированы — 1 для событий, 0 для всех остальных.
- Будьте осторожны, не звоните
np.*
методы; использовать исключительно редкие методы - Вместо того, чтобы формировать треугольные полосы и затем суммировать в конце, быстрее выполнять разность сумм, используя
tau
- Это масштабируется с плотностью, в отличие от полноматричного подхода, и использует меньше памяти.
Отсортированный логарифмический обход
Немного пофантазирую над идеей @vpn:
- Это не обязательно должно быть n ** 2, если вы скажете Numpy правильно применить двоичный поиск
- Этот двоичный поиск можно векторизовать с помощью
searchsorted
- Вместо пересечения родных для Python наборов вы можете использовать
intersect1d
Для очень высоких плотностей этот метод остается самым быстрым; но имеет худшие характеристики масштабирования, чем полноматричный метод только трассировки для более низких плотностей единиц (более высоких плотностей событий).
Полная или разреженная матрица, только следы
(Еще один) подход:
- Для разреженной или полной матрицы
- немаскированный
- на основе только суммы диагональных трасс, без вызовов треугольных методов
- постоянное время в плотности
В частности, для полной матрицы этот метод очень быстр: всегда быстрее, чем разреженная, замаскированная трансляция и оригинал; и быстрее, чем логарифмический обход для плотностей ниже ~ 70%.
Накопительная сумма
Для полноты изложения нижеприведенный справочный источник включает метод совокупной суммы @ KellyBundy, который является конкурентоспособным для всех плотностей, кроме очень высоких.
Выход
All method durations (ms)
d% trace sorted cumsum old bcast sparse sparsetr
50.0 0.70 4.12 0.21 491.82 41.73 80.62 21.62
55.0 0.64 3.27 0.18 411.49 40.09 65.46 18.25
60.0 0.58 2.69 0.18 311.74 40.69 50.05 14.60
65.0 0.58 2.27 0.23 250.12 41.50 35.52 13.45
70.0 1.03 1.84 0.18 182.37 41.84 24.03 10.20
75.0 1.34 1.40 0.18 113.49 42.36 16.37 7.31
80.0 0.61 1.06 0.18 77.04 40.99 11.78 6.11
85.0 0.81 0.69 0.17 44.68 52.77 10.30 5.95
90.0 1.56 0.56 0.22 24.50 46.26 6.32 3.94
95.0 1.50 0.26 0.18 6.27 40.60 4.80 3.48
96.0 1.77 0.21 0.18 3.71 41.01 4.25 3.31
96.5 1.79 0.17 0.18 1.89 40.69 4.16 3.24
97.0 1.79 0.16 0.18 1.75 43.89 4.69 3.30
97.5 1.87 0.14 0.18 1.16 43.34 4.31 3.22
98.0 2.10 0.15 0.19 1.22 41.16 4.10 3.33
98.5 1.75 0.10 0.18 0.42 40.75 3.94 3.15
99.0 1.73 0.09 0.22 0.16 40.71 3.97 3.11
99.5 1.78 0.08 0.18 0.09 41.29 4.09 3.42
Fast method durations (us)
d% trace sorted cumsum
50.0 604 3993 124
55.0 556 3504 124
60.0 577 2895 125
65.0 569 2440 130
70.0 556 2041 126
75.0 574 1372 125
80.0 556 1078 123
85.0 562 686 126
90.0 563 477 128
95.0 561 238 126
96.0 558 176 126
96.5 569 139 124
97.0 557 137 131
97.5 568 104 124
98.0 601 116 128
98.5 565 61 128
99.0 567 51 129
99.5 566 43 127
Справочный источник
import math
from numbers import Real
from timeit import timeit
from typing import Tuple
import numpy as np
from itertools import product
import scipy.sparse
from scipy.sparse import spmatrix
def old_eps(ts_1, ts_2, tau):
Q_tau = 0
q_tau = 0
event_index1, = np.where(np.array(ts_1) == 0)
n1 = event_index1.shape[0]
event_index2, = np.where(np.array(ts_2) == 0)
n2 = event_index2.shape[0]
if (n1 != 0 and n2 != 0):
matching_idx = set(event_index1).intersection(event_index2)
c_ij = c_ji = 0.5 *len(matching_idx)
for x,y in product(event_index1,event_index2):
if x-y > 0 and (x-y)<= tau:
c_ij += 1
elif y-x > 0 and (y-x) <= tau:
c_ji += 1
Q_tau = (c_ij+c_ji)/math.sqrt( n1 * n2 )
q_tau = (c_ij - c_ji)/math.sqrt( n1 * n2 )
return Q_tau, q_tau
def bcast_eps(ts_1: np.ndarray, ts_2: np.ndarray, tau: int) -> Tuple[Real, Real]:
if ts_1.shape != ts_2.shape or len(ts_1.shape) != 1:
raise ValueError('Vectors must be flat and of the same length')
N, = ts_1.shape
events_1 = ts_1 == 0
events_2 = ts_2 == 0
n1 = np.sum(events_1)
n2 = np.sum(events_2)
if n1 == 0 or n2 == 0:
return 0, 0
all_true = np.ones((N, N), dtype=bool)
upper_mask = np.logical_or(np.tril(all_true), np.triu(all_true, k=+1+tau))
lower_mask = np.logical_or(np.triu(all_true), np.tril(all_true, k=-1-tau))
matches = np.logical_and(events_1[np.newaxis, :], events_2[:, np.newaxis])
matches_u = np.ma.array(matches, mask=upper_mask)
matches_l = np.ma.array(matches, mask=lower_mask)
n_matches = np.trace(matches)
c_ij = c_ji = n_matches / 2
c_ij += np.sum(matches_u)
c_ji += np.sum(matches_l)
den = math.sqrt(n1 * n2)
Q_tau = (c_ij + c_ji) / den
q_tau = (c_ij - c_ji) / den
return Q_tau, q_tau
def trace_eps(ts_1: np.ndarray, ts_2: np.ndarray, tau: int) -> Tuple[Real, Real]:
if ts_1.shape != ts_2.shape or len(ts_1.shape) != 1:
raise ValueError('Vectors must be flat and of the same length')
events_1 = ts_1 == 0
events_2 = ts_2 == 0
n1 = np.sum(events_1)
n2 = np.sum(events_2)
if n1 == 0 or n2 == 0:
return 0, 0
matches = np.logical_and(events_1[np.newaxis, :], events_2[:, np.newaxis])
n_matches = np.trace(matches)
c_ij = c_ji = n_matches / 2
for k in range(1, tau+1):
c_ij += np.trace(matches, k)
c_ji += np.trace(matches, -k)
den = math.sqrt(n1 * n2)
Q_tau = (c_ij + c_ji) / den
q_tau = (c_ij - c_ji) / den
return Q_tau, q_tau
def sorted_eps(ts_1: np.ndarray, ts_2: np.ndarray, tau: int) -> Tuple[Real, Real]:
if ts_1.shape != ts_2.shape or len(ts_1.shape) != 1:
raise ValueError('Vectors must be flat and of the same length')
event_index1, = np.where(np.array(ts_1) == 0)
event_index2, = np.where(np.array(ts_2) == 0)
n1, = event_index1.shape
n2, = event_index2.shape
if n1 == 0 or n2 == 0:
return 0, 0
n_matches, = np.intersect1d(
event_index1, event_index2, assume_unique=True,
).shape
c_ij = c_ji = n_matches/2
insertions = np.searchsorted(
a=event_index1, # array to pretend insertion into
v=event_index2, # values to insert
)
for insertion, y in zip(insertions, event_index2):
i1 = insertion
if i1 < n1:
if y == event_index1[i1]:
i1 += 1
for x in event_index1[i1:]:
if x - y > tau:
break
c_ij += 1
i2 = insertion - 1
if i2 >= 0:
for x in event_index1[i2::-1]:
if y - x > tau:
break
c_ji += 1
den = math.sqrt(n1 * n2)
Q_tau = (c_ij + c_ji) / den
q_tau = (c_ij - c_ji) / den
return Q_tau, q_tau
def sparse_eps(ts_1: spmatrix, ts_2: spmatrix, tau: int) -> Tuple[Real, Real]:
if ts_1.shape != ts_2.shape or len(ts_1.shape) != 2 or ts_1.shape[0] != 1:
raise ValueError('Vectors must be flat and of the same length')
n1 = ts_1.sum()
n2 = ts_2.sum()
if n1 == 0 or n2 == 0:
return 0, 0
matches = ts_2.T * ts_1
matches_u = scipy.sparse.triu(matches, +1).sum() - scipy.sparse.triu(matches, k=+1+tau).sum()
matches_l = scipy.sparse.tril(matches, -1).sum() - scipy.sparse.tril(matches, k=-1-tau).sum()
n_matches = matches.diagonal().sum()
c_ij = c_ji = n_matches / 2
c_ij += matches_u
c_ji += matches_l
den = math.sqrt(n1 * n2)
Q_tau = (c_ij + c_ji) / den
q_tau = (c_ij - c_ji) / den
return Q_tau, q_tau
def sparsetr_eps(ts_1: spmatrix, ts_2: spmatrix, tau: int) -> Tuple[Real, Real]:
if ts_1.shape != ts_2.shape or len(ts_1.shape) != 2 or ts_1.shape[0] != 1:
raise ValueError('Vectors must be flat and of the same length')
n1 = ts_1.sum()
n2 = ts_2.sum()
if n1 == 0 or n2 == 0:
return 0, 0
matches = ts_2.T * ts_1
n_matches = matches.diagonal().sum()
c_ij = c_ji = n_matches / 2
for k in range(1, tau+1):
c_ij += matches.diagonal(+k).sum()
c_ji += matches.diagonal(-k).sum()
den = math.sqrt(n1 * n2)
Q_tau = (c_ij + c_ji) / den
q_tau = (c_ij - c_ji) / den
return Q_tau, q_tau
def cumsum_eps(ts_1: spmatrix, ts_2: spmatrix, tau: int) -> Tuple[Real, Real]:
ts_1 = 1 - ts_1
ts_2 = 1 - ts_2
n1 = ts_1.sum()
n2 = ts_2.sum()
if n1 == 0 or n2 == 0:
return 0, 0
cs1 = np.pad(ts_1.cumsum(), (0, tau), 'edge')
cs2 = np.pad(ts_2.cumsum(), (0, tau), 'edge')
c_ij = c_ji = 0.5 * (ts_1 * ts_2).sum()
c_ij += ((cs1[tau:] - cs1[:-tau]) * ts_2).sum()
c_ji += ((cs2[tau:] - cs2[:-tau]) * ts_1).sum()
sqrt = math.sqrt(n1 * n2)
Q_tau = (c_ij + c_ji) / sqrt
q_tau = (c_ij - c_ji) / sqrt
return Q_tau, q_tau
def test() -> None:
shape = 2, 2000
full_ts = np.ones(shape, dtype=np.uint8)
sparse_ts = scipy.sparse.lil_matrix(shape, dtype=np.uint8)
density = 0.9
rand = np.random.default_rng(seed=0).random
events = rand(shape) > density
full_ts[events] = 0
sparse_ts[events] = 1
# Add some interesting values to test boundary conditions: on the diagonal
full_ts[:, 5] = 0
sparse_ts[:, 5] = 1
Q_exp = 1.9446638724075895
q_exp = 0.026566446344365977
methods = (
(old_eps, full_ts),
(bcast_eps, full_ts),
(trace_eps, full_ts),
(sorted_eps, full_ts),
(sparse_eps, sparse_ts),
(sparsetr_eps, sparse_ts),
(cumsum_eps, full_ts),
)
for eps, ts in methods:
Q_tau, q_tau = eps(*ts, tau=10)
assert math.isclose(Q_tau, Q_exp, abs_tol=1e-12, rel_tol=0)
assert math.isclose(q_tau, q_exp, abs_tol=1e-12, rel_tol=0)
def compare(fast: bool) -> None:
shape = 2, 2000
rand = np.random.default_rng(seed=0).random
n = 400 if fast else 1
methods = (
trace_eps,
sorted_eps,
cumsum_eps,
)
if fast:
print('Fast method durations (us)')
factor = 1e6
fmt="{:8.0f}"
else:
print('All method durations (ms)')
factor = 1e3
fmt="{:8.2f}"
methods += (
old_eps,
bcast_eps,
sparse_eps,
sparsetr_eps,
)
print(' d%', ' '.join(
f'{method.__name__.removesuffix("_eps"):>8}'
for method in methods
))
densities = np.hstack((
np.linspace(0.50, 0.95, 10),
np.linspace(0.960, 0.995, 8),
))
for density in densities:
print(f'{100*density:4.1f}', end=' ')
full_ts = np.ones(shape, dtype=np.uint8)
sparse_ts = scipy.sparse.lil_matrix(shape, dtype=np.uint8)
events = rand(shape) > density
full_ts[events] = 0
sparse_ts[events] = 1
inputs = (
full_ts,
full_ts,
full_ts,
)
if not fast:
inputs += (
full_ts,
full_ts,
sparse_ts,
sparse_ts,
)
for eps, ts in zip(methods, inputs):
t = timeit(lambda: eps(*ts, tau=10), number=n)
print(fmt.format(t/n*factor), end=' ')
print()
print()
if __name__ == '__main__':
test()
compare(False)
compare(True)
for x,y in product(event_index1,event_index2)
выглядит убийцей эффективности. Если длина event_index1, event_index2
являются L1, L2
соответственно, этот цикл имеет O(L1 * L2)
временная сложность. Вам может сойти с рук O(L1 * log L2)
(или O(L2 * log L1)
в зависимости от того, какой из них больше.
Обратите внимание, что для любого данного x
в c_ij
увеличивается на количество y
с в [x - tau, x)
range. Two binary searches over event_index2
would give you this range, along the lines of
for x in event_index1:
c_ij += bisect_left(event_index2, x) - bisect_left(event_index2, x - tau)
and ditto for c_ji
. Of course, from bisect import bisect_left
.
- 1
In my testing, your binary search suggestion is by far the fastest (though I used Numpy’s
search_sorted
implementation rather than native Python).– Reinderien
- 2
@Reinderien
search_sorted
shall indeed be better than nativebisect
. I am not too versed innumpy
to suggest it. Thanks.– vnp
@vnp I really like this solution. I never even thought it could be done using sorting instead of loop iteration.
– skynaive
You have quadratic runtime, we can reduce it to linear. You want to count the numbers of zeros in certain ranges. Let’s flip zeros and ones and count the numbers of ones in ranges by subtracting prefix sums:
def Kelly_eps(ts_1, ts_2, tau):
ts_1 = 1 - ts_1
ts_2 = 1 - ts_2
n1 = ts_1.sum()
n2 = ts_2.sum()
if n1 == 0 or n2 == 0:
return 0, 0
cs1 = np.pad(ts_1.cumsum(), (0, tau), 'edge')
cs2 = np.pad(ts_2.cumsum(), (0, tau), 'edge')
c_ij = c_ji = 0.5 * (ts_1 * ts_2).sum()
c_ij += ((cs1[tau:] - cs1[:-tau]) * ts_2) .sum () c_ji + = ((cs2[tau:] - cs2[:-tau]) * ts_1) .sum () sqrt = math.sqrt (n1 * n2) Q_tau = (c_ij + c_ji) / sqrt q_tau = (c_ij - c_ji) / sqrt return Q_tau, q_tau
Использование сценария Reinderien для сравнения с двумя самыми быстрыми решениями:
d=50% trace_eps: 1.37 ms sorted_eps: 8.33 ms Kelly_eps: 0.19 ms
d=55% trace_eps: 0.80 ms sorted_eps: 7.10 ms Kelly_eps: 0.17 ms
d=61% trace_eps: 1.20 ms sorted_eps: 5.79 ms Kelly_eps: 0.19 ms
d=66% trace_eps: 1.25 ms sorted_eps: 4.20 ms Kelly_eps: 0.22 ms
d=72% trace_eps: 0.98 ms sorted_eps: 2.50 ms Kelly_eps: 0.19 ms
d=77% trace_eps: 0.73 ms sorted_eps: 1.62 ms Kelly_eps: 0.13 ms
d=83% trace_eps: 0.73 ms sorted_eps: 1.23 ms Kelly_eps: 0.12 ms
d=88% trace_eps: 0.72 ms sorted_eps: 0.65 ms Kelly_eps: 0.12 ms
d=94% trace_eps: 0.71 ms sorted_eps: 0.37 ms Kelly_eps: 0.12 ms
d=99% trace_eps: 0.69 ms sorted_eps: 0.07 ms Kelly_eps: 0.12 ms
(Это может быть не на 100% актуально, поскольку речь идет не о Python, а только о части пересечения, а не о более сложной части согласования в пределах порогового расстояния. Но, возможно, по крайней мере, пища для размышлений.)
matching_idx
Я думаю, это список всех мест, где на обоих входах стоит 0? На самом деле вам не нужен этот список, просто подсчитайте, сколько позиций, где ts_1[i]
и ts_2[i]
оба равны нулю. Было бы очень быстро вычислить это из исходных входных данных, например count_zeros( ts_1 | ts_2 )
, вместо использования пересечения в двух списках индексов.
Это не разделит никакой работы с поиском рядом, поблизости индексы, так что вам, вероятно, все равно придется сгенерировать списки индексов с нулевыми входными данными. Тем не менее, это может быть дешевле, чем часть перекрестка.
(IDK, как получить numpy для этого (я не знаю numpy), но это просто на C с внутренними функциями SIMD или любым другим способом компиляции языка программирования или JIT для машинного кода с инструкциями SIMD, которые дают вам аналогичный уровень контроля.)
Чтобы сделать его эффективным, выполняйте операцию ИЛИ на лету (один вектор SIMD за раз, не сохраняя результат в новом массиве), а count_zeros
вероятно делать как len - count_ones(ts1|ts2)
.
Если вы храните свои входные элементы в виде упакованных растровых изображений, вы можете использовать ИЛИ 128 или 256 бит на 16 или 32-байтовый вектор SIMD и эффективно popcount (https://github.com/WojciechMula/sse-popcount/). Некоторые алгоритмы поп-подсчета SIMD будут одинаково эффективно работать для прямого подсчета нулей, и в этом случае вы можете это сделать, но из коробки вы обычно найдете оптимизированный код для подсчета единиц, отсюда и мое предложение.
Или, если вы храните свои входные массивы как распакованные 8-битные или 32-битные целочисленные элементы, вы можете просто sum += ts_1[i] | ts_2[i]
(в идеале SIMD векторизованный). Чтобы эффективно векторизовать сумму 8-битных элементов 0/1, см. https://stackoverflow.com/questions/54541129/how-to-count-character-occurrences-using-simd — проблема очень похожа на накопление _mm256_cmpeq_epi8
результаты (0 / -1).
Для 32-битных элементов вам не нужно расширять, чтобы избежать переполнения при суммировании результатов OR, но ваша плотность данных будет очень низкой (в 4 или 32 раза ниже, чем возможно), поэтому, надеюсь, вы этого не делаете.
8000 бит — это всего 31,25 вектора SIMD по 256 бит каждый, или 1000 байт, так что это должно быть очень быстро, например, может быть, 100 тактов или около того с данными, уже горячими в кэше L1d. (https://github.com/WojciechMula/sse-popcount/blob/master/results/skylake/skylake-i7-6700-gcc5.3.0-avx2.rst#input-size-1024b)
Спасибо, что разместили свой ответ. В вашем ответе говорится о вещах, о которых я никогда не слышал, и вы говорите о вещах, которые я не могу придумать, как реализовать на Python. Ваш ответ — первый убедительный аргумент, который я видел против Python (поскольку он является языком высокого уровня). Однако я не думаю, что ваш ответ неуместен, поскольку использование C является общепринятой практикой, когда вам нужна скорость, Python не может вас достать. Я бы сказал, что это идиоматический Python, но, учитывая, что я, как и многие другие программисты Python, не могу программировать на C, было бы натяжкой сказать идиоматический …
— Пейлонрайз♦
@Peilonrayz: Для ясности,
count_zeros( ts_1 | ts_2 )
— это псевдокод, а не то, что вы можете буквально написать на C. Вы не можете ИЛИ целые массивы, вам нужно перебирать их в цикле. (Ты мог сделать в основном это с C ++std::bitset<8000>
— это действительно перегружает|
оператор и имеет.count()
метод для подсчета тех, который, надеюсь, использует оптимизированный popcnt под капотом.)— Питер Кордес
- 1
FWIW Я обнаружил, что это ясно. Причина, по которой я конкретно упомянул C, заключается в том, что C — это распространенный способ ускорить Python. Например, в большей части std-lib, например делить пополам/_bisect или numpy, которые случайно упомянуты в приведенных выше ответах.
— Пейлонрайз♦
Вы ближе всего к документации говорите нам о том, что вы рассчитываете статистику. Что это за статистика? Я пробовал искать «Q statistic», «tau statistic» и «eps statistic», но не нашел ничего, что бы соответствовало тому, что вы сделали. Вы можете подумать, что знаете, что он делает, и не забудете, но через шесть месяцев вы полностью забудете, и когда вы вернетесь к этому, у вас будет столько же знаний, сколько у нас. Ваше объяснение входных переменных также может быть помещено в строку документации.
Пересечение индексов событий и вычитание x
и y
предполагает, что входные массивы имеют одинаковую длину. Это должно быть задокументировано, если это правда.
Вы должны знать, что такое переменная, просто по ее имени, не оглядываясь назад на то, как вы ее определили. eps
, Q_tau
, q_tau
, и tau
может пройти этот тест (в зависимости от того, что такое «эта статистика» на самом деле). ts_1
, ts_2
, event_index1
, event_index2
, n1
, n2
, matching_idx
, c_ij
, c_ji
, x
и y
не надо. Я бы предложил назвать их input_array_1
, input_array_2
, zeros_indices_of_1
, zero_indices_of_2
, num_zeros_1
, num_zeros_2
, zero_indices_of_both
, above_diagonal
, below_diagonal
, index_1
, и index_2
. (Вы заметили, что знаете, что все это должно быть, кроме диагоналей?)
Похоже, вы не используете pandas, поэтому не будем импортировать его.
Я люблю возвращаться пораньше, если возможно. Ты мог бы иметь
if ( n1 == 0 or n2 == 0 ): return 0,0
Это освободит вас от объявления Q_tau
и q_tau
в начале и позволяет убрать отступ в конце.
Спасибо за подробное объяснение и тесты производительности. Мне больше понравилось отсортированное решение из-за простоты, но я буду развертывать все на реальных данных для более практической производительности.
— скайнайв
Спасибо. Дисплей намного чище :-). Теперь мне интересно, на что действительно похож тау ОП. Я думаю, что ваше решение трассировки становится быстрее с меньшим тау, а мое — быстрее с большим тау. При очень высоких плотностях отсортированное решение приближается к линейному времени, как у меня всегда, поэтому они должны там отличаться на постоянный коэффициент. Возможно, любой из них можно было бы улучшить, чтобы настроить этот постоянный коэффициент, но м-м-м. Я подозреваю, что на более высоком уровне будет происходить большее дальнейшее улучшение, не за счет более быстрой обработки каждой пары векторов, а за счет уменьшения количества пар, которые нужно сделать вообще (или, может быть, этого даже не следует делать попарно).
— Келли Банди