Hi,
I want to do a table-table join, both of them having keys of same type, but when i apply the .join
operator, the IDE itself gives following errors:
None of the following functions can be called with the arguments supplied.
join(KTable<PaymentProcessedKey!, TypeVariable(VO)!>!, ValueJoiner<in Int!, in TypeVariable(VO)!, out TypeVariable(VR)!>!, Materialized<PaymentProcessedKey!, TypeVariable(VR)!, KeyValueStore<Bytes!, ByteArray!>!>!) where VO = TypeVariable(VO), VR = TypeVariable(VR) for fun <VO : Any!, VR : Any!> join(p0: KTable<PaymentProcessedKey!, VO!>!, p1: ValueJoiner<in Int!, in VO!, out VR!>!, p2: Materialized<PaymentProcessedKey!, VR!, KeyValueStore<Bytes!, ByteArray!>!>!): KTable<PaymentProcessedKey!, VR!>! defined in org.apache.kafka.streams.kstream.KTable
join(KTable<PaymentProcessedKey!, TypeVariable(VO)!>!, ValueJoiner<in Int!, in TypeVariable(VO)!, out TypeVariable(VR)!>!, Named!) where VO = TypeVariable(VO), VR = TypeVariable(VR) for fun <VO : Any!, VR : Any!> join(p0: KTable<PaymentProcessedKey!, VO!>!, p1: ValueJoiner<in Int!, in VO!, out VR!>!, p2: Named!): KTable<PaymentProcessedKey!, VR!>! defined in org.apache.kafka.streams.kstream.KTable
join(KTable<TypeVariable(KO)!, TypeVariable(VO)!>!, Function<Int!, TypeVariable(KO)!>!, ValueJoiner<Int!, TypeVariable(VO)!, TypeVariable(VR)!>!) where VR = TypeVariable(VR), KO = TypeVariable(KO), VO = TypeVariable(VO) for fun <VR : Any!, KO : Any!, VO : Any!> join(p0: KTable<KO!, VO!>!, p1: Function<Int!, KO!>!, p2: ValueJoiner<Int!, VO!, VR!>!): KTable<PaymentProcessedKey!, VR!>! defined in org.apache.kafka.streams.kstream.KTable
My code goes as follows (Kotlin code):
"dbserver3.sm_db.payment",
Consumed.with(JsonSerdes.key(), JsonSerdes.payment())
)
val paymentsYesterday = builder.stream(
"dbserver3.sm_db.payment",
Consumed.with(JsonSerdes.key(), JsonSerdes.payment())
)
val paymentsMerchantToday = paymentsToday.map { _, value ->
KeyValue(
PaymentProcessedKey(value.merchantId, value.lastUpdate, 0),
value.amount
)
}
val paymentsMerchantYesterday = paymentsYesterday.map {_, value ->
KeyValue(
PaymentProcessedKey(value.merchantId, value.lastUpdate, 1),
value.amount
)
}
paymentsMerchantToday.print(Printed.toSysOut<PaymentProcessedKey?, Int?>().withLabel("Today"))
paymentsMerchantYesterday.print(Printed.toSysOut<PaymentProcessedKey?, Int?>().withLabel("Yesterday"))
val paymentProcessedToday = paymentsMerchantToday.groupByKey(
Grouped.with(JsonSerdes.paymentProcessedKey(), Serdes.Integer())
).reduce(Integer::sum)
val paymentProcessedYesterday = paymentsMerchantYesterday.groupByKey(
Grouped.with(JsonSerdes.paymentProcessedKey(), Serdes.Integer())
).reduce(Integer::sum)
val tYJoiner: ValueJoiner<Int, Int, Int> = ValueJoiner<Int, Int, Int>{a: Int, b: Int -> ((a - b)/b)*100}
val tYPrams: Joined<PaymentProcessedKey, Int, Int> =
Joined.with(
JsonSerdes.paymentProcessedKey(),
Serdes.Integer(),
Serdes.Integer()
)
val growth = paymentProcessedToday.join(paymentProcessedYesterday, tYJoiner, tYPrams);```
Which parameter am I missing?